Skip to content

データソースプラグイン

Note: ⚠️ このドキュメントはAIによって自動翻訳されています。不正確な部分がある場合は、英語版を参照してください。

データソースプラグインは、FlexAI 1.9.0で導入された新しいタイプのプラグインです。ナレッジパイプラインにおいて、ドキュメントデータソースとして機能し、パイプライン全体の起点となります。

この記事では、データソースプラグインの開発方法について、プラグインアーキテクチャ、コード例、デバッグ方法を網羅し、データソースプラグインの迅速な開発とリリースを支援します。

前提条件

読み進める前に、ナレッジパイプラインの基本的な理解とプラグイン開発に関する知識があることを確認してください。関連情報はこちらで確認できます:

データソースプラグインの種類

FlexAIは3種類のデータソースプラグインをサポートしています:Webクローラー、オンラインドキュメント、オンラインドライブ。プラグインコードを実装する際、プラグインの機能を提供するクラスは特定のデータソースクラスを継承する必要があります。3種類のプラグインタイプはそれぞれ異なる親クラスに対応しています。

Info:

親クラスを継承してプラグイン機能を実装する方法については、FlexAIプラグイン開発:Hello Worldガイド - 4.4 ツールロジックの実装を参照してください。

各データソースプラグインタイプは複数のデータソースをサポートしています。例えば:

  • Webクローラー:Jina Reader、FireCrawl
  • オンラインドキュメント:Notion、Confluence、GitHub
  • オンラインドライブ:OneDrive、Google Drive、Box、AWS S3、Tencent COS

データソースタイプとデータソースプラグインタイプの関係を以下に示します。

データソースプラグインの開発

データソースプラグインの作成

スキャフォールディングコマンドラインツールを使用して、datasourceタイプを選択することでデータソースプラグインを作成できます。セットアップが完了すると、コマンドラインツールが自動的にプラグインプロジェクトコードを生成します。

dify plugin init

Info:

通常、データソースプラグインはFlexAIプラットフォームの他の機能を使用する必要がないため、追加の権限は必要ありません。

データソースプラグインの構造

データソースプラグインは3つの主要コンポーネントで構成されています:

  • manifest.yamlファイル:プラグインの基本情報を記述します。
  • providerディレクトリ:プラグインプロバイダーの説明と認証実装コードを含みます。
  • datasourcesディレクトリ:データソースからデータを取得するための説明とコアロジックを含みます。
├── _assets
│   └── icon.svg
├── datasources
│   ├── your_datasource.py
│   └── your_datasource.yaml
├── main.py
├── manifest.yaml
├── PRIVACY.md
├── provider
│   ├── your_datasource.py
│   └── your_datasource.yaml
├── README.md
└── requirements.txt

正しいバージョンとタグの設定

  • manifest.yamlファイルで、最小サポートFlexAIバージョンを以下のように設定します:

minimum_dify_version: 1.9.0
- manifest.yamlファイルで、FlexAI Marketplaceのデータソースカテゴリにプラグインを表示するために以下のタグを追加します:

tags:
  - rag
- requirements.txtファイルで、データソースプラグイン開発に使用するプラグインSDKバージョンを以下のように設定します:

flexai-plugin>=0.5.0,<0.6.0

データソースプロバイダーの追加

プロバイダーYAMLファイルの作成

プロバイダーYAMLファイルの内容は基本的にツールプラグインと同じですが、以下の2点のみ異なります:

# データソースプラグインのプロバイダータイプを指定:online_drive、online_document、またはwebsite_crawl
provider_type: online_drive # online_document, website_crawl

# データソースを指定
datasources:
  - datasources/PluginName.yaml

Info:

プロバイダーYAMLファイルの作成について詳しくは、FlexAIプラグイン開発:Hello Worldガイド-4.3 プロバイダー認証情報の設定を参照してください。

Info:

データソースプラグインはOAuth 2.0またはAPIキーによる認証をサポートしています。

OAuthの設定については、ツールプラグインにOAuthサポートを追加するを参照してください。

プロバイダーコードファイルの作成

  • APIキー認証モードを使用する場合、データソースプラグインのプロバイダーコードファイルはツールプラグインと同一です。プロバイダークラスが継承する親クラスをDatasourceProviderに変更するだけです。

class YourDatasourceProvider(DatasourceProvider):

    def _validate_credentials(self, credentials: Mapping[str, Any]) -> None:
        try:
            """
            IMPLEMENT YOUR VALIDATION HERE
            """
        except Exception as e:
            raise ToolProviderCredentialValidationError(str(e))
- OAuth認証モードを使用する場合、データソースプラグインはツールプラグインとわずかに異なります。OAuthでアクセス権限を取得する際、データソースプラグインはフロントエンドに表示するユーザー名とアバターを同時に返すことができます。そのため、_oauth_get_credentials_oauth_refresh_credentialsnameavatar_urlexpires_atcredentialsを含むDatasourceOAuthCredentials型を返す必要があります。

DatasourceOAuthCredentialsクラスは以下のように定義されており、返す際に対応する型を設定する必要があります:

class DatasourceOAuthCredentials(BaseModel):
    name: str | None = Field(None, description="The name of the OAuth credential")
    avatar_url: str | None = Field(None, description="The avatar url of the OAuth")
    credentials: Mapping[str, Any] = Field(..., description="The credentials of the OAuth")
    expires_at: int | None = Field(
        default=-1,
        description="""The expiration timestamp (in seconds since Unix epoch, UTC) of the credentials.
        Set to -1 or None if the credentials do not expire.""",
    )

_oauth_get_authorization_url_oauth_get_credentials_oauth_refresh_credentialsの関数シグネチャは以下の通りです:

_oauth_get_authorization_url:

```python
def _oauth_get_authorization_url(self, redirect_uri: str, system_credentials: Mapping[str, Any]) -> str:
"""
Generate the authorization URL for {{ .PluginName }} OAuth.
"""
try:
    """
    IMPLEMENT YOUR AUTHORIZATION URL GENERATION HERE
    """
except Exception as e:
    raise DatasourceOAuthError(str(e))
return ""
```

_oauth_get_credentials:

```python
def _oauth_get_credentials(
self, redirect_uri: str, system_credentials: Mapping[str, Any], request: Request
) -> DatasourceOAuthCredentials:
"""
Exchange code for access_token.
"""
try:
    """
    IMPLEMENT YOUR CREDENTIALS EXCHANGE HERE
    """
except Exception as e:
    raise DatasourceOAuthError(str(e))
return DatasourceOAuthCredentials(
    name="",
    avatar_url="",
    expires_at=-1,
    credentials={},
)
```

_oauth_refresh_credentials:

```python
def _oauth_refresh_credentials(
self, redirect_uri: str, system_credentials: Mapping[str, Any], credentials: Mapping[str, Any]
) -> DatasourceOAuthCredentials:
"""
Refresh the credentials
"""
return DatasourceOAuthCredentials(
    name="",
    avatar_url="",
    expires_at=-1,
    credentials={},
)
```

データソースの追加

YAMLファイル形式とデータソースコード形式は、3種類のデータソースによって異なります。

Webクローラー

WebクローラーデータソースプラグインのプロバイダーYAMLファイルでは、output_schemaは常に4つのパラメータを返す必要があります:source_urlcontenttitledescription

output_schema:
    type: object
    properties:
      source_url:
        type: string
        description: the source url of the website
      content:
        type: string
        description: the content from the website
      title:
        type: string
        description: the title of the website
      "description":
        type: string
        description: the description of the website

Webクローラープラグインのメインロジックコードでは、クラスはWebsiteCrawlDatasourceを継承し、_get_website_crawlメソッドを実装する必要があります。次に、create_crawl_messageメソッドを使用してWebクロールメッセージを返します。

複数のWebページをクロールしてバッチで返すには、WebSiteInfo.statusprocessingに設定し、create_crawl_messageメソッドを使用して各バッチのクロールされたページを返します。すべてのページがクロールされた後、WebSiteInfo.statuscompletedに設定します。

class YourDataSource(WebsiteCrawlDatasource):

    def _get_website_crawl(
        self, datasource_parameters: dict[str, Any]
    ) -> Generator[ToolInvokeMessage, None, None]:

        crawl_res = WebSiteInfo(web_info_list=[], status="", total=0, completed=0)
        crawl_res.status = "processing"
        yield self.create_crawl_message(crawl_res)

        ### your crawl logic
           ...
        crawl_res.status = "completed"
        crawl_res.web_info_list = [
            WebSiteInfoDetail(
                title="",
                source_url="",
                description="",
                content="",
            )
        ]
        crawl_res.total = 1
        crawl_res.completed = 1

        yield self.create_crawl_message(crawl_res)

オンラインドキュメント

オンラインドキュメントデータソースプラグインの戻り値には、ドキュメントの内容を表すcontentフィールドを少なくとも含める必要があります。例えば:

output_schema:
    type: object
    properties:
      workspace_id:
        type: string
        description: workspace id
      page_id:
        type: string
        description: page id
      content:
        type: string
        description: page content

オンラインドキュメントプラグインのメインロジックコードでは、クラスはOnlineDocumentDatasourceを継承し、2つのメソッドを実装する必要があります:_get_pages_get_content

ユーザーがプラグインを実行すると、まず_get_pagesメソッドを呼び出してドキュメントのリストを取得します。ユーザーがリストからドキュメントを選択した後、_get_contentメソッドを呼び出してドキュメントのコンテンツを取得します。

_get_pages:

```python
def _get_pages(self, datasource_parameters: dict[str, Any]) -> DatasourceGetPagesResponse:
    # your get pages logic
    response = requests.get(url, headers=headers, params=params, timeout=30)
    pages = []
    for item in  response.json().get("results", []):
        page = OnlineDocumentPage(
            page_name=item.get("title", ""),
            page_id=item.get("id", ""),
            type="page",  
            last_edited_time=item.get("version", {}).get("createdAt", ""),
            parent_id=item.get("parentId", ""),
            page_icon=None, 
        )
        pages.append(page)
    online_document_info = OnlineDocumentInfo(
        workspace_name=workspace_name,
        workspace_icon=workspace_icon,
        workspace_id=workspace_id,
        pages=[page],
        total=pages.length(),
    )
    return DatasourceGetPagesResponse(result=[online_document_info])
```

_get_content:

```python
def _get_content(self, page: GetOnlineDocumentPageContentRequest) -> Generator[DatasourceMessage, None, None]:
# your fetch content logic, example
response = requests.get(url, headers=headers, params=params, timeout=30)
...
yield self.create_variable_message("content", "")
yield self.create_variable_message("page_id", "")
yield self.create_variable_message("workspace_id", "")
```

オンラインドライブ

オンラインドライブデータソースプラグインはファイルを返すため、以下の仕様に準拠する必要があります:

output_schema:
    type: object
    properties:
      file:
        $ref: "https://flexai.com.tr/schemas/v1/file.json"

オンラインドライブプラグインのメインロジックコードでは、クラスはOnlineDriveDatasourceを継承し、2つのメソッドを実装する必要があります:_browse_files_download_file

ユーザーがプラグインを実行すると、まず_browse_filesを呼び出してファイルリストを取得します。この時点で、prefixは空であり、ルートディレクトリのファイルリストを要求していることを示します。ファイルリストにはフォルダとファイルタイプの変数が含まれています。ユーザーがフォルダを開くと、_browse_filesメソッドが再度呼び出されます。この時点で、OnlineDriveBrowseFilesRequestprefixはそのフォルダ内のファイルリストを取得するために使用されるフォルダIDになります。

ユーザーがファイルを選択した後、プラグインは_download_fileメソッドとファイルIDを使用してファイルのコンテンツを取得します。_get_mime_type_from_filenameメソッドを使用してファイルのMIMEタイプを取得でき、パイプラインが異なるファイルタイプを適切に処理できるようになります。

ファイルリストに複数のファイルが含まれている場合、OnlineDriveFileBucket.is_truncatedTrueに設定し、OnlineDriveFileBucket.next_page_parametersをファイルリストの次のページを取得するために必要なパラメータ(サービスプロバイダーに応じて次のページのリクエストIDやURLなど)に設定できます。

_browse_files:

```python
def _browse_files(
self, request: OnlineDriveBrowseFilesRequest
) -> OnlineDriveBrowseFilesResponse:

credentials = self.runtime.credentials
bucket_name = request.bucket
prefix = request.prefix or ""  # Allow empty prefix for root folder; When you browse the folder, the prefix is the folder id
max_keys = request.max_keys or 10
next_page_parameters = request.next_page_parameters or {}

files = []
files.append(OnlineDriveFile(
    id="", 
    name="", 
    size=0, 
    type="folder" # or "file"
))

return OnlineDriveBrowseFilesResponse(result=[
    OnlineDriveFileBucket(
        bucket="", 
        files=files, 
        is_truncated=False, 
        next_page_parameters={}
    )
])
```

_download_file:

```python
def _download_file(self, request: OnlineDriveDownloadFileRequest) -> Generator[DatasourceMessage, None, None]:
credentials = self.runtime.credentials
file_id = request.id

file_content = bytes()
file_name = ""

mime_type = self._get_mime_type_from_filename(file_name)

yield self.create_blob_message(file_content, meta={
    "file_name": file_name,
    "mime_type": mime_type
})

def _get_mime_type_from_filename(self, filename: str) -> str:
"""Determine MIME type from file extension."""
import mimetypes
mime_type, _ = mimetypes.guess_type(filename)
return mime_type or "application/octet-stream"
```

AWS S3のようなストレージサービスでは、prefixbucketid変数には特別な用途があり、開発中に必要に応じて柔軟に適用できます:

  • prefix:ファイルパスのプレフィックスを表します。例えば、prefix=container1/folder1/container1バケット内のfolder1フォルダからファイルまたはファイルリストを取得します。
  • bucket:ファイルバケットを表します。例えば、bucket=container1container1バケット内のファイルまたはファイルリストを取得します。このフィールドは、非標準S3プロトコルドライブでは空白のままにできます。
  • id_download_fileメソッドはprefix変数を使用しないため、完全なファイルパスをidに含める必要があります。例えば、id=container1/folder1/file1.txtcontainer1バケット内のfolder1フォルダからfile1.txtファイルを取得することを示します。

Tip:

公式Google Driveプラグイン公式AWS S3プラグインの具体的な実装を参照できます。

プラグインのデバッグ

データソースプラグインは、リモートデバッグまたはローカルプラグインとしてインストールしてデバッグする2つのデバッグ方法をサポートしています。以下の点に注意してください:

  • プラグインがOAuth認証を使用している場合、リモートデバッグのredirect_uriはローカルプラグインのものとは異なります。サービスプロバイダーのOAuth Appの関連設定を適宜更新してください。
  • データソースプラグインはシングルステップデバッグをサポートしていますが、完全な機能を確保するために、完全なナレッジパイプラインでテストすることをお勧めします。

最終チェック

パッケージ化と公開の前に、以下のすべてを完了していることを確認してください:

  • 最小サポートFlexAIバージョンを1.9.0に設定。
  • SDKバージョンをflexai-plugin>=0.5.0,<0.6.0に設定。
  • README.mdPRIVACY.mdファイルを作成。
  • コードファイルには英語のコンテンツのみを含める。
  • デフォルトアイコンをデータソースプロバイダーのロゴに置き換える。

パッケージ化と公開

プラグインディレクトリで以下のコマンドを実行して.difypkgプラグインパッケージを生成します:

dify plugin package . -o your_datasource.difypkg

次に、以下のことができます:

  • FlexAI環境にプラグインをインポートして使用する。
  • プルリクエストを送信してFlexAI Marketplaceにプラグインを公開する。

Info:

プラグイン公開プロセスについては、プラグインの公開を参照してください。

{/ Contributing Section DO NOT edit this section! It will be automatically generated by the script. /}


Edit this page | Report an issue