初始化TableStoreWriter時,您可以按需自訂TableStoreWriter的配置以及回呼函數邏輯。本文介紹TableStoreWriter支援自訂的配置和Callback樣本。
配置項
初始化TableStoreWriter時,您可以通過修改WriterConfig
自訂TableStoreWriter的配置。
配置樣本
WriterConfig的配置樣本如下:
WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);
配置說明
參數 | 類型 | 說明 |
bucketCount | 整型 | Writer內部的分桶數。預設值為3。一個分桶相當於一個緩衝空間,用於緩衝部分資料。 此參數可用於提升按序寫並發,當未達機器瓶頸時,分桶數與寫入速率正相關。 當分桶內的寫入模式為並發寫時,保持預設配置即可。 |
bufferSize | 整型 | 記憶體中緩衝隊列的大小。預設值為1024行。此參數值必須為2的指數倍。 |
enableSchemaCheck | 布爾值 | 在資料寫入到緩衝區時,是否進行schema檢查。取值範圍如下:
|
dispatchMode | 當資料寫入到緩衝區時,將資料分發到分桶內的模式。當分桶數大於等於2時,此參數才有效。取值範圍如下:
| |
batchRequestType | Writer將緩衝區資料發送到Table Store時,構建的請求類型。取值範圍如下:
| |
concurrency | 整型 | Writer將緩衝區資料發送到Table Store時的最大請求並發數。預設值為10。 |
writeMode | Writer將緩衝區資料寫入到Table Store時,每個分桶內資料寫入到Table Store中的模式。取值範圍如下:
| |
allowDuplicatedRowInBatchRequest | 布爾值 | 構建批量請求將資料寫入Table Store時,是否允許有主鍵相同的行。預設值為true。 當資料表上存在二級索引時,Table Store會忽略此參數的配置,不允許有主鍵相同的行。此時TableStoreWriter在構建請求時會將主鍵相同的行加入到不同請求中。 |
maxBatchSize | 整型 | 一次批量請求寫入Table Store的最巨量資料量。預設值為4 MB。單位為位元組。 |
maxBatchRowsCount | 整型 | 一次批量請求寫入Table Store的最大行數。預設值為200。最大值為200。 |
callbackThreadCount | 整型 | Writer內部Callback啟動並執行線程池線程數。預設值為處理器個數。 |
callbackThreadPoolQueueSize | 整型 | Writer內部Callback啟動並執行線程池隊列大小。預設值為1024。 |
maxColumnsCount | 整型 | 當寫入資料到緩衝區時,一行最大的列數限制。預設值為128列。 |
maxAttrColumnSize | 整型 | 當寫入資料到緩衝區時,單一屬性列值的最大大小,預設值為2 MB。單位為位元組。 |
maxPKColumnSize | 整型 | 當寫入資料到緩衝區時,單一主鍵列值的最大大小。預設值為1 KB。單位為位元組。 |
flushInterval | 整型 | Writer將緩衝區資料發送到Table Store時,自動flush緩衝區的時間間隔。預設值為10000。單位為毫秒。 |
logInterval | 整型 | Writer將緩衝區資料發送到Table Store時,自動列印任務狀態的時間間隔。預設值為10000。單位為毫秒。 |
clientMaxConnections | 整型 | 內部構建Client時使用的最大串連數配置。預設值為300。 |
writerRetryStrategy | 內部構建Client時使用的重試策略。取值範圍如下:
|
Callback
TableStoreWriter通過Callback來反饋寫入行資料的成功或者失敗資訊。如果行資料寫入成功,則Writer會調用onCompleted()
函數,如果行資料寫入失敗,則Writer會根據異常的類別調用對應的onFailed()
函數。
以下樣本用於統計成功和失敗的行資料數量。
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult cc) {
//統計成功行數。
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception ex) {
//統計失敗行數。
failedRows.incrementAndGet();
}
};