全部產品
Search
文件中心

Tablestore:TableStoreWriter配置項

更新時間:Dec 06, 2024

初始化TableStoreWriter時,您可以按需自訂TableStoreWriter的配置以及回呼函數邏輯。本文介紹TableStoreWriter支援自訂的配置和Callback樣本。

配置項

初始化TableStoreWriter時,您可以通過修改WriterConfig自訂TableStoreWriter的配置。

配置樣本

WriterConfig的配置樣本如下:

WriterConfig config = new WriterConfig();
config.setBucketCount(3);
config.setBufferSize(1024);
config.setEnableSchemaCheck(true);
config.setDispatchMode(DispatchMode.HASH_PARTITION_KEY);
config.setBatchRequestType(BatchRequestType.BATCH_WRITE_ROW);
config.setConcurrency(10);
config.setWriteMode(WriteMode.PARALLEL);
config.setAllowDuplicatedRowInBatchRequest(true);
config.setMaxBatchSize(4 * 1024 * 1024);
config.setMaxBatchRowsCount(200);
config.setCallbackThreadCount(16);
config.setCallbackThreadPoolQueueSize(1024);
config.setMaxColumnsCount(128);
config.setMaxAttrColumnSize(2 * 1024 * 1024);
config.setMaxPKColumnSize(1024);
config.setFlushInterval(10000);
config.setLogInterval(10000);
config.setClientMaxConnections(300);
config.setWriterRetryStrategy(WriterRetryStrategy.CERTAIN_ERROR_CODE_NOT_RETRY);

配置說明

參數

類型

說明

bucketCount

整型

Writer內部的分桶數。預設值為3。一個分桶相當於一個緩衝空間,用於緩衝部分資料。

此參數可用於提升按序寫並發,當未達機器瓶頸時,分桶數與寫入速率正相關。

當分桶內的寫入模式為並發寫時,保持預設配置即可。

bufferSize

整型

記憶體中緩衝隊列的大小。預設值為1024行。此參數值必須為2的指數倍。

enableSchemaCheck

布爾值

在資料寫入到緩衝區時,是否進行schema檢查。取值範圍如下:

  • true(預設值):開啟schema檢查。在行資料寫入緩衝區前,TableStoreWriter會對該行資料進行如下檢查:

    • 該行的主鍵的schema是否與表定義的相同。

    • 該行的主鍵列或屬性列的值大小是否超過限制。

    • 該行的屬性列的個數是否超過限制。

    • 屬性列中是否有列名與主鍵列相同。

    • 該行的大小是否超過一次批量請求匯入的最巨量資料量限制。

    如果行資料未通過上述檢查,則TableStoreWriter會判定行資料為髒資料,不會寫入到緩衝區中。

  • false:在行資料寫入緩衝區時不檢查schema。

    如果緩衝區中的部分行資料為髒資料,則TableStoreWriter將行資料寫入到Table Store時,對應行資料會寫入失敗。

dispatchMode

DispatchMode

當資料寫入到緩衝區時,將資料分發到分桶內的模式。當分桶數大於等於2時,此參數才有效。取值範圍如下:

  • HASH_PARTITION_KEY(預設值):基於分區鍵雜湊值做分桶進行分發,保證同分區的資料處於一桶內按序寫入。

  • HASH_PRIMARY_KEY:基於主鍵雜湊值做分桶進行分發,保證同主鍵的資料處於一個桶內按序寫入。

  • ROUND_ROBIN:迴圈遍曆每個分桶進行分發。資料隨機分散在不同分桶中。

batchRequestType

BatchRequestType

Writer將緩衝區資料發送到Table Store時,構建的請求類型。取值範圍如下:

  • BATCH_WRITE_ROW(預設值):構建BatchWriteRowRequest。

  • BULK_IMPORT:構建BulkImportRequest。

concurrency

整型

Writer將緩衝區資料發送到Table Store時的最大請求並發數。預設值為10。

writeMode

WriteMode

Writer將緩衝區資料寫入到Table Store時,每個分桶內資料寫入到Table Store中的模式。取值範圍如下:

  • PARALLEL(預設值):並發寫。不同桶間並發,同一個桶內也會並行請求。

  • SEQUENTIAL:串列寫。不同桶間並發,同一個桶內串列請求。

allowDuplicatedRowInBatchRequest

布爾值

構建批量請求將資料寫入Table Store時,是否允許有主鍵相同的行。預設值為true。

當資料表上存在二級索引時,Table Store會忽略此參數的配置,不允許有主鍵相同的行。此時TableStoreWriter在構建請求時會將主鍵相同的行加入到不同請求中。

maxBatchSize

整型

一次批量請求寫入Table Store的最巨量資料量。預設值為4 MB。單位為位元組。

maxBatchRowsCount

整型

一次批量請求寫入Table Store的最大行數。預設值為200。最大值為200。

callbackThreadCount

整型

Writer內部Callback啟動並執行線程池線程數。預設值為處理器個數。

callbackThreadPoolQueueSize

整型

Writer內部Callback啟動並執行線程池隊列大小。預設值為1024。

maxColumnsCount

整型

當寫入資料到緩衝區時,一行最大的列數限制。預設值為128列。

maxAttrColumnSize

整型

當寫入資料到緩衝區時,單一屬性列值的最大大小,預設值為2 MB。單位為位元組。

maxPKColumnSize

整型

當寫入資料到緩衝區時,單一主鍵列值的最大大小。預設值為1 KB。單位為位元組。

flushInterval

整型

Writer將緩衝區資料發送到Table Store時,自動flush緩衝區的時間間隔。預設值為10000。單位為毫秒。

logInterval

整型

Writer將緩衝區資料發送到Table Store時,自動列印任務狀態的時間間隔。預設值為10000。單位為毫秒。

clientMaxConnections

整型

內部構建Client時使用的最大串連數配置。預設值為300。

writerRetryStrategy

WriterRetryStrategy

內部構建Client時使用的重試策略。取值範圍如下:

  • CERTAIN_ERROR_CODE_NOT_RETRY(預設值):給定的錯誤碼不做重試,其他錯誤均會重試。

    不做重試的錯誤碼包括OTSParameterInvalidOTSConditionCheckFailOTSRequestBodyTooLargeOTSInvalidPKOTSOutOfColumnCountLimitOTSOutOfRowSizeLimit

  • CERTAIN_ERROR_CODE_RETRY:只對給定的錯誤碼進行重試,其他錯誤均不重試。

    進行重試的錯誤碼包括OTSInternalServerErrorOTSRequestTimeoutOTSPartitionUnavailableOTSTableNotReadyOTSRowOperationConflictOTSTimeoutOTSServerUnavailableOTSServerBusy

Callback

TableStoreWriter通過Callback來反饋寫入行資料的成功或者失敗資訊。如果行資料寫入成功,則Writer會調用onCompleted()函數,如果行資料寫入失敗,則Writer會根據異常的類別調用對應的onFailed()函數。

以下樣本用於統計成功和失敗的行資料數量。

private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
    @Override
    public void onCompleted(RowChange rowChange, RowWriteResult cc) {
        //統計成功行數。
        succeedRows.incrementAndGet();
    }

    @Override
    public void onFailed(RowChange rowChange, Exception ex) {
        //統計失敗行數。
        failedRows.incrementAndGet();
    }
};