OpenSearchパイプライン

データ・プリッパーを使用してOpenSearchパイプラインを作成および管理し、OpenSearchクラスタにデータを取り込みます。

データ・プリッパーは、ダウンストリーム分析および可視化のためにデータをフィルタリング、エンリッチ、変換、正規化および集計できるオープン・ソース・データ・コレクタです。これは、大規模で複雑なデータセットを処理するための最も推奨されるデータ取り込みツールの1つです。

PULLモデルを使用して、OpenSearch 2.xクラスタにデータを取り込み、Oracle Cloud Infrastructure Streamingサービス、自己管理KafkaおよびObject Storageと統合します。
ノート

OpenSearch Cluster with Data Prepper機能は、現在OC1レルムで使用できます。

必要なポリシー

このトピックで説明するステップに進む前に、次のタスクを完了します。

テナンシの管理者以外の場合は、テナンシ管理者に連絡して、これらの権限を付与してください。管理者は、管理者以外のユーザーがパイプラインを管理およびCRUD操作できるように、次のユーザー権限を更新する必要があります

次のポリシーにより、管理者は各テナンシのすべてのユーザーに権限を付与できます:

Allow any-user to manage opensearch-cluster-pipeline in tenancy

次のポリシーにより、管理者はコンパートメント内のグループに権限を付与できます(推奨)

Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>

ここで、<group>は、そのグループ内のすべてのユーザーがリソースにアクセスできます。

次のポリシーにより、OpenSearchパイプラインはOracle Cloud Infrastructure Vaultからシークレットを読み取ることができます。

Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

オブジェクト・ストレージ・ポリシー

次のポリシーは、オブジェクト・ストレージ・ソースにのみ必要です:

次のポリシーを使用すると、OpenSearchパイプラインでオブジェクト・ストレージのバケットをソース・コーディネーションの永続性として使用できます。

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}

次のポリシーにより、OpenSearchパイプラインはオブジェクト・ストレージからオブジェクトを収集できます:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

次のポリシーにより、OpenSearchパイプラインはオブジェクト・ストレージからバケットを読み取ることができます:

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

次のポリシーにより、OpenSearchパイプラインはソース・コーディネーション・バケットからバケットを読み取ることができます。

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

OCIストリーミングおよび自己管理Kafkaポリシー

これらのポリシーは、OCIストリーミング・サービスまたは自己管理Kafkaソースに必要です。

一般的なネットワーク・ポリシー

ノート

これらのポリシーは、パブリックOCIストリーミング・サービスには必要ありません。

OpenSearchサービスがカスタマ・サブネット内のプライベート・エンドポイントを作成、読取り、更新できるようにするポリシーが追加されます。

Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>

OCIストリーミング・サービス・ポリシー(パブリックおよびプライベート)

これらのポリシーは、OCIストリーミング・サービスにのみ必要です。

次のポリシーにより、OpenSearchパイプラインはOCIストリーミング・サービスのレコードを消費できます。

Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>' 
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}

次のポリシーにより、OpenSearchパイプラインはOCIストリーミング・サービスからストリーム・プールを読み取ることができます

Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline', 
target.streampool.id = '<target-stream-pool-ocid>'}

自己管理Kafka権限

これらの権限は、自己管理Kafkaソースにのみ必要です。

次のリンクを選択して、トピックのリスト、トピックの説明、グループへの参加およびOpenSearchパイプラインによるトピックからのレコードの消費に必要な権限を追加します:

https://kafka.apache.org/documentation/#security_authz

ネットワーク・セキュリティ・ルール

この構成は、プライベートOCIストリーミング・サービスおよび自己管理Kafkaにのみ必要です。パブリックOCIストリーミング・サービスの場合は、「なし」を選択します。

サブネットまたはネットワーク・セキュリティ・グループのセキュリティ・リストにイングレス・セキュリティ・ルールをすべてのOpenSearchパイプラインに追加して、サブネットで実行されているプライベートOCIストリーミング・サービスと通信します。

セキュリティ・ルールを追加するには、セキュリティ・ルールおよびアクセスとセキュリティを参照してください。

サブネットのCIDRを検索するには、サブネットの詳細の取得を参照してください。

次の図は、ネットワーク・セキュリティ・グループのイングレス・ルールを示しています。

ネットワーク・セキュリティ・グループのイングレス・ルール

Vaultでのシークレットの作成

OpenSearchパイプラインは、パイプラインyamlコード内のユーザー名やパスワードなどのプレーン・テキスト・シークレットを受け入れないため、OpenSearchパイプラインで必要なすべてのプレーン・テキスト・シークレットをVaultを介して渡す必要があります。

次のタスクを実行します。

  • OpenSearchクラスタに書込み権限を持つ新しいユーザーを作成します。手順については、次のOpenSearchドキュメントのトピックを参照してください。

    デフォルトのアクショングループ

    ユーザーおよびロール

    OpenSearch IAMポリシーを使用した検索

  • 作成した新しいユーザーのシークレット(ユーザー名とパスワード)をVaultに作成します。手順については、次のOracle Cloud Infrastructure Vaultのトピックを参照してください:

    ボールト・シークレットの管理

    Vaultでのシークレットの作成

  • OpenSearchパイプラインがVaultからシークレットを読み取ることができるように、OpenSearchテナンシに次のリソース・プリンシパル・ポリシーを追加します。
    ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } 
    ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

    Vaultのポリシーの作成の詳細は、Vaultサービスの詳細を参照してください。

次のOpenSearchパイプライン・タスクを実行できます。

コンパートメント内のOpenSearchパイプラインをリストします

新しいOpenSearchパイプラインを作成します

OpenSearchパイプラインの詳細を取得します

OpenSearchパイプラインの設定を編集します

テナンシからOpenSearchパイプラインを削除します

サポートされるプロセッサ

オブジェクト・ストレージおよびソース・コーディネーションYAML

オブジェクト・ストレージ・ソースは、ソース調整構成に依存します。OpenSearchパイプラインは、オブジェクト・ストレージを永続性として使用するソース調整をサポートします。テナンシからオブジェクト・ストレージ・バケットの詳細を指定する必要があります。

次に、ソース調整の例を示します。

source_coordination:
  store:
    oci-object-bucket:
      name: <OCI Object storage bucket-name details from their tenancy>
      namespace: <namespace>

テナンシのオブジェクト・ストレージ・ネームスペースを取得する方法の詳細は、オブジェクト・ストレージ・ネームスペースの理解を参照してください。

次に、オブジェクト・ストレージ永続性を使用したソース調整の例を示します。

source_coordination:
  store:
    oci-object-bucket:
      name: "dataprepper-test-pipelines" <-- bucket name
      namespace: "idee4xpu3dvm".         <-- namespace

オブジェクト・ストレージYAML

パイプライン構成YAMLの注意事項は次のとおりです。

  • OCIシークレット: OpenSearchクラスタ資格証明を使用してVaultにシークレットを作成し、パイプラインYAMLでOpenSearchクラスタに接続するために使用できます。
  • OpenSearchシンク: シンクには、取込み用の索引名を含むOpenSearchクラスタOCIDsが含まれます。
  • oci-object source: データ・プリッパーでは、多数の構成がサポートされるオブジェクト・ストレージを使用したスキャンベースの取込みがサポートされます。オブジェクト・ストレージ・バケット内のオブジェクトをスケジュールされた頻度に基づいて取り込むか、スケジュールに基づかないかに基づいて取り込むようにソースを構成できます。次のスキャン・オプションがあります
    • 1つ以上の時間スキャン: このオプションを使用すると、オブジェクトの最終変更時間に基づいてオブジェクト・ストレージ・バケット内のオブジェクトを1回以上読み取るパイプラインを構成できます。
    • スキャンをスケジュール: このオプションを使用すると、パイプラインの作成後に定期的にスキャンをスケジュールできます。

次の表に、オブジェクト・ストレージ・ソースの構成に使用できるオプションを示します。

オブジェクト・ストレージ構成
オプション 必須 タイプ 摘要
acknowledgments いいえ ブール trueの場合、OpenSearchシンクによってイベントが受信されたときに、オブジェクト・ストレージ・オブジェクト・ソースがエンドツーエンドの確認を受信できるようにします。
buffer_timeout いいえ 期間 タイムアウトが発生するまでのデータ・プリッパー・バッファへのイベントの書込みに許可される時間。指定した時間内にOCIソースがバッファに書き込めないイベントは破棄されます。デフォルトは10sです。
codec はい コーデック 適用するdata-prepperの codec
compression いいえ 文字列 適用する圧縮アルゴリズム: nonegzipsnappyまたはautomatic。デフォルトはnoneです。
delete_oci_objects_on_read いいえ ブール trueの場合、オブジェクト・ストレージ・ソース・スキャンでは、オブジェクト・ストレージ・オブジェクトからのすべてのイベントがすべてのシンクによって正常に確認された後、オブジェクト・ストレージ・オブジェクトの削除が試行されます。オブジェクト・ストレージ・オブジェクトを削除するときは、acknowledgmentsを有効にする必要があります。デフォルトはfalseです。エンドツーエンドのacknowledgmentsが有効になっていない場合、削除は機能しません。
oci いいえ OCI OCI構成。詳細については、次のOCIの項を参照してください。
records_to_accumulate いいえ 整数 バッファに書き込まれる前に蓄積されるメッセージの数。デフォルトは100です。
workers いいえ 整数 ソースがOCIバケットからデータを読み取るために使用するワーカー・スレッドの数を構成します。オブジェクト・ストレージ・オブジェクトが1MB未満の場合を除き、この値はデフォルトのままにします。より大きなオブジェクト・ストレージ・オブジェクトのパフォーマンスが低下する場合があります。デフォルトは1です。

オブジェクト・ストレージ・パイプライン構成YAML

オブジェクト・ストレージ・パイプライン構成YAMLの例を次に示します:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username: 
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      compression: none
      scan:
        start_time: 2024-11-18T08:01:59.363Z
        buckets:
          - bucket:
              namespace: <namespace>
              name: <bucket-name>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

構成例

個々のObject Storageバケット・レベルまたはスキャン・レベルで適用できる1回限りのスキャン・オプションを次に示します

oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "<namespace>"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-01-01T00:00:00Z
      compression: "none"

終了時間

終了時間の例を次に示します。


simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

開始時刻と終了時刻

開始時間と終了時間の例を次に示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

レンジ

次に、範囲の例を示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               range: "PT12H"
      compression: "none"

開始時間、終了時間および範囲

開始時間、終了時間および範囲の例を次に示します。

oci-object:
      codec:
        newline:
      scan:
        start_time: 2023-01-01T00:00:00Z
        end_time: 2024-12-01T00:00:00Z
        range: "PT12H"
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"

include_prefixフィルタ

include_prefixフィルタの例を次に示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest1", "10-05-2024"]
      compression: "none"

フォルダからのファイルのフィルタ

次に、フォルダのフィルタ・ファイルの例を示します。特定のフォルダからのみファイルを読み取るには、フィルタを使用してフォルダを指定します。次に、include_prefixを使用して、folder1内のfolder2からのファイルを含める例を示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                include_prefix: ["folder1/folder2"]
      compression: "none"

exclude_prefixフィルタ

exclude_prefixフィルタの例を次に示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest", "10-05-2024"]
                 exclude_suffix: [".png"]
      compression: "none"

JSONのコーデックのサポート

次に、JSONのコーデック・サポートの例を示します:

source:
    oci-object:
      acknowledgments: true
      codec:
        json: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

CSVに対するコーデックのサポート

次に、CSVのコーデック・サポートの例を示します。


source:
    oci-object:
      acknowledgments: true
      codec:
        csv: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Newlineでのコーデックのサポート

改行のコーデックサポートの例を次に示します。

source:
    oci-object:
      acknowledgments: true
      codec:
        newline: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

棚卸なしの取込みオプションのスケジュール

次に、カウントなしで取り込みオプションをスケジュールする例を示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

棚卸による取込みオプションのスケジュール

次に、カウントを含む取込みオプションのスケジュールの例を示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
          count: 10
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

開始時間による取込みオプションのスケジューリング

開始時間を使用して取り込みオプションをスケジュールする例を次に示します。


oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        start_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

終了時間のある取込みオプションのスケジューリング

次に、終了時間を含む取込みオプションのスケジュールの例を示します。

oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        end_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Kafka YAML

Kafkaソースにソース調整は必要ありません。

Kafkaソースで使用可能なすべての構成の詳細は、次のリンクにアクセスしてください:

https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/

OCIストリーミング・サービスをKafkaソースとして使用して、OpenSearchクラスタに取り込むことができます。この実行方法の詳細は、Kafka APIの使用を参照してください。

OCIストリーミング・パブリック・アクセスYAML

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

パイプラインOCIストリーミング・サービス・プライベート・アクセスYAML

OCIストリーミング・サービスのOpenSearchパイプラインのYAMLの例を次に示します:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

自己管理Kafka YAML

次に、OpenSearchの自己管理Kafka YAMLの例を示します:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
      kafka-credentials:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - "https://<bootstrap_server_fqdn>:9092"
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
        certificate: <certificate-in-pem-format>
      authentication:
        sasl:
          plaintext:
            username: ${{oci_secrets:kafka-credentials:username}}
            password: ${{oci_secrets:kafka-credentials:password}}
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>