このクイックスタートでは、Visual Studio Codeおよび.NET CLIを使用して、単純な.NETコンソール・アプリケーションを作成して実行します。プロジェクトの作成、コンパイル、実行などのプロジェクト・タスクは、.NET CLIを使用して行います。必要に応じて、別のIDEでこのチュートリアルに従い、ターミナルでコマンドを実行できます。
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Oci.Common.Auth;
using Oci.Common.Waiters;
using Oci.StreamingService;
using Oci.StreamingService.Models;
using Oci.StreamingService.Requests;
using Oci.StreamingService.Responses;
namespace OssConsumer
{
class Program
{
public static async Task Main(string[] args)
{
Console.WriteLine("Starting example for OSS Consumer");
string configurationFilePath = "<config_file_path>";
string profile = "<config_file_profile_name>";
string ociStreamOcid = "<stream_OCID>";
string ociMessageEndpoint = "<stream_message_endpoint>";
try
{
var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
StreamClient streamClient = new StreamClient(provider);
streamClient.SetEndpoint(ociMessageEndpoint);
// A cursor can be created as part of a consumer group.
// Committed offsets are managed for the group, and partitions
// are dynamically balanced amongst consumers in the group.
Console.WriteLine("Starting a simple message loop with a group cursor");
string groupCursor = await GetCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
await SimpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
}
catch (Exception e)
{
Console.WriteLine($"Streaming example failed: {e}");
}
}
private static async Task<string> GetCursorByGroup(StreamClient streamClient, string streamId, string groupName, string instanceName)
{
Console.WriteLine($"Creating a cursor for group {groupName}, instance {instanceName}");
CreateGroupCursorDetails createGroupCursorDetails = new CreateGroupCursorDetails
{
GroupName = groupName,
InstanceName = instanceName,
Type = CreateGroupCursorDetails.TypeEnum.TrimHorizon,
CommitOnGet = true
};
CreateGroupCursorRequest createCursorRequest = new CreateGroupCursorRequest
{
StreamId = streamId,
CreateGroupCursorDetails = createGroupCursorDetails
};
CreateGroupCursorResponse groupCursorResponse = await streamClient.CreateGroupCursor(createCursorRequest);
return groupCursorResponse.Cursor.Value;
}
private static async Task SimpleMessageLoop(StreamClient streamClient, string streamId, string initialCursor)
{
string cursor = initialCursor;
for (int i = 0; i < 10; i++)
{
GetMessagesRequest getMessagesRequest = new GetMessagesRequest
{
StreamId = streamId,
Cursor = cursor,
Limit = 10
};
GetMessagesResponse getResponse = await streamClient.GetMessages(getMessagesRequest);
// process the messages
Console.WriteLine($"Read {getResponse.Items.Count}");
foreach (Message message in getResponse.Items)
{
string key = message.Key != null ? Encoding.UTF8.GetString(message.Key) : "Null";
Console.WriteLine($"{key} : {Encoding.UTF8.GetString(message.Value)}");
}
// getMessages is a throttled method; clients should retrieve sufficiently large message
// batches, as to avoid too many http requests.
await Task.Delay(1000);
// use the next-cursor for iteration
cursor = getResponse.OpcNextCursor;
}
}
}
}
wdディレクトリから、次のコマンドを実行します:
コピー
dotnet run
次のようなメッセージが表示されます:
Starting example for OSS Consumer
Starting a simple message loop with a group cursor
Creating a cursor for group exampleGroup, instance exampleInstance-1
Read 10
messagekey-0 : messageValue-0
messagekey-1 : messageValue-1
messagekey-2 : messageValue-2
messagekey-3 : messageValue-3
messagekey-4 : messageValue-4
messagekey-5 : messageValue-5
messagekey-6 : messageValue-6
messagekey-7 : messageValue-7
messagekey-8 : messageValue-8
messagekey-9 : messageValue-9
Read 10