Shard操作
Shard操作分为Shard水平扩展和Shard分裂合并两种模式,应用场景如下
Shard水平扩展不允许合并Shard,分裂合并方式则允许
使用kafka方式消费Topic必须开启Shard水平扩展
开启Shard水平扩展后,key range无法使用, 所有Shard的BeginHashKey与EndHashKey是一样的,无法按HashKey和PartitionKey方式写入数据,需要自定义在应用层hash取模,并且需要注意扩容导致的写入shard发生变化
Shard水平扩展模式
DataHub支持Topic Shard水平扩展,创建Topic时开启Shard扩展模式即可
步骤一
开启Shard扩展模式
步骤二
点击图标(如下图所示),修改Shard数量
步骤三
查看水平扩展后的Shard
shard分裂合并
DataHub 支持为Topic动态扩容/缩容,通过SplitShard/MergeShard来实现。
使用场景
DataHub具有服务弹性伸缩功能,用户可根据实时的流量调整Shard数量,来应对突发性的流量增长或达到节约资源的目的。例如在双11大促期间,大部分Topic数据流量会激增,平时的Shard数量可能完全无法满足这样的流量增长,此时可以对其中一些Shard进行Split操作,一变二,二变四,最大可扩容至256个Shard,按目前的流控限制足以达到1280MB/s的流量。在双11大促后,流量下降,多余的Shard会占用没有必要的quota,因此可以进行Merge操作,每两个Shard合并为一个,直到合适为止。
Shard属性
可以通过ListShard接口获取所有Shard的信息,每个Shard拥有如下属性, 样例:
{
"ShardId": "string",
"State": "string",
"ClosedTime": uint64,
"BeginHashKey": "string",
"EndHashKey": "string",
"ParentShardIds": [string,string,],
"LeftShardId": "string",
"RightShardId": "string"
}
SplitShard
指定一个128 bit的HashKey以及一个ShardID,通过SDK或者Console进行操作.SplitShard操作会将指定的Shard分裂为两个ChildShard,并且返回ChildShard的ID以及Key信息,同时Parent Shard会被置为CLOSED状态。例如,Split之前存在如下一个Shard:
ShardId:0 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
通过SDK进行Split操作:
String shardId = "0";
SplitShardRequest req = new SplitShardRequest(projectName, topicName, shardId, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
SplitShardResult resp = client.splitShard(req);
最终将会变成如下3个Shard:
ShardId:0 Status:CLOSED BeginHashKey:00000000000000000000000000000000
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
ShardId:1 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
EndHashKey:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
ShardId:2 Status:ACTIVE BeginHashKey:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
MergeShard
指定两个相邻的ShardID,通过SDK或者Console进行操作.MergeShard操作会将指定的两个Shard合并为一个新的Shard,并且返回新Shard的ID以及Key信息,同时两个ParentShard会被置为CLOSED状态。例如,Merge之前存在如下两个Shard:
ShardId:0 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
EndHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
ShardId:1 Status:ACTIVE BeginHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
通过SDK进行Merge操作:
String shardId = "0";
String adjacentShardId = "1";
MergeShardRequest req = new MergeShardRequest(projectName, topicName, shardId, adjacentShardId);
MergeShardResult resp = client.mergeShard(req);
最终将会变成如下3个Shard:
ShardId:0 Status:CLOSED BeginHashKey:00000000000000000000000000000000
EndHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
ShardId:1 Status:CLOSED BeginHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
ShardId:2 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
注意事项
当Shard进行Merge/Split后会被置为CLOSED状态,该状态可以继续消费读取数据,但是不可写入,也不可再次进行Merge/Split操作,当到达Topic的lifecycle后该Shard会被回收。如果配置了Connector,对应任务会在复制完该Shard数据后自动挂起,待该Shard回收后会自动删除任务。Topic在进行Merge/Split后新的Shard需要等待变为ACTIVE状态后方可正常使用,通常不会超过5秒。