After configuring Kafka input components, you can read data from Kafka data sources into the storage system connected to the big data platform for data integration and secondary processing. This topic describes how to configure Kafka input components.
Prerequisites
Before you begin, make sure that you have completed the following operations:
Created a Kafka data source. For more information, see Create a Kafka data source.
The account used to configure Kafka input component properties must have read-through permission on the data source. If you do not have the permission, you need to request the data source permission. For more information, see Request, renew, and return data source permissions.
Procedure
In the top navigation bar of the Dataphin homepage, choose Develop > Data Integration.
In the top navigation bar of the Integration page, select a project (In Dev-Prod mode, you need to select an environment).
In the left-side navigation pane, click Batch Pipeline. In the Batch Pipeline list, click the offline pipeline that you want to develop to open its configuration page.
Click Component Library in the upper-right corner of the page to open the Component Library panel.
In the left-side navigation pane of the Component Library panel, select Inputs. Find the KAFKA component in the input component list on the right and drag it to the canvas.
Click the
icon in the KAFKA input component card to open the KAFKA Input Configuration dialog box.In the KAFKA Input Configuration dialog box, configure the parameters as described in the following table.
Parameter
Description
Step Name
The name of the Kafka input component. Dataphin automatically generates a step name. You can also modify it based on your business scenario. The name must meet the following requirements:
It can contain only Chinese characters, letters, underscores (_), and digits.
It cannot exceed 64 characters in length.
Datasource
The data source dropdown list displays all Kafka data sources in the current Dataphin instance, including those for which you have read-through permission and those for which you do not. Click the
icon to copy the current data source name.For data sources for which you do not have read-through permission, you can click Request next to the data source to request read-through permission. For more information, see Request data source permissions.
If you do not have a Kafka data source, click Create Data Source to create one. For more information, see Create a Kafka data source.
Topic
The Kafka topic. Click the dropdown list to select the Kafka topic name from which you want to read data.
Key Type
The type of the Kafka key, which determines the key.deserializer configuration when initializing the Kafka Consumer. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, and KAFKA AVRO (available when schema.registry is configured for the data source).
Value Type
The type of the Kafka value, which determines the value.deserializer configuration when initializing the Kafka Consumer. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, and KAFKA AVRO (available when schema.registry is configured for the data source).
Consumer Group ID
The group.id configuration when initializing the Kafka Consumer.
If you want a data synchronization node in Data Integration to consume data from the correct offset, you must set this parameter to a value that is unique to the data synchronization node. If you do not specify this parameter, a random string starting with
datax_is automatically generated as the group.id each time synchronization is executed.Start Time
The start time for reading data. Only a time string in the yyyyMMddHHmmss format is supported to specify a specific time, which is the left boundary of the time range. This parameter needs to be used with scheduling parameters. For example, if the scheduling parameter is configured as
beginDateTime=${20220101000000}, then Start Time should be configured as ${beginDateTime}.End Time
The end time for reading data. Only a time string in the yyyyMMddHHmmss format is supported to specify a specific time, which is the right boundary of the time range. This parameter needs to be used with scheduling parameters. For example, if the scheduling parameter is configured as
endDateTime=${20220101000000}, then End Time should be configured as ${endDateTime}.Synchronization End Strategy
Select a synchronization end strategy. The following two strategies are available:
When No New Data Is Read For 1 Minute: If the consumer does not retrieve any data from Kafka for 1 minute (usually because all data in the topic has been read, or possibly due to network or Kafka cluster availability issues), the task stops immediately. Otherwise, it continues to retry until data is read again.
When The Specified End Offset Is Reached: If the business time or offset of the Kafka record read by the data integration task meets the end offset configuration above, the task ends. Otherwise, it continues to retry reading Kafka records indefinitely.
Advanced Configuration
You can configure offset reset strategy, single read size, single read time, and read timeout through advanced configuration. If schema registry is configured for the topic, you need to configure keySchema and valueSchema parameters in the advanced configuration. This parameter is empty by default. The sample format is as follows:
{ "namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }Output Fields
By default, six fields are displayed: __key__ , __value__, __partition__, __headers__, __offset__, and __timestamp__. You can manually add output fields:
Click Batch Add to configure in bulk using JSON or TEXT format.
JSON format example
[ { "index": 0, "name": "__key__", "type": "STRING" }, { "index": 1, "name": "__value__", "type": "STRING" }, { "index": 2, "name": "__partition__", "type": "INTEGER" }, { "index": 3, "name": "__headers__", "type": "STRING" }, { "index": 4, "name": "__offset__", "type": "LONG" }, { "index": 5, "name": "__timestamp__", "type": "LONG" } ]Noteindexindicates the column number of the specified object,nameindicates the field name after import, andtypeindicates the field type after import. For example,"index":3,"name":"user_id","type":"String"means importing the 4th column from the file, with the field nameuser_idand field typeString.TEXT format example
0,__key__,STRING 1,__value__,STRING 2,__partition__,INTEGER 3,__headers__,STRING 4,__offset__,LONG 5,__timestamp__,LONGNoteThe row delimiter is used to separate the information of each field. The default is a line feed (\n). Line feed (\n), semicolon (;), and period (.) are supported.
The column delimiter is used to separate field names and field types. The default is a comma (,).
Click New Output Field, and fill in Source Index, Column, and select Type as prompted.
The source table fields can also be configured as strings other than the six strings mentioned above. In this case, the Kafka record is parsed as a JSON string, and the string configured in the source table field is used as a JSON path to read the corresponding content as the field value and write it to the corresponding target table field. For example:
If
{ "data": { "name": "bob", "age": 35 } }is the value of the Kafka record, when the source table field is configured as data.name, "bob" will be read as the value of this field and written to the corresponding target table. The field types that can be added are Java types and datax mapping types.You can also perform the following operations on added fields:
Click the Actions
icon in the column to edit an existing field.Click the Actions
icon in the column to delete an existing field.
Click OK to complete the Kafka input component configuration.