使用消息模型时,您需要完成Factory、MetaStore和TimelineStore的初始化。本文介绍如何完成初始化的配置和操作。
前提条件
已配置访问密钥。具体操作,请参考配置访问密钥。
已获取表格存储实例的Endpoint。具体操作,请参考获取实例Endpoint。
初始化Factory
将SyncClient作为参数,初始化StoreFactory。通过Store工厂创建Meta数据和Timeline数据的管理Store。
实现错误重试需要依赖SyncClient的重试策略,您可以通过配置SyncClient实现重试。如果有特殊需求,可自定义策略(只需实现RetryStrategy接口)。
/**
* 重试策略配置。
* Code: configuration.setRetryStrategy(new DefaultRetryStrategy());
**/
ClientConfiguration configuration = new ClientConfiguration();
SyncClient client = new SyncClient(
"http://instanceName.cn-shanghai.ots.aliyuncs.com",
System.getenv("OTS_AK_ENV"),
System.getenv("OTS_SK_ENV"),
"instanceName", configuration);
TimelineStoreFactory serviceFactory = new TimelineStoreFactoryImpl(client);
初始化MetaStore
构建Meta表的Schema,包含Identifier、MetaIndex等参数,通过Store工厂创建并获取Meta的管理Store,配置参数包含Meta表名、索引名、主键字段、索引类型等。
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
.addStringField("timeline_id").build();
IndexSchema metaIndex = new IndexSchema();
metaIndex.setFieldSchemas(Arrays.asList(//配置索引字段以及字段类型。
new FieldSchema("group_name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
new FieldSchema("create_time", FieldType.LONG).setIndex(true)
));
TimelineMetaSchema metaSchema = new TimelineMetaSchema("groupMeta", idSchema)
.withIndex("metaIndex", metaIndex); //配置索引。
TimelineMetaStore timelineMetaStore = serviceFactory.createMetaStore(metaSchema);
建表
根据metaSchema的参数创建表。如果metaSchema中配置了索引,建表成功后会创建索引。
timelineMetaStore.prepareTables();
删表
当表存在索引时,在删除表前,系统会先删除索引,再删除Store相应表。
timelineMetaStore.dropAllTables();
初始化TimelineStore
构建timeline表的Schema配置,包含Identifier、TimelineIndex等参数,通过Store工厂创建并获取Timeline的管理Store;配置参数包含Timeline表名、索引名、主键字段、索引类型等。
消息的批量写入,基于Tablestore的DefaultTableStoreWriter提升并发,用户可以根据自己需求设置线程池数目。
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
.addStringField("timeline_id").build();
IndexSchema timelineIndex = new IndexSchema();
timelineIndex.setFieldSchemas(Arrays.asList(//配置索引的字段以及字段类型。
new FieldSchema("text", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
new FieldSchema("receivers", FieldType.KEYWORD).setIndex(true).setIsArray(true)
));
TimelineSchema timelineSchema = new TimelineSchema("timeline", idSchema)
.autoGenerateSeqId() //SequenceId 设置为自增列方式。
.setCallbackExecuteThreads(5) //设置Writer初始线程数为5。
.withIndex("metaIndex", timelineIndex); //设置索引。
TimelineStore timelineStore = serviceFactory.createTimelineStore(timelineSchema);
建表
根据TimelineSchema的参数创建表。如果TimelineSchema中配置了索引,建表成功后会创建索引。
timelineStore.prepareTables();
删表
当表存在索引时,在删除表前,系统会先删除索引,再删除Store相应的表。
timelineStore.dropAllTables();