OpenSearchパイプライン

データ・プリッパーを使用してOpenSearchパイプラインを作成および管理し、データをOpenSearchクラスタに取り込みます。

データ・プリッパーは、ダウンストリーム分析および可視化のためにデータをフィルタ、エンリッチ、変換、正規化および集計できるオープン・ソース・データ・コレクタです。これは、大規模で複雑なデータセットを処理するための最も推奨されるデータ取込みツールの1つです。

PULLモデルを使用して、OpenSearch 2.xクラスタにデータを取り込み、Oracle Cloud Infrastructure Streamingサービス、自己管理型のKafkaおよびObject Storageと統合します。
ノート

OpenSearchデータ・プリッパー付きクラスタ機能は、現在OC1レルムで使用できます。

必要なポリシー

このトピックで説明するステップに進む前に、次のタスクを完了してください。

テナンシの管理者以外の場合は、テナンシ管理者に連絡して、これらの権限を付与してください。管理者は、管理者以外のユーザーがパイプラインを管理およびCRUD操作できるように、次のユーザー権限を更新する必要があります

次のポリシーにより、管理者は各テナンシ内のすべてのユーザーに権限を付与できます:

Allow any-user to manage opensearch-cluster-pipeline in tenancy

次のポリシーにより、管理者はコンパートメント内のグループに対する権限を付与できます(推奨)。

Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>

ここで、<group>は、そのグループ内のすべてのユーザーがリソースにアクセスできます。

次のポリシーでは、OpenSearchパイプラインがOracle Cloud Infrastructure Vaultからシークレットを読み取ることができます。

Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

オブジェクト・ストレージ・ポリシー

次のポリシーは、オブジェクト・ストレージ・ソースにのみ必要です:

次のポリシーにより、OpenSearchパイプラインは、オブジェクト・ストレージのバケットをソース・コーディネーションの永続性として使用できます:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}

次のポリシーにより、OpenSearchパイプラインはオブジェクト・ストレージからオブジェクトを収集できます:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

次のポリシーにより、OpenSearchパイプラインはオブジェクト・ストレージからバケットを読み取ることができます:

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

次のポリシーにより、OpenSearchパイプラインはソース・コーディネーション・バケットからバケットを読み取ることができます。

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

OCIストリーミングおよび自己管理のKafkaポリシー

これらのポリシーは、OCIストリーミング・サービスまたは自己管理Kafkaソースに必要です。

共通ネットワークポリシー

ノート

これらのポリシーは、パブリックOCIストリーミング・サービスには必要ありません。

OpenSearchサービスがカスタマ・サブネット内のプライベート・エンドポイントを作成、読取り、更新できるようにするために追加するポリシー。

Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>

OCIストリーミング・サービスのポリシー(パブリックおよびプライベート)

これらのポリシーは、OCIストリーミング・サービスにのみ必要です。

次のポリシーは、OpenSearchパイプラインがOCIストリーミング・サービスからレコードを使用することを許可します。

Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>' 
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}

次のポリシーは、OpenSearchパイプラインがOCIストリーミング・サービスからストリーム・プールを読み取ることを許可します

Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline', 
target.streampool.id = '<target-stream-pool-ocid>'}

自己管理のKafka権限

これらの権限は、自己管理Kafkaソースにのみ必要です。

次のリンクを選択して、トピックのリスト、トピックの説明、グループへの参加、およびOpenSearchパイプラインによるトピックからのレコードの消費に必要な権限を追加します:

https://kafka.apache.org/documentation/#security_authz

ネットワーク・セキュリティ・ルール

この構成は、プライベートOCIストリーミング・サービスおよび自己管理Kafkaでのみ必要です。パブリックOCIストリーミング・サービスの場合は、「なし」を選択します。

サブネットまたはネットワーク・セキュリティ・グループのセキュリティ・リストにイングレス・セキュリティ・ルールをすべてのOpenSearchパイプラインに追加して、サブネットで実行されているプライベートOCIストリーミング・サービスと通信します。

セキュリティ・ルールを追加するには、セキュリティ・ルールおよびアクセスおよびセキュリティを参照してください。

サブネットのCIDRを検索するには、サブネットの詳細の取得を参照してください。

次の図は、ネットワーク・セキュリティ・グループのイングレス・ルールを示しています。

ネットワーク・セキュリティ・グループのイングレス・ルール

Vaultでのシークレットの作成

OpenSearchパイプラインはパイプラインyamlコード内のユーザー名やパスワードなどのプレーン・テキスト・シークレットを受け入れないため、OpenSearchパイプラインで必要なすべてのプレーン・テキスト・シークレットをVaultを介して渡す必要があります。

次のタスクを実行します。

  • OpenSearchクラスタに、書込み権限を持つ新しいユーザーを作成します。手順については、次のOpenSearchドキュメント・トピックを参照してください。

    デフォルトのアクション・グループ

    ユーザーおよびロール

    OpenSearch IAMポリシーを使用した検索

  • 作成した新しいユーザーのシークレット(ユーザー名とパスワード)をVaultに作成します。手順については、次のOracle Cloud Infrastructure Vaultのトピックを参照してください:

    ボールト・シークレットの管理

    Vaultでのシークレットの作成

  • OpenSearchテナンシに次のリソース・プリンシパル・ポリシーを追加して、OpenSearchパイプラインがVaultからシークレットを読み取れるようにします。

    ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } 
    ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

    Vaultのポリシーの作成の詳細は、Vaultサービスの詳細を参照してください。

次のOpenSearchパイプライン・タスクを実行できます:

コンパートメント内のOpenSearchパイプラインをリストします

新しいOpenSearchパイプラインを作成します

OpenSearchパイプラインの詳細の取得

OpenSearchパイプラインの設定を編集します

テナンシからOpenSearchパイプラインを削除します

サポートされているプロセッサ

オブジェクト・ストレージおよびソース・コーディネーションYAML

オブジェクト・ストレージ・ソースは、ソース・コーディネーション構成に依存します。OpenSearchパイプラインは、オブジェクト・ストレージを永続性として使用するソース・コーディネーションをサポートします。テナンシからオブジェクト・ストレージ・バケットの詳細を指定する必要があります。

次に、ソース調整の例を示します。

source_coordination:
  store:
    oci-object-bucket:
      name: <OCI Object storage bucket-name details from their tenancy>
      namespace: <namespace>

テナンシのオブジェクト・ストレージ・ネームスペースを取得する方法の詳細は、オブジェクト・ストレージ・ネームスペースの理解を参照してください。

次に、オブジェクト・ストレージ永続性を使用したソース・コーディネーションの例を示します:

source_coordination:
  store:
    oci-object-bucket:
      name: "dataprepper-test-pipelines" <-- bucket name
      namespace: "idee4xpu3dvm".         <-- namespace

オブジェクト・ストレージYAML

次に示すのは、パイプライン構成YAMLの注意事項です。

  • OCIシークレット: OpenSearchクラスタ資格証明を使用してVaultにシークレットを作成し、パイプラインYAMLでOpenSearchクラスタに接続するために使用できます。

  • OpenSearchシンク: シンクには、取込み用の索引名を含むOpenSearchクラスタOCIDsが含まれます。

  • oci-object source: データ・プリッパーは、多くの構成がサポートされるオブジェクト・ストレージを使用したスキャンベースの取込みをサポートしています。スケジュールされた頻度に基づいて、またはスケジュールに基づいてではなく、オブジェクト・ストレージ・バケット内のオブジェクトを取り込むようにソースを構成できます。次のスキャン・オプションがあります

    • 1つ以上の時間スキャン: このオプションを使用すると、オブジェクトの最終変更時間に基づいてオブジェクト・ストレージ・バケット内のオブジェクトを1回以上読み取るパイプラインを構成できます。

    • スケジューリング・ベース・スキャン: このオプションを使用すると、パイプラインの作成後に一定の間隔でスキャンをスケジュールできます。

次の表に、オブジェクト・ストレージ・ソースの構成に使用できるオプションを示します。

オブジェクト・ストレージの構成
オプション 必須 タイプ 内容
acknowledgments No Boolean trueの場合、オブジェクト・ストレージ・オブジェクト・ソースは、イベントがOpenSearchシンクによって受信されたときにエンドツーエンドの確認を受け取ることができます。
buffer_timeout No 継続時間 タイムアウトが発生するまでのデータ・プリッパー・バッファへのイベントの書込みに許可される時間。指定した時間内にOCIソースがバッファに書き込めないイベントは破棄されます。デフォルトは10sです。
codec Yes コーデック 適用するデータプリッパーの codec
compression No 文字列 適用する圧縮アルゴリズム: nonegzipsnappyまたはautomatic。デフォルトはnoneです。
delete_oci_objects_on_read No Boolean trueの場合、オブジェクト・ストレージ・ソース・スキャンは、オブジェクト・ストレージ・オブジェクトからのすべてのイベントがすべてのシンクによって正常に確認された後に、オブジェクト・ストレージ・オブジェクトの削除を試みます。オブジェクト・ストレージ・オブジェクトを削除する場合は、acknowledgmentsを有効にする必要があります。デフォルトはfalseエンドツーエンドのacknowledgmentsが有効になっていない場合、削除は機能しません。
oci No OCI OCI構成。詳細は、次のOCIの項を参照してください。
records_to_accumulate No Integer バッファに書き込まれる前に蓄積されるメッセージの数。デフォルトは100です。
workers No Integer ソースがOCIバケットからのデータの読取りに使用するワーカー・スレッドの数を構成します。オブジェクト・ストレージ・オブジェクトが1MB未満でないかぎり、この値をデフォルトのままにします。大きいオブジェクト・ストレージ・オブジェクトのパフォーマンスが低下する可能性があります。デフォルトは1です。

次に、オブジェクト・ストレージ・パイプライン構成YAMLの例を示します:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username: 
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      compression: none
      scan:
        start_time: 2024-11-18T08:01:59.363Z
        buckets:
          - bucket:
              namespace: <namespace>
              name: <bucket-name>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

構成例

個々のオブジェクト・ストレージ・バケット・レベルまたはスキャン・レベルで適用できるワンタイム・スキャン・オプションは次のとおりです

oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "<namespace>"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-01-01T00:00:00Z
      compression: "none"

次に、終了時間の例を示します。


simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

開始時刻と終了時刻の例を次に示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

範囲の例を次に示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               range: "PT12H"
      compression: "none"

開始時間、終了時間および範囲の例を次に示します。

oci-object:
      codec:
        newline:
      scan:
        start_time: 2023-01-01T00:00:00Z
        end_time: 2024-12-01T00:00:00Z
        range: "PT12H"
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"

include_prefixフィルタの例を次に示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest1", "10-05-2024"]
      compression: "none"

次に、フォルダからのフィルタ・ファイルの例を示します。特定のフォルダからのみファイルを読み取るには、フィルタを使用してフォルダを指定します。次に、include_prefixを使用してfolder1内にfolder2からファイルをインクルードする例を示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                include_prefix: ["folder1/folder2"]
      compression: "none"

exclude_prefixフィルタの例を次に示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest", "10-05-2024"]
                 exclude_suffix: [".png"]
      compression: "none"

JSONのコーデック・サポートの例を次に示します。

source:
    oci-object:
      acknowledgments: true
      codec:
        json: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

CSVのコーデック・サポートの例を次に示します。


source:
    oci-object:
      acknowledgments: true
      codec:
        csv: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

次に、改行のコーデック・サポートの例を示します。

source:
    oci-object:
      acknowledgments: true
      codec:
        newline: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

次に、カウントなしの収集オプションのスケジュールの例を示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

次に、カウントを使用した収集オプションのスケジュールの例を示します。

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
          count: 10
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

開始時間を含む収集オプションのスケジューリングの例を次に示します:


oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        start_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

次に、終了時間を含む収集オプションのスケジューリングの例を示します:

oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        end_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Kafka YAML

Kafkaソースにはソース調整は必要ありません。

Kafkaソースで使用可能なすべての構成の詳細は、次のリンクにアクセスしてください:

https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/

OCIストリーミング・サービスをKafkaソースとして使用して、OpenSearchクラスタに取り込むことができます。この実行方法の詳細は、Kafka APIの使用を参照してください。

OCIストリーミング・パブリック・アクセスYAML

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

パイプラインOCIストリーミング・サービス・プライベート・アクセスYAML

次に、OCIストリーミング・サービスのOpenSearchパイプラインのYAMLの例を示します:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

自己管理のKafka YAML

次に、OpenSearchの自己管理Kafka YAMLの例を示します:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
      kafka-credentials:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - "https://<bootstrap_server_fqdn>:9092"
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
        certificate: <certificate-in-pem-format>
      authentication:
        sasl:
          plaintext:
            username: ${{oci_secrets:kafka-credentials:username}}
            password: ${{oci_secrets:kafka-credentials:password}}
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>