通过通道服务功能,您可以消费处理表中数据。本文介绍如何使用 Java SDK 快速体验通道服务。
注意事项
TunnelWorkerConfig 中默认会启动读数据和处理数据的线程池。如果使用的是单台机器,当需要启动多个 TunnelWorker 时,建议共用一个 TunnelWorkerConfig。
TunnelWorker 的初始化需要预热时间,该值受 TunnelWorkerConfig 中的 heartbeatIntervalInSec 参数影响,可以通过 TunnelWorkerConfig 中的 setHeartbeatIntervalInSec 方法配置,默认为 30 s。
当客户端(TunnelWorker)没有被正常 shutdown 时(例如异常退出或者手动结束),TunnelWorker 会自动进行资源的回收,包括释放线程池,自动调用用户在 Channel 上注册的 shutdown 方法,关闭 Tunnel 连接等。
Tunnel 的增量日志保留时间,其数值与数据表中 Stream 的日志过期时长(最长时长为 7 天)保持一致,因此 Tunnel 的增量日志最多保留 7 天。
增量或者全量加增量类型 Tunnel 消费数据时,可能会出现以下情况:
当 Tunnel 处于全量阶段时,如果全量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,将会触发
OTSTunnelExpired
错误,从而导致无法继续消费后续数据。如果您预计全量数据无法在指定时间内完成消费,请及时联系表格存储技术支持进行咨询。
当 Tunnel 处于增量阶段时,如果增量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,Tunnel 将可能从最近可消费的数据处开始消费,因此存在漏消费数据的风险。
Tunnel 过期后,表格存储可能会禁用该 Tunnel。如果禁用状态持续超过 30 天,则该 Tunnel 将被彻底删除,删除后将无法恢复。
前提条件
在访问控制 RAM 服务侧完成如下操作:
已创建 RAM 用户并为 RAM 用户授予管理表格存储权限
AliyunOTSFullAccess
。具体操作,请参见创建 RAM 用户和为 RAM 用户授权。说明在实际业务环境中,建议您遵循最小化授权原则,避免权限过大带来的安全风险。
已为 RAM 用户创建 AccessKey。具体操作,请参见创建 AccessKey。
警告阿里云账号 AccessKey 泄露会威胁您所有资源的安全。建议您使用 RAM 用户 AccessKey 进行操作,这可以有效降低 AccessKey 泄露的风险。
在表格存储服务侧完成如下操作:
已创建数据表。具体操作,请参见使用控制台创建数据表、使用命令行工具创建数据表或使用SDK创建数据表。
已获取实例域名地址(Endpoint)。具体操作,请参见获取实例Endpoint。
已配置访问凭证。具体操作,请参见配置访问凭证。
体验通道服务
使用 Java SDK 最小化地体验通道服务。
初始化 TunnelClient。
说明在运行本代码示例之前,请确保已设置环境变量
TABLESTORE_ACCESS_KEY_ID
和TABLESTORE_ACCESS_KEY_SECRET
,这两个变量分别对应阿里云账号或 RAM 用户的 AccessKey ID 和 AccessKey Secret。//endPoint为表格存储实例的endPoint,例如https://instance.cn-hangzhou.ots.aliyuncs.com。 //accessKeyId和accessKeySecret分别为访问表格存储服务的AccessKey的Id和Secret。 //instanceName为实例名称。 final String endPoint = ""; final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID"); final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET"); final String instanceName = ""; TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
创建通道。
请提前创建一张测试表或者使用已有的一张数据表。如果需要新建测试表,可以使用 SyncClient 中的 createTable 方法或者使用官网控制台等方式创建。
//支持创建TunnelType.BaseData(全量)、TunnelType.Stream(增量)、TunnelType.BaseAndStream(全量加增量)三种类型的Tunnel。 //如下示例为创建全量加增量类型的Tunnel,如果需创建其它类型的Tunnel,则将CreateTunnelRequest中的TunnelType设置为相应的类型。 final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); //tunnelId用于后续TunnelWorker的初始化,该值也可以通过ListTunnel或者DescribeTunnel获取。 String tunnelId = resp.getTunnelId(); System.out.println("Create Tunnel, Id: " + tunnelId);
根据业务自定义数据消费 Callback 函数,开始自动化的数据消费。 TunnelClient 的自定义配置请参见下面的表格。
//根据业务自定义数据消费Callback函数,即实现IChannelProcessor接口(process和shutdown)。 private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { //ProcessRecordsInput中包含有拉取到的数据。 System.out.println("Default record processor, would print records count"); System.out.println( //NextToken用于Tunnel Client的翻页。 String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { //模拟消费处理。 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } //TunnelWorkerConfig默认会启动读数据和处理数据的线程池。如果使用的是单台机器,则会启动多个TunnelWorker。 //建议共用一个TunnelWorkerConfig,TunnelWorkerConfig中包括更多的高级参数。 TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); //配置TunnelWorker,并启动自动化的数据处理任务。 TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
配置 TunnelWorkerConfig
TunnelWorkerConfig 提供 Tunnel Client 的自定义配置,可根据实际需要配置参数,Java SDK 中的参数说明请参见下表。
配置 | 参数 | 说明 |
Heartbeat 的间隔和超时时间 | heartbeatTimeoutInSec | Heartbeat 的超时间隔。默认值为 300 s。 当 Heartbeat 发生超时,Tunnel 服务端会认为当前 TunnelClient 不可用(失活),客户端需要重新进行 ConnectTunnel。 |
heartbeatIntervalInSec | 进行 Heartbeat 的间隔。 默认值为 30 s,最小支持配置到 5 s,单位为 s。 Heartbeat 用于活跃 Channel 的探测、Channel 状态的更新、(自动化)数据拉取任务的初始化等。 | |
记录消费位点的时间间隔 | checkpointIntervalInMillis | 用户消费完数据后,向 Tunnel 服务端进行记录消费位点操作(checkpoint)的时间间隔。 默认值为 5000 ms,单位为 ms。 说明
|
客户端的自定义标识 | clientTag | 客户端的自定义标识,可以生成 Tunnel Client ID,用于区分 TunnelWorker。 |
数据处理的自定义 Callback | channelProcessor | 用户注册的处理数据的 Callback,包括 process 和 shutdown 方法。 |
数据读取和数据处理的线程池资源配置 | readRecordsExecutor | 用于数据读取的线程池资源。无特殊需求,建议使用默认的配置。 |
processRecordsExecutor | 用于处理数据的线程池资源。无特殊需求,建议使用默认的配置。 说明
| |
内存控制 | maxChannelParallel | 读取和处理数据的最大 Channel 并行度,可用于内存控制。 默认值为 -1,表示不限制最大并行度。 说明 仅 Java SDK 5.10.0 及以上版本支持此功能。 |
最大退避时间配置 | maxRetryIntervalInMillis | Tunnel 的最大退避时间基准值,最大退避时间在此基准值附近随机变化,具体范围为 默认值为 2000 ms,最小值为 200 ms。 说明
|
CLOSING 分区状态检测 | enableClosingChannelDetect | 是否开启 CLOSING 分区实时检测。默认值为 false,表示不开启 CLOSING 分区实时检测。 说明
|
附录:完整代码
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
System.out.println("Default record processor, would print records count");
System.out.println(
//NextToken用于Tunnel Client的翻页。
String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
//模拟消费处理。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1.初始化Tunnel Client。
final String endPoint = "";
final String accessKeyId = System.getenv("OTS_AK_ENV");
final String accessKeySecret = System.getenv("OTS_SK_ENV");
final String instanceName = "";
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2.创建新通道(此步骤需要提前创建一张测试表,可以使用SyncClient的createTable或者使用官网控制台等方式创建)。
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
//tunnelId用于后续TunnelWorker的初始化,该值也可以通过ListTunnel或者DescribeTunnel获取。
String tunnelId = resp.getTunnelId();
System.out.println("Create Tunnel, Id: " + tunnelId);
//3.用户自定义数据消费Callback,开始自动化的数据消费。
//TunnelWorkerConfig中有更多的高级参数。
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}