This topic describes the release notes for the TCP client SDK for C++ V3.x.x. The release notes include the usage notes, version information, environment requirements, compilation instructions, and feature changes.
Usage notes
You can use the TCP client SDK for C++ V3.x.x to access only instances that contain namespaces. If the instance that you use does not contain a namespace, do not upgrade the TCP client SDK for C++ to V3.x.x.
By default, all ApsaraMQ forRocketMQ 5.x instances contain namespaces. If you use an ApsaraMQ for RocketMQ 4.x instance, you can check whether the instance contains a namespace in the Basic Information section of the Instance Details page in the ApsaraMQ for RocketMQ console.
Version information
Release date | Version | Download link |
2021-10-18 | v3.x.x |
Environment requirements
ONS-Client-CPP is an open source client SDK that is native to Apache RocketMQ 5.0. Apache RocketMQ 5.0 adopts the gRPC framework, which is developed based on HTTP 2.0 and Protobuf. Therefore, gRPC is also required for the SDK for C++ V3.0.0. The following table describes the version requirements that the dependencies and toolchains must meet.
Dependency
Dependency | Version |
grpc/grpc | 1.39.0 |
fmt | 8.0.1 |
spdlog | 1.9.2 |
filesystem | 1.5.0 |
asio | 1.18.2 |
cpp_httplib | 0.9.4 |
protobuf | 3.17.2 |
Toolchains
Operating system | Toolchain version |
Linux or macOS | GNU Compiler Collection (GCC) 4.9 or later and Clang 3.4 or later |
Windows 7 or later | Visual Studio 2015 or later |
C++ Standard
The SDK uses the C++ 11 standard library. C++ 11 or later is required.
Compilation instructions
Compilation instructions for open source code
Install Bazel. For more information, see Installing Bazel.
NotePython 3.x.x is required for Bazel 4.x.
Download and decompress the open source code. You can download the code by using one of the following methods:
Run the
git clone https://github.com/aliyun-mq/ons-client-cpp.git
command to clone the source code.Download the code to your on-premises machine from the link in Version information.
Run the following command in the project folder. Bazel automatically downloads all third-party dependencies.
bazel -c opt //dist/...
The following code provides a sample output:
INFO: From Action dist/libons_library.pic.a: starting to run shell INFO: Elapsed time: 39.480s, Critical Path: 38.89s INFO: 2044 processes: 1796 remote cache hit, 241 internal, 7 processwrapper-sandbox. INFO: Build completed successfully, 2044 total actions
When the code is compiled, the static library that contains all merged objects is saved to the bazel-bin/dist/ons-dist.tar.gz file.
root@a36849cf2f24:~/ons-client-cpp# ls -lah bazel-bin/dist/ons-dist.tar.gz -r-xr-xr-x 1 root root 15M Oct 14 08:03 bazel-bin/dist/ons-dist.tar.gz
Compilation instructions for CentOS 7
By default, GCC 4.8.5 is installed in CentOS 7.x. The version of GCC does not meet the toolchain version requirement. You must install devtoolset-4 that provides GCC 5.3.1.
wget https://copr.fedorainfracloud.org/coprs/vbatts/bazel/repo/epel-7/vbatts-bazel-epel-7.repo
cp vbatts-bazel-epel-7.repo /etc/yum.repos.d/
yum install devtoolset-4-gcc devtoolset-4-gcc-c++ bazel4 python3 git -y
scl enable devtoolset-4 bash
unlink /usr/bin/python && ln -s /usr/bin/python3 /usr/bin/python
git clone git@github.com:aliyun-mq/ons-client-cpp.git
cd ons-client-cpp && bazel build //dist/...
Feature changes
Ordered messages
The default value of the MaxReconsumeTimes parameter is changed from Integer.MAX to 16. This parameter specifies the maximum number of retries for ordered messages. If a consumer fails to consume a message after the maximum number of retries is reached, the message is delivered to a dead-letter queue. You can specify a custom value for the MaxReconsumeTimes parameter.
Broadcasting consumption
In broadcasting consumption mode, the offsetStore
operation is supported. You can call this operation to specify the consumer offset from which a consumer starts to consume messages. If you do not specify the consumer offset, the consumer starts message consumption from the latest consumer offset. This is consistent with previous versions.
Sample code:
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include "ons/MessageModel.h"
#include "ons/ONSFactory.h"
#include "rocketmq/Logger.h"
using namespace std;
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(const Message& message, ConsumeContext& context) noexcept override {
std::lock_guard<std::mutex> lk(console_mtx);
auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp();
auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp();
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID()
<< ", Body-size: " << message.getBody().size()
<< ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count()
<< "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count()
<< "ms" << std::endl;
return Action::CommitMessage;
}
};
int main(int argc, char* argv[]) {
auto& logger = rocketmq::getLogger();
logger.setLevel(rocketmq::Level::Debug);
logger.init();
std::cout << "=======Before consuming messages=======" << std::endl;
ONSFactoryProperty factory_property;
// Obtain the consumer offset by calling the offsetStore operation. This operation is supported only in broadcasting consumption mode.
factory_property.setMessageModel(ONS_NAMESPACE::MessageModel::BROADCASTING);
factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard");
PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property);
const char* topic = "cpp_sdk_standard";
const char* tag = "*";
// register your own listener here to handle the messages received.
auto* messageListener = new ExampleMessageListener();
consumer->subscribe(topic, tag);
consumer->registerMessageListener(messageListener);
// Start this consumer
consumer->start();
// Keep main thread running until process finished.
std::this_thread::sleep_for(std::chrono::minutes(15));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
Push mode
If the specified number of consumption threads is not within the valid range of 1 to 1000, an exception is thrown when the system attempts to create a consumer. This type of exception is not thrown when the system attempts to start a consumer.
The consumption throttling feature is supported. You can configure the consumption throttling feature to limit the consumption rate of messages. This helps prevent application exceptions that are caused by sudden surges of messages on consumer clients.
NoteThe consumption throttling feature does not apply to the retries of ordered messages.
The following sample code provides an example on how to configure the consumption throttling feature:
#include <chrono> #include <iostream> #include <mutex> #include <thread> #include "ons/MessageModel.h" #include "ons/ONSFactory.h" #include "rocketmq/Logger.h" using namespace std; using namespace ons; std::mutex console_mtx; class ExampleMessageListener : public MessageListener { public: Action consume(const Message& message, ConsumeContext& context) noexcept override { std::lock_guard<std::mutex> lk(console_mtx); auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp(); auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp(); std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID() << ", Body-size: " << message.getBody().size() << ", Tag: " << message.getTag() << ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count() << "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count() << "ms" << std::endl; return Action::CommitMessage; } }; int main(int argc, char* argv[]) { auto& logger = rocketmq::getLogger(); logger.setLevel(rocketmq::Level::Debug); logger.init(); const char* topic = "cpp_sdk_standard"; const char* tag = "*"; std::cout << "=======Before consuming messages=======" << std::endl; ONSFactoryProperty factory_property; factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard"); // Client-side throttling. factory_property.throttle(topic, 16); PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property); // register your own listener here to handle the messages received. auto* messageListener = new ExampleMessageListener(); consumer->subscribe(topic, tag); consumer->registerMessageListener(messageListener); // Start this consumer. consumer->start(); // Keep main thread running until process finished. std::this_thread::sleep_for(std::chrono::minutes(15)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }
Message traces
Parameter | Description |
AccessKey | The AccessKey ID of your Alibaba Cloud account or Resource Access Management (RAM) user. AccessKey IDs are used to verify user identities. When you use SDKs or call API operations to obtain ApsaraMQ for RocketMQ resources, the AccessKey ID is required for authentication. |
ReachServer | The time when the message arrived at the ApsaraMQ for RocketMQ broker. |
PresetDeliverAt | The scheduled point in time when the scheduled message was to be delivered. |
ActualAvailableAt | The time when the scheduled message was delivered. The value of this parameter indicates the time when the scheduled message became ready for consumption. |
Available Time | The time when the message became ready for consumption. |
Commit/RollbackTime | The time when the transactional message was committed or rolled back. |
Arrive at Consumer At | The time when the message arrived at the consumer client. |
Wait Duration before Processing | The wait duration between the time when the message arrived at the consumer client and the time when the thread pool allocated threads and processing resources for the message. |
Changes in API operations
The default log path is changed from ~/logs/rocketmqlogs/ons.log to ~/logs/rocketmqlogs/ons.log.
The enum Action class is moved from the global namespace to the ons namespace.
Header files are stored in the /ons path.
The return value of Message#getStartDeliverTime is changed from int64_t to std::chrono::system_clock::timepoint or std::chrono::milliseconds.
The throws declaration for functions is deleted because the throws declaration is no longer supported in C++ 11.
The
Producer
class provides anoexcept
operation. You can call the operation to disable exceptions from being thrown.Enumeration types are changed to
namespace enum
, namely,enum class Type
.
FAQ
Can I use an SDK of the latest version together with an SDK of an earlier version in the same process? Do the symbols conflict?
Symbols for the precompiled static library of the latest SDK exist in the ons namespace, which is the default namespace. These symbols conflict with the symbols that are used in earlier versions of the SDK. You can compile your SDK from the source code and make sure that the value of the
ONS_NAMESPACE
macro is not ons. This way, you can use your SDK together with an earlier version in the same process.Bazel provides multiple methods for defining macros. For example, you can use .bazelrc, the defines property in cc_library rules, and the
cc_library#copts
property to define a macro.How do I compile a static library that contains a symbol table when I debug the code?
Run the following command:
bazel -c dbg //dist/...
For information about compilation options, see Bazel user guide.
How do I resolve dependency conflicts if my Protobuf dependency does not meet the version requirement?
ONS-Client-CPP uses the source code of third-party dependencies. You need to only specify the versions of ONS-Client-CPP dependencies that are consistent with your dependency.
ONS-Client-CPP depends on RocketMQ-Client-CPP. You must fork the apache/rocketmq-client-cpp repository, and change the URL of the dependency in ons-cilent-cpp/bazel/deps.bzl to the URL of the forked repository.
Why do a large number of outbound messages time out after I enable an HTTP proxy on my on-premises machine and declare environment variables such as
http_proxy
andgrpc_proxy
?The gRPC-based SDK supports proxies such as http_proxy, https_proxy, and grpc_proxy. If no proxy is required, you can configure the no_grpc_proxy or no_proxy environment variable to ignore the proxy site. For more information, see gRPC environment variables.
Does the SDK support the C++ 98 and C++ 03 standards?
No, the SDK does not support the C++ 98 or C++ 03 standard. The SDK uses gRPC as the core protocol and Protobuf as the key dependency. The gRPC and Protobuf protocols do not support the C++ 98 or C++ 03 standard.