OpenSearchパイプライン
データ・プリッパーを使用してOpenSearchパイプラインを作成および管理し、データをOpenSearchクラスタに取り込みます。
データ・プリッパーは、ダウンストリーム分析および可視化のためにデータをフィルタ、エンリッチ、変換、正規化および集計できるオープン・ソース・データ・コレクタです。これは、大規模で複雑なデータセットを処理するための最も推奨されるデータ取込みツールの1つです。
OpenSearchデータ・プリッパー付きクラスタ機能は、現在OC1レルムで使用できます。
必要なポリシー
このトピックで説明するステップに進む前に、次のタスクを完了してください。
テナンシの管理者以外の場合は、テナンシ管理者に連絡して、これらの権限を付与してください。管理者は、管理者以外のユーザーがパイプラインを管理およびCRUD操作できるように、次のユーザー権限を更新する必要があります
次のポリシーにより、管理者は各テナンシ内のすべてのユーザーに権限を付与できます:
Allow any-user to manage opensearch-cluster-pipeline in tenancy
次のポリシーにより、管理者はコンパートメント内のグループに対する権限を付与できます(推奨)。
Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
ここで、<group>
は、そのグループ内のすべてのユーザーがリソースにアクセスできます。
次のポリシーでは、OpenSearchパイプラインがOracle Cloud Infrastructure Vaultからシークレットを読み取ることができます。
Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
オブジェクト・ストレージ・ポリシー
次のポリシーは、オブジェクト・ストレージ・ソースにのみ必要です:
次のポリシーにより、OpenSearchパイプラインは、オブジェクト・ストレージのバケットをソース・コーディネーションの永続性として使用できます:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}
次のポリシーにより、OpenSearchパイプラインはオブジェクト・ストレージからオブジェクトを収集できます:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
次のポリシーにより、OpenSearchパイプラインはオブジェクト・ストレージからバケットを読み取ることができます:
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
次のポリシーにより、OpenSearchパイプラインはソース・コーディネーション・バケットからバケットを読み取ることができます。
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
OCIストリーミングおよび自己管理のKafkaポリシー
これらのポリシーは、OCIストリーミング・サービスまたは自己管理Kafkaソースに必要です。
共通ネットワークポリシー
これらのポリシーは、パブリックOCIストリーミング・サービスには必要ありません。
OpenSearchサービスがカスタマ・サブネット内のプライベート・エンドポイントを作成、読取り、更新できるようにするために追加するポリシー。
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
OCIストリーミング・サービスのポリシー(パブリックおよびプライベート)
これらのポリシーは、OCIストリーミング・サービスにのみ必要です。
次のポリシーは、OpenSearchパイプラインがOCIストリーミング・サービスからレコードを使用することを許可します。
Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>'
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}
次のポリシーは、OpenSearchパイプラインがOCIストリーミング・サービスからストリーム・プールを読み取ることを許可します
Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline',
target.streampool.id = '<target-stream-pool-ocid>'}
自己管理のKafka権限
これらの権限は、自己管理Kafkaソースにのみ必要です。
次のリンクを選択して、トピックのリスト、トピックの説明、グループへの参加、およびOpenSearchパイプラインによるトピックからのレコードの消費に必要な権限を追加します:
ネットワーク・セキュリティ・ルール
この構成は、プライベートOCIストリーミング・サービスおよび自己管理Kafkaでのみ必要です。パブリックOCIストリーミング・サービスの場合は、「なし」を選択します。
サブネットまたはネットワーク・セキュリティ・グループのセキュリティ・リストにイングレス・セキュリティ・ルールをすべてのOpenSearchパイプラインに追加して、サブネットで実行されているプライベートOCIストリーミング・サービスと通信します。
セキュリティ・ルールを追加するには、セキュリティ・ルールおよびアクセスおよびセキュリティを参照してください。
サブネットのCIDRを検索するには、サブネットの詳細の取得を参照してください。
次の図は、ネットワーク・セキュリティ・グループのイングレス・ルールを示しています。

Vaultでのシークレットの作成
OpenSearchパイプラインはパイプラインyamlコード内のユーザー名やパスワードなどのプレーン・テキスト・シークレットを受け入れないため、OpenSearchパイプラインで必要なすべてのプレーン・テキスト・シークレットをVaultを介して渡す必要があります。
次のタスクを実行します。
-
OpenSearchクラスタに、書込み権限を持つ新しいユーザーを作成します。手順については、次のOpenSearchドキュメント・トピックを参照してください。
-
作成した新しいユーザーのシークレット(ユーザー名とパスワード)をVaultに作成します。手順については、次のOracle Cloud Infrastructure Vaultのトピックを参照してください:
-
OpenSearchテナンシに次のリソース・プリンシパル・ポリシーを追加して、OpenSearchパイプラインがVaultからシークレットを読み取れるようにします。
ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Vaultのポリシーの作成の詳細は、Vaultサービスの詳細を参照してください。
次のOpenSearchパイプライン・タスクを実行できます:
サポートされているプロセッサ
次の表に、パイプラインでサポートされているプロセッサを示します。
オブジェクト・ストレージおよびソース・コーディネーションYAML
オブジェクト・ストレージ・ソースは、ソース・コーディネーション構成に依存します。OpenSearchパイプラインは、オブジェクト・ストレージを永続性として使用するソース・コーディネーションをサポートします。テナンシからオブジェクト・ストレージ・バケットの詳細を指定する必要があります。
次に、ソース調整の例を示します。
source_coordination:
store:
oci-object-bucket:
name: <OCI Object storage bucket-name details from their tenancy>
namespace: <namespace>
テナンシのオブジェクト・ストレージ・ネームスペースを取得する方法の詳細は、オブジェクト・ストレージ・ネームスペースの理解を参照してください。
次に、オブジェクト・ストレージ永続性を使用したソース・コーディネーションの例を示します:
source_coordination:
store:
oci-object-bucket:
name: "dataprepper-test-pipelines" <-- bucket name
namespace: "idee4xpu3dvm". <-- namespace
オブジェクト・ストレージYAML
次に示すのは、パイプライン構成YAMLの注意事項です。
-
OCIシークレット: OpenSearchクラスタ資格証明を使用してVaultにシークレットを作成し、パイプラインYAMLでOpenSearchクラスタに接続するために使用できます。
-
OpenSearchシンク: シンクには、取込み用の索引名を含むOpenSearchクラスタOCIDsが含まれます。
-
oci-object source: データ・プリッパーは、多くの構成がサポートされるオブジェクト・ストレージを使用したスキャンベースの取込みをサポートしています。スケジュールされた頻度に基づいて、またはスケジュールに基づいてではなく、オブジェクト・ストレージ・バケット内のオブジェクトを取り込むようにソースを構成できます。次のスキャン・オプションがあります
-
1つ以上の時間スキャン: このオプションを使用すると、オブジェクトの最終変更時間に基づいてオブジェクト・ストレージ・バケット内のオブジェクトを1回以上読み取るパイプラインを構成できます。
-
スケジューリング・ベース・スキャン: このオプションを使用すると、パイプラインの作成後に一定の間隔でスキャンをスケジュールできます。
-
次の表に、オブジェクト・ストレージ・ソースの構成に使用できるオプションを示します。
オプション | 必須 | タイプ | 内容 |
---|---|---|---|
acknowledgments |
No | Boolean | true の場合、オブジェクト・ストレージ・オブジェクト・ソースは、イベントがOpenSearchシンクによって受信されたときにエンドツーエンドの確認を受け取ることができます。 |
buffer_timeout |
No | 継続時間 | タイムアウトが発生するまでのデータ・プリッパー・バッファへのイベントの書込みに許可される時間。指定した時間内にOCIソースがバッファに書き込めないイベントは破棄されます。デフォルトは10s です。 |
codec |
Yes | コーデック | 適用するデータプリッパーの codec。 |
compression |
No | 文字列 | 適用する圧縮アルゴリズム: none 、gzip 、snappy またはautomatic 。デフォルトはnone です。 |
delete_oci_objects_on_read |
No | Boolean | true の場合、オブジェクト・ストレージ・ソース・スキャンは、オブジェクト・ストレージ・オブジェクトからのすべてのイベントがすべてのシンクによって正常に確認された後に、オブジェクト・ストレージ・オブジェクトの削除を試みます。オブジェクト・ストレージ・オブジェクトを削除する場合は、acknowledgments を有効にする必要があります。デフォルトはfalse エンドツーエンドのacknowledgments が有効になっていない場合、削除は機能しません。 |
oci |
No | OCI | OCI構成。詳細は、次のOCIの項を参照してください。 |
records_to_accumulate |
No | Integer | バッファに書き込まれる前に蓄積されるメッセージの数。デフォルトは100 です。 |
workers |
No | Integer | ソースがOCIバケットからのデータの読取りに使用するワーカー・スレッドの数を構成します。オブジェクト・ストレージ・オブジェクトが1MB未満でないかぎり、この値をデフォルトのままにします。大きいオブジェクト・ストレージ・オブジェクトのパフォーマンスが低下する可能性があります。デフォルトは1 です。 |
次に、オブジェクト・ストレージ・パイプライン構成YAMLの例を示します:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
compression: none
scan:
start_time: 2024-11-18T08:01:59.363Z
buckets:
- bucket:
namespace: <namespace>
name: <bucket-name>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
構成例
個々のオブジェクト・ストレージ・バケット・レベルまたはスキャン・レベルで適用できるワンタイム・スキャン・オプションは次のとおりです
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "<namespace>"
name: "data-prepper-object-storage-testing"
start_time: 2023-01-01T00:00:00Z
compression: "none"
次に、終了時間の例を示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
end_time: 2024-12-01T00:00:00Z
compression: "none"
開始時刻と終了時刻の例を次に示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
compression: "none"
範囲の例を次に示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
range: "PT12H"
compression: "none"
開始時間、終了時間および範囲の例を次に示します。
oci-object:
codec:
newline:
scan:
start_time: 2023-01-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
range: "PT12H"
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
include_prefix
フィルタの例を次に示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest1", "10-05-2024"]
compression: "none"
次に、フォルダからのフィルタ・ファイルの例を示します。特定のフォルダからのみファイルを読み取るには、フィルタを使用してフォルダを指定します。次に、include_prefix
を使用してfolder1内にfolder2からファイルをインクルードする例を示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["folder1/folder2"]
compression: "none"
exclude_prefix
フィルタの例を次に示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest", "10-05-2024"]
exclude_suffix: [".png"]
compression: "none"
JSONのコーデック・サポートの例を次に示します。
source:
oci-object:
acknowledgments: true
codec:
json: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
CSVのコーデック・サポートの例を次に示します。
source:
oci-object:
acknowledgments: true
codec:
csv: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
次に、改行のコーデック・サポートの例を示します。
source:
oci-object:
acknowledgments: true
codec:
newline: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
次に、カウントなしの収集オプションのスケジュールの例を示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
次に、カウントを使用した収集オプションのスケジュールの例を示します。
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
count: 10
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
開始時間を含む収集オプションのスケジューリングの例を次に示します:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
start_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
次に、終了時間を含む収集オプションのスケジューリングの例を示します:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
end_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Kafka YAML
Kafkaソースにはソース調整は必要ありません。
Kafkaソースで使用可能なすべての構成の詳細は、次のリンクにアクセスしてください:
https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/
OCIストリーミング・サービスをKafkaソースとして使用して、OpenSearchクラスタに取り込むことができます。この実行方法の詳細は、Kafka APIの使用を参照してください。
OCIストリーミング・パブリック・アクセスYAML
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
パイプラインOCIストリーミング・サービス・プライベート・アクセスYAML
次に、OCIストリーミング・サービスのOpenSearchパイプラインのYAMLの例を示します:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
自己管理のKafka YAML
次に、OpenSearchの自己管理Kafka YAMLの例を示します:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-credentials:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
kafka:
bootstrap_servers:
- "https://<bootstrap_server_fqdn>:9092"
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
certificate: <certificate-in-pem-format>
authentication:
sasl:
plaintext:
username: ${{oci_secrets:kafka-credentials:username}}
password: ${{oci_secrets:kafka-credentials:password}}
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>