啟動Tablestore Sink Connector時,您需要通過索引值映射向Kafka Connect進程傳遞參數。通過本文您可以結合配置樣本和配置參數說明瞭解Tablestore Sink Connector的相關配置。

配置樣本

當從Kafka同步資料到資料表或者時序表時配置項不同,且不同工作模式下相應設定檔的樣本不同。此處以同步資料到資料表中為例介紹配置樣本。同步資料到時序表的配置樣本中需要增加時序相關配置項。

  • .properties格式設定檔的樣本,適用於standalone模式。
    # 設定連接器名稱。
    name=tablestore-sink
    # 指定連接器類。
    connector.class=TableStoreSinkConnector
    # 設定最大任務數。
    tasks.max=1
    # 指定匯出資料的Kafka的Topic列表。
    topics=test
    # 以下為Tablestore串連參數的配置。
    # Tablestore執行個體的Endpoint。 
    tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
    # 填寫AccessKey ID和AccessKey Secret。
    tablestore.access.key.id=xxx
    tablestore.access.key.secret=xxx
    # Tablestore執行個體名稱。
    tablestore.instance.name=xxx
    
    # 以下為資料對應相關的配置。
    # 指定Kafka Record的解析器。
    # 預設的DefaulteEventParser已支援Struct和Map類型,您也可以使用自訂的EventParser。
    event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser
    
    # 定義目標表名稱的格式字串,字串中可包含<topic>作為原始Topic的預留位置。
    # topics.assign.tables配置的優先順序更高,如果配置了topics.assign.tables,則忽略table.name.format的配置。
    # 例如當設定table.name.format為kafka_<topic>時,如果kafka中主題名稱為test,則將映射到Tablestore的表名為kafka_test。
    table.name.format=<topic>
    # 指定Topic與目標表的映射關係,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之間的分隔字元為英文冒號(:),不同映射之間分隔字元為英文逗號(,)。
    # 如果預設,則採取table.name.format的配置。
    # topics.assign.tables=test:test_kafka
    
    # 指定主鍵模式,可選值包括kafka、record_key和record_value,預設值為kafka。
    # kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作為資料表的主鍵。
    # record_key表示以Record Key中的欄位作為資料表的主鍵。
    # record_value表示以Record Value中的欄位作為資料表的主鍵。
    primarykey.mode=kafka
    
    # 定義匯入資料表的主鍵列名和資料類型。
    # 屬性名稱格式為tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
    # 其中<tablename>為資料表名稱的預留位置。
    # 當主鍵模式為kafka時,無需配置該屬性,預設主鍵列名為{"topic_partition","offset"},預設主鍵列資料類型為{string, integer}。
    # 當主鍵模式為record_key或record_value時,必須配置以下兩個屬性。
    # tablestore.test.primarykey.name=A,B
    # tablestore.test.primarykey.type=string,integer
    
    # 定義屬性列白名單,用於過濾Record Value中的欄位擷取所需屬性列。
    # 預設值為空白,使用Record Value中的所有欄位作為資料表的屬性列。
    # 屬性名稱格式為tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
    # 其中<tablename>為資料表名稱的預留位置。
    # tablestore.test.columns.whitelist.name=A,B
    # tablestore.test.columns.whitelist.type=string,integer
    
    # 以下為寫入Tablestore相關的配置。
    # 指定寫入模式,可選值包括put和update,預設值為put。
    # put表示覆蓋寫。
    # update表示更新寫。
    insert.mode=put
    # 是否需要保序,預設值為true。如果關閉保序模式,則有助於提升寫入效率。
    insert.order.enable=true
    # 是否自動建立目標表,預設值為false。
    auto.create=false
    
    # 指定刪除模式,可選值包括none、row、column和row_and_column,預設值為none。
    # none表示不允許進行任何刪除。
    # row表示允許刪除行。
    # column表示允許刪除屬性列。
    # row_and_column表示允許刪除行和屬性列。
    delete.mode=none
    
    # 寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須為2的指數。
    buffer.size=1024
    # 寫入資料表時的回調線程數,預設值為核心數+1。
    # max.thread.count=
    # 寫入資料表時的最大請求並發數,預設值為10。
    max.concurrency=10
    # 寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。
    bucket.count=3
    # 寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。
    flush.Interval=10000
    
    # 以下為髒資料處理相關配置。
    # 在解析Kafka Record或者寫入資料表時可能發生錯誤,您可以可通過以下配置進行處理。
    # 指定容錯能力,可選值包括none和all,預設值為none。
    # none表示任何錯誤都將導致Sink Task立即失敗。
    # all表示跳過產生錯誤的Record,並記錄該Record。
    runtime.error.tolerance=none
    # 指定髒資料記錄模式,可選值包括ignore、kafka和tablestore,預設值為ignore。
    # ignore表示忽略所有錯誤。
    # kafka表示將產生錯誤的Record和錯誤資訊儲存在Kafka的另一個Topic中。
    # tablestore表示將產生錯誤的Record和錯誤資訊儲存在Tablestore另一張資料表中。
    runtime.error.mode=ignore
    
    # 當髒資料記錄模式為kafka時,需要配置Kafka叢集地址和Topic。
    # runtime.error.bootstrap.servers=localhost:9092
    # runtime.error.topic.name=errors
    
    # 當髒資料記錄模式為tablestore時,需要配置Tablestore中資料表名稱。
    # runtime.error.table.name=errors
  • .json格式設定檔的樣本,適用於distributed模式。
    {
      "name": "tablestore-sink",
      "config": {
        // 指定連接器類。
        "connector.class":"TableStoreSinkConnector",
        // 設定最大任務數。
        "tasks.max":"3",
        // 指定匯出資料的Kafka的Topic列表。
        "topics":"test",
        // 以下為Tablestore串連參數的配置。
        // Tablestore執行個體的Endpoint。
        "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
        // 填寫AccessKey ID和AccessKey Secret。
        "tablestore.access.key.id":"xxx",
        "tablestore.access.key.secret":"xxx",
        // Tablestore執行個體名稱。
        "tablestore.instance.name":"xxx",
    
        // 以下為資料對應相關的配置。
        // 指定Kafka Record的解析器。
        // 預設的DefaulteEventParser已支援Struct和Map類型,您也可以使用自訂的EventParser。
        "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",
    
        // 定義目標表名稱的格式字串,字串中可包含<topic>作為原始Topic的預留位置。
        // topics.assign.tables配置的優先順序更高。如果配置了topics.assign.tables,則忽略table.name.format的配置。
        // 例如當設定table.name.format為kafka_<topic>時,如果kafka中主題名稱為test,則將映射到Tablestore的表名為kafka_test。
        "table.name.format":"<topic>",
        // 指定Topic與目標表的映射關係,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之間的分隔字元為英文冒號(:),不同映射之間分隔字元為英文逗號(,)。
        // 如果預設,則採取table.name.format的配置。
        // "topics.assign.tables":"test:test_kafka",
    
        // 指定主鍵模式,可選值包括kafka、record_key和record_value,預設值為kafka。
        // kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作為資料表的主鍵。
        // record_key表示以Record Key中的欄位作為資料表的主鍵。
        // record_value表示以Record Value中的欄位作為資料表的主鍵。
        "primarykey.mode":"kafka",
    
        // 定義匯入資料表的主鍵列名和資料類型。
        // 屬性名稱格式為tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
        // 其中<tablename>為資料表名稱的預留位置。
        // 當主鍵模式為kafka時,無需配置該屬性,預設主鍵列名為{"topic_partition","offset"},預設主鍵列資料類型為{string, integer}。
        // 當主鍵模式為record_key或record_value時,必須配置以下兩個屬性。
        // "tablestore.test.primarykey.name":"A,B",
        // "tablestore.test.primarykey.type":"string,integer",
    
        // 定義屬性列白名單,用於過濾Record Value中的欄位擷取所需屬性列。
        // 預設值為空白,使用Record Value中的所有欄位作為資料表的屬性列。
        // 屬性名稱格式為tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
        // 其中<tablename>為資料表名稱的預留位置。
        // "tablestore.test.columns.whitelist.name":"A,B",
        // "tablestore.test.columns.whitelist.type":"string,integer",
    
        // 以下為寫入Tablestore相關的配置。
        // 指定寫入模式,可選值包括put和update,預設值為put。
        // put表示覆蓋寫。
        // update表示更新寫。
        "insert.mode":"put",
        // 是否需要保序,預設值為true。如果關閉保序模式,則有助於提升寫入效率。
        "insert.order.enable":"true",
        // 是否自動建立目標表,預設值為false。
        "auto.create":"false",
    
        // 指定刪除模式,可選值包括none、row、column和row_and_column,預設值為none。
        // none表示不允許進行任何刪除。
        // row表示允許刪除行。
        // column表示允許刪除屬性列。
        // row_and_column表示允許刪除行和屬性列。
        "delete.mode":"none",
    
        // 寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須為2的指數。
        "buffer.size":"1024",
        // 寫入資料表時的回調線程數,預設值為核心數+1。
        // "max.thread.count":
        // 寫入資料表時的最大請求並發數,預設值為10。
        "max.concurrency":"10",
        // 寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。
        "bucket.count":"3",
        // 寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。
        "flush.Interval":"10000",
    
        // 以下為髒資料處理相關配置。
        // 在解析Kafka Record或者寫入資料表時可能發生錯誤,您可以通過以下配置進行處理。
        // 指定容錯能力,可選值包括none和all,預設值為none。
        // none表示任何錯誤都將導致Sink Task立即失敗。
        // all表示跳過產生錯誤的Record,並記錄該Record。
        "runtime.error.tolerance":"none",
        // 指定髒資料記錄模式,可選值包括ignore、kafka和tablestore,預設值為ignore。
        // ignore表示忽略所有錯誤。
        // kafka表示將產生錯誤的Record和錯誤資訊儲存在Kafka的另一個Topic中。
        // tablestore表示將產生錯誤的Record和錯誤資訊儲存在Tablestore另一張資料表中。
        "runtime.error.mode":"ignore"
    
        // 當髒資料記錄模式為kafka時,需要配置Kafka叢集地址和Topic。
        // "runtime.error.bootstrap.servers":"localhost:9092",
        // "runtime.error.topic.name":"errors",
    
        // 當髒資料記錄模式為tablestore時,需要配置Tablestore中資料表名稱。
        // "runtime.error.table.name":"errors",
      }

配置項說明

設定檔中的配置項說明請參見下表。其中時序相關配置項只有同步資料到時序表時才需要配置。

分類 配置項 類型 是否必選 樣本值 描述
Kafka Connect常見配置 name string tablestore-sink 連接器(Connector)名稱。連接器名稱必須唯一。
connector.class class TableStoreSinkConnector 連接器的Java類。
如果您要使用該連接器,請在connector.class配置項中指定Connector類的名稱,支援配置為Connector類的全名(com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector)或別名(TableStoreSinkConnector)。
connector.class=com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector
tasks.max integer 3 連接器支援建立的最大任務數。

如果連接器無法達到此並行度層級,則可能會建立較少的任務。

key.converter string org.apache.kafka.connect.json.JsonConverter 覆蓋worker設定的預設key轉換器。
value.converter string org.apache.kafka.connect.json.JsonConverter 覆蓋worker設定的預設value轉換器。
topics list test 連接器輸入的Kafka Topic列表,多個Topic之間以英文逗號(,)分隔。

您必須為連接器設定topics來控制連接器輸入的Topic。

連接器Connection配置 tablestore.endpoint string https://xxx.xxx.ots.aliyuncs.com Tablestore執行個體的服務地址。更多資訊,請參見服務地址
tablestore.mode string timeseries 根據資料同步到的表類型選擇模式。取值範圍如下:
  • normal(預設):同步資料到Tablestore的資料表。
  • timeseries:同步資料到Tablestore的時序表。
tablestore.access.key.id string LTAn******************** 登入帳號的AccessKey ID和AccessKey Secret,擷取方式請參見擷取AccessKey
tablestore.access.key.secret string zbnK**************************
tablestore.auth.mode string aksk 設定認證方式。取值範圍如下:
  • aksk(預設):使用阿里雲帳號或者RAM使用者的AccessKey ID和Access Secret進行認證。請使用此認證方式。
  • sts:使用STS臨時訪問憑證進行認證。對接雲kafka時使用。
tablestore.instance.name string myotstest Tablestore執行個體的名稱。
連接器Data Mapping配置 event.parse.class class DefaultEventParser 訊息解析器的Java類,預設值為DefaultEventParser。解析器用於從Kafka Record中解析出資料表的主鍵列和屬性列。
注意 Tablestore對列值大小有限制。string類型和binary類型的主鍵列列值限制均為1 KB,屬性列列值限制均為2 MB。更多資訊,請參見通用限制

如果資料類型轉換後列值超出對應限制,則將該Kafka Record作為髒資料處理。

如果使用預設的DefaultEventParser解析器,Kafka Record的Key或Value必須為Kafka Connect的Struct或Map類型。Struct中選擇的欄位必須為支援的資料類型,欄位會根據資料類型映射錶轉換為Tablestore資料類型寫入資料表。Map中的實值型別也必須為支援的資料類型,支援的資料類型同Struct,最終會被轉換為binary類型寫入資料表。Kafka和Tablestore的資料類型映射關係請參見附錄:Kafka和Tablestore資料類型映射

如果Kafka Record為不相容的資料格式,則您可以通過實現com.aliyun.tablestore.kafka.connect.parsers.EventParser定義的介面來自訂解析器。

table.name.format string kafka_<topic> 目標資料表名稱的格式字串,預設值為<topic>。字串中可包含<topic>作為原始Topic的預留位置。例如當設定table.name.format為kafka_<topic>時,如果kafka中主題名稱為test,則映射到Tablestore的表名為kafka_test。

此配置項的優先順序低於topics.assign.tables配置項,如果配置了topics.assign.tables,則忽略table.name.format的配置。

topics.assign.tables list test:destTable 指定topic與Tablestore表之間的映射關係,格式為<topic_1>:<tablename_1>,<topic_2>:<tablename_2>。多個映射關係之間以英文逗號(,)分隔,例如test:destTable表示將Topic名為test的訊息記錄寫入資料表destTable中。

此配置項的優先順序高於table.name.format配置項,如果配置了topics.assign.tables,則忽略table.name.format的配置。

primarykey.mode string kafka 資料表的主鍵模式。取值範圍如下:
  • kafka:以<connect_topic>_<connect_partition>(Kafka主題和分區,用底線"_"分隔)和<connect_offset>(該訊息記錄在分區中的位移量)作為資料表的主鍵。
  • record_key:以Record Key中的欄位(Struct類型)或者鍵(Map類型)作為資料表的主鍵。
  • record_value:以Record Value中的欄位(Struct類型)或者鍵(Map類型)作為資料表的主鍵。

請配合tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type使用。此配置項不區分大小寫。

tablestore.<tablename>.primarykey.name list A,B 資料表的主鍵列名,其中<tablename>為資料表名稱的預留位置,包含1~4個主鍵列,以英文逗號(,)分隔。
主鍵模式不同時,主鍵列名的配置不同。
  • 當設定主鍵模式為kafka時,以topic_partition,offset作為資料表主鍵列名稱。在該主鍵模式下,您可以不配置此主鍵列名。如果配置了主鍵列名,則不會覆蓋預設主鍵列名。
  • 當設定主鍵模式為record_key時,從Record Key中提取與配置的主鍵列名相同的欄位(Struct類型)或者鍵(Map類型)作為資料表的主鍵。在該主鍵模式下主鍵列名必須配置。
  • 當設定主鍵模式為record_value時,從Record Value中提取與配置的主鍵列名相同的欄位(Struct類型)或者鍵(Map類型)作為資料表的主鍵。在該主鍵模式下主鍵列名必須配置。

Tablestore資料表的主鍵列是有順序的,此屬性的配置應注意主鍵列名順序,例如PRIMARY KEY(A, B, C)與PRIMARY KEY(A, C, B)是不同的兩個主鍵結構。

tablestore.<tablename>.primarykey.type list string, integer 資料表的主鍵列資料類型,其中<tablename>為資料表名稱的預留位置,包含1~4個主鍵列,以英文逗號(,)分隔,順序必須與tablestore.<tablename>.primarykey.name相對應。此屬性配置不區分大小寫。資料類型的可選值包括integer、string、binary和auto_increment。
主鍵模式不同時,主鍵資料類型的配置不同。
  • 當主鍵模式為kafka時,以string, integer作為資料表主鍵資料類型。

    在該主鍵模式下,您可以不配置此主鍵列資料類型。如果配置了主鍵列資料類型,則不會覆蓋預設主鍵列資料類型。

  • 當主鍵模式為record_key或者record_value時,指定相應主鍵列的資料類型。在該主鍵模式下主鍵列資料類型必須配置。

    如果指定的資料類型與Kafka Schema中定義的資料類型發生衝突,則會產生解析錯誤,您可以配置Runtime Error相關屬性來應對解析錯誤。

    當配置此配置項為auto_increment時,表示主鍵自增列,此時Kafka Record中可以預設該欄位,寫入資料表時會自動插入自增列。

tablestore.<tablename>.columns.whitelist.name list A,B 資料表的屬性列白名單中屬性列名稱,其中<tablename>為資料表名稱的預留位置,以英文逗號(,)分隔。

如果配置為空白,則使用Record Value中的所有欄位(Struct類型)或者鍵(Map類型)作為資料表的屬性列,否則用於過濾得到所需屬性列。

tablestore.<tablename>.columns.whitelist.type list string, integer 資料表的屬性列白名單中屬性列資料類型,其中<tablename>為資料表名稱的預留位置,以英文逗號(,)分隔,順序必須與tablestore.<tablename>.columns.whitelist.name相對應。此屬性配置不區分大小寫。資料類型的可選值包括integer、string、binary、boolean和double。
連接器Write配置 insert.mode string put 寫入模式。取值範圍如下:
  • put(預設):對應Tablestore的PutRow操作,即新寫入一行資料會覆蓋原資料。
  • update:對應Tablestore的UpdateRow操作,即更新一行資料,可以增加一行中的屬性列,或者更新已存在的屬性列的值。

此屬性配置不區分大小寫。

insert.order.enable boolean true 寫入資料表時是否需要保序。取值範圍如下:
  • true(預設):寫入時保持Kafka訊息記錄的順序。
  • false:寫入順序無保證,但寫入效率會提升。
auto.create boolean false 是否需要自動建立目標表,支援自動建立資料表或者時序表。取值範圍如下:
  • true:自動建立目標表。
  • false(預設):不自動建立目標表。
delete.mode string none 刪除模式,僅當同步資料到資料表且主鍵模式為record_key時才有效。取值範圍如下:
  • none(預設):不允許進行任何刪除。
  • row:允許刪除行。當Record Value為空白時會刪除行。
  • column:允許刪除屬性列。當Record Value中欄位值(Struct類型)或者索引值(Map類型)為空白時會刪除屬性列。
  • row_and_column:允許刪除行和屬性列。

此屬性配置不區分大小寫。

刪除操作與insert.mode的配置相關。更多資訊,請參見附錄:刪除語義

buffer.size integer 1024 寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須是2的指數。
max.thread.count integer 3 寫入資料表時的回調線程數,預設值為CPU核心數+1
max.concurrency integer 10 寫入資料表時的最大請求並發數。
bucket.count integer 3 寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。
flush.Interval integer 10000 寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。
連接器Runtime Error配置 runtime.error.tolerance string none 解析Kafka Record或者寫入表時產生錯誤的處理策略。取值範圍如下:
  • none(預設):任何錯誤都將導致Sink Task立即失敗。
  • all:跳過產生錯誤的Record,並記錄該Record。

此屬性配置不區分大小寫。

runtime.error.mode string ignore 解析Kafka Record或者寫入表時產生錯誤,對錯誤的Record的處理策略。取值範圍如下:
  • ignore(預設):忽略所有錯誤。
  • kafka:將產生錯誤的Record和錯誤資訊儲存在Kafka的另一個Topic中,此時需要配置runtime.error.bootstrap.servers和runtime.error.topic.name。記錄運行錯誤的Kafka Record的Key和Value與原Record一致,Header中增加ErrorInfo欄位來記錄運行錯誤資訊。
  • tablestore:將產生錯誤的Record和錯誤資訊儲存在Tablestore另一張資料表中,此時需要配置runtime.error.table.name。記錄運行錯誤的資料表主鍵列為topic_partition(string類型), offset(integer類型),並且屬性列為key(bytes類型)、value(bytes類型)和error_info(string類型)。

kafka模式下需要對Kafka Record的Header、Key和Value進行序列化轉換,tablestore模式下需要對Kafka Record的Key和Value進行序列化轉換,此處預設使用org.apache.kafka.connect.json.JsonConverter,並且配置schemas.enable為true,您可以通過JsonConverter還原序列化得到未經處理資料。關於Converter的更多資訊,請參見Kafka Converter

runtime.error.bootstrap.servers string localhost:9092 用於記錄運行錯誤的Kafka叢集地址。
runtime.error.topic.name string errors 用於記錄運行錯誤的Kafka Topic名稱。
runtime.error.table.name string errors 用於記錄運行錯誤的Tablestore表名稱。
時序相關配置項 tablestore.timeseries.<tablename>.measurement string mName 將JSON中的key值為指定值對應的value值作為_m_name欄位寫入對應時序表中。

如果設定此配置項為<topic>,則將kafka記錄的topic作為_m_name欄位寫入時序表中。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改,例如時序表名稱為test,則配置項名稱為tablestore.timeseries.test.measurement。

tablestore.timeseries.<tablename>.dataSource string ds 將JSON中的key值為ds對應的value值作為_data_source欄位寫入對應時序表中。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。

tablestore.timeseries.<tablename>.tags list region,level 將JSON中key值為region和level所對應的value值作為tags欄位寫入對應時序表中。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。

tablestore.timeseries.<tablename>.time string timestamp 將JSON中key值為timestamp對應的value值作為_time欄位寫入對應時序表中。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。

tablestore.timeseries.<tablename>.time.unit string MILLISECONDS tablestore.timeseries.<tablename>.time值的時間戳記單位。取值範圍為SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。

tablestore.timeseries.<tablename>.field.name list cpu,io 將JSON中key值為cpu和io的索引值對作為_field_name以及_field_name的值寫入對應時序表。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。

tablestore.timeseries.<tablename>.field.type string double,integer tablestore.timeseries.<tablename>.field.name中欄位對應的資料類型。取值範圍為double、integer、string、binary、boolean。多個資料類型之間用半形冒號(,)分隔。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際修改。

tablestore.timeseries.mapAll boolean false 將輸入JSON中的非主鍵欄位和時間欄位都作為field儲存到時序表中。

當配置項取值為false時,tablestore.timeseries.<tablename>.field.name和tablestore.timeseries.<tablename>.field.type必填。

tablestore.timeseries.toLowerCase boolean true 將field中的key(輸入資料中非主鍵或者時間的key,或者配置在tablestore.timeseries.<tablename>.field.name中的key)轉為小寫寫入時序表。
tablestore.timeseries.rowsPerBatch integer 50 寫入tablestore時,一次請求支援寫入的最大行數。最大值為200,預設值為200。

附錄:Kafka和Tablestore資料類型映射

Kafka和Tablestore資料類型映射關係請參見下表。

Kafka Schema Type Tablestore資料類型
STRING STRING
INT8、INT16、INT32、INT64 INTEGER
FLOAT32、FLOAT64 DOUBLE
BOOLEAN BOOLEAN
BYTES BINARY

附錄:刪除語義

说明 只有同步資料到資料表時才支援此功能。

當同步資料到資料表且Kafka訊息記錄的value中存在空值時,根據寫入模式(insert.mode)和刪除模式(delete.mode)的不同設定,資料寫入到Tablestore資料表中的處理方式不同,詳細說明請參見下表。

insert.mode put update
delete.mode none row column row_and_column none row column row_and_column
value為空白值 覆蓋寫 刪行 覆蓋寫 刪行 髒資料 刪行 髒資料 刪行
value所有欄位值均為空白值 覆蓋寫 覆蓋寫 覆蓋寫 覆蓋寫 髒資料 髒資料 刪列 刪列
value部分欄位值為空白值 覆蓋寫 覆蓋寫 覆蓋寫 覆蓋寫 忽略空值 忽略空值 刪列 刪列