OpenSearchパイプライン
データ・プリッパーを使用してOpenSearchパイプラインを作成および管理し、OpenSearchクラスタにデータを取り込みます。
データ・プリッパーは、ダウンストリーム分析および可視化のためにデータをフィルタリング、エンリッチ、変換、正規化および集計できるオープン・ソース・データ・コレクタです。これは、大規模で複雑なデータセットを処理するための最も推奨されるデータ取り込みツールの1つです。
OpenSearch Cluster with Data Prepper機能は、現在OC1レルムで使用できます。
必要なポリシー
このトピックで説明するステップに進む前に、次のタスクを完了します。
テナンシの管理者以外の場合は、テナンシ管理者に連絡して、これらの権限を付与してください。管理者は、管理者以外のユーザーがパイプラインを管理およびCRUD操作できるように、次のユーザー権限を更新する必要があります
次のポリシーにより、管理者は各テナンシのすべてのユーザーに権限を付与できます:
Allow any-user to manage opensearch-cluster-pipeline in tenancy
次のポリシーにより、管理者はコンパートメント内のグループに権限を付与できます(推奨)
Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
ここで、<group>
は、そのグループ内のすべてのユーザーがリソースにアクセスできます。
次のポリシーにより、OpenSearchパイプラインはOracle Cloud Infrastructure Vaultからシークレットを読み取ることができます。
Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
オブジェクト・ストレージ・ポリシー
次のポリシーは、オブジェクト・ストレージ・ソースにのみ必要です:
次のポリシーを使用すると、OpenSearchパイプラインでオブジェクト・ストレージのバケットをソース・コーディネーションの永続性として使用できます。
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}
次のポリシーにより、OpenSearchパイプラインはオブジェクト・ストレージからオブジェクトを収集できます:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
次のポリシーにより、OpenSearchパイプラインはオブジェクト・ストレージからバケットを読み取ることができます:
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
次のポリシーにより、OpenSearchパイプラインはソース・コーディネーション・バケットからバケットを読み取ることができます。
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
OCIストリーミングおよび自己管理Kafkaポリシー
これらのポリシーは、OCIストリーミング・サービスまたは自己管理Kafkaソースに必要です。
一般的なネットワーク・ポリシー
これらのポリシーは、パブリックOCIストリーミング・サービスには必要ありません。
OpenSearchサービスがカスタマ・サブネット内のプライベート・エンドポイントを作成、読取り、更新できるようにするポリシーが追加されます。
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
OCIストリーミング・サービス・ポリシー(パブリックおよびプライベート)
これらのポリシーは、OCIストリーミング・サービスにのみ必要です。
次のポリシーにより、OpenSearchパイプラインはOCIストリーミング・サービスのレコードを消費できます。
Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>'
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}
次のポリシーにより、OpenSearchパイプラインはOCIストリーミング・サービスからストリーム・プールを読み取ることができます
Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline',
target.streampool.id = '<target-stream-pool-ocid>'}
自己管理Kafka権限
これらの権限は、自己管理Kafkaソースにのみ必要です。
次のリンクを選択して、トピックのリスト、トピックの説明、グループへの参加およびOpenSearchパイプラインによるトピックからのレコードの消費に必要な権限を追加します:
ネットワーク・セキュリティ・ルール
この構成は、プライベートOCIストリーミング・サービスおよび自己管理Kafkaにのみ必要です。パブリックOCIストリーミング・サービスの場合は、「なし」を選択します。
サブネットまたはネットワーク・セキュリティ・グループのセキュリティ・リストにイングレス・セキュリティ・ルールをすべてのOpenSearchパイプラインに追加して、サブネットで実行されているプライベートOCIストリーミング・サービスと通信します。
セキュリティ・ルールを追加するには、セキュリティ・ルールおよびアクセスとセキュリティを参照してください。
サブネットのCIDRを検索するには、サブネットの詳細の取得を参照してください。
次の図は、ネットワーク・セキュリティ・グループのイングレス・ルールを示しています。

Vaultでのシークレットの作成
OpenSearchパイプラインは、パイプラインyamlコード内のユーザー名やパスワードなどのプレーン・テキスト・シークレットを受け入れないため、OpenSearchパイプラインで必要なすべてのプレーン・テキスト・シークレットをVaultを介して渡す必要があります。
次のタスクを実行します。
- OpenSearchクラスタに書込み権限を持つ新しいユーザーを作成します。手順については、次のOpenSearchドキュメントのトピックを参照してください。
- 作成した新しいユーザーのシークレット(ユーザー名とパスワード)をVaultに作成します。手順については、次のOracle Cloud Infrastructure Vaultのトピックを参照してください:
- OpenSearchパイプラインがVaultからシークレットを読み取ることができるように、OpenSearchテナンシに次のリソース・プリンシパル・ポリシーを追加します。
ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Vaultのポリシーの作成の詳細は、Vaultサービスの詳細を参照してください。
次のOpenSearchパイプライン・タスクを実行できます。
サポートされるプロセッサ
次の表に、パイプラインでサポートされているプロセッサを示します。
オブジェクト・ストレージおよびソース・コーディネーションYAML
オブジェクト・ストレージ・ソースは、ソース調整構成に依存します。OpenSearchパイプラインは、オブジェクト・ストレージを永続性として使用するソース調整をサポートします。テナンシからオブジェクト・ストレージ・バケットの詳細を指定する必要があります。
次に、ソース調整の例を示します。
source_coordination:
store:
oci-object-bucket:
name: <OCI Object storage bucket-name details from their tenancy>
namespace: <namespace>
テナンシのオブジェクト・ストレージ・ネームスペースを取得する方法の詳細は、オブジェクト・ストレージ・ネームスペースの理解を参照してください。
次に、オブジェクト・ストレージ永続性を使用したソース調整の例を示します。
source_coordination:
store:
oci-object-bucket:
name: "dataprepper-test-pipelines" <-- bucket name
namespace: "idee4xpu3dvm". <-- namespace
オブジェクト・ストレージYAML
パイプライン構成YAMLの注意事項は次のとおりです。
- OCIシークレット: OpenSearchクラスタ資格証明を使用してVaultにシークレットを作成し、パイプラインYAMLでOpenSearchクラスタに接続するために使用できます。
- OpenSearchシンク: シンクには、取込み用の索引名を含むOpenSearchクラスタOCIDsが含まれます。
- oci-object source: データ・プリッパーでは、多数の構成がサポートされるオブジェクト・ストレージを使用したスキャンベースの取込みがサポートされます。オブジェクト・ストレージ・バケット内のオブジェクトをスケジュールされた頻度に基づいて取り込むか、スケジュールに基づかないかに基づいて取り込むようにソースを構成できます。次のスキャン・オプションがあります
- 1つ以上の時間スキャン: このオプションを使用すると、オブジェクトの最終変更時間に基づいてオブジェクト・ストレージ・バケット内のオブジェクトを1回以上読み取るパイプラインを構成できます。
- スキャンをスケジュール: このオプションを使用すると、パイプラインの作成後に定期的にスキャンをスケジュールできます。
次の表に、オブジェクト・ストレージ・ソースの構成に使用できるオプションを示します。
オプション | 必須 | タイプ | 摘要 |
---|---|---|---|
acknowledgments |
いいえ | ブール | true の場合、OpenSearchシンクによってイベントが受信されたときに、オブジェクト・ストレージ・オブジェクト・ソースがエンドツーエンドの確認を受信できるようにします。 |
buffer_timeout |
いいえ | 期間 | タイムアウトが発生するまでのデータ・プリッパー・バッファへのイベントの書込みに許可される時間。指定した時間内にOCIソースがバッファに書き込めないイベントは破棄されます。デフォルトは10s です。 |
codec |
はい | コーデック | 適用するdata-prepperの codec。 |
compression |
いいえ | 文字列 | 適用する圧縮アルゴリズム: none 、gzip 、snappy またはautomatic 。デフォルトはnone です。 |
delete_oci_objects_on_read |
いいえ | ブール | true の場合、オブジェクト・ストレージ・ソース・スキャンでは、オブジェクト・ストレージ・オブジェクトからのすべてのイベントがすべてのシンクによって正常に確認された後、オブジェクト・ストレージ・オブジェクトの削除が試行されます。オブジェクト・ストレージ・オブジェクトを削除するときは、acknowledgments を有効にする必要があります。デフォルトはfalse です。エンドツーエンドのacknowledgments が有効になっていない場合、削除は機能しません。 |
oci |
いいえ | OCI | OCI構成。詳細については、次のOCIの項を参照してください。 |
records_to_accumulate |
いいえ | 整数 | バッファに書き込まれる前に蓄積されるメッセージの数。デフォルトは100 です。 |
workers |
いいえ | 整数 | ソースがOCIバケットからデータを読み取るために使用するワーカー・スレッドの数を構成します。オブジェクト・ストレージ・オブジェクトが1MB未満の場合を除き、この値はデフォルトのままにします。より大きなオブジェクト・ストレージ・オブジェクトのパフォーマンスが低下する場合があります。デフォルトは1 です。 |
オブジェクト・ストレージ・パイプライン構成YAML
オブジェクト・ストレージ・パイプライン構成YAMLの例を次に示します:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
compression: none
scan:
start_time: 2024-11-18T08:01:59.363Z
buckets:
- bucket:
namespace: <namespace>
name: <bucket-name>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
構成例
個々のObject Storageバケット・レベルまたはスキャン・レベルで適用できる1回限りのスキャン・オプションを次に示します
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "<namespace>"
name: "data-prepper-object-storage-testing"
start_time: 2023-01-01T00:00:00Z
compression: "none"
終了時間
終了時間の例を次に示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
end_time: 2024-12-01T00:00:00Z
compression: "none"
開始時刻と終了時刻
開始時間と終了時間の例を次に示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
compression: "none"
レンジ
次に、範囲の例を示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
range: "PT12H"
compression: "none"
開始時間、終了時間および範囲
開始時間、終了時間および範囲の例を次に示します。
oci-object:
codec:
newline:
scan:
start_time: 2023-01-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
range: "PT12H"
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
include_prefix
フィルタ
include_prefix
フィルタの例を次に示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest1", "10-05-2024"]
compression: "none"
フォルダからのファイルのフィルタ
次に、フォルダのフィルタ・ファイルの例を示します。特定のフォルダからのみファイルを読み取るには、フィルタを使用してフォルダを指定します。次に、include_prefix
を使用して、folder1内のfolder2からのファイルを含める例を示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["folder1/folder2"]
compression: "none"
exclude_prefix
フィルタ
exclude_prefix
フィルタの例を次に示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest", "10-05-2024"]
exclude_suffix: [".png"]
compression: "none"
JSONのコーデックのサポート
次に、JSONのコーデック・サポートの例を示します:
source:
oci-object:
acknowledgments: true
codec:
json: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
CSVに対するコーデックのサポート
次に、CSVのコーデック・サポートの例を示します。
source:
oci-object:
acknowledgments: true
codec:
csv: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Newlineでのコーデックのサポート
改行のコーデックサポートの例を次に示します。
source:
oci-object:
acknowledgments: true
codec:
newline: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
棚卸なしの取込みオプションのスケジュール
次に、カウントなしで取り込みオプションをスケジュールする例を示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
棚卸による取込みオプションのスケジュール
次に、カウントを含む取込みオプションのスケジュールの例を示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
count: 10
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
開始時間による取込みオプションのスケジューリング
開始時間を使用して取り込みオプションをスケジュールする例を次に示します。
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
start_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
終了時間のある取込みオプションのスケジューリング
次に、終了時間を含む取込みオプションのスケジュールの例を示します。
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
end_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Kafka YAML
Kafkaソースにソース調整は必要ありません。
Kafkaソースで使用可能なすべての構成の詳細は、次のリンクにアクセスしてください:
https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/
OCIストリーミング・サービスをKafkaソースとして使用して、OpenSearchクラスタに取り込むことができます。この実行方法の詳細は、Kafka APIの使用を参照してください。
OCIストリーミング・パブリック・アクセスYAML
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
パイプラインOCIストリーミング・サービス・プライベート・アクセスYAML
OCIストリーミング・サービスのOpenSearchパイプラインのYAMLの例を次に示します:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
自己管理Kafka YAML
次に、OpenSearchの自己管理Kafka YAMLの例を示します:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-credentials:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
kafka:
bootstrap_servers:
- "https://<bootstrap_server_fqdn>:9092"
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
certificate: <certificate-in-pem-format>
authentication:
sasl:
plaintext:
username: ${{oci_secrets:kafka-credentials:username}}
password: ${{oci_secrets:kafka-credentials:password}}
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>