Preparations
1. Create an index in Elasticsearch
DataHub allows you to synchronize data to indexes in clusters of Elasticsearch V5.X, V6.X, and V7.X.
DataHub allows you to synchronize data from only the topics of the TUPLE type to Elasticsearch. Before you start a DataConnector, make sure that an index is created in Elasticsearch, or indexes can be automatically created. Otherwise, the DataConnector will fail.
2.Prepare an account for data synchronization that is granted required permissions
When you create a DataConnector to synchronize data to Elasticsearch, you must manually enter information related to Elasticsearch such as the endpoint, index name, account, and password. Make sure that the entered account information is authentic and valid. Otherwise, the DataConnector fails to be created.
Create a DataConnector
In the left-side navigation pane of the DataHub console, click Project Manager. On the Project List page, find a project and click View in the Actions column. On the details page of the project, find a topic and click View in the Actions column.
On the details page of the topic, click Connector
in the upper-right corner.
In the Create Connector panel, click ElasticSearch
, as shown in the following figure.

Parameter description
1. Endpoint
The endpoint of Elasticsearch. You must enter an internal endpoint and an internal port in the format of Internal endpoint:Internal port
.
For example, if the internal endpoint is es-cn-xxx.elasticsearch.aliyuncs.com
and the internal port is 9200
, enter es-cn-xxx.elasticsearch.aliyuncs.com:9200
.
2. Index
You can specify indexes in two ways: static indexes and dynamic indexes.
Static indexes
If you create an index in advance or allow Elasticsearch to automatically create an index, all data is written to the index.
Dynamic indexes
You must allow Elasticsearch to automatically create indexes. Otherwise, data fails to be written. You can allow Elasticsearch to generate indexes based on a time period or a field. Indexes can be generated based on only one field.
Supported time formats
Example 1: Create an index every morning. Set the Index parameter to test_${%Y-%m-%d}
. If the current date is March 31, 2021, the index to which data is written is test_2021-03-31
.
Example 2: Create indexes based on a field. To create indexes based on the col1
field, set the Index parameter to test_${col1}
. If a record whose value in the col1 field is AAA
and a record whose value in the col1 field is BBB
exist, the two records are written to different indexes. The former is written to test_AAA
and the latter is written to test_BBB
.
When the number of indexes in an Elasticsearch cluster increases, the data write slows down. Excessive indexes may cause timeout when DataHub writes data to Elasticsearch. Therefore, when you use dynamic indexes, you must prevent creating excessive indexes.
3. User and Password
The username and password that are used to access Elasticsearch.
4. Type Fields
The types that are generated during data synchronization from DataHub to Elasticsearch vary with the versions of Elasticsearch clusters. In a cluster of Elasticsearch V5.X, you can create multiple types in one index. However, in a cluster of Elasticsearch V6.X, you can create only one type in one index. Therefore, the way in which DataHub synchronizes data to Elasticsearch varies. The Type Fields parameter is required.
When you create a DataConnector to synchronize data from DataHub to a cluster of Elasticsearch V5.X, the value of the field that you select is used as the type of a record. If multiple fields are selected, the values of the fields that are separated by vertical bars (|) are used together as the type of a record. The fields that you select from the Type Fields drop-down list cannot be null.
When you create a DataConnector to synchronize data from DataHub to a cluster of Elasticsearch V6.X, the name of the field that you select is used as the type of a record. If multiple fields are selected, the names of the fields that are separated by vertical bars (|) are used together as the type of a record. The cluster of Elasticsearch V6.X supports all field names.
Example:
DataHub schema : f1 string, f2 string, f3 string, f4 string
Data: ["test1","test2","test3",null]
Type field | Type in Elasticsearch V5.X | Type in a Elasticsearch V6.X |
Type field | Type in Elasticsearch V5.X | Type in a Elasticsearch V6.X |
f1 | test1 | f1 |
f1,f3 | test1|test3 | f1|f3 |
ff | The type failed to be created. | ff |
f1,ff | The type failed to be created. | f1|ff |
f4 | The type is created but the data failed to be synchronized because dirty data occurs. | The type is created and the data is synchronized. |
Note:
The type name cannot be customized when you create a DataConnector to synchronize data to a cluster of Elasticsearch V6.X. If you want to customize the type name, you can use DataHub SDKs to create a DataConnector.
Default types are used in a cluster of Elasticsearch V7.X. Therefore, you do not need to set the Type Fields parameter when you create a DataConnector to synchronize data to a cluster of Elasticsearch V7.X.
5. ID Fields
You can generate IDs for the data written to Elasticsearch based on the data written to DataHub. You can also select no ID fields. In this case, Elasticsearch generates a unique ID for each record. When you create a DataConnector to synchronize data from DataHub to Elasticsearch, the value of the field that you select is used as the ID of a record. If multiple fields are selected, the values of the fields that are separated by vertical bars (|) are used together as the ID of a record. The fields that you select from the ID Fields drop-down list cannot be null.
Example:
DataHub schema : f1 string, f2 string, f3 string, f4 string
Data: ["test1","test2","test3",null]
ID field | Data ID |
| Elasticsearch automatically generates a unique ID for each data record. |
f1 | test1 |
f1,f3 | test1|test3 |
ff | The ID failed to be generated. |
f4 | The ID is generated but the data record failed to be synchronized because dirty data occurs. |
6. Router attribute column
You can generate routers for the data written to Elasticsearch based on the data written to DataHub. You can also select no router fields. In this case, the router feature of Elasticsearch is not used. When you create a DataConnector to synchronize data from DataHub to Elasticsearch, the value of the field that you select is used as the router of a data record. If multiple fields are selected, the values of the fields that are separated by vertical bars (|) are used together as the router of a data record. The fields that you select from the Router attribute column drop-down list cannot be null.
For more information about examples, see the description of the ID Fields
parameter.
7. Import Fields
The fields to be imported from DataHub to Elasticsearch. Fields that are not selected are not synchronized from DataHub to Elasticsearch. Fields that are used as IDs and fields that are used as types in a cluster of Elasticsearch V5.X cannot be selected as the fields to be imported. No examples are provided because the fields to be imported do not completely determine the finally generated data. The following part provides a complete data synchronization example.
8. Network Type
You must select the network type based on the type of the Elasticsearch cluster. DataConnectors used to synchronize data from DataHub on Alibaba Cloud support only the VPC network type because all Elasticsearch clusters on the Alibaba Cloud are virtual private cloud (VPC)-connected clusters. If you select the VPC network type, you must enter information such as the VPC ID and the ID of the Elasticsearch cluster. The following figure shows how to view the information.

Note: When you set the Instance ID parameter, you must append -worker
to the ID of the Elasticsearch cluster. For example, if the ID of the Elasticsearch cluster is es-cn-xxx
, enter es-cn-xxx-worker
.
Data write example
In this example, the DataConnector is created. If the DataConnector fails to be created, modify the configuration based on the foregoing parameter description.
The following table describes a topic schema in DataHub.
Field name | Data type |
f1 | BIGINT |
f2 | STRING |
f3 | BOOLEAN |
f4 | DOUBLE |
f5 | TIMESTAMP |
f6 | DECIMAL |
Example 1:
Type Fields = f1 (This parameter is unavailable when you create a DataConnector to synchronize data from DataHub to a cluster of Elasticsearch V7.X.)
ID Fields = f2
Import Fields = f1,f2,f3,f4,f5,f6
Data = v1,v2,v3,v4,v5,v6
Elasticsearch version | type | id | data |
Elasticsearch version | type | id | data |
ES5 | v1 | v2 | {f3:v3,f4:v4,f5:v5,f6:v6}
|
ES6 | f1 | v2 | {f1:v1,f3:v3,f4:v4,f5:v5,f6:v6}
|
ES7 | - | v2 | {f1:v1,f3:v3,f4:v4,f5:v5,f6:v6}
|
Data = null,v2,v3,v4,v5,v6
Elasticsearch version | type | id | data |
Elasticsearch version | type | id | data |
ES5 | - | - | Dirty data occurs because the type field is null. |
ES6 | f1 | v2 | {f1:v1,f3:v3,f4:v4,f5:v5,f6:v6}
|
ES7 | - | v2 | {f1:v1,f3:v3,f4:v4,f5:v5,f6:v6}
|
Data = v1,null,v3,v4,v5,v6
Elasticsearch version | type | id | data |
Elasticsearch version | type | id | data |
ES5 | - | - | Dirty data occurs because the ID field is null. |
ES6 | - | - | Dirty data occurs because the ID field is null. |
ES7 | - | - | Dirty data occurs because the ID field is null. |
Example 2:
Type Fields = f1,f2 (This parameter is unavailable when you create a DataConnector to synchronize data from DataHub to a cluster of Elasticsearch V7.X.)
ID Fields = f3,f4
Import Fields = f1,f2,f3,f4,f5,f6
Data = v1,v2,v3,v4,v5,v6
Elasticsearch version | type | id | data | |
Elasticsearch version | type | id | data | |
ES5 | v1|v2 | v3|v4 | {f5:v5,f6,v6
| |
ES6 | f1|f2 | v3|v4 | {f5:v5,f6,v6}
| |
ES7 | - | v3v4 | {f1:v1,f2:v2,f5:v5,f6:v6}
| |
Data = v1,null,v3,v4,v5,v6
Elasticsearch version | type | id | data |
Elasticsearch version | type | id | data |
ES5 | - | - | Dirty data occurs because the type field is null. |
ES6 | f1|f2 | v3|v4 | {f1:v1,f2:v2,f5:v5,f6:v6} |
ES7 | - | v3|v4 | {f1:v1,f2:v2,f5:v5,f6:v6} |
Data = v1,v2,null,v4,v5,v6
Elasticsearch version | type | id | data |
Elasticsearch version | type | id | data |
ES5 | - | - | Dirty data occurs because the ID field is null. |
ES6 | - | - | Dirty data occurs because the ID field is null. |
ES7 | - | - | Dirty data occurs because the ID field is null. |
Example 3:
Type Fields = f1 (This parameter is unavailable when you create a DataConnector to synchronize data from DataHub to a cluster of Elasticsearch V7.X.)
ID Fields = f2
Router attribute column = f3
Import Fields = f1,f2,f3,f4,f5,f6
Data = v1,v2,v3,v4,v5,v6
Elasticsearch version | type | id | router | data |
Elasticsearch version | type | id | router | data |
ES5 | v1 | v2 | v3 | {f4:v4,f5:v5,f6:v6}
|
ES6 | f1 | v2 | v3 | {f1:v1,f4:v4,f5:v5,f6:v6}
|
ES7 | - | v2 | v3 | {f1:v1,f4:v4,f5:v5,f6:v6}
|
Data = null,v2,v3,v4,v5,v6
Elasticsearch version | type | id | data |
Elasticsearch version | type | id | data |
ES5 | - | - | Dirty data occurs because the type field is null. |
ES6 | f1 | v2 | {f1:v1,f4:v4,f5:v5,f6:v6}
|
ES7 | - | v2 | {f1:v1,f4:v4,f5:v5,f6:v6}
|
Data = v1,null,v3,v4,v5,v6
Elasticsearch version | type | id | data |
Elasticsearch version | type | id | data |
ES5 | - | - | Dirty data occurs because the ID field is null. |
ES6 | - | - | Dirty data occurs because the ID field is null. |
ES7 | - | - | Dirty data occurs because the ID field is null. |
Data = v1,v2,null,v4,v5,v6
Elasticsearch version | type | id | data |
Elasticsearch version | type | id | data |
ES5 | - | - | Dirty data occurs because the router field is null. |
ES6 | - | - | Dirty data occurs because the router field is null. |
ES7 | - | - | Dirty data occurs because the router field is null. |
Example
This example shows the complete procedure of synchronizing data from DataHub to Elasticsearch V6.7. All operations related to Elasticsearch are performed by using the development tools provided in the Kibana console
. For more information about other operations, see Elasticsearch documentation.
1. Create an index in Elasticsearch
By default, Elasticsearch can automatically create an index. Therefore, you can skip this step. If you do not allow Elasticsearch to automatically create an index, you must manually create an index. For more information about how to create an index, see Elasticsearch documentation.
2. Create a DataHub topic
Note: You can create DataConnectors to synchronize data to Elasticsearch only for the topics of the TUPLE type.
For more information, see the "Create a topic" section of the Manage topics topic.
View the created topic.

3. Create a DataConnector to synchronize data to Elasticsearch
In this example, when you create the DataConnector to synchronize data to Elasticsearch, f1 and f2 are used as the type fields, f3 and f4 as the ID fields, and all fields as the fields to be imported.

4. Write data to the DataHub topic
You can use a DataHub SDK or a DataHub plug-in to write data.
After a data record is written, sample the data and view the written data.
5. Verify the synchronized data
First, check the synchronization offset of the DataConnector that is used to synchronize data to Elasticsearch.
You can find that the synchronization offset and synchronization time of the DataConnector have changed. The synchronization time is the time when the data has just been written to DataHub, and the synchronization offset has changed to 1. The synchronization offset starts from 0, and an offset of 0 indicates that the first data record is written.
Then, check whether data is synchronized to Elasticsearch. You can view the synchronized data in the Kibana console.
