SDK for Pythonストリーミング・クイックスタート
このクイックスタートでは、Oracle Cloud Infrastructure (OCI) SDK for PythonおよびOracle Cloud Infrastructure Streamingを使用して、メッセージを公開および消費する方法を示します。
主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。OCI SDKの使用の詳細は、SDKガイドを参照してください。
前提条件
-
SDK for Pythonを使用するには、次が必要です:
- ストリームのメッセージ・エンドポイントおよびOCIDを収集します。ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。このクイックスタートの目的では、ストリームでパブリック・エンドポイントを使用し、Oracleで暗号化を管理する必要があります。既存のストリームがない場合は、「ストリームの作成」および「ストリーム・プールの作成」を参照してください。
- Python 3.6以降(PIPがインストールおよび更新済)。
- Visual Code Studio (推奨)またはその他の統合開発環境(IDE)。
-
次のコマンドを使用して、Python用の
oci-sdk
パッケージをインストールします:pip install oci
- 有効なSDK構成ファイルがあることを確認します。本番環境では、インスタンス・プリンシパル認可を使用する必要があります。
メッセージの生成
wd
ディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、現在のPython環境にPython用のoci-sdk
パッケージがすでにインストールされている必要があります。-
次のコードを使用して、
wd
ディレクトリにProducer.py
という名前のファイルを作成します。次のコード・スニペットの変数ociConfigFilePath
、ociProfileName
、ociStreamOcid
およびociMessageEndpoint
の値は、テナンシに適用可能な値で置き換えてください。import oci from base64 import b64encode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def produce_messages(client, stream_id): # Build up a PutMessagesDetails and publish some messages to the stream message_list = [] for i in range(100): key = "messageKey" + str(i) value = "messageValue " + str(i) encoded_key = b64encode(key.encode()).decode() encoded_value = b64encode(value.encode()).decode() message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value)) print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id)) messages = oci.streaming.models.PutMessagesDetails(messages=message_list) put_message_result = client.put_messages(stream_id, messages) # The put_message_result can contain some useful metadata for handling failures for entry in put_message_result.data.entries: if entry.error: print("Error ({}) : {}".format(entry.error, entry.error_message)) else: print("Published message to partition {} , offset {}".format(entry.partition, entry.offset)) config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # Publish some messages to the stream produce_messages(stream_client, ociStreamOcid)
-
wd
ディレクトリから、次のコマンドを実行します:python Producer.py
- コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。
メッセージの消費
- 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
wd
ディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たしていることを確認した後、現在のPython環境にPython用のoci-sdk
パッケージがすでにインストールされている必要があります。-
次のコードを使用して、ディレクトリ
wd
にConsumer.py
という名前のファイルを作成します。次のコード・スニペットの変数ociConfigFilePath
、ociProfileName
、ociStreamOcid
およびociMessageEndpoint
の値は、テナンシに適用可能な値で置き換えてください。import oci import time from base64 import b64decode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def get_cursor_by_group(sc, sid, group_name, instance_name): print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name)) cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name, type=oci.streaming.models. CreateGroupCursorDetails.TYPE_TRIM_HORIZON, commit_on_get=True) response = sc.create_group_cursor(sid, cursor_details) return response.data.value def simple_message_loop(client, stream_id, initial_cursor): cursor = initial_cursor while True: get_response = client.get_messages(stream_id, cursor, limit=10) # No messages to process. return. if not get_response.data: return # Process the messages print(" Read {} messages".format(len(get_response.data))) for message in get_response.data: if message.key is None: key = "Null" else: key = b64decode(message.key.encode()).decode() print("{}: {}".format(key, b64decode(message.value.encode()).decode())) # get_messages is a throttled method; clients should retrieve sufficiently large message # batches, as to avoid too many http requests. time.sleep(1) # use the next-cursor for iteration cursor = get_response.headers["opc-next-cursor"] config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # A cursor can be created as part of a consumer group. # Committed offsets are managed for the group, and partitions # are dynamically balanced amongst consumers in the group. group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1") simple_message_loop(stream_client, ociStreamOcid, group_cursor)
-
wd
ディレクトリから、次のコマンドを実行します:python Consumer.py
-
次のようなメッセージが表示されます:
Starting a simple message loop with a group cursor Creating a cursor for group example-group, instance example-instance-1 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 1 messages Null: Example Test Message 0 Read 10 messages key 0: value 0 key 1: value 1
次のステップ
詳細は、次のリソースを参照してください: