このトピックでは、TCPクライアントSDK for C ++ V3.x.xのリリースノートについて説明します。 リリースノートには、使用状況、バージョン情報、環境要件、コンパイル手順、機能の変更が含まれます。
使用上の注意
TCPクライアントSDK for C ++ V3.x.xを使用して、名前空間を含むインスタンスのみにアクセスできます。 使用するインスタンスに名前空間が含まれていない場合は、TCPクライアントSDK for C ++ をV3.x.xにアップグレードしないでください。
デフォルトでは、すべてのApsaraMQ forRocketMQ 5.xインスタンスに名前空間が含まれます。 ApsaraMQ for RocketMQ 4.xインスタンスを使用している場合、ApsaraMQ for RocketMQコンソールの インスタンスの詳細 ページの 基本情報 セクションで、インスタンスに名前空間が含まれているかどうかを確認できます。
バージョン情報
リリース日 | Version | ダウンロードリンク |
2021-10-18 | v3.x.x |
環境要件
ONS-Client-CPPは、Apache RocketMQ 5.0にネイティブなオープンソースのクライアントSDKです。 Apache RocketMQ 5.0は、HTTP 2.0とProtobufに基づいて開発されたgRPCフレームワークを採用しています。 したがって、gRPCはSDK for C ++ V3.0.0にも必要です。 次の表に、依存関係とツールチェーンが満たす必要のあるバージョン要件を示します。
依存関係
依存関係 | Version |
grpc/grpc | 1.39.0 |
fmt | 8.0.1 |
spdlog | 1.9.2 |
ファイルシステム | 1.5.0 |
asio | 1.18.2 |
cpp_httplib | 0.9.4 |
protobuf | 3.17.2 |
ツールチェーン
オペレーティングシステム | Toolchainのバージョン |
LinuxまたはmacOS | GNUコンパイラコレクション (GCC) 4.9以降およびClang 3.4以降 |
Windows 7以降 | Visual Studio 2015以降 |
C ++ スタンダード
SDKはC ++ 11標準ライブラリを使用します。 C ++ 11以降が必要です。
コンパイル手順
オープンソースコードのコンパイル手順
Bazelをインストールします。 詳細については、「Bazelのインストール」をご参照ください。
説明Bazel 4.xにはPython 3.x. xが必要です。
オープンソースコードをダウンロードして解凍します。 次のいずれかの方法を使用して、コードをダウンロードできます。
git clone https://github.com/aliyun-mq/ons-client-cpp.git
コマンドを実行して、ソースコードを複製します。バージョン情報のリンクからオンプレミスマシンにコードをダウンロードします。
プロジェクトフォルダーで次のコマンドを実行します。 Bazelは、すべてのサードパーティの依存関係を自動的にダウンロードします。
bazel -c opt // dist/...
次のコードは、サンプル出力を提供します。
INFO: From Action dist/libons_library.pic.a: starting to run shell INFO: Elapsed time: 39.480s, Critical Path: 38.89s INFO: 2044 processes: 1796 remote cache hit, 241 internal, 7 processwrapper-sandbox. INFO: Build completed successfully, 2044 total actions
が2044
コードがコンパイルされると、マージされたすべてのオブジェクトを含む静的ライブラリがbazel-bin/dist/ons-dist.tar.gzファイルに保存されます。
root@a36849cf2f24:~/ons-client-cpp# ls -lah bazel-bin/dist/ons-dist.tar.gz -r-xr-xr-x 1 root root 15M Oct 14 08:03 bazel-bin/dist/ons-dist.tar.gz
CentOS 7のコンパイル手順
デフォルトでは、GCC 4.8.5はCentOS 7.xにインストールされます。 GCCのバージョンは、ツールチェーンのバージョン要件を満たしていません。 GCC 5.3.1を提供するdevtoolset-4をインストールする必要があります。
wget https://copr.fedorainfracloud.org/coprs/vbatts/bazel/repo/epel-7/vbatts-bazel-epel-7.repo
cp vbatts-bazel-epel-7.repo /etc/yum.repos.d/
yum install devtoolset-4-gcc devtoolset-4-gcc-c++ bazel4 python3 git -y
scl enable devtoolset-4 bash
unlink /usr/bin/python && ln -s /usr/bin/python3 /usr/bin/python
git clone git@github.com:aliyun-mq/ons-client-cpp.git
cd ons-client-cpp && bazel build //dist/...
機能の変更
注文メッセージ
MaxReconsumeTimesパラメーターのデフォルト値は、Integer.MAXから16に変更されます。 このパラメーターは、順序付きメッセージの最大再試行回数を指定します。 再試行の最大数に達した後、コンシューマーがメッセージの消費に失敗した場合、メッセージはデットレターキューに配信されます。 MaxReconsumeTimesパラメーターにカスタム値を指定できます。
放送消費量
ブロードキャスト消費モードでは、offsetStore
操作がサポートされます。 この操作では、コンシューマーがメッセージの消費を開始するコンシューマーオフセットを指定できます。 コンシューマオフセットを指定しない場合、コンシューマは最新のコンシューマオフセットからメッセージの消費を開始します。 これは以前のバージョンと一致しています。
サンプルコード:
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include "ons/MessageModel.h"
#include "ons/ONSFactory.h"
#include "rocketmq/Logger.h"
using namespace std;
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(const Message& message, ConsumeContext& context) noexcept override {
std::lock_guard<std::mutex> lk(console_mtx);
auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp();
auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp();
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID()
<< ", Body-size: " << message.getBody().size()
<< ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count()
<< "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count()
<< "ms" << std::endl;
return Action::CommitMessage;
}
};
int main(int argc, char* argv[]) {
auto& logger = rocketmq::getLogger();
logger.setLevel(rocketmq::Level::Debug);
logger.init();
std::cout << "=======Before consuming messages=======" << std::endl;
ONSFactoryProperty factory_property;
// Obtain the consumer offset by calling the offsetStore operation. This operation is supported only in broadcasting consumption mode.
factory_property.setMessageModel(ONS_NAMESPACE::MessageModel::BROADCASTING);
factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard");
PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property);
const char* topic = "cpp_sdk_standard";
const char* tag = "*";
// register your own listener here to handle the messages received.
auto* messageListener = new ExampleMessageListener();
consumer->subscribe(topic, tag);
consumer->registerMessageListener(messageListener);
// Start this consumer
consumer->start();
// Keep main thread running until process finished.
std::this_thread::sleep_for(std::chrono::minutes(15));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
プッシュモード
指定された消費スレッド数が1 ~ 1000の有効範囲内にない場合、システムがコンシューマーを作成しようとすると例外がスローされます。 このタイプの例外は、システムがコンシューマーを起動しようとするとスローされません。
消費抑制機能がサポートされています。 消費抑制機能を設定して、メッセージの消費率を制限できます。 これにより、コンシューマークライアントでのメッセージの突然の急増によって引き起こされるアプリケーションの例外を防ぐことができます。
説明消費抑制機能は、順序付けられたメッセージの再試行には適用されません。
次のサンプルコードは、消費抑制機能を設定する方法の例を示しています。
#include <chrono> #include <iostream> #include <mutex> #include <thread> #include "ons/MessageModel.h" #include "ons/ONSFactory.h" #include "rocketmq/Logger.h" using namespace std; using namespace ons; std::mutex console_mtx; class ExampleMessageListener : public MessageListener { public: Action consume(const Message& message, ConsumeContext& context) noexcept override { std::lock_guard<std::mutex> lk(console_mtx); auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp(); auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp(); std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID() << ", Body-size: " << message.getBody().size() << ", Tag: " << message.getTag() << ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count() << "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count() << "ms" << std::endl; return Action::CommitMessage; } }; int main(int argc, char* argv[]) { auto& logger = rocketmq::getLogger(); logger.setLevel(rocketmq::Level::Debug); logger.init(); const char* topic = "cpp_sdk_standard"; const char* tag = "*"; std::cout << "=======Before consuming messages=======" << std::endl; ONSFactoryProperty factory_property; factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard"); // Client-side throttling. factory_property.throttle(topic, 16); PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property); // register your own listener here to handle the messages received. auto* messageListener = new ExampleMessageListener(); consumer->subscribe(topic, tag); consumer->registerMessageListener(messageListener); // Start this consumer. consumer->start(); // Keep main thread running until process finished. std::this_thread::sleep_for(std::chrono::minutes(15)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }
メッセージトレース
パラメーター | 説明 |
AccessKey | Alibaba CloudアカウントまたはRAM (Resource Access Management) ユーザーのAccessKey ID。 AccessKey IDは、ユーザーIDの検証に使用されます。 SDKを使用するか、API操作を呼び出してApsaraMQ for RocketMQリソースを取得する場合、認証にはAccessKey IDが必要です。 |
ReachServer | ApsaraMQ for RocketMQブローカーにメッセージが到着した時刻。 |
PresetDeliverAt | スケジュールされたメッセージが配信される予定時刻。 |
ActualAvailableAt | スケジュールされたメッセージが配信された時刻。 このパラメーターの値は、スケジュールされたメッセージが使用できるようになった時刻を示します。 |
利用可能時間 | メッセージが消費の準備ができた時間。 |
コミット /RollbackTime | トランザクションメッセージがコミットまたはロールバックされた時刻。 |
コンシューマーに到達 | メッセージがコンシューマークライアントに到着した時刻。 |
処理時間待ち | メッセージがコンシューマークライアントに到着した時間と、スレッドプールがメッセージのスレッドと処理リソースを割り当てた時間との間の待機時間。 |
API操作の変更
デフォルトのログパスは、~/logs/rocketmqlogs/ons.logから ~/logs/rocketmqlogs/ons.logに変更されます。
enum Actionクラスは、グローバル名前空間からons名前空間に移動されます。
ヘッダーファイルは /onsのパスに格納されます。
Message#getStartDeliverTimeの戻り値がint64_tからstd::chrono::system_clock::timepointまたはstd::chrono:: ミリ秒に変更されます。
throws宣言はC ++ 11ではサポートされなくなったため、関数のthrows宣言は削除されます。
Producer
クラスは、noexept
操作を提供します。 この操作を呼び出して、例外のスローを無効にできます。列挙型は
namespace enum
(enum class Type
) に変更されます。
よくある質問
最新バージョンのSDKと以前のバージョンのSDKを同じプロセスで使用できますか? シンボルは競合しますか?
最新のSDKのプリコンパイル済み静的ライブラリのシンボルは、デフォルトの名前空間であるons名前空間に存在します。 これらのシンボルは、以前のバージョンのSDKで使用されていたシンボルと競合します。 ソースコードからSDKをコンパイルし、
ONS_NAMESPACE
マクロの値がonでないことを確認できます。 これにより、同じプロセスでSDKを以前のバージョンと一緒に使用できます。Bazelは、マクロを定義するための複数のメソッドを提供します。 たとえば、を使用できます。. bazelrc、プロパティを定義しますcc_libraryルールで、
cc_library# コプト
プロパティを使用してマクロを定義します。コードをデバッグするときに、シンボルテーブルを含む静的ライブラリをコンパイルする方法を教えてください。
Protobuf依存関係がバージョン要件を満たしていない場合、依存関係の競合を解決するにはどうすればよいですか?
ONS-Client-CPPは、サードパーティの依存関係のソースコードを使用します。 依存関係と一致するONS-Client-CPP依存関係のバージョンのみを指定する必要があります。
ONS-Client-CPPはRocketMQ-Client-CPPに依存します。 apache/rocketmq-client-cppリポジトリをフォークし、ons-cilent-cpp/bazel/deps.bzlの依存関係のURLをフォークされたリポジトリのURLに変更する必要があります。
オンプレミスマシンでHTTPプロキシを有効にし、
http_proxy
やgrpc_proxy
などの環境変数を宣言した後、多数のアウトバウンドメッセージがタイムアウトするのはなぜですか。gRPCベースのSDKは、http_proxy、https_proxy、grpc_proxyなどのプロキシをサポートしています。 プロキシが必要ない場合は、プロキシサイトを無視するようにno_grpc_proxyまたはno_proxy環境変数を設定できます。 詳細については、「gRPC環境変数」をご参照ください。
SDKはC ++ 98およびC ++ 03標準をサポートしていますか?
いいえ、SDKはC ++ 98またはC ++ 03標準をサポートしていません。 SDKは、gRPCをコアプロトコルとして使用し、Protobufをキーの依存関係として使用します。 gRPCおよびProtobufプロトコルは、C ++ 98またはC ++ 03標準をサポートしていません。