個々のコンシューマの使用
個々のコンシューマを使用して、ストリームからのメッセージを消費します。
コンシューマ・グループを使用するかわりに、個々のコンシューマを使用してストリームからのメッセージを消費する場合、Streamingの多くの利点(サービス管理の調整、水平スケーリング、オフセット管理など)を利用できません。アプリケーションでは、これらのシナリオに加え、多くのことをプログラム的に処理する必要があります。
これらの理由から、本番環境ではコンシューマ・グループを使用することをお薦めしますが、テストまたは概念実証アプリケーションのために個々のコンシューマを使用すると役立つかもしれません。
カーソルの使用
カーソルとは、ストリーム内のロケーションを指すポインタです。ロケーションは、パーティション内の特定のオフセットまたは時間です。
メッセージの消費を開始する前に、消費を開始するポイントを指定する必要があります。これには、カーソルを作成します。
サポートされているカーソル・タイプは5つあります:
TRIM_HORIZON
- ストリーム内で使用可能な最も古いメッセージから消費を開始します。ストリーム内のすべてのメッセージを消費する場合に、カーソルをTRIM_HORIZON
で作成します。AT_OFFSET
- 指定されたオフセットで消費を開始します。オフセットは、最も古いメッセージのオフセット以上かつ最新の公開済オフセット以下である必要があります。AFTER_OFFSET
- 指定されたオフセット後に使用を開始します。このカーソルには、AT_OFFSET
カーソルと同じ制限があります。AT_TIME
- 指定された時間から消費を開始します。戻されるメッセージのタイムスタンプは、指定された時間以降になります。LATEST
- カーソルの作成後に公開されたメッセージの消費を開始します。
個々のコンシューマに対してカーソルを作成する場合、カーソルが使用するストリーム内のパーティションを指定する必要があります。ストリームにメッセージを含むパーティションが複数ある場合は、複数のカーソルを作成してそれらを読み取る必要があります。
カーソルを作成した後、メッセージを取得して消費できます。
メッセージを消費し続けるかぎり、カーソルを再作成する必要はありません。ループの外部にカーソルを作成してメッセージを取得します。
メッセージの取得
カーソルの作成後、メッセージを取得(読取り)し、メッセージの消費を開始するカーソルを指定します。サービスは、メッセージおよびopc-next-cursor
ヘッダー値で応答します。戻されたヘッダー値は、次のGetMessagesコールで使用します。返されるカーソルはnullではありませんが、5分後に失効します。5分を超えてメッセージの消費を停止する場合は、カーソルを再作成する必要があります。
複数のコンシューマが同じパーティションから読み取っている場合、それらは同じメッセージを受信します。アプリケーションでこれらのメッセージをどのように処理するかを決定します。
パーティションにそれ以上未読メッセージが存在しない場合、ストリーミングはメッセージの空のリストを返します。
GetMessagesのバッチ・サイズは、ストリームに公開された平均メッセージ・サイズに基づきます。デフォルトでは、サービスはできるだけ多くのメッセージを返します。limit
パラメータを使用して最大10,000までの値を指定できますが、ストリームのスループットを超えないように、平均メッセージ・サイズを考慮してください。
遅延
コンシューマが遅延している(消費を上回る速度で生成している)かどうかを判断するには、メッセージのタイムスタンプを使用します。コンシューマが遅延している場合は、最初のコンシューマから一部のパーティションを取得するために、より多くのコンシューマを生成することを検討してください。1つのパーティションで遅延している場合、リカバリする方法はありません。
次のオプションを検討してください:
- より多くのパーティションを持つ新しいストリームを作成します。
- コンシューマ・グループを使用します。
- 問題の原因がホットスポットである場合は、メッセージ・キー戦略を変更します。
- メッセージ処理時間を短縮するか、リクエストを並列処理します。
特定のパーティションで消費するために残っているメッセージの数を確認するには、LATEST
タイプのカーソルを使用して、次に公開されるメッセージのオフセットを取得し、現在消費しているオフセットでデルタを作成します。
オフセットの管理
オフセットは、パーティション内のメッセージの場所を示します。コンシューマが再開する場合や、障害からリカバリする必要がある場合は、オフセットを使用してストリームからの読取りを再開できます。
個々のコンシューマを使用する場合は、コンシューマ・アプリケーションで処理されたオフセットを管理する必要があります(「オフセットの手動コミット」を参照)。コンシューマは、パーティションごとに到達または停止したオフセットを格納する責任があります。コンシューマが再開したら、処理した最後のメッセージのオフセットを読み取り、AFTER_OFFSET
タイプのカーソルを作成して、その取得したオフセットを指定します。処理した最後のメッセージのオフセットを格納するためのガイダンスは提供していません。別のストリーム、マシン上のファイル、オブジェクト・ストレージなどの任意の方法を使用できます。
メッセージのオフセットは密ではありません。オフセットは単調に増加する数値です。それらは減少せず、場合によっては2つ以上増加します。たとえば、同じパーティションに2つのメッセージを公開した場合、最初のメッセージのオフセットは42で、2番目のメッセージのオフセットは45になることがあります(オフセット43と44は存在しません)。