このトピックでは、TCPクライアントSDK for Java 2.x. x.Finalのリリースノートについて説明します。使用状況、バージョン情報、環境要件、機能の変更について説明します。
使用上の注意
Java用TCPクライアントSDKをV2.x.x.Finalにアップグレードできるのは、中国 (杭州) 、中国 (青島) 、中国 (北京) 、中国 (張家口) 、中国 (フフホト) 、中国 (深セン) 、中国 (成都) 、中国 (香港) 、ドイツ (フランクフルト) 、インドネシア (ジャカルタ) のリージョンのみです。 他のリージョンでは、Java用TCPクライアントSDKをV2.x.x.Finalにアップグレードしないでください。 それ以外の場合、SDKを使用してApsaraMQ for RocketMQに接続することはできません。
TCPクライアントSDK for Java 2.xを使用できます。 x. 仮想プライベートクラウド (VPC) でのみApsaraMQ for RocketMQにアクセスする場合の最終。
既存のApsaraMQ for RocketMQインスタンスを使用し、クラシックネットワーク内のインスタンスにアクセスする場合、TCPクライアントSDK for Javaを2.x. x.Finalにアップグレードしないでください。 それ以外の場合、ApsaraMQ for RocketMQインスタンスにアクセスできません。
TCPクライアントSDK for Java 2.x. x.Finalを使用して、名前空間を含むインスタンスのみにアクセスできます。 使用するインスタンスに名前空間が含まれていない場合は、TCPクライアントSDK for Javaを2.x. x.Finalにアップグレードしないでください。
デフォルトでは、すべてのApsaraMQ for RocketMQ 5.xインスタンスに名前空間が含まれます。 ApsaraMQ for RocketMQ 4.xインスタンスを使用している場合、ApsaraMQ for RocketMQコンソールの インスタンスの詳細 ページの 基本情報 セクションで、インスタンスに名前空間が含まれているかどうかを確認できます。
バージョン情報
リリース日 | Version | ダウンロードリンク |
2023-02-23 | 2.0.5. ファイナル | |
2022-08-17 | 2.0.3。ファイナル | |
2022-06-16 | 2.0.2. ファイナル | |
2021-11-29 | 2.0.1. ファイナル | |
2021-10-18 | 2.0.0. ファイナル |
環境要件
SDK for Java V2.x.xを使用する場合は、Java 8以降をサポートするJava開発キット (JDK) を使用するようにしてください。
V2.0.5でのフィーチャーの変更
最適化された機能
非同期ログがサポートされています。
固定された問題
バッチ消費の待ち時間は指定できません。
特定のセキュリティ脆弱性が修正されました。
V2.0.3での機能の変更
修正された問題
スレッドプール内のスレッド数は、それ以降のJDKバージョンでは32を超える値に増やすことはできません。
V2.0.2での機能の変更
修正された問題
メッセージ送信中にデッドロックがトリガーされる場合があります。
V2.0.1での機能の変更
メッセージトレース
トレースクエリ結果で返されるデータが増えます。
V2.0.0での機能の変更
注文メッセージ
MaxReconsumeTimesパラメーターのデフォルト値は、Integer.MAXから16に変更されます。 このパラメーターは、順序付きメッセージの最大再試行回数を指定します。 再試行の最大数に達しても、コンシューマーがメッセージの消費に失敗した場合、メッセージはデットレターキューに配信されます。 MaxReconsumeTimesパラメーターの値を変更して、順序付きメッセージの最大再試行回数を変更できます。
トランザクションメッセージ
プロデューサーがメッセージを送信するとき、LocalTransactionExecutor
クラスがnullの場合、例外がスローされます。 以前のバージョンでは、このエラーに対して例外がスローされません。
放送消費量
ブロードキャスト消費モードでは、offsetStore
操作を呼び出して、消費を開始する消費者オフセットを指定できます。 消費者オフセットを指定しない場合、消費は最新の消費者オフセットから開始されます。 これは以前のバージョンと一致しています。
サンプルコード:
public class BroadcastingConsumerExample {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId");
properties.put(PropertyKeyConst.AccessKey, "MyAccessKey");
properties.put(PropertyKeyConst.SecretKey, "MySecretKey");
// The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
// You can obtain the consumer offset by calling the offsetStore operations only in broadcasting consumption mode.
properties.put(PropertyKeyConst.MessageModel, MessageModel.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
// The frequency of calls for the AbstractOffsetStore method. The value 1 specifies that the system calls the AbstractOffsetStore method every second to set a persistent offset.
OffsetStore offsetStore = new AbstractOffsetStore(1) {
@Override
public Map<TopicPartition, Long> loadOffset() {
// The logic that is used to obtain the offset from an external storage system.
}
@Override
public void persistOffset(Map<TopicPartition, Long> offsetTable) {
// The logic that is used to persist the offset to an external storage system.
}
};
offsetStore.start();
consumer.setOffsetStore(offsetStore);
consumer.subscribe("testBroadcastingTopic", "TagA", new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) {
// The logic that is used to consume messages.
return Action.CommitMessage;
}
});
consumer.start();
Thread.sleep(100000);
consumer.shutdown();
offsetStore.shutdown();
}
}
プッシュモードでの消費
通常のメッセージのバッチ消費はサポートされていません。
指定された消費スレッド数が1 ~ 1,000の有効範囲内にない場合、コンシューマーを作成するときにシステムは例外をスローします。
消費抑制機能が追加されました。 メッセージ消費率を制限するように消費抑制機能を設定できます。 これにより、コンシューマークライアントでのメッセージの突然の急増によって引き起こされるアプリケーションの例外を防ぐことができます。 次のコードは、カスタムメッセージ消費率を指定する方法の例を示しています。
説明消費抑制機能は、順序付けられたメッセージの再試行には適用されません。
public class RateLimitConsumerExample { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId"); properties.put(PropertyKeyConst.AccessKey, "MyAccessKey"); properties.put(PropertyKeyConst.SecretKey, "MySecretKey"); // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Consumer consumer = ONSFactory.createConsumer(properties); // Specify the message consumption rate in testTopicA. In this example, the consumer client can consume only 10 messages per second. consumer.rateLimit("testTopicA", 10); consumer.subscribe("testTopic", "TagA", new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { // The logic that is used to consume messages. return Action.CommitMessage; } }); consumer.start(); Thread.sleep(100000); consumer.shutdown(); } }
Pullモードでの消費
Pullモードでの消費はサポートされていません。
ログ設定
デフォルトのログパスは、~/logs/ons.logから ~/logs/ons/ons-client.logに変更されます。
ログレベルは、ログバックフレームワークでサポートされているログレベルと一致します。 OFF、TRACE、ALLのログレベルが追加されます。 以前のバージョンでは、ERROR、WARN、INFO、およびDEBUGログレベルのみがサポートされています。
環境変数は、ログ関連のパラメーターを追加するためにサポートされます。 以前のバージョンは
-D
のみをサポートしています。
クライアントの作成
無効なエンドポイントが指定されている場合、システムがプロデューサーまたはコンシューマーを作成しようとすると、例外がスローされます。
メッセージトレース
最新のSDKを使用してメッセージを送受信する場合、次のパラメーターがトレースクエリ結果に追加されます。
パラメーター | 説明 |
AccessKey | Alibaba CloudアカウントまたはRAM (Resource Access Management) ユーザーのAccessKey ID。 AccessKey IDは、ユーザーIDの検証に使用されます。 SDKを使用するか、API操作を呼び出してApsaraMQ for RocketMQリソースを取得する場合、認証にはAccessKey IDが必要です。 |
ReachServer | ApsaraMQ for RocketMQブローカーにメッセージが到着した時刻。 |
PresetDeliverAt | スケジュールされたメッセージが配信される予定時刻。 |
ActualAvailableAt | スケジュールされたメッセージが配信された時刻。 このパラメーターの値は、スケジュールされたメッセージが使用できるようになった時刻を示します。 |
利用可能時間 | メッセージが消費の準備ができた時間。 |
コミット /RollbackTime | トランザクションメッセージがコミットまたはロールバックされた時刻。 |
コンシューマーに到達 | メッセージがコンシューマークライアントに到着した時刻。 |
処理時間待ち | メッセージがコンシューマークライアントに到着した時間と、スレッドプールがメッセージのスレッドと処理リソースを割り当てた時間との間の待機時間。 |
V2.0.0の問題を修正
次の問題が修正されました。updateCredential操作が短期間に複数回呼び出されると、AccessKey ID、AccessKey secret、およびSTSトークンが更新されるとアトミック性が保証されないため、Security Token Service (STS) を使用したRAMロールのクロスアカウント認証は失敗します。
STS認証用のサンプルSDKコードの詳細については、「手順2: Alibaba Cloudアカウント間のリソースへのアクセス」をご参照ください。