SDK for TypeScript Streaming Quickstart
This quickstart shows you how to use the Oracle Cloud Infrastructure (OCI) SDK for TypeScript and JavaScript and Oracle Cloud Infrastructure Streaming to publish and consume messages.
Refer to the Overview of Streaming for key concepts and more Streaming details. For more information about using the OCI SDKs, see the SDK Guides.
Prerequisites
-
To use the SDK for TypeScript and JavaScript, you must have the following:
- An Oracle Cloud Infrastructure account.
- A user created in that account, in a group with a policy that grants the required permissions. This user can be yourself, or another person/system that needs to call the API. For an example of how to set up a new user, group, compartment, and policy, see Adding Users. For a list of typical policies you may want to use, see Common Policies.
- A key pair used for signing API requests, with the public key uploaded to Oracle. Only the user calling the API should possess the private key. For more information, see Getting Started.
- Collect the Messages endpoint and OCID of a stream. See Listing Streams and Stream Pools for instructions on viewing stream details. For the purposes of this quickstart, the stream should use a public endpoint and let Oracle manage encryption. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream.
- Node.js version 8.x or later. Download the latest long-term support (LTS) version.
-
Install the TypeScript interpreter for NodeJS globally:
npm install -g typescript
- Visual Code Studio (recommended) or any other integrated development environment (IDE).
-
Open a command prompt that has
npm
in its path, change to the directory where you want to keep your code for this quickstart (wd
, for example), and then run the following command to install the OCI SDK for TypeScript:npm install oci-sdk
Alternatively, you can be more efficient with dependencies by installing just the OCI TypeScript SDK packages for authentication and Streaming:
npm install oci-common npm install oci-streaming
- Ensure that you have a valid SDK configuration file. For production environments, you should use instance principal authorization.
Producing Messages
- Open your favorite editor, such as Visual Studio Code, from the directory
wd
. You should already haveoci-sdk
packages for TypeScript installed in this directory after you've met the prerequisites. -
Create a file named
Producer.ts
in thewd
directory with following code. Replace values of variablesociConfigFile
,ociProfileName
,ociStreamOcid
, andociMessageEndpointForStream
in the following code snippet with the values applicable for your tenancy.const common = require("oci-common"); const st = require("oci-streaming"); // OCI SDK package for OSS const ociConfigFile = "<config_file_path>"; const ociProfileName = "<config_file_profile_name>"; const ociMessageEndpointForStream = "<stream_message_endpoint>"; // example value "https://cell-1.streaming.region.oci.oraclecloud.com" const ociStreamOcid = "<stream_OCID>"; // provide authentication for OCI and OSS const provider = new common.ConfigFileAuthenticationDetailsProvider(ociConfigFile, ociProfileName); async function main() { // OSS client to produce and consume messages from a Stream in OSS const client = new st.StreamClient({ authenticationDetailsProvider: provider }); client.endpoint = ociMessageEndpointForStream; // build up a putRequest and publish some messages to the stream let messages = []; for (let i = 1; i <= 3; i++) { let entry = { key: Buffer.from("messageKey" + i).toString("base64"), value: Buffer.from("messageValue" + i).toString("base64") }; messages.push(entry); } console.log("Publishing %s messages to stream %s.", messages.length, ociStreamOcid); const putMessageDetails = { messages: messages }; const putMessagesRequest = { putMessagesDetails: putMessageDetails, streamId: ociStreamOcid }; const putMessageResponse = await client.putMessages(putMessagesRequest); for (var entry of putMessageResponse.putMessagesResult.entries) console.log("Published messages to parition %s, offset %s", entry.partition, entry.offset); } main().catch((err) => { console.log("Error occurred: ", err); });
-
From the terminal in the
wd
directory, run the following command to compileProducer.ts
and generateProducer.js
:tsc Producer.ts
-
From the same directory, run the following command:
node run Producer.js
You should see terminal output similar to the following:
$:/path/to/directory/wd>node Producer.js Publishing 3 messages to stream ocid1.stream.oc1.exampleuniqueID. Published messages to parition 0, offset 1314 Published messages to parition 0, offset 1315 Published messages to parition 0, offset 1316
- Use the Console to see the latest messages sent to the stream to verify that production was successful.
Consuming Messages
- First, ensure that the stream you want to consume messages from contains messages. You could use the Console to produce a test message, or use the stream and messages we created in this quickstart.
- Open your favorite editor, such as Visual Studio Code, from the directory
wd
. You should already have theoci-sdk
packages for TypeScript installed in this directory after you've met the prerequisites. -
Create a file named
Consumer.ts
in directorywd
with following code. Replace values of variablesociConfigFile
,ociProfileName
,ociStreamOcid
, andociMessageEndpointForStream
in the following code snippet with the values applicable for your tenancy.const common = require("oci-common"); const st = require("oci-streaming"); // OCI SDK package for OSS const ociConfigFile = "<config_file_path>"; const ociProfileName = "<config_file_profile_name>"; const ociMessageEndpointForStream = "<stream_message_endpoint>"; // example value "https://cell-1.streaming.region.oci.oraclecloud.com" const ociStreamOcid = "<stream_OCID>"; // provide authentication for OCI and OSS const provider = new common.ConfigFileAuthenticationDetailsProvider(ociConfigFile, ociProfileName); async function main() { // OSS client to produce and consume messages from a Stream in OSS const client = new st.StreamClient({ authenticationDetailsProvider: provider }); client.endpoint = ociMessageEndpointForStream; // Use a cursor for getting messages; each getMessages call will return a next-cursor for iteration. // There are a couple kinds of cursors, we will use group cursors // Committed offsets are managed for the group, and partitions // are dynamically balanced amongst consumers in the group. console.log("Starting a simple message loop with a group cursor"); const groupCursor = await getCursorByGroup(client, ociStreamOcid, "exampleGroup01000", "exampleInstance-1"); await simpleMessageLoop(client, ociStreamOcid, groupCursor); } async function getCursorByGroup(client, streamId, groupName, instanceName) { console.log("Creating a cursor for group %s, instance %s.", groupName, instanceName); const cursorDetails = { groupName: groupName, instanceName: instanceName, type: st.models.CreateGroupCursorDetails.Type.TrimHorizon, commitOnGet: true }; const createCursorRequest = { createGroupCursorDetails: cursorDetails, streamId: streamId }; const response = await client.createGroupCursor(createCursorRequest); return response.cursor.value; } async function simpleMessageLoop(client, streamId, initialCursor) { let cursor = initialCursor; for (var i = 0; i < 5; i++) { const getRequest = { streamId: streamId, cursor: cursor, limit: 100 }; const response = await client.getMessages(getRequest); console.log("Read %s messages.", response.items.length); for (var message of response.items) { if (message.key !== null) { console.log("Key: %s, Value: %s, Partition: %s", Buffer.from(message.key, "base64").toString(), Buffer.from(message.value, "base64").toString(), Buffer.from(message.partition, "utf8").toString()); } else{ console.log("Key: Null, Value: %s, Partition: %s", Buffer.from(message.value, "base64").toString(), Buffer.from(message.partition, "utf8").toString()); } } // getMessages is a throttled method; clients should retrieve sufficiently large message // batches, as to avoid too many http requests. await delay(2); cursor = response.opcNextCursor; } } async function delay(s) { return new Promise(resolve => setTimeout(resolve, s * 1000)); } main().catch((err) => { console.log("Error occurred: ", err); });
-
From the terminal in the
wd
directory, run the following command to compileConsumer.ts
and generateConsumer.js
:tsc Consumer.ts
-
From the
wd
directory, run the following command:node run Consumer.js
-
You should see messages similar to the following:
Starting a simple message loop with a group cursor Creating a cursor for group exampleGroup01000, instance exampleInstance-1. Read 6 messages. Key: messageKey1, Value: messageValue1, Partition: 0 Key: messageKey2, Value: messageValue2, Partition: 0 Key: messageKey3, Value: messageValue3, Partition: 0 Key: Null, Value: message value and key null, Partition: 0 Key: Null, Value: message value and key null, Partition: 0 Key: Null, Value: message value and key null, Partition: 0 Read 0 messages. Read 0 messages. Read 0 messages. Read 0 messages.
Next Steps
See the following resources for more information:
- OCI SDK for TypeScript on GitHub
- OCI SDK for TypeScript examples