Kafka .NET Client and Streaming Quickstart
This quickstart shows you how to use the Kafka .NET client with Oracle Cloud Infrastructure Streaming to publish and consume messages. These examples use C# language.
See Using Streaming with Apache Kafka for more information. Refer to the Overview of Streaming for key concepts and more Streaming details.
Prerequisites
In this quickstart, we create and run a simple .NET console application by using Visual Studio Code and the .NET CLI. Project tasks, such as creating, compiling, and running a project are done by using the .NET CLI. If you prefer, you can follow this tutorial with a different IDE and run commands in a terminal.
-
To use the Kafka .NET client with Streaming, you must have the following:
- An Oracle Cloud Infrastructure account.
- A user created in that account, in a group with a policy that grants the required permissions. For an example of how to set up a new user, group, compartment, and policy, see Adding Users. For a list of typical policies you may want to use, see Common Policies.
-
Collect the following details:
- Stream OCID
- Messages endpoint
- Stream pool OCID
- Stream pool FQDN
- Kafka connection settings:
- Bootstrap servers
- SASL connection strings
- Security protocol
See Listing Streams and Stream Pools for instructions on viewing stream details. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream. Streams correspond to a Kafka topic.
- Install .NET 5.0 SDK or later. Ensure that
dotnet
is set in yourPATH
environment variable. -
Visual Studio Code (recommended) with the C# extension installed. For information about how to install extensions on Visual Studio Code, see VS Code Extension Marketplace.
-
Authentication with the Kafka protocol uses auth tokens and the SASL/PLAIN mechanism. Refer to Working with Auth Tokens for auth token generation. If you created the stream and stream pool in OCI, you are already authorized to use this stream according to OCI IAM, so you should create auth tokens for your OCI user.
Note
OCI user auth tokens are visible only at the time of creation. Copy it and keep it somewhere safe for future use. -
Install the SSL CA root certificates on the host where you are developing and running this quickstart. The client uses CA certificates to verify the broker's certificate.
For Windows, download the
cacert.pem
file distributed with curl (download cacert.pm). For other platforms, refer to Configure SSL trust store.
Producing Messages
- Open your favorite editor, such as Visual Studio Code, from the empty working
directory
wd
. - Open the terminal and
cd
into thewd
directory. -
Create a C# .NET console application by running the following command in the terminal:
dotnet new console
You should see a message indicating that the application was created:
The template "Console Application" was created successfully.
This creates a
Program.cs
file with C# code for a simple "HelloWorld" application. -
To reference the
confluent-kafka-dotnet
library in your new .NET Core project, run the following command in your project's directorywd
:dotnet add package Confluent.Kafka
-
Replace the code in
Program.cs
in thewd
directory with following code. Replace values of variables in the mapProducerConfig
and the name oftopic
with the details you gathered in the prerequisites:using System; using Confluent.Kafka; namespace OssProducerWithKafkaApi { class Program { static void Main(string[] args) { Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS"); var config = new ProducerConfig { BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092 SslCaLocation = "<path\to\root\ca\certificate\*.pem>", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.Plain, SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>", SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section }; Produce("<topic_stream_name>", config); // use the name of the stream you created } static void Produce(string topic, ClientConfig config) { using (var producer = new ProducerBuilder<string, string>(config).Build()) { int numProduced = 0; int numMessages = 10; for (int i=0; i<numMessages; ++i) { var key = "messageKey" + i; var val = "messageVal" + i; Console.WriteLine($"Producing record: {key} {val}"); producer.Produce(topic, new Message<string, string> { Key = key, Value = val }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); numProduced += 1; } }); } producer.Flush(TimeSpan.FromSeconds(10)); Console.WriteLine($"{numProduced} messages were produced to topic {topic}"); } } } }
-
From the
wd
directory, run the following command:dotnet run
- Use the Console to see the latest messages sent to the stream to verify that production was successful.
Consuming Messages
- First, ensure that the stream you want to consume messages from contains messages. You could use the Console to produce a test message, or use the stream and messages we created in this quickstart.
- Open your favorite editor, such as Visual Studio Code, from the empty working
directory
wd
. -
Create a C# .NET console application by running the following command on the terminal:
dotnet new console
You should see a message indicating that the application was created:
The template "Console Application" was created successfully.
This creates a
Program.cs
file with C# code for a simple "HelloWorld" application. -
To reference the
confluent-kafka-dotnet
library in your new .NET Core project, run the following command in your project's directorywd
:dotnet add package Confluent.Kafka
-
Replace the code in
Program.cs
in thewd
directory with following code. Replace values of variables in the mapProducerConfig
and the name oftopic
with the details you gathered in the prerequisites:using System; using Confluent.Kafka; using System.Threading; namespace OssKafkaConsumerDotnet { class Program { static void Main(string[] args) { Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS"); var config = new ConsumerConfig { BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092 SslCaLocation = "<path\to\root\ca\certificate\*.pem>", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.Plain, SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>", SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section }; Consume("<topic_stream_name>", config); // use the name of the stream you created } static void Consume(string topic, ClientConfig config) { var consumerConfig = new ConsumerConfig(config); consumerConfig.GroupId = "dotnet-oss-consumer-group"; consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest; consumerConfig.EnableAutoCommit = true; CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build()) { consumer.Subscribe(topic); try { while (true) { var cr = consumer.Consume(cts.Token); string key = cr.Message.Key == null ? "Null" : cr.Message.Key; Console.WriteLine($"Consumed record with key {key} and value {cr.Message.Value}"); } } catch (OperationCanceledException) { //exception might have occurred since Ctrl-C was pressed. } finally { // Ensure the consumer leaves the group cleanly and final offsets are committed. consumer.Close(); } } } } }
-
From the
wd
directory, run the following command:dotnet run
-
You should see messages similar to the following:
Demo for using Kafka APIs seamlessly with OSS Consumed record with key messageKey0 and value messageValue0 Consumed record with key messageKey1 and value messageValue1 Consumed record with key Null and value Example test message
Next Steps
See the following resources for more information: