SDK for Goストリーミング・クイックスタート
このクイックスタートでは、Oracle Cloud Infrastructure (OCI) SDK for GoおよびOracle Cloud Infrastructure Streamingを使用して、メッセージを公開および消費する方法を示します。
主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。OCI SDKの使用の詳細は、SDKガイドを参照してください。
前提条件
-
SDK for Goを使用するには、次が必要です:
- Oracle Cloud Infrastructureアカウント。
- そのアカウントで作成され、必要な権限を付与するポリシーがあるグループに含まれるユーザー。このユーザーは、自分自身であるか、またはAPIをコールする必要がある別の個人/システムです。新しいユーザー、グループ、コンパートメントおよびポリシーの設定方法の例は、ユーザーの追加を参照してください。使用する一般的なポリシーのリストは、共通ポリシーを参照してください。
- APIリクエストの署名に使用されるキー・ペア(公開キーがOracleにアップロードされている)。秘密キーはAPIをコールするユーザーのみが所有する必要があります。詳細は、SDK構成ファイルを参照してください。
- ストリームのメッセージ・エンドポイントおよびOCIDを収集します。ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。このクイックスタートの目的では、ストリームでパブリック・エンドポイントを使用し、Oracleで暗号化を管理する必要があります。既存のストリームがない場合は、「ストリームの作成」および「ストリーム・プールの作成」を参照してください。
- Goがローカルにインストール済。必要に応じて、これらの手順に従います。
go
がPATH
内にあることを確認します。 -
Visual Studio Code (推奨)またはその他の統合開発環境(IDE)やテキスト・エディタ。
- 有効なSDK構成ファイルがあることを確認します。本番環境では、インスタンス・プリンシパル認可を使用する必要があります。
メッセージの生成
- 空の作業ディレクトリ
wd
から、Visual Studio Codeなどのお気に入りのエディタを開きます。 - このディレクトリに
Producer.go
という名前のファイルを作成します。 -
Producer.go
に、次のコードを追加します。次のコード・スニペットの変数ociConfigFilePath
、ociProfileName
、ociStreamOcid
およびociMessageEndpoint
の値は、テナンシに適用可能な値で置き換えてください。package main import ( "context" "fmt" "strconv" "github.com/oracle/oci-go-sdk/v36/common" "github.com/oracle/oci-go-sdk/v36/example/helpers" "github.com/oracle/oci-go-sdk/v36/streaming" ) const ociMessageEndpoint = "<stream_message_endpoint>" const ociStreamOcid = "<stream_OCID>" const ociConfigFilePath = "<config_file_path>" const ociProfileName = "<config_file_profile_name>" func main() { fmt.Println("Go oci oss sdk example producer") putMsgInStream(ociMessageEndpoint, ociStreamOcid) } func putMsgInStream(streamEndpoint string, streamOcid string) { fmt.Println("Stream endpoint for put msg api is: " + streamEndpoint) provider, err := common.ConfigurationProviderFromFileWithProfile(ociConfigFilePath, ociProfileName, "") helpers.FatalIfError(err) streamClient, err := streaming.NewStreamClientWithConfigurationProvider(provider, streamEndpoint) helpers.FatalIfError(err) // Create a request and dependent object(s). for i := 0; i < 5; i++ { putMsgReq := streaming.PutMessagesRequest{StreamId: common.String(streamOcid), PutMessagesDetails: streaming.PutMessagesDetails{ // we are batching 2 messages for each Put Request Messages: []streaming.PutMessagesDetailsEntry{ {Key: []byte("key dummy-0-" + strconv.Itoa(i)), Value: []byte("value dummy-" + strconv.Itoa(i))}, {Key: []byte("key dummy-1-" + strconv.Itoa(i)), Value: []byte("value dummy-" + strconv.Itoa(i))}}}, } // Send the request using the service client putMsgResp, err := streamClient.PutMessages(context.Background(), putMsgReq) helpers.FatalIfError(err) // Retrieve value from the response. fmt.Println(putMsgResp) } }
Producer.go
を保存します。-
ターミナルを開き、
wd
ディレクトリにcd
で移動して、次のコマンドを順番に実行します:-
このコマンドは、
go.mod
ファイルをwd
ディレクトリに作成します:go mod init oss_producer_example/v0
-
このコマンドは、OCI SDK for Goおよびストリーミング用のSDKをインストールします:
go mod tidy
-
このコマンドは、例を実行します:
go run Producer.go
-
- コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。
メッセージの消費
- 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
-
Consumer.go
に、次のコードを追加します。次のコード・スニペットの変数ociConfigFilePath
、ociProfileName
、ociStreamOcid
およびociMessageEndpoint
の値は、テナンシに適用可能な値で置き換えてください。package main import ( "context" "fmt" "github.com/oracle/oci-go-sdk/v36/common" "github.com/oracle/oci-go-sdk/v36/example/helpers" "github.com/oracle/oci-go-sdk/v36/streaming" ) const ociMessageEndpoint = "<stream_message_endpoint>" const ociStreamOcid = "<stream_OCID>" const ociConfigFilePath = "<config_file_path>" const ociProfileName = "<config_file_profile_name>" func main() { fmt.Println("Go oci oss sdk example for consumer") getMsgWithGroupCursor(ociMessageEndpoint, ociStreamOcid) } func getMsgWithGroupCursor(streamEndpoint string, streamOcid string) { client, err := streaming.NewStreamClientWithConfigurationProvider(common.DefaultConfigProvider(), streamEndpoint) helpers.FatalIfError(err) grpCursorCreateReq0 := streaming.CreateGroupCursorRequest{ StreamId: common.String(streamOcid), CreateGroupCursorDetails: streaming.CreateGroupCursorDetails{Type: streaming.CreateGroupCursorDetailsTypeTrimHorizon, CommitOnGet: common.Bool(true), GroupName: common.String("Go-groupname-0"), InstanceName: common.String("Go-groupname-0-instancename-0"), TimeoutInMs: common.Int(1000), }} // Send the request using the service client grpCursorResp0, err := client.CreateGroupCursor(context.Background(), grpCursorCreateReq0) helpers.FatalIfError(err) // Retrieve value from the response. fmt.Println(grpCursorResp0) simpleGetMsgLoop(client, streamOcid, *grpCursorResp0.Value) } func simpleGetMsgLoop(streamClient streaming.StreamClient, streamOcid string, cursorValue string) { for i := 0; i < 5; i++ { getMsgReq := streaming.GetMessagesRequest{Limit: common.Int(3), StreamId: common.String(streamOcid), Cursor: common.String(cursorValue)} // Send the request using the service client getMsgResp, err := streamClient.GetMessages(context.Background(), getMsgReq) helpers.FatalIfError(err) // Retrieve value from the response. if len(getMsgResp.Items) > 0 { fmt.Println("Key : " + string(getMsgResp.Items[0].Key) + ", value : " + string(getMsgResp.Items[0].Value) + ", Partition " + *getMsgResp.Items[0].Partition) } if len(getMsgResp.Items) > 1 { fmt.Println("Key : " + string(getMsgResp.Items[1].Key) + ", value : " + string(getMsgResp.Items[1].Value) + ", Partition " + *getMsgResp.Items[1].Partition) } cursorValue = *getMsgResp.OpcNextCursor } }
Consumer.go
を保存します。-
ターミナルを開き、
wd
ディレクトリにcd
で移動して、次のコマンドを順番に実行します:-
このコマンドは、
go.mod
ファイルをwd
ディレクトリに作成します:go mod init oss_consumer_example/v0
-
このコマンドは、OCI SDK for Goおよびストリーミング用のSDKをインストールします:
go mod tidy
-
このコマンドは、例を実行します:
go run Consumer.go
-
-
次のようなメッセージが表示されます:
Go oci oss sdk example for consumer { RawResponse={200 OK 200 HTTP/1.1 1 1 map[Access-Control-Allow-Credentials:[true] ... } Key : , value : Example Test Message 0, Partition 0 Key : , value : Example Test Message 0, Partition 0 Key : , value : Example Test Message 0, Partition 0 Key : , value : Example Test Message 0, Partition 0 Key : , value : Example Test Message 0, Partition 0
次のステップ
詳細は、次のリソースを参照してください: