SDK for Go Streaming Quickstart
This quickstart shows you how to use the Oracle Cloud Infrastructure (OCI) SDK for Go and Oracle Cloud Infrastructure Streaming to publish and consume messages.
Refer to the Overview of Streaming for key concepts and more Streaming details. For more information about using the OCI SDKs, see the SDK Guides.
Prerequisites
-
To use the SDK for Go, you must have the following:
- An Oracle Cloud Infrastructure account.
- A user created in that account, in a group with a policy that grants the required permissions. This user can be yourself, or another person/system that needs to call the API. For an example of how to set up a new user, group, compartment, and policy, see Adding Users. For a list of typical policies you may want to use, see Common Policies.
- A key pair used for signing API requests, with the public key uploaded to Oracle. Only the user calling the API should possess the private key. For more information, see SDK configuration file.
- Collect the Messages endpoint and OCID of a stream. See Listing Streams and Stream Pools for instructions on viewing stream details. For the purposes of this quickstart, the stream should use a public endpoint and let Oracle manage encryption. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream.
- Go installed locally. Follow these instructions if necessary. Ensure that
go
is in yourPATH
. -
Visual Studio Code (recommended) or any other integrated development environment (IDE) or text editor.
- Ensure that you have a valid SDK configuration file. For production environments, you should use instance principal authorization.
Producing Messages
- Open your favorite editor, such as Visual Studio Code, from the empty working
directory
wd
. - Create a file named
Producer.go
in this directory. -
Add the following code to
Producer.go
. Replace values of variablesociConfigFilePath
,ociProfileName
,ociStreamOcid
, andociMessageEndpoint
in the following code snippet with the values applicable for your tenancy.package main import ( "context" "fmt" "strconv" "github.com/oracle/oci-go-sdk/v36/common" "github.com/oracle/oci-go-sdk/v36/example/helpers" "github.com/oracle/oci-go-sdk/v36/streaming" ) const ociMessageEndpoint = "<stream_message_endpoint>" const ociStreamOcid = "<stream_OCID>" const ociConfigFilePath = "<config_file_path>" const ociProfileName = "<config_file_profile_name>" func main() { fmt.Println("Go oci oss sdk example producer") putMsgInStream(ociMessageEndpoint, ociStreamOcid) } func putMsgInStream(streamEndpoint string, streamOcid string) { fmt.Println("Stream endpoint for put msg api is: " + streamEndpoint) provider, err := common.ConfigurationProviderFromFileWithProfile(ociConfigFilePath, ociProfileName, "") helpers.FatalIfError(err) streamClient, err := streaming.NewStreamClientWithConfigurationProvider(provider, streamEndpoint) helpers.FatalIfError(err) // Create a request and dependent object(s). for i := 0; i < 5; i++ { putMsgReq := streaming.PutMessagesRequest{StreamId: common.String(streamOcid), PutMessagesDetails: streaming.PutMessagesDetails{ // we are batching 2 messages for each Put Request Messages: []streaming.PutMessagesDetailsEntry{ {Key: []byte("key dummy-0-" + strconv.Itoa(i)), Value: []byte("value dummy-" + strconv.Itoa(i))}, {Key: []byte("key dummy-1-" + strconv.Itoa(i)), Value: []byte("value dummy-" + strconv.Itoa(i))}}}, } // Send the request using the service client putMsgResp, err := streamClient.PutMessages(context.Background(), putMsgReq) helpers.FatalIfError(err) // Retrieve value from the response. fmt.Println(putMsgResp) } }
- Save
Producer.go
. -
Open the terminal and
cd
to thewd
directory, run the following commands, in order:-
This command creates the
go.mod
file in thewd
directory:go mod init oss_producer_example/v0
-
This command installs the OCI SDK for Go and for Streaming:
go mod tidy
-
This command runs the example:
go run Producer.go
-
- Use the Console to see the latest messages sent to the stream to verify that production was successful.
Consuming Messages
- First, ensure that the stream you want to consume messages from contains messages. You could use the Console to produce a test message, or use the stream and messages we created in this quickstart.
-
Add the following code to
Consumer.go
. Replace values of variablesociConfigFilePath
,ociProfileName
,ociStreamOcid
, andociMessageEndpoint
in the following code snippet with the values applicable for your tenancy.package main import ( "context" "fmt" "github.com/oracle/oci-go-sdk/v36/common" "github.com/oracle/oci-go-sdk/v36/example/helpers" "github.com/oracle/oci-go-sdk/v36/streaming" ) const ociMessageEndpoint = "<stream_message_endpoint>" const ociStreamOcid = "<stream_OCID>" const ociConfigFilePath = "<config_file_path>" const ociProfileName = "<config_file_profile_name>" func main() { fmt.Println("Go oci oss sdk example for consumer") getMsgWithGroupCursor(ociMessageEndpoint, ociStreamOcid) } func getMsgWithGroupCursor(streamEndpoint string, streamOcid string) { client, err := streaming.NewStreamClientWithConfigurationProvider(common.DefaultConfigProvider(), streamEndpoint) helpers.FatalIfError(err) grpCursorCreateReq0 := streaming.CreateGroupCursorRequest{ StreamId: common.String(streamOcid), CreateGroupCursorDetails: streaming.CreateGroupCursorDetails{Type: streaming.CreateGroupCursorDetailsTypeTrimHorizon, CommitOnGet: common.Bool(true), GroupName: common.String("Go-groupname-0"), InstanceName: common.String("Go-groupname-0-instancename-0"), TimeoutInMs: common.Int(1000), }} // Send the request using the service client grpCursorResp0, err := client.CreateGroupCursor(context.Background(), grpCursorCreateReq0) helpers.FatalIfError(err) // Retrieve value from the response. fmt.Println(grpCursorResp0) simpleGetMsgLoop(client, streamOcid, *grpCursorResp0.Value) } func simpleGetMsgLoop(streamClient streaming.StreamClient, streamOcid string, cursorValue string) { for i := 0; i < 5; i++ { getMsgReq := streaming.GetMessagesRequest{Limit: common.Int(3), StreamId: common.String(streamOcid), Cursor: common.String(cursorValue)} // Send the request using the service client getMsgResp, err := streamClient.GetMessages(context.Background(), getMsgReq) helpers.FatalIfError(err) // Retrieve value from the response. if len(getMsgResp.Items) > 0 { fmt.Println("Key : " + string(getMsgResp.Items[0].Key) + ", value : " + string(getMsgResp.Items[0].Value) + ", Partition " + *getMsgResp.Items[0].Partition) } if len(getMsgResp.Items) > 1 { fmt.Println("Key : " + string(getMsgResp.Items[1].Key) + ", value : " + string(getMsgResp.Items[1].Value) + ", Partition " + *getMsgResp.Items[1].Partition) } cursorValue = *getMsgResp.OpcNextCursor } }
- Save
Consumer.go
. -
Open the terminal and
cd
to thewd
directory, run the following commands, in order:-
This command creates the
go.mod
file in thewd
directory:go mod init oss_consumer_example/v0
-
This command installs the OCI SDK for Go and for Streaming:
go mod tidy
-
This command runs the example:
go run Consumer.go
-
-
You should see messages similar to the following:
Go oci oss sdk example for consumer { RawResponse={200 OK 200 HTTP/1.1 1 1 map[Access-Control-Allow-Credentials:[true] ... } Key : , value : Example Test Message 0, Partition 0 Key : , value : Example Test Message 0, Partition 0 Key : , value : Example Test Message 0, Partition 0 Key : , value : Example Test Message 0, Partition 0 Key : , value : Example Test Message 0, Partition 0
Next Steps
See the following resources for more information: