全部產品
Search
文件中心

Tablestore:配置說明

更新時間:Mar 20, 2025

啟動Tablestore Sink Connector時,您需要通過索引值映射向Kafka Connect進程傳遞參數。通過本文您可以結合配置樣本和配置參數說明瞭解Tablestore Sink Connector的相關配置。

配置樣本

當從Kafka同步資料到資料表或者時序表時配置項不同,且不同工作模式下相應設定檔的樣本不同。此處以同步資料到資料表中為例介紹配置樣本。同步資料到時序表的配置樣本中需要增加時序相關配置項。

standalone模式配置樣本

如果使用的是standalone模式,您需要通過.properties格式檔案進行配置。配置樣本如下:

# 設定連接器名稱。
name=tablestore-sink
# 指定連接器類。
connector.class=TableStoreSinkConnector
# 設定最大任務數。
tasks.max=1
# 指定匯出資料的Kafka的Topic列表。
topics=test
# 以下為Tablestore串連參數的配置。
# Tablestore執行個體的Endpoint。 
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# 填寫AccessKey ID和AccessKey Secret。
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Tablestore執行個體名稱。
tablestore.instance.name=xxx

# 以下為資料對應相關的配置。
# 指定Kafka Record的解析器。
# 預設的DefaulteEventParser已支援Struct和Map類型,您也可以使用自訂的EventParser。
event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser

# 定義目標表名稱的格式字串,字串中可包含<topic>作為原始Topic的預留位置。
# topics.assign.tables配置的優先順序更高,如果配置了topics.assign.tables,則忽略table.name.format的配置。
# 例如當設定table.name.format為kafka_<topic>時,如果kafka中主題名稱為test,則將映射到Tablestore的表名為kafka_test。
table.name.format=<topic>
# 指定Topic與目標表的映射關係,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之間的分隔字元為英文冒號(:),不同映射之間分隔字元為英文逗號(,)。
# 如果預設,則採取table.name.format的配置。
# topics.assign.tables=test:test_kafka

# 指定主鍵模式,可選值包括kafka、record_key和record_value,預設值為kafka。
# kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作為資料表的主鍵。
# record_key表示以Record Key中的欄位作為資料表的主鍵。
# record_value表示以Record Value中的欄位作為資料表的主鍵。
primarykey.mode=kafka

# 定義匯入資料表的主鍵列名和資料類型。
# 屬性名稱格式為tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
# 其中<tablename>為資料表名稱的預留位置。
# 當主鍵模式為kafka時,無需配置該屬性,預設主鍵列名為{"topic_partition","offset"},預設主鍵列資料類型為{string, integer}。
# 當主鍵模式為record_key或record_value時,必須配置以下兩個屬性。
# tablestore.test.primarykey.name=A,B
# tablestore.test.primarykey.type=string,integer

# 定義屬性列白名單,用於過濾Record Value中的欄位擷取所需屬性列。
# 預設值為空白,使用Record Value中的所有欄位作為資料表的屬性列。
# 屬性名稱格式為tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
# 其中<tablename>為資料表名稱的預留位置。
# tablestore.test.columns.whitelist.name=A,B
# tablestore.test.columns.whitelist.type=string,integer

# 以下為寫入Tablestore相關的配置。
# 指定寫入模式,可選值包括put和update,預設值為put。
# put表示覆蓋寫。
# update表示更新寫。
insert.mode=put
# 是否需要保序,預設值為true。如果關閉保序模式,則有助於提升寫入效率。
insert.order.enable=true
# 是否自動建立目標表,預設值為false。
auto.create=false

# 指定刪除模式,可選值包括none、row、column和row_and_column,預設值為none。
# none表示不允許進行任何刪除。
# row表示允許刪除行。
# column表示允許刪除屬性列。
# row_and_column表示允許刪除行和屬性列。
delete.mode=none

# 寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須為2的指數。
buffer.size=1024
# 寫入資料表時的回調線程數,預設值為核心數+1。
# max.thread.count=
# 寫入資料表時的最大請求並發數,預設值為10。
max.concurrency=10
# 寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。
bucket.count=3
# 寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。
flush.Interval=10000

# 以下為髒資料處理相關配置。
# 在解析Kafka Record或者寫入資料表時可能發生錯誤,您可以可通過以下配置進行處理。
# 指定容錯能力,可選值包括none和all,預設值為none。
# none表示任何錯誤都將導致Sink Task立即失敗。
# all表示跳過產生錯誤的Record,並記錄該Record。
runtime.error.tolerance=none
# 指定髒資料記錄模式,可選值包括ignore、kafka和tablestore,預設值為ignore。
# ignore表示忽略所有錯誤。
# kafka表示將產生錯誤的Record和錯誤資訊儲存在Kafka的另一個Topic中。
# tablestore表示將產生錯誤的Record和錯誤資訊儲存在Tablestore另一張資料表中。
runtime.error.mode=ignore

# 當髒資料記錄模式為kafka時,需要配置Kafka叢集地址和Topic。
# runtime.error.bootstrap.servers=localhost:9092
# runtime.error.topic.name=errors

# 當髒資料記錄模式為tablestore時,需要配置Tablestore中資料表名稱。
# runtime.error.table.name=errors

distributed模式配置樣本

如果使用的是distributed模式,您需要通過JSON格式檔案進行配置。配置樣本如下:

{
  "name": "tablestore-sink",
  "config": {
    // 指定連接器類。
    "connector.class":"TableStoreSinkConnector",
    // 設定最大任務數。
    "tasks.max":"3",
    // 指定匯出資料的Kafka的Topic列表。
    "topics":"test",
    // 以下為Tablestore串連參數的配置。
    // Tablestore執行個體的Endpoint。
    "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
    // 填寫AccessKey ID和AccessKey Secret。
    "tablestore.access.key.id":"xxx",
    "tablestore.access.key.secret":"xxx",
    // Tablestore執行個體名稱。
    "tablestore.instance.name":"xxx",

    // 以下為資料對應相關的配置。
    // 指定Kafka Record的解析器。
    // 預設的DefaulteEventParser已支援Struct和Map類型,您也可以使用自訂的EventParser。
    "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",

    // 定義目標表名稱的格式字串,字串中可包含<topic>作為原始Topic的預留位置。
    // topics.assign.tables配置的優先順序更高。如果配置了topics.assign.tables,則忽略table.name.format的配置。
    // 例如當設定table.name.format為kafka_<topic>時,如果kafka中主題名稱為test,則將映射到Tablestore的表名為kafka_test。
    "table.name.format":"<topic>",
    // 指定Topic與目標表的映射關係,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之間的分隔字元為英文冒號(:),不同映射之間分隔字元為英文逗號(,)。
    // 如果預設,則採取table.name.format的配置。
    // "topics.assign.tables":"test:test_kafka",

    // 指定主鍵模式,可選值包括kafka、record_key和record_value,預設值為kafka。
    // kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作為資料表的主鍵。
    // record_key表示以Record Key中的欄位作為資料表的主鍵。
    // record_value表示以Record Value中的欄位作為資料表的主鍵。
    "primarykey.mode":"kafka",

    // 定義匯入資料表的主鍵列名和資料類型。
    // 屬性名稱格式為tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
    // 其中<tablename>為資料表名稱的預留位置。
    // 當主鍵模式為kafka時,無需配置該屬性,預設主鍵列名為{"topic_partition","offset"},預設主鍵列資料類型為{string, integer}。
    // 當主鍵模式為record_key或record_value時,必須配置以下兩個屬性。
    // "tablestore.test.primarykey.name":"A,B",
    // "tablestore.test.primarykey.type":"string,integer",

    // 定義屬性列白名單,用於過濾Record Value中的欄位擷取所需屬性列。
    // 預設值為空白,使用Record Value中的所有欄位作為資料表的屬性列。
    // 屬性名稱格式為tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
    // 其中<tablename>為資料表名稱的預留位置。
    // "tablestore.test.columns.whitelist.name":"A,B",
    // "tablestore.test.columns.whitelist.type":"string,integer",

    // 以下為寫入Tablestore相關的配置。
    // 指定寫入模式,可選值包括put和update,預設值為put。
    // put表示覆蓋寫。
    // update表示更新寫。
    "insert.mode":"put",
    // 是否需要保序,預設值為true。如果關閉保序模式,則有助於提升寫入效率。
    "insert.order.enable":"true",
    // 是否自動建立目標表,預設值為false。
    "auto.create":"false",

    // 指定刪除模式,可選值包括none、row、column和row_and_column,預設值為none。
    // none表示不允許進行任何刪除。
    // row表示允許刪除行。
    // column表示允許刪除屬性列。
    // row_and_column表示允許刪除行和屬性列。
    "delete.mode":"none",

    // 寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須為2的指數。
    "buffer.size":"1024",
    // 寫入資料表時的回調線程數,預設值為核心數+1。
    // "max.thread.count":
    // 寫入資料表時的最大請求並發數,預設值為10。
    "max.concurrency":"10",
    // 寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。
    "bucket.count":"3",
    // 寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。
    "flush.Interval":"10000",

    // 以下為髒資料處理相關配置。
    // 在解析Kafka Record或者寫入資料表時可能發生錯誤,您可以通過以下配置進行處理。
    // 指定容錯能力,可選值包括none和all,預設值為none。
    // none表示任何錯誤都將導致Sink Task立即失敗。
    // all表示跳過產生錯誤的Record,並記錄該Record。
    "runtime.error.tolerance":"none",
    // 指定髒資料記錄模式,可選值包括ignore、kafka和tablestore,預設值為ignore。
    // ignore表示忽略所有錯誤。
    // kafka表示將產生錯誤的Record和錯誤資訊儲存在Kafka的另一個Topic中。
    // tablestore表示將產生錯誤的Record和錯誤資訊儲存在Tablestore另一張資料表中。
    "runtime.error.mode":"ignore"

    // 當髒資料記錄模式為kafka時,需要配置Kafka叢集地址和Topic。
    // "runtime.error.bootstrap.servers":"localhost:9092",
    // "runtime.error.topic.name":"errors",

    // 當髒資料記錄模式為tablestore時,需要配置Tablestore中資料表名稱。
    // "runtime.error.table.name":"errors",
  }

配置項說明

設定檔中的配置項說明請參見下表。其中時序相關配置項只有同步資料到時序表時才需要配置。

Kafka Connect常見配置

配置項

類型

是否必選

樣本值

描述

name

string

tablestore-sink

連接器(Connector)名稱。連接器名稱必須唯一。

connector.class

class

TableStoreSinkConnector

連接器的Java類。

如果您要使用該連接器,請在connector.class配置項中指定Connector類的名稱,支援配置為Connector類的全名com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector或別名TableStoreSinkConnector,例如connector.class=com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector

tasks.max

integer

3

連接器支援建立的最大任務數。

如果連接器無法達到此並行度層級,則可能會建立較少的任務。

key.converter

string

org.apache.kafka.connect.json.JsonConverter

覆蓋worker設定的預設key轉換器。

value.converter

string

org.apache.kafka.connect.json.JsonConverter

覆蓋worker設定的預設value轉換器。

topics

list

test

連接器輸入的Kafka Topic列表,多個Topic之間以半形逗號(,)分隔。

您必須為連接器設定topics來控制連接器輸入的Topic。

連接器Connection配置

配置項

類型

是否必選

樣本值

描述

tablestore.endpoint

string

https://xxx.xxx.ots.aliyuncs.com

Tablestore執行個體的服務地址

tablestore.mode

string

timeseries

根據資料同步到的表類型選擇模式。取值範圍如下:

  • normal(預設):同步資料到Table Store的資料表。

  • timeseries:同步資料到Table Store的時序表。

tablestore.access.key.id

string

LTAn********************

登入帳號的AccessKey ID和AccessKey Secret,擷取方式請參見建立AccessKey

tablestore.access.key.secret

string

zbnK**************************

tablestore.auth.mode

string

aksk

設定認證方式。取值範圍如下:

  • aksk:使用阿里雲帳號或者RAM使用者的AccessKey ID和Access Secret進行認證。請使用此認證方式。

  • sts(預設):使用STS臨時訪問憑證進行認證。對接雲Kafka時使用。

tablestore.instance.name

string

myotstest

Tablestore執行個體的名稱。

連接器Data Mapping配置

配置項

類型

是否必選

樣本值

描述

event.parse.class

class

DefaultEventParser

訊息解析器的Java類,預設值為DefaultEventParser。解析器用於從Kafka Record中解析出資料表的主鍵列和屬性列。

重要

Tablestore對列值大小有限制。string類型和binary類型的主鍵列值限制均為1 KB,屬性列列值限制均為2 MB。更多資訊,請參見使用限制

如果資料類型轉換後列值超出對應限制,則將該Kafka Record作為髒資料處理。

如果使用預設的DefaultEventParser解析器,Kafka Record的Key或Value必須為Kafka Connect的Struct或Map類型。

Struct中選擇的欄位必須為支援的資料類型,欄位會根據資料類型映射表轉換為Tablestore資料類型寫入資料表;Map中的實值型別也必須為支援的資料類型,支援的資料類型與Struct相同,最終會被轉換為binary類型寫入資料表。

如果Kafka Record為不相容的資料格式,則您可以通過實現com.aliyun.tablestore.kafka.connect.parsers.EventParser定義的介面來自訂解析器。

table.name.format

string

kafka_<topic>

目標資料表名稱的格式字串,預設值為<topic>。字串中可包含<topic>作為原始Topic的預留位置。例如當設定table.name.formatkafka_<topic>時,如果Kafka中Topic名為test,則映射到Tablestore的表名為kafka_test。

此配置項的優先順序低於topics.assign.tables配置項,如果配置了topics.assign.tables,則會忽略table.name.format的配置。

topics.assign.tables

list

test:destTable

指定topic與Tablestore表之間的映射關係,格式為<topic_1>:<tablename_1>,<topic_2>:<tablename_2>。多個映射關係之間以半形逗號(,)分隔,例如test:destTable表示將Topic名為test的訊息記錄寫入資料表destTable中。

此配置項的優先順序高於table.name.format配置項,如果配置了topics.assign.tables,則會忽略table.name.format的配置。

primarykey.mode

string

kafka

資料表的主鍵模式。取值範圍如下:

  • kafka:以<connect_topic>_<connect_partition>(Kafka主題和分區,用底線"_"分隔)和<connect_offset>(該訊息記錄在分區中的位移量)作為資料表的主鍵。

  • record_key:以Record Key中的欄位(Struct類型)或者鍵(Map類型)作為資料表的主鍵。

  • record_value:以Record Value中的欄位(Struct類型)或者鍵(Map類型)作為資料表的主鍵。

請配合tablestore.<tablename>.primarykey.nametablestore.<tablename>.primarykey.type使用。此配置項不區分大小寫。

tablestore.<tablename>.primarykey.name

list

A,B

資料表的主鍵列名,其中<tablename>為資料表名稱的預留位置,包含1~4個主鍵列,以半形逗號(,)分隔。

主鍵模式不同時,主鍵列名的配置不同。

  • 當設定主鍵模式為kafka時,以topic_partition,offset作為資料表主鍵列名稱。在該主鍵模式下,您可以不配置此主鍵列名。如果配置了主鍵列名,則不會覆蓋預設主鍵列名。

  • 當設定主鍵模式為record_key時,從Record Key中提取與配置的主鍵列名相同的欄位(Struct類型)或者鍵(Map類型)作為資料表的主鍵。在該主鍵模式下主鍵列名必須配置。

  • 當設定主鍵模式為record_value時,從Record Value中提取與配置的主鍵列名相同的欄位(Struct類型)或者鍵(Map類型)作為資料表的主鍵。在該主鍵模式下主鍵列名必須配置。

Tablestore資料表的主鍵列是有順序的,此屬性的配置應注意主鍵列名順序,例如PRIMARY KEY(A、B、C)與PRIMARY KEY(A、C、B)是不同的兩個主鍵結構。

tablestore.<tablename>.primarykey.type

list

string, integer

資料表的主鍵列資料類型,其中<tablename>為資料表名稱的預留位置,包含1~4個主鍵列,以半形逗號(,)分隔,順序必須與tablestore.<tablename>.primarykey.name相對應。此屬性配置不區分大小寫。資料類型的可選值包括integer、string、binary和auto_increment。

主鍵資料類型的配置與主鍵模式相關。

  • 當主鍵模式為kafka時,以string, integer作為資料表主鍵資料類型。

    在該主鍵模式下,您可以不配置此主鍵列資料類型。如果配置了主鍵列資料類型,則不會覆蓋預設主鍵列資料類型。

  • 當主鍵模式為record_key或record_value時,指定相應主鍵列的資料類型。在該主鍵模式下主鍵列資料類型必須配置。

    如果指定的資料類型與Kafka Schema中定義的資料類型發生衝突,則會產生解析錯誤,您可以配置Runtime Error相關屬性來應對解析錯誤。

    當配置此配置項為auto_increment時,表示主鍵自增列,此時Kafka Record中可以預設該欄位,寫入資料表時會自動插入自增列。

tablestore.<tablename>.columns.whitelist.name

list

A,B

資料表的屬性列白名單中屬性列名稱,其中<tablename>為資料表名稱的預留位置,以半形逗號(,)分隔。

如果配置為空白,則使用Record Value中的所有欄位(Struct類型)或者鍵(Map類型)作為資料表的屬性列,否則用於過濾得到所需屬性列。

tablestore.<tablename>.columns.whitelist.type

list

string, integer

資料表的屬性列白名單中屬性列資料類型,其中<tablename>為資料表名稱的預留位置,以半形逗號(,)分隔,順序必須與tablestore.<tablename>.columns.whitelist.name相對應。此屬性配置不區分大小寫。資料類型的可選值包括integer、string、binary、boolean和double。

連接器Write配置

配置項

類型

是否必選

樣本值

描述

insert.mode

string

put

寫入模式。取值範圍如下:

  • put(預設):對應Tablestore的PutRow操作,即新寫入一行資料會覆蓋原資料。

  • update:對應Tablestore的UpdateRow操作,即更新一行資料,可以增加一行中的屬性列,或更新已存在的屬性列的值。

此屬性配置不區分大小寫。

insert.order.enable

boolean

true

寫入資料表時是否需要保持順序。取值範圍如下:

  • true(預設):寫入時保持Kafka訊息記錄的順序。

  • false:寫入順序無保證,但寫入效率會提升。

auto.create

boolean

false

是否需要自動建立目標表,支援自動建立資料表或時序表。取值範圍如下:

  • true:自動建立目標表。

  • false(預設):不自動建立目標表。

delete.mode

string

none

刪除模式,僅當同步資料到資料表且主鍵模式為record_key時才有效。取值範圍如下:

  • none(預設):不允許進行任何刪除。

  • row:允許刪除行。當Record Value為空白時會刪除行。

  • column:允許刪除屬性列。當Record Value中欄位值(Struct類型)或者索引值(Map類型)為空白時會刪除屬性列。

  • row_and_column:允許刪除行和屬性列。

此屬性配置不區分大小寫。

刪除操作與insert.mode的配置相關。更多資訊,請參見附錄:刪除語義

buffer.size

integer

1024

寫入資料表時記憶體中緩衝隊列的大小,預設值為1024,單位為行數。此配置項的值必須是2的指數。

max.thread.count

integer

3

寫入資料表時的回調線程數,預設值為CPU核心數+1

max.concurrency

integer

10

寫入資料表時的最大請求並發數,預設值為10。

bucket.count

integer

3

寫入資料表時的分桶數,預設值為3。適當調大此配置項的值可提升並發寫入能力,但不應大於最大請求並發數。

flush.Interval

integer

10000

寫入資料表時對緩衝區的重新整理時間間隔,預設值為10000,單位為毫秒。

連接器Runtime Error配置

配置項

類型

是否必選

樣本值

描述

runtime.error.tolerance

string

none

解析Kafka Record或者寫入表時產生錯誤的處理策略。取值範圍如下:

  • none(預設):任何錯誤都將導致Sink Task立即失敗。

  • all:跳過產生錯誤的Record,並記錄該Record。

此屬性配置不區分大小寫。

runtime.error.mode

string

ignore

解析Kafka Record或者寫入表時產生錯誤,對錯誤的Record的處理策略。取值範圍如下:

  • ignore(預設):忽略所有錯誤。

  • kafka:將產生錯誤的Record和錯誤資訊儲存在Kafka的另一個Topic中,此時需要配置runtime.error.bootstrap.serversruntime.error.topic.name。記錄運行錯誤的Kafka Record的Key和Value與原Record一致,Header中增加ErrorInfo欄位來記錄運行錯誤資訊。

  • tablestore:將產生錯誤的Record和錯誤資訊儲存在Tablestore另一張資料表中,此時需要配置runtime.error.table.name。記錄運行錯誤的資料表主鍵列為topic_partition(string類型), offset(integer類型),並且屬性列為key(bytes類型)、value(bytes類型)和error_info(string類型)。

kafka模式下需要對Kafka Record的Header、Key和Value進行序列化轉換,tablestore模式下需要對Kafka Record的Key和Value進行序列化轉換,此處預設使用org.apache.kafka.connect.json.JsonConverter,並且配置schemas.enable為true,您可以通過JsonConverter還原序列化得到未經處理資料。關於Converter的更多資訊,請參見Kafka Converter

runtime.error.bootstrap.servers

string

localhost:9092

用於記錄運行錯誤的Kafka叢集地址。

runtime.error.topic.name

string

errors

用於記錄運行錯誤的Kafka Topic名稱。

runtime.error.table.name

string

errors

用於記錄運行錯誤的Tablestore表名稱。

時序相關配置

配置項

類型

是否必選

樣本值

描述

tablestore.timeseries.<tablename>.measurement

string

mName

將JSON中的key值為指定值對應的value值作為_m_name欄位寫入對應時序表中。

如果設定此配置項為<topic>,則將Kafka記錄的topic作為_m_name欄位寫入時序表中。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際情況修改,例如時序表名稱為test,則配置項名稱為tablestore.timeseries.test.measurement

tablestore.timeseries.<tablename>.dataSource

string

ds

將JSON中的key值為ds對應的value值作為_data_source欄位寫入對應時序表中。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際情況修改。

tablestore.timeseries.<tablename>.tags

list

region,level

將JSON中key值為region和level所對應的value值作為tags欄位寫入對應時序表中。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際情況修改。

tablestore.timeseries.<tablename>.time

string

timestamp

將JSON中key值為timestamp對應的value值作為_time欄位寫入對應時序表中。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際情況修改。

tablestore.timeseries.<tablename>.time.unit

string

MILLISECONDS

tablestore.timeseries.<tablename>.time值的時間戳記單位。取值範圍為SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際情況修改。

tablestore.timeseries.<tablename>.field.name

list

cpu,io

將JSON中key值為cpu和io的索引值對作為_field_name以及_field_name的值寫入對應時序表。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際情況修改。

tablestore.timeseries.<tablename>.field.type

string

double,integer

tablestore.timeseries.<tablename>.field.name中欄位對應的資料類型。取值範圍為double、integer、string、binary、boolean。多個資料類型之間用半形逗號(,)分隔。

配置項名稱中<tablename>為時序表名稱的預留位置,請根據實際情況修改。

tablestore.timeseries.mapAll

boolean

false

將輸入JSON中的非主鍵欄位和時間欄位都作為field儲存到時序表中。

當配置項取值為false時,tablestore.timeseries.<tablename>.field.nametablestore.timeseries.<tablename>.field.type必填。

tablestore.timeseries.toLowerCase

boolean

true

將field中的key(輸入資料中非主鍵或者時間的key,或配置在tablestore.timeseries.<tablename>.field.name中的key)轉換為小寫寫入時序表。

tablestore.timeseries.rowsPerBatch

integer

50

寫入tablestore時,一次請求支援寫入的最大行數。最大值為200,預設值為200。

附錄:Kafka和Tablestore資料類型映射

Kafka和Tablestore資料類型映射關係請參見下表。

Kafka Schema Type

Tablestore資料類型

STRING

STRING

INT8、INT16、INT32、INT64

INTEGER

FLOAT32、FLOAT64

DOUBLE

BOOLEAN

BOOLEAN

BYTES

BINARY

附錄:刪除語義

說明

只有同步資料到資料表時才支援此功能。

當同步資料到資料表且Kafka訊息記錄的value中存在空值時,根據寫入模式(insert.mode)和刪除模式(delete.mode)的不同設定,資料寫入到Table Store資料表的處理方式不同,詳細說明請參見下表。

insert.mode

put

update

delete.mode

none

row

column

row_and_column

none

row

column

row_and_column

value為空白值

覆蓋寫

刪行

覆蓋寫

刪行

髒資料

刪行

髒資料

刪行

value所有欄位值均為空白值

覆蓋寫

覆蓋寫

覆蓋寫

覆蓋寫

髒資料

髒資料

刪列

刪列

value部分欄位值為空白值

覆蓋寫

覆蓋寫

覆蓋寫

覆蓋寫

忽略空值

忽略空值

刪列

刪列