本文介绍TCP协议下Java SDK 2.x.x.Final的版本说明,包括使用限制、版本的基本信息、环境要求以及和历史版本相比各功能特性的变更内容。
使用限制
目前仅华东1(杭州)、华北1(青岛)、华北2(北京)、华北3(张家口)、华北5(呼和浩特)、华南1(深圳)、西南1(成都)、中国香港、德国(法兰克福)和印度尼西亚(雅加达)地域支持将客户端升级为2.x.x.Final版本,其他地域请勿将SDK升级到Java SDK 2.x.x Final版本,否则将无法访问云消息队列 RocketMQ 版服务。
Java SDK 2.x.x.Final仅支持通过VPC网络访问云消息队列 RocketMQ 版,不支持经典网络访问。
若您使用存量云消息队列 RocketMQ 版实例并通过经典网络访问,请勿将Java SDK升级到V2.x.x.Final版本,否则将无法访问云消息队列 RocketMQ 版实例。
Java SDK 2.x.x.Final仅支持有命名空间的实例,若您使用的实例无命名空间,请勿将客户端版本升级到Java SDK 2.x.x.Final。
5.x版本实例默认都有命名空间,4.x版本实例可在云消息队列 RocketMQ 版控制台实例详情页面的基础信息区域查看是否有命名空间。
版本信息
发布时间 | 版本号 | 下载链接 |
2023-02-23 | 2.0.5.Final | |
2022-08-17 | 2.0.3.Final | |
2022-06-16 | 2.0.2.Final | |
2021-11-29 | 2.0.1.Final | |
2021-10-18 | 2.0.0.Final |
环境要求
使用V2.x.x版本Java SDK,请确保您使用的JDK版本为Java 8及以上。
V2.0.5功能变更
功能优化
日志增加异步支持。
问题修复
修复批量消费等待时间无法指定的问题。
修复部分安全漏洞。
V2.0.3功能变更
问题修复
修复高版本JDK中,消费线程池无法调节至32个线程以上的问题。
V2.0.2功能变更
问题修复
修复消息发送时,有小概率触发死锁的问题。
V2.0.1功能变更
消息轨迹
补充消息轨迹数据。
V2.0.0功能变更
顺序消息
顺序消息的最大重试次数MaxReconsumeTimes参数的默认值从Integer.MAX变更为16次。超过最大重试次数消息还未被消费成功将直接被投递至死信队列。您可以通过自定义MaxReconsumeTimes参数值修改顺序消息的最大重试次数。
事务消息
若生产者发送消息时LocalTransactionExecuter
类为null,则系统会抛出异常,历史版本则不会抛出异常。
广播消费
广播消费模式下,支持使用offsetStore
接口的方式定制消费者启动时的消费位点。若未设置,默认和历史版本一致直接从最新消费位点开始消费。
示例代码如下:
public class BroadcastingConsumerExample {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId");
properties.put(PropertyKeyConst.AccessKey, "MyAccessKey");
properties.put(PropertyKeyConst.SecretKey, "MySecretKey");
// 设置TCP协议的接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
// 从OffsetStore读取消费位点的功能仅支持广播消费模式。
properties.put(PropertyKeyConst.MessageModel, MessageModel.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
// 参数1表示1秒钟会调用一次AbstractOffsetStore方法来进行位点的持久化。
OffsetStore offsetStore = new AbstractOffsetStore(1) {
@Override
public Map<TopicPartition, Long> loadOffset() {
// 实现从外部存储读取位点的逻辑。
}
@Override
public void persistOffset(Map<TopicPartition, Long> offsetTable) {
// 持久化位点到外部存储的逻辑。
}
};
offsetStore.start();
consumer.setOffsetStore(offsetStore);
consumer.subscribe("testBroadcastingTopic", "TagA", new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) {
// 实现消息消费相关逻辑。
return Action.CommitMessage;
}
});
consumer.start();
Thread.sleep(100000);
consumer.shutdown();
offsetStore.shutdown();
}
}
Push消费
暂不支持普通消息的批量消费功能。
如果设置的消费线程数不在合法区间[1,1000]内,系统会在创建消费者时抛出异常,而不是在启动消费者时抛出异常。
新增消费速度限流功能。为了避免消息洪峰可能对消费端应用产生应冲击,您可通过该功能限制消息的消费速度,保护消费端应用。自定义消费速度的示例代码如下:
说明顺序消息的消息重试不受限流控制。
public class RateLimitConsumerExample { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId"); properties.put(PropertyKeyConst.AccessKey, "MyAccessKey"); properties.put(PropertyKeyConst.SecretKey, "MySecretKey"); // 设置TCP协议的接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Consumer consumer = ONSFactory.createConsumer(properties); // 对testTopicA进行消费限流,限制消费端每秒钟消费10条消息。 consumer.rateLimit("testTopicA", 10); consumer.subscribe("testTopic", "TagA", new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { // 实现消息消费相关逻辑。 return Action.CommitMessage; } }); consumer.start(); Thread.sleep(100000); consumer.shutdown(); } }
Pull消费
暂不支持Pull消费方式。
日志配置
日志默认路径由~/logs/ons.log变更为~/logs/ons/ons-client.log。
日志级别与logback完全对齐。除已有的ERROR、WARN、INFO、DEBUG级别之外,增加对OFF、TRACE和ALL级别的支持。
所有日志相关参数的添加方式除了
-D
方式以外,增加对环境变量的支持。
客户端创建
设置非法的接入点时,系统会在创建发送者或消费者时抛出异常,而不是在启动发送者或消费者时抛出异常。
消息轨迹
使用最新版本SDK收发消息,查询消息轨迹时,新增返回如下轨迹参数。
参数 | 说明 |
AccessKey | 您的阿里云账号或RAM用户的AccessKey ID,用于标识用户。当您通过SDK或API调用云消息队列 RocketMQ 版资源时,需要使用AccessKey ID进行身份验证。 |
到达Server | 消息到达消息队列RocketMQ版服务端的时间。 |
预设DeliverAt | 定时消息的预计投递时间。 |
实际AvailableAt | 定时消息定时结束的时间。即消息可被消费者消费的开始时间。 |
Available Time | 消息可被消费者消费的开始时间。 |
提交/回滚时间 | 事务消息提交或回滚的时间。 |
到达消费端 | 消息到达消费者客户端的时间。 |
等待处理耗时 | 消息到达消费者客户端,等待线程池分配线程和分配处理资源的耗时。 |
V2.0.0问题修复
修复RAM角色实现跨云账号STS授权场景下,updateCredential方法调用频率较高时,三元组(AccessKey ID、AccessKey Secret和STS Token)更新不具备原子性而导致的鉴权失败问题。
STS授权的SDK示例代码,请参见STS Token配置示例。