使用訊息模型時,您需要完成Factory、MetaStore和TimelineStore的初始化。本文介紹如何完成初始化的配置和操作。
前提條件
已配置存取金鑰。具體操作,請參考配置存取金鑰。
已擷取Table Store執行個體的Endpoint。具體操作,請參考擷取執行個體Endpoint。
初始化Factory
將SyncClient作為參數,初始化StoreFactory。通過Store工廠建立Meta資料和Timeline資料的管理Store。
實現錯誤重試需要依賴SyncClient的重試策略,您可以通過配置SyncClient實現重試。如果有特殊需求,可自訂策略(只需實現RetryStrategy介面)。
/**
* 重試策略配置。
* Code: configuration.setRetryStrategy(new DefaultRetryStrategy());
**/
ClientConfiguration configuration = new ClientConfiguration();
SyncClient client = new SyncClient(
"http://instanceName.cn-shanghai.ots.aliyuncs.com",
System.getenv("OTS_AK_ENV"),
System.getenv("OTS_SK_ENV"),
"instanceName", configuration);
TimelineStoreFactory serviceFactory = new TimelineStoreFactoryImpl(client);
初始化MetaStore
構建Meta表的Schema,包含Identifier、MetaIndex等參數,通過Store工廠建立並擷取Meta的管理Store,配置參數包含Meta表名、索引名、主鍵欄位、索引類型等。
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
.addStringField("timeline_id").build();
IndexSchema metaIndex = new IndexSchema();
metaIndex.setFieldSchemas(Arrays.asList(//配置索引欄位以及欄位類型。
new FieldSchema("group_name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
new FieldSchema("create_time", FieldType.LONG).setIndex(true)
));
TimelineMetaSchema metaSchema = new TimelineMetaSchema("groupMeta", idSchema)
.withIndex("metaIndex", metaIndex); //配置索引。
TimelineMetaStore timelineMetaStore = serviceFactory.createMetaStore(metaSchema);
建表
根據metaSchema的參數建立表。如果metaSchema中配置了索引,建表成功後會建立索引。
timelineMetaStore.prepareTables();
刪表
當表存在索引時,在刪除表前,系統會先刪除索引,再刪除Store相應表。
timelineMetaStore.dropAllTables();
初始化TimelineStore
構建timeline表的Schema配置,包含Identifier、TimelineIndex等參數,通過Store工廠建立並擷取Timeline的管理Store;配置參數包含Timeline表名、索引名、主鍵欄位、索引類型等。
訊息的批量寫入,基於Tablestore的DefaultTableStoreWriter提升並發,使用者可以根據自己需求設定線程池數目。
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
.addStringField("timeline_id").build();
IndexSchema timelineIndex = new IndexSchema();
timelineIndex.setFieldSchemas(Arrays.asList(//配置索引的欄位以及欄位類型。
new FieldSchema("text", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
new FieldSchema("receivers", FieldType.KEYWORD).setIndex(true).setIsArray(true)
));
TimelineSchema timelineSchema = new TimelineSchema("timeline", idSchema)
.autoGenerateSeqId() //SequenceId 設定為自增列方式。
.setCallbackExecuteThreads(5) //設定Writer初始線程數為5。
.withIndex("metaIndex", timelineIndex); //設定索引。
TimelineStore timelineStore = serviceFactory.createTimelineStore(timelineSchema);
建表
根據TimelineSchema的參數建立表。如果TimelineSchema中配置了索引,建表成功後會建立索引。
timelineStore.prepareTables();
刪表
當表存在索引時,在刪除表前,系統會先刪除索引,再刪除Store相應的表。
timelineStore.dropAllTables();