全部产品
Search
文档中心

大数据开发治理平台 DataWorks:Kafka数据源

更新时间:Oct 23, 2024

Kafka数据源为您提供读取和写入Kafka的双向通道,本文为您介绍DataWorks的Kafka数据同步的能力支持情况。

支持的版本

支持阿里云Kafka,以及=0.10.2且<=2.2.x的自建Kafka版本。

说明

对于<0.10.2版本Kafka,由于Kafka不支持检索分区数据offset,并且Kafka数据结构可能不支持时间戳,进而无法支持数据同步。

资源评估

实时读取

  • 使用包年包月Serverless资源组时,请提前预估Serverless资源组规格,避免资源组规格不足影响任务运行:

    一个topic预估需要1 CU,除此之外,还需根据流量进行评估:

    • Kafka数据不压缩,按10 MB/s预估需要1 CU。

    • Kafka数据压缩,按10 MB/s预估需要2 CU。

    • Kafka数据压缩并且进行JSON解析,按10MB/s预估需要3 CU。

  • 使用包年包月Serverless资源组和旧版独享数据集成资源组时:

    • 对Failover容忍度高,集群槽位的水位建议不超过80%。

    • 对Failover容忍度低,集群槽位的水位建议不超过70%。

说明

实际占用和数据内容格式等相关,评估后您可以再根据实际运行情况进行调整。

使用限制

Kafka数据源目前支持使用Serverless资源组(推荐)旧版独享数据集成资源组

单表离线读

同时配置parameter.groupId和parameter.kafkaConfig.group.id时,parameter.groupId优先级高于kafkaConfig配置信息中的group.id。

单表实时写

写入数据不支持去重,即如果任务重置位点或者Failover后再启动,会导致出现重复数据写入。

整库实时写

  • 实时数据同步任务支持使用Serverless资源组(推荐)旧版独享数据集成资源组

  • 对于源端同步表有主键的场景,同步时会使用主键值作为kafka记录的key,确保同主键的变更有序写入kafka的同一分区。

  • 对于源端同步表无主键的场景,如果选择了支持无主键表同步选项,则同步时kafka记录的key为空。如果要确保表的变更有序写入kafka,则选择写入的kafka topic必须是单分区。如果选择了自定义同步主键,则同步时使用其他非主键的一个或几个字段的联合,代替主键作为kafka记录的key。

  • 如果在kafka集群发生响应异常的情况下,仍要确保有主键表同主键的变更有序写入kafka的同一分区,则需要在配置kafka数据源时,在扩展参数表单中加入如下配置。

    {"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}

    重要

    添加配置后同步性能会大幅下降,需要在性能和严格保序可靠性之间做好权衡。

  • 实时同步写入kafka的消息总体格式、同步任务心跳消息格式及源端更改数据对应的消息格式,详情请参见:附录:消息格式

支持的字段类型

Kafka的数据存储为非结构化的存储,通常Kafka记录的数据模块有key、value、offset、timestamp、headers、partition。DataWorks在对Kafka数据进行读写时,会按照以下的策略进行数据处理。

离线读数据

DataWorks读取Kafka数据时,支持对Kafka数据进行JSON格式的解析,各数据模块的处理方式如下。

Kafka记录数据模块

处理后的数据类型

key

取决于数据同步任务配置的keyType配置项,keyType参数介绍请参见下文的全量参数说明章节。

value

取决于数据同步任务配置的valueType配置项,valueType参数介绍请参见下文的全量参数说明章节。

offset

Long

timestamp

Long

headers

String

partition

Long

离线写数据

DataWorks将数据写入Kafka时,支持写入JSON格式或text格式的数据,不同的数据同步方案往Kafka数据源中写入数据时,对数据的处理策略不一致,详情如下。

重要
  • 写入text格式的数据时,不会写入字段名数据,使用分隔符来分隔字段取值。

  • 实时同步写入数据到Kafka时,写入的格式为内置的JSON格式,写入数据为包含数据库变更消息的数据、业务时间和DDL信息的所有数据,数据格式详情请参见附录:消息格式

同步任务类型

写入Kafka value的格式

源端字段类型

写入时的处理方式

离线同步

DataStudio的离线同步节点

json

字符串

UTF8编码字符串

布尔

转换为UTF8编码字符串"true"或者"false"

时间/日期

yyyy-MM-dd HH:mm:ss格式UTF8编码字符串

数值

UTF8编码数值字符串

字节流

字节流会被视为UTF8编码的字符串,被转换成字符串

text

字符串

UTF8编码字符串

布尔

转换为UTF8编码字符串"true"或者"false"

时间/日期

yyyy-MM-dd HH:mm:ss格式UTF8编码字符串

数值

UTF8编码数值字符串

字节流

字节流会被视为UTF8编码的字符串,被转换成字符串

实时同步:ETL实时同步至Kafka

DataStudio的实时同步节点

json

字符串

UTF8编码字符串

布尔

json布尔类型

时间/日期

  • 对于精确到毫秒以下精度的时间:转换成表示毫秒时间戳的13位JSON整数。

  • 对于精确到微秒或者纳秒精度的时间:转换成带有表示毫秒时间戳的13位整数,和表示纳秒时间戳的6位小数的JSON浮点数。

数值

json数值类型

字节流

字节流会进行Base64编码后转换成UTF8编码的字符串

text

字符串

UTF8编码字符串

布尔

转换为UTF8编码字符串"true"或者"false"

时间/日期

yyyy-MM-dd HH:mm:ss格式UTF8编码字符串

数值

UTF8编码数值字符串

字节流

字节流会进行Base64编码后转换成UTF8编码字符串

实时同步:整库实时同步至Kafka

纯实时同步增量数据

内置JSON格式

字符串

UTF8编码字符串

布尔

json布尔类型

时间/日期

13位毫秒时间戳

数值

json数值

字节流

字节流会进行Base64编码后转换成UTF8编码字符串

同步解决方案:一键实时同步至Kafka

离线全量+实时增量

内置JSON格式

字符串

UTF8编码字符串

布尔

json布尔类型

时间/日期

13位毫秒时间戳

数值

json数值

字节流

字节流会进行Base64编码后转换成UTF8编码字符串

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

单表、整库实时同步任务配置指导

操作流程请参见DataStudio侧实时同步任务配置

单表、整库全增量实时同步任务配置指导

操作流程请参见数据集成侧同步任务配置

启用认证配置说明

SSL

配置Kafka数据源时,特殊认证方式选择SSL或者SASL_SSL时,表明Kafka集群开启了SSL认证,您需要上传客户端truststore证书文件并填写truststore证书密码。

  • 如果Kafka集群为alikafka实例,可以参考SSL证书算法升级说明下载正确的truststore证书文件,truststore证书密码为KafkaOnsClient

  • 如果Kafka集群为EMR实例,可以参考使用SSL加密Kafka链接下载正确的truststore证书文件并获取truststore证书密码。

  • 如果是自建集群,请自行上传正确的truststore证书,填写正确的truststore证书密码。

keystore证书文件、keystore证书密码和SSL密钥密码只有在Kafka集群开启双向SSL认证时才需要进行配置,用于Kafka集群服务端认证客户端身份,Kafka集群server.propertiesssl.client.auth=required时开启双向SSL认证,详情请参见使用SSL加密Kafka链接

GSSAPI

配置Kafka数据源时,当Sasl机制选择GSSAPI时,需要上传三个认证文件,分别是JAAS配置文件Kerberos配置文件以及Keytab文件,并在独享资源组进行DNS/HOST设置,下面分别介绍三种文件以及独享资源组DNS、HOST的配置方式。

说明

Serverless资源组需要通过内网DNS解析配置Host地址信息,更多信息,请参见内网DNS解析(PrivateZone)

  • JAAS配置文件

    JAAS文件必须以KafkaClient开头,之后使用一个大括号包含所有配置项:

    • 大括号内第一行定义使用的登录组件类,对于各类Sasl认证机制,登录组件类是固定的,后续的每个配置项以key=value格式书写。

    • 除最后一个配置项,其他配置项结尾不能有分号。

    • 最后一个配置项结尾必须有分号,在大括号"}"之后也必须加上一个分号。

    不符合以上格式要求将导致JAAS配置文件解析出错,典型的JAAS配置文件格式如下(根据实际情况替换以下内容中的xxx):

    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="xxx"
       storeKey=true
       serviceName="kafka-server"
       principal="kafka-client@EXAMPLE.COM";
    };

    配置项

    说明

    登录模块

    必须配置com.sun.security.auth.module.Krb5LoginModule。

    useKeyTab

    必须指定为true。

    keyTab

    可以指定任意路径,在同步任务运行时会自动下载数据源配置时上传的keyTab文件到本地,并使用本地文件路径填充keyTab配置项。

    storeKey

    决定客户端是否保存密钥,配置true或者false均可,不影响数据同步。

    serviceName

    对应Kafka服务端server.properties配置文件中的sasl.kerberos.service.name配置项,请根据实际情况配置该项。

    principal

    Kafka客户端使用的kerberos principal,请根据实际情况配置该项,并确保上传的keyTab文件包含该principal的密钥。

  • Kerberos配置文件

    Kerberos配置文件必须包含两个模块[libdefaults]和[realms]

    • [libdefaults]模块指定Kerberos认证参数,模块中每个配置项以key=value格式书写。

    • [realms]模块指定kdc地址,可以包含多个realm子模块,每个realm子模块以realm名称=开头。

    后面紧跟一组用大括号包含配置项,每个配置项也以key=value格式书写,典型的Kerberos配置文件格式如下(根据实际情况替换以下内容中的xxx):

    [libdefaults]
      default_realm = xxx
    
    [realms]
      xxx = {
        kdc = xxx
      }

    配置项

    说明

    [libdefaults].default_realm

    访问Kafka集群节点时默认使用的realm,一般情况下与JAAS配置文件中指定客户端principal所在realm一致。

    [libdefaults]其他参数

    [libdefaults]模块可以指定其他一些kerberos认证参数,例如ticket_lifetime等,请根据实际需要配置。

    [realms].realm名称

    需要与JAAS配置文件中指定客户端principal所在realm,以及[libdefaults].default_realm一致,如果JAAS配置文件中指定客户端principal所在realm和[libdefaults].default_realm不一致,则需要包含两组realms子模块分别对应JAAS配置文件中指定客户端principal所在realm和[libdefaults].default_realm。

    [realms].realm名称.kdc

    以ip:port格式指定kdc地址和端口,例如kdc=10.0.0.1:88,端口如果省略默认使用88端口,例如kdc=10.0.0.1。

  • Keytab文件

    Keytab文件需要包含JAAS配置文件指定principal的密钥,并且能够通过kdc的验证。例如本地当前工作目录有名为client.keytab的文件,可以通过以下命令验证Keytab文件是否包含指定principal的密钥。

    klist -ket ./client.keytab
    
    Keytab name: FILE:client.keytab
    KVNO Timestamp           Principal
    ---- ------------------- ------------------------------------------------------
       7 2018-07-30T10:19:16 test@demo.com (des-cbc-md5)
  • 独享资源组DNS、HOST配置

    开启Kerberos认证的Kafka集群,会使用Kafka集群中节点的hostname作为节点在kdc(Kerberos的服务端程序,即密钥分发中心)中注册的principal的一部分,而客户端访问Kafka集群节点时,会根据本地的DNS、HOST设置,推导Kafka集群节点的principal,进而从kdc获取节点的访问凭证。使用独享资源组访问开启Kerberos认证的Kafka集群时,需要正确配置DNS、HOST,以确保顺利从kdc获取Kafka集群节点的访问凭证:

    • DNS设置

      当独享资源组绑定的VPC中,使用PrivateZone实例进行了Kafka集群节点的域名解析设置,则可以在DataWorks管控台,独享资源组对应的VPC绑定项增加100.100.2.136和100.100.2.138两个IP的自定义路由,即可使PrivateZone针对Kafka集群节点的域名解析设置对独享资源组生效。

    • HOST设置

      当独享资源组绑定的VPC中,未使用PrivateZone实例进行了Kafka集群节点的域名解析设置,则需要在DataWorks管控台,独享资源组网络设置中逐个将Kafka集群节点的IP地址与域名映射添加到Host配置中。

PLAIN

配置Kafka数据源时,当Sasl机制选择PLAIN时,JAAS文件必须以KafkaClient开头,之后使用一个大括号包含所有配置项。

  • 大括号内第一行定义使用的登录组件类,对于各类Sasl认证机制,登录组件类是固定的,后续的每个配置项以key=value格式书写。

  • 除最后一个配置项,其他配置项结尾不能有分号。

  • 最后一个配置项结尾必须有分号,在大括号"}"之后也必须加上一个分号。

不符合以上格式要求将导致JAAS配置文件解析出错,典型的JAAS配置文件格式如下(根据实际情况替换以下内容中的xxx):

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxx"
  password="xxx";
};

配置项

说明

登录模块

必须配置org.apache.kafka.common.security.plain.PlainLoginModul

username

用户名,请根据实际情况配置该项。

password

密码,请根据实际情况配置该项。

常见问题

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

从Kafka读取数据的JSON配置,如下所示。

{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent": 1,//并发数
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    }
}

Reader脚本参数

参数

描述

是否必选

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。

server

Kafka的broker server地址,格式为ip:port

您可以只配置一个server,但请务必保证Kafka集群中所有broker的IP地址都可以连通DataWorks。

topic

Kafka的Topic,是Kafka处理资源的消息源(feeds of messages)的聚合。

column

需要读取的Kafka数据,支持常量列、数据列和属性列:

  • 常量列:使用单引号包裹的列为常量列,例如["'abc'", "'123'"]

  • 数据列

    • 如果您的数据是一个JSON,支持获取JSON的属性,例如["event_id"]

    • 如果您的数据是一个JSON,支持获取JSON的嵌套子属性,例如["tag.desc"]

  • 属性列

    • __key__表示消息的key。

    • __value__表示消息的完整内容 。

    • __partition__表示当前消息所在分区。

    • __headers__表示当前消息headers信息。

    • __offset__表示当前消息的偏移量。

    • __timestamp__表示当前消息的时间戳。

    完整示例如下。

    "column": [
        "__key__",
        "__value__",
        "__partition__",
        "__offset__",
        "__timestamp__",
        "'123'",
        "event_id",
        "tag.desc"
        ]

keyType

Kafka的Key的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。

valueType

Kafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。

beginDateTime

数据消费的开始时间位点,为时间范围(左闭右开)的左边界。yyyymmddhhmmss格式的时间字符串,可以配合调度参数使用。详情请参见调度参数支持的格式

说明

Kafka 0.10.2及以上的版本支持该功能。

需要和beginOffset二选一。

说明

beginDateTimeendDateTime配合使用。

endDateTime

数据消费的结束时间位点,为时间范围(左闭右开)的右边界。yyyymmddhhmmss格式的时间字符串,可以配合调度参数使用。详情请参见调度参数支持的格式

说明

Kafka 0.10.2及以上的版本支持该功能。

需要和endOffset二选一。

说明

endDateTimebeginDateTime配合使用。

beginOffset

数据消费的开始时间位点,您可以配置以下形式:

  • 数字形式(例如15553274),表示开始消费的点位。

  • seekToBeginning:表示从开始点位消费数据。

  • seekToLast:表示从kafkaConfig配置中指定的group.id对应群组ID保存的位点开始读取数据,注意群组位点在客户端会定时自动提交到Kafka服务端,所以任务失败后,如果重跑任务时可能会有数据重复或者丢失,skipExceedRecord参数配置为true时,任务可能丢弃最后读取的一些记录,而这些丢弃数据的群组位点已经提交到服务端,在下一个周期任务运行时将无法读到这些丢弃的数据。

  • seekToEnd:表示从最后点位消费数据,会读取到空数据。

需要和beginDateTime二选一。

endOffset

数据消费的结束位点,用于控制结束数据消费任务退出的时间。

需要和endDateTime二选一。

skipExceedRecord

Kafka使用public ConsumerRecords<K, V> poll(final Duration timeout)消费数据,一次poll调用获取的数据可能在endOffset或者endDateTime之外。skipExceedRecord用于控制是否写出多余的数据至目的端。由于消费数据使用了自动点位提交,建议您:

  • Kafka 0.10.2之前版本:建议配置skipExceedRecord为false。

  • Kafka 0.10.2及以上版本:建议配置skipExceedRecord为true。

否,默认值为false

partition

Kafka的一个Topic有多个分区(partition),正常情况下数据同步任务是读取Topic(多个分区)一个点位区间的数据。您也可以指定partition,仅读取一个分区点位区间的数据。

否,无默认值。

kafkaConfig

创建Kafka数据消费客户端KafkaConsumer可以指定扩展参数,例如bootstrap.serversauto.commit.interval.mssession.timeout.ms等,您可以基于kafkaConfig控制KafkaConsumer消费数据的行为。

encoding

当keyType或valueType配置为STRING时,将使用该配置项指定的编码解析字符串。

否,默认为UTF-8。

waitTIme

消费者对象从Kafka拉取一次数据的最大等待时间,单位为秒。

否,默认为60。

stopWhenPollEmpty

该配置项可选值为true/false。当配置为true时,如果消费者从Kafka拉取数据返回为空(一般是已经读完主题中的全部数据,也可能是网络或者Kafka集群可用性问题),则立即停止任务,否则持续重试直到再次读到数据。

否,默认为true。

stopWhenReachEndOffset

该配置项只在stopWhenPollEmpty为true时生效,可选值为true/false。

  • 当配置为true时,如果消费者从Kafka拉取数据返回为空时,会检查当前是否读取到了Kafka Topic分区中的最新位点数据,如果已经读到了Kafka Topic所有分区中的最新位点数据,则立即停止任务,否则继续尝试从Kafka Topic中拉取数据。

  • 当配置为false时,如果消费者从Kafka拉取数据返回为空时,不会进行检查,立即停止任务。

否,默认为false。

说明

兼容历史逻辑,Kafka版本低于V0.10.2无法执行已经读取Kafka Topic所有分区中的最新位点数据检查,但线上可能存在个别脚本模式任务是读取的版本低于V0.10.2的Kafka数据。

kafkaConfig参数说明如下。

参数

描述

fetch.min.bytes

指定消费者从broker获取消息的最小字节数,即有足够的数据时,才将其返回给消费者。

fetch.max.wait.ms

等待broker返回数据的最大时间,默认500毫秒。fetch.min.bytesfetch.max.wait.ms先满足哪个条件,便按照该方式返回数据。

max.partition.fetch.bytes

指定broker从每个partition中返回给消费者的最大字节数,默认为1 MB。

session.timeout.ms

指定消费者不再接收服务之前,可以与服务器断开连接的时间,默认是30秒。

auto.offset.reset

消费者在读取没有偏移量或者偏移量无效的情况下(因为消费者长时间失效,包含偏移量的记录已经过时并被删除)的处理方式。默认为none(意味着不会自动重置位点),您可以更改为earliest(消费者从起始位置读取partition的记录)。

max.poll.records

单次调用poll方法能够返回的消息数量。

key.deserializer

消息Key的反序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer

value.deserializer

数据Value的反序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer

ssl.truststore.location

SSL根证书的路径。

ssl.truststore.password

根证书Store的密码。如果是Aliyun Kafka,则配置为KafkaOnsClient。

security.protocol

接入协议,目前支持使用SASL_SSL协议接入。

sasl.mechanism

SASL鉴权方式,如果是Aliyun Kafka,使用PLAIN。

java.security.auth.login.config

SASL鉴权文件路径。

Writer脚本Demo

向Kafka写入数据的JSON配置,如下所示。

{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"Kafka",//插件名。
"parameter":{
"server": "ip:9092", //Kafka的server地址。
"keyIndex": 0, //作为Key的列。需遵循驼峰命名规则,k小写
"valueIndex": 1, //作为Value的某列。目前只支持取来源端数据的一列或者该参数不填(不填表示取来源所有数据)
        //例如想取odps的第2、3、4列数据作为kafkaValue,请新建odps表将原odps表数据做清洗整合写新odps表后使用新表同步。
"keyType": "Integer", //Kafka的Key的类型。
"valueType": "Short", //Kafka的Value的类型。
"topic": "t08", //Kafka的topic。
"batchSize": 1024 //向kafka一次性写入的数据量,单位是字节。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
                     "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                     "concurrent":1, //作业并发数。
                     "mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}

Writer脚本参数

参数

描述

是否必选

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。

server

Kafka的server地址,格式为ip:port

topic

Kafka的topic,是Kafka处理资源的消息源(feeds of messages)的不同分类。

每条发布至Kafka集群的消息都有一个类别,该类别被称为topic,一个topic是对一组消息的归纳。

valueIndex

Kafka Writer中作为Value的那一列。如果不填写,默认将所有列拼起来作为Value,分隔符为fieldDelimiter

writeMode

当未配置valueIndex时,该配置项决定将源端读取记录的所有列拼接作为写入kafka记录Value的格式,可选值为text和JSON,默认值为text。

  • 配置为text,将所有列按照fieldDelimiter指定分隔符拼接。

  • 配置为JSON,将所有列按照column参数指定字段名称拼接为JSON字符串。

例如源端记录有三列,值为a、b和c,writeMode配置为text、fieldDelimiter配置为#时,写入kafka的记录Value为字符串a#b#c;writeMode配置为JSON、column配置为[{"name":"col1"},{"name":"col2"},{"name":"col3"}]时,写入kafka的记录Value为字符串{"col1":"a","col2":"b","col3":"c"}。

如果配置了valueIndex,该配置项无效。

column

目标表需要写入数据的字段,字段间用英文逗号分隔。例如:"column": ["id", "name", "age"]

当未配置valueIndex,并且writeMode选择JSON时,该配置项定义源端读取记录的列值在JSON结构中的字段名称。例如,"column": [{"name":id"}, {"name":"name"}, {"name":"age"}]。

  • 当源端读取记录列的个数多于column配置的字段名个数时,写入时进行截断。例如:

    源端记录有三列,值为a、b和c,column配置为[{"name":"col1"},{"name":"col2"}]时,写入kafka的记录Value为字符串{"col1":"a","col2":"b"}。

  • 当源端读取记录列的个数少于column配置的字段名个数时,多余column配置字段名填充null或者nullValueFormat指定的字符串。例如:

    源端记录有两列,值为a和b,column配置为[{"name":"col1"},{"name":"col2"},{"name":"col3"}]时,写入kafka的记录Value为字符串{"col1":"a","col2":"b","col3":null}。如果配置了valueIndex,或者writeMode配置为text,该配置项无效。

如果配置了valueIndex,或者writeMode配置为text,该配置项无效。

当未配置valueIndex,并且writeMode配置为JSON时必选

partition

指定写入Kafka topic指定分区的编号,是一个大于等于0的整数。

keyIndex

Kafka Writer中作为Key的那一列。

keyIndex参数取值范围是大于等于0的整数,否则任务会出错。

keyIndexes

源端读取记录中作为写入kafka记录Key的列的序号数组。

列序号从0开始,例如[0,1,2],会将配置的所有列序号的值用逗号连接作为写入kafka记录的Key。如果不填写,写入kafka记录Key为null,数据轮流写入topic的各个分区中,与keyIndex参数只能二选一。

fieldDelimiter

当writeMode配置为text,并且未配置valueIndex时,将源端读取记录的所有列按照该配置项指定列分隔符拼接作为写入kafka记录的Value,支持配置单个或者多个字符作为分隔符,支持以\u0001格式配置unicode字符,支持\t\n等转义字符。默认值为\t

如果writeMode未配置为text或者配置了valueIndex,该配置项无效。

keyType

Kafka的Key的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。

valueType

Kafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。

nullKeyFormat

keyIndex或者keyIndexes指定的源端列值为null时,替换为该配置项指定的字符串,如果不配置不做替换。

nullValueFormat

当源端列值为null时,组装写入kafka记录Value时替换为该配置项指定的字符串,如果不配置不做替换。

acks

初始化Kafka Producer时的acks配置,决定写入成功的确认方式。默认acks参数为allacks取值如下:

  • 0:不进行写入成功确认。

  • 1:确认主副本写入成功。

  • all:确认所有副本写入成功。

附录:写入Kafka消息格式定义

完成配置实时同步任务的操作后,执行同步任务会将源端数据库读取的数据,以JSON格式写入到Kafka topic中。除了会将设置的源端表中已有数据全部写入Kafka对应Topic中,还会启动实时同步将增量数据持续写入Kafka对应Topic中,同时源端表增量DDL变更信息也会以JSON格式写入Kafka对应Topic中。您可以通过附录:消息格式获取写入Kafka的消息的状态及变更等信息。

说明

通过离线同步任务写入Kafka的数据JSON结构中的payload.sequenceId、payload.timestamp.eventTImepayload.timestamp.checkpointTime字段均设置为-1