如果需要实时消费表中数据,您需要调用CreateTunnel接口为数据表创建一个通道,一张数据表上可以创建多个通道。在创建通道时需要指定数据表名称、通道名称和通道类型。
前提条件
已初始化TunnelClient。
已创建数据表。具体操作,请参见创建数据表。
参数
请求参数
参数 | 说明 |
TableName | 数据表名称。 |
TunnelName | 通道的名称。 |
TunnelType | 通道的类型。取值范围如下:
创建增量或者全量加增量类型的通道时,系统默认创建通道后写入的数据为增量数据。如果要消费指定时间点后的增量数据,请配置增量数据的起始时间戳(startTime)。
|
响应参数
参数 | 说明 |
TunnelId | 通道的ID。 |
ResponseInfo | 返回的一些其它字段。 |
RequestId | 当次请求的Request ID。 |
示例
创建全量类型的通道
以下示例用于为数据表创建一个全量类型的通道。
//支持创建三种类型的通道TunnelType.BaseData(全量)、TunnelType.Stream(增量)和TunnelType.BaseAndStream(全量加增量)。
//本示例为创建全量类型的通道,如果需要创建其它类型的通道,则将CreateTunnelRequest中的TunnelType设置为相应的类型。
private static void createTunnel(TunnelClient client, String tableName, String tunnelName) {
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseData);
CreateTunnelResponse resp = client.createTunnel(request);
System.out.println("RequestId: " + resp.getRequestId());
System.out.println("TunnelId: " + resp.getTunnelId());
}
创建增量或者全量加增量类型的通道
以下示例用于为数据表创建增量或全量加增量类型的通道,并指定读取的增量数据时间范围。
//创建增量或者全量加增量类型的通道,指定起始时间戳或结束时间戳,表示读取的增量数据时间范围。对于全量类型的通道,StreamTunnelConfig的配置不生效。
private static void createStreamTunnelByOffset(TunnelClient client,String tableName,String tunnelName, long startTime, long endTime){
CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest(tableName,tunnelName, TunnelType.Stream);//创建增量类型通道。
//CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest(tableName,tunnelName, TunnelType.BaseAndStream);//创建全量加增量类型通道。
StreamTunnelConfig streamTunnelConfig = new StreamTunnelConfig();
/*
指定增量数据的起始时间戳(startTime)和结束时间戳(endTime)。单位为毫秒,取值范围为[CurrentSystemTime - StreamExpiration + 5 minute, CurrentSystemTime)。
其中CurrentSystemTime为当前系统时间的毫秒单位时间戳;StreamExpiration为增量日志过期时间的毫秒单位时间戳,最大值为7天。您可以在为数据表开启Stream功能时设置。
结束时间戳的取值必须大于起始时间戳。
*/
streamTunnelConfig.setStartOffset(startTime);
streamTunnelConfig.setEndOffset(endTime);
createTunnelRequest.setStreamTunnelConfig(streamTunnelConfig);
CreateTunnelResponse resp = client.createTunnel(createTunnelRequest);
System.out.println("RequestId: " + resp.getRequestId());
System.out.println("TunnelId: " + resp.getTunnelId());
}
常见问题
使用通道服务消费数据时报错OTSTrimmedDataAccess Requested stream data is already trimmed or does not exist
相关文档
关于API说明的更多信息,请参见CreateTunnel。
如果要快速使用通道服务消费数据,请参见快速使用通道服务文档进行操作。
如果要查看指定表的所有通道信息,您可以通过获取表内的通道信息实现。更多信息,请参见获取表内的通道信息。
如果要查看指定通道的详细信息,您可以通过获取通道的具体信息实现。更多信息,请参见获取通道的具体信息。
如果不再使用某个通道,您可以删除相应通道。更多信息,请参见删除通道。
使用通道服务可以实现数据迁移。更多信息,请参见将表格存储数据表中数据同步到另一个数据表。
实时计算Flink能将通道服务的数据通道作为流式数据的输入,可实现通过Flink计算与分析表格存储数据,更多信息,请参见使用教程(宽表模型)和使用教程(时序模型)。
当前通道服务本身没有额外的费用开销。在消费通道服务数据时,表格存储会根据实际拉取的数据产生读吞吐量计量计费。更多信息,请参见计费概述。