Using Kafka APIs
This topic describes how to configure Apache Kafka for API compatibility with Oracle Cloud Infrastructure Streaming. When your producers use Kafka APIs to interact with Streaming the decision of which partition to publish a unique message to is handled client-side by Kafka.
Please refer to Kafka API Support for additional information.
Endpoints
For bootstrap servers, use your region endpoint on port 9092. For example:
streaming.us-phoenix-1.oci.oraclecloud.com:9092
Authentication
Authentication with the Kafka protocol uses auth tokens and the SASL/PLAIN mechanism. You can generate tokens in the Console user details page. See Working with Auth Tokens for more information.
Create a dedicated group/user and grant that group the permission to manage streams in the appropriate compartment or tenancy. The policy in Let streaming admins manage streaming resources lets the specified group do everything with streaming and related Streaming service resources. You then can generate an auth token for the user you created and use it in your Kafka client configuration.
Your username must be in the following format:
tenancyName/domain/username/streamPoolId
Kafka Configuration
Set the following properties for your Kafka client.
For the Java SDKRecommended settings for Java SDK:
Properties properties = new Properties();
properties.put("bootstrap.servers", "streaming.{region}.oci.oraclecloud.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{tenancyName}/{username}/{streamPoolId}\" password=\"{authToken}\";");
Recommended settings for Java SDK producers:
properties.put("retries", 5); // retries on transient errors and load balancing disconnection
properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
Recommended settings for Java SDK consumers:
properties.put("max.partition.fetch.bytes", 1024 * 1024); // limit request size to 1MB per partition
Recommended settings for Librdkafka SDK:
'metadata.broker.list': 'streaming.{region}.oci.oraclecloud.com:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '{tenancyName}/{username}/{streamPoolID}',
'sasl.password': '{authToken}'
Recommended settings for Librdkafka SDK producers:
'message.send.max.retries': 5 // retries on transient errors and load balancing disconnection
'max.request.size': 1024 * 1024 // limit request size to 1 MB
Recommended settings for Librdkafka SDK consumers:
'max.partition.fetch.bytes': 1024 * 1024 // limit request size to 1 MB per partition
Instance Principal Authorization for the Java SDK
If you are using the Java SDK, you can authorize an instance to interact with Streaming instead of using auth tokens.
To configure the Java SDK for instance principal authorization:
- Verify that you have a valid Oracle Cloud Infrastructure (OCI) SDK and CLI configuration file.
- Import the Oracle Cloud Infrastructure SDK for Java into your project. See Getting Started with the SDK for Java for more information.
- Add the following Oracle Cloud Infrastructure SDK for Java
dependency:
<dependency> <groupId>com.oracle.oci.sdk</groupId> <artifactId>oci-apisdk-java-sdk-addons-sasl</artifactId> <optional>false</optional> <version>1.13.1</version> <!-- that's the minimum version to use --> </dependency>
- Modify the
sasl.mechanism
property of your Kafka client configuration:properties.put("sasl.mechanism", OciMechanism.OCI_RSA_SHA256.mechanismName());
- Modify the
sasl.jaas.config
property of your Kafka client configuration using one of the following options:properties.put("sasl.jaas.config", "com.oracle.bmc.auth.sasl.InstancePrincipalsLoginModule required intent=\"streamPoolId:<streamPoolId>\";");
properties.put("sasl.jaas.config", "com.oracle.bmc.auth.sasl.UserPrincipalsLoginModule required config=\"<pathToConfig>\" profile=\"<profile>\" intent=\"streamPoolId:<streamPoolId>\";");
- If
config
is not specified, the default config path is used (~/.oci/config
). - If
profile
is not specified, the default profile is used (DEFAULT).
- If