DataWorks provides Elasticsearch Reader and Elasticsearch Writer for you to read data from and write data to Elasticsearch data sources. This topic describes the capabilities of synchronizing data from or to Elasticsearch data sources.
Background information
You can use a shared resource group to run a synchronization task to read data from or write data to an Elasticsearch V5.X cluster. You can use an exclusive resource group for Data Integration or a new-version resource group to run a synchronization task to read data from or write data to an Elasticsearch V5.X, V6.X, V7.X, or V8.X cluster.
For information about exclusive resource groups for Data Integration, see Create and use an exclusive resource group for Data Integration.
For information about new-version resource groups, see Create and use a new-version resource group.
Elasticsearch is an open source service that is released based on the Apache License. The service is a popular search engine for enterprises. Elasticsearch is a distributed search and analytics engine built on top of Apache Lucene. The following description provides the mappings between the core concepts of Elasticsearch and those of a relational database:
Relational database instance -> Database -> Table -> Row -> Column
Elasticsearch -> Index -> Types -> Documents -> Fields
An Elasticsearch cluster can contain multiple indexes (databases). Each index can contain multiple types (tables). Each type can contain multiple documents (rows). Each document can contain multiple fields (columns). Elasticsearch Writer obtains data records from a reader and uses the RESTful API of Elasticsearch to write the data records to Elasticsearch at a time.
Supported Elasticsearch versions
DataWorks allows you to add only Alibaba Cloud Elasticsearch V5.X, V6.X, V7.X, and V8.X clusters as data sources. Self-managed Elasticsearch clusters are not supported.
Limits
Batch data read and write
Elasticsearch Reader obtains shard information on the server for data synchronization. You must make sure that the shards on the server are in the active state during data synchronization. Otherwise, data inconsistency may occur.
If you add an Alibaba Cloud Elasticsearch V6.X, V7.X, or V8.X cluster to DataWorks as a data source and configure a synchronization task for the data source, you can use only an exclusive resource group for Data Integration or a new-version resource group to run the synchronization task.
Fields of the scaled_float data type cannot be synchronized.
Indexes that contain fields with the keyword
$ref
in field names cannot be synchronized.
Supported data types
Data type | Elasticsearch Reader for batch data read | Elasticsearch Writer for batch data write | Elasticsearch Writer for real-time data write |
binary | Supported | Supported | Supported |
boolean | Supported | Supported | Supported |
keyword | Supported | Supported | Supported |
constant_keyword | Not supported | Not supported | Not supported |
wildcard | Not supported | Not supported | Not supported |
long | Supported | Supported | Supported |
integer | Supported | Supported | Supported |
short | Supported | Supported | Supported |
byte | Supported | Supported | Supported |
double | Supported | Supported | Supported |
float | Supported | Supported | Supported |
half_float | Not supported | Not supported | Not supported |
scaled_float | Not supported | Not supported | Not supported |
unsigned_long | Not supported | Not supported | Not supported |
date | Supported | Supported | Supported |
date_nanos | Not supported | Not supported | Not supported |
alias | Not supported | Not supported | Not supported |
object | Supported | Supported | Supported |
flattened | Not supported | Not supported | Not supported |
nested | Supported | Supported | Supported |
join | Not supported | Not supported | Not supported |
integer_range | Supported | Supported | Supported |
float_range | Supported | Supported | Supported |
long_range | Supported | Supported | Supported |
double_range | Supported | Supported | Supported |
date_range | Supported | Supported | Supported |
ip_range | Not supported | Supported | Supported |
ip | Supported | Supported | Supported |
version | Supported | Supported | Supported |
murmur3 | Not supported | Not supported | Not supported |
aggregate_metric_double | Not supported | Not supported | Not supported |
histogram | Not supported | Not supported | Not supported |
text | Supported | Supported | Supported |
annotated-text | Not supported | Not supported | Not supported |
completion | Supported | Not supported | Not supported |
search_as_you_type | Not supported | Not supported | Not supported |
token_count | Supported | Not supported | Not supported |
dense_vector | Not supported | Not supported | Not supported |
rank_feature | Not supported | Not supported | Not supported |
rank_features | Not supported | Not supported | Not supported |
geo_point | Supported | Supported | Supported |
geo_shape | Supported | Supported | Supported |
point | Not supported | Not supported | Not supported |
shape | Not supported | Not supported | Not supported |
percolator | Not supported | Not supported | Not supported |
string | Supported | Supported | Supported |
How Elasticsearch Reader works
Elasticsearch Reader works in the following way:
Uses the _search, scroll, and slice APIs of Elasticsearch. The slices are processed by multiple threads of a synchronization task in Data Integration.
Converts data types based on the mapping configuration of Elasticsearch.
For more information, see the documentation for open source Elasticsearch.
Elasticsearch Reader obtains shard information on the server for data synchronization. You must make sure that the shards on the server are in the active state during data synchronization. Otherwise, data inconsistency may occur.
Basic configuration
You must delete the comments from the following code before you run the code.
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" // The maximum number of dirty data records allowed.
},
"jvmOption":"",
"speed":{
"concurrent":3,// The maximum number of parallel threads.
"throttle":true,//
"mbps":"12",// The maximum transmission rate. Unit: MB/s.
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ // The names of the fields.
"id",
"name"
],
"endpoint":"", // The endpoint of the Elasticsearch cluster.
"index":"", // The name of the index from which you want to read data.
"password":"", // The password of the Elasticsearch cluster.
"scroll":"", // The scroll ID.
"search":"", // The search criteria. The value is the same as the Elasticsearch query that uses the _search API.
"type":"default",
"username":"" // The username of the Elasticsearch cluster.
},
"stepType":"elasticsearch"
},
{
"stepType": "elasticsearch",
"parameter": {
"column": [ // The names of the fields to which you want to write data.
{
"name": "id",
"type": "integer"
},
{
"name": "name",
"type": "text"
}
],
"index": "test", // The name of the index to which you want to write data.
"indexType": "", // The type of the index to which you want to write data. If you use an Elasticsearch V7.X cluster, leave this parameter empty.
"actionType": "index", // The write mode.
"cleanup": false, // Specifies whether to create an index before data write.
"datasource": "test", // The name of the data source.
"primaryKeyInfo": { // The value assignment method of the primary key.
"fieldDelimiterOrigin": ",",
"column": [
"id"
],
"type": "specific",
"fieldDelimiter": ","
},
"dynamic": false, // Specifies whether to use the dynamic mapping mechanism to establish mappings between fields.
"batchSize": 1024 // The number of documents to write at a time.
},
"name": "Writer",
"category": "writer"
}
],
"type":"job",
"version":"2.0" // The version number.
}
Advanced features
Extracts all data.
You can extract all fields in a document in an Elasticsearch cluster to a field. For more information, see Scenario 1: Extract all data.
Converts semi-structured data to structured data.
Item
Description
References
Background information
Data in Elasticsearch is deeply nested, has various field types and lengths, and may use Chinese characters. To facilitate data computing and storage for downstream services, Elasticsearch can convert semi-structured data to structured data.
None
Principles
Elasticsearch Reader uses a JSON tool to obtain data paths and flatten nested JSON-formatted data obtained from an Elasticsearch cluster to single-dimensional data. Then, Elasticsearch Reader maps the data to structured tables. This way, Elasticsearch data in a complex structure is converted into multiple structured tables.
None
Solution
You can use paths to parse nested JSON-formatted data.
Property
Property.Child property
Property[0].Child property
Scenario 2: Synchronize nested JSON-formatted data or object properties
You can traverse all data of a property that has multiple child properties and split the data into multiple tables or multiple rows.
Property[*].Child property
You can merge data in a string array into one property and remove duplicates.
Property []
Scenario 4: Merge data in an array into one property and remove duplicates
You can merge multiple properties into one property.
Property 1,Property 2
You can select properties from multiple properties for processing.
Property 1|Property 2
Scenario 6: Select properties from multiple properties for processing
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.
Add a data source
Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.
Configure a batch synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a batch synchronization task by using the codeless UI and Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix: Code and parameters.
Configure a real-time synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a real-time synchronization task in DataStudio.
Configure synchronization settings to implement batch synchronization of all data in a database or one-time synchronization of full data and real-time synchronization of incremental data in a single table or a database
For more information about the configuration procedure, see Configure a synchronization task in Data Integration.
Appendix: Code and parameters
Appendix: Configure a batch synchronization task by using the code editor
If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.
Code for Elasticsearch Reader
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" // The maximum number of dirty data records allowed.
},
"jvmOption":"",
"speed":{
"concurrent":3,
"throttle":false
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ // The names of the fields.
"id",
"name"
],
"endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200",// The endpoint of the Elasticsearch cluster.
"index":"aliyun_es_xx", // The name of the index.
"password":"*******", // The password of the Elasticsearch cluster.
"multiThread":true,
"scroll":"5m", // The scroll ID.
"pageSize":5000,
"connTimeOut":600000,
"readTimeOut":600000,
"retryCount":30,
"retrySleepTime":"10000",
"search":{
"range":{
"gmt_modified":{
"gte":0
}
}
}, // The search criteria. The value is the same as the Elasticsearch query that calls the _search API.
"type":"doc",
"username":"aliyun_di" // The username of the Elasticsearch cluster.
},
"stepType":"elasticsearch"
},
{
"category":"writer",
"name":"Writer",
"parameter":{ },
"stepType":"stream"
}
],
"type":"job",
"version":"2.0" // The version number.
}
Parameters in code for Elasticsearch Reader
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
index | The name of the index from which you want to read data. | Yes | No default value |
type | The name of the index type in the Elasticsearch cluster. | No | Index name |
search | The query parameter of Elasticsearch. | Yes | No default value |
pageSize | The number of data records to read at a time. | No | 100 |
scroll | The parameter that is used to specify the timestamp of the snapshot taken for a scroll.
| Yes | No default value |
strictMode | Specifies whether to read data from the Elasticsearch cluster in strict mode. In strict mode, if a shard-related error is reported, Elasticsearch Reader stops reading data to prevent some data from failing to be read. | No | true |
sort | The field based on which the returned results are sorted. | No | No default value |
retryCount | The number of retries after a failure. | No | 300 |
connTimeOut | The connection timeout period of the client. | No | 600,000 |
readTimeOut | The timeout period of data read of the client. | No | 600,000 |
multiThread | Specifies whether to use multiple threads for an HTTP request. | No | true |
preemptiveAuth | Specifies whether to use the preemptive request mode for an HTTP request. | No | false |
retrySleepTime | The interval between retries after a failure. | No | 1000 |
discovery | Specifies whether to enable the node discovery mechanism.
| No | false |
compression | Specifies whether to compress a request body in the GZIP format. If you set this parameter to true, you must enable the http.compression settings on the Elasticsearch cluster. | No | false |
dateFormat | The dateFormat parameter is required if the fields to be synchronized include fields of a date data type and the format setting is not configured for mappings of the fields. You must specify all formats that are required to synchronize the fields of a date data type in this parameter, such as | No | No default value |
full | Specifies whether to synchronize all fields in a document in an Elasticsearch cluster to the destination as a field, and use the queried data in an Elasticsearch cluster as a field. For more information, see Scenario 1: Extract all data. | No | No default value |
multi | You can configure this parameter to enable the advanced feature that supports five solutions to help Elasticsearch Reader convert semi-structured data into structured data. The two child properties are | No | No default value |
Code for Elasticsearch Writer
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1, // The maximum number of parallel threads.
"mbps":"12",// The maximum transmission rate. Unit: MB/s.
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
},
"stepType": "stream"
},
{
"category": "writer",
"name": "Writer",
"parameter": {
"datasource":"xxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"discovery": false,
"primaryKeyInfo":{
"type":"pk",
"fieldDelimiter":",",
"column":[]
},
"batchSize": 1000,
"dynamic":false,
"esPartitionColumn":[
{
"name":"col1",
"comment":"xx",
"type":"STRING"
}
],
"column": [
{
"name": "pk",
"type": "id"
},
{
"name": "col_ip",
"type": "ip"
},
{
"name": "col_array",
"type": "long",
"array": true,
},
{
"name": "col_double",
"type": "double"
},
{
"name": "col_long",
"type": "long"
},
{
"name": "col_integer",
"type": "integer"
{
"name": "col_keyword",
"type": "keyword"
},
{
"name": "col_text",
"type": "text",
"analyzer": "ik_max_word",
"other_params":
{
"doc_values": false
},
},
{
"name": "col_geo_point",
"type": "geo_point"
},
{
"name": "col_date",
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"name": "col_nested1",
"type": "nested"
},
{
"name": "col_nested2",
"type": "nested"
},
{
"name": "col_object1",
"type": "object"
},
{
"name": "col_object2",
"type": "object"
},
{
"name": "col_integer_array",
"type": "integer",
"array": true
},
{
"name": "col_geo_shape",
"type": "geo_shape",
"tree": "quadtree",
"precision": "10m"
}
]
},
"stepType": "elasticsearch"
}
],
"type": "job",
"version": "2.0"
}
A connection failure may occur if you use the shared resource group for Data Integration to connect to an Elasticsearch cluster that is deployed in a virtual private cloud (VPC). To connect to an Elasticsearch cluster that is deployed in a VPC and synchronize data from or to the Elasticsearch cluster, use an exclusive resource group for Data Integration. For information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration.
Parameters in code for Elasticsearch Writer
Parameter | Description | Required | Default value |
datasource | The name of the data source. If no data sources are available, add an Elasticsearch cluster to DataWorks as a data source. For more information, see Add an Elasticsearch data source. | Yes | No default value |
index | The name of the index to which you want to write data. | Yes | No default value |
indexType | The name of the index type in the destination Elasticsearch cluster. | No | Elasticsearch |
cleanup | Specifies whether to delete the existing data from the index before data write. Valid values:
| No | false |
batchSize | The number of data records to write at a time. | No | 1,000 |
trySize | The maximum number of retries that are allowed after a write failure occurs. | No | 30 |
timeout | The connection timeout of the client. | No | 600,000 |
discovery | Specifies whether to enable node discovery.
| No | false |
compression | Specifies whether to enable compression for an HTTP request. | No | true |
multiThread | Specifies whether to use multiple threads for an HTTP request. | No | true |
ignoreWriteError | Specifies whether to ignore write errors and proceed with data write operations without retries. | No | false |
ignoreParseError | Specifies whether to ignore format parsing errors and proceed with data write operations. | No | true |
alias | The alias of the index to which you want to write data. The alias feature of Elasticsearch is similar to the view feature of a database. For example, if you create an alias named my_index_alias for the index my_index, the operations that are performed on my_index_alias also take effect on my_index. If you configure the alias parameter, the alias that you specify is created for the index after data is written to the index. | No | No default value |
aliasMode | The mode in which an alias is added after data is written to the index. Valid values: append and exclusive.
Elasticsearch Writer can convert aliases to actual index names. You can use aliases to migrate data from one index to another index, search for data across multiple indexes in a unified manner, and create a view on a subset of data in an index. | No | append |
settings | The settings of the index. The settings must follow the specifications of open source Elasticsearch. | No | No default value |
column | The fields of the document. The parameters for each field include basic parameters such as name and type, and advanced parameters such as analyzer, format, and array. Elasticsearch Writer supports the following types of fields:
The following information describes the field types:
If you want to define properties related to the Elasticsearch data source in addition to the field type when you configure the column parameter, you can configure the other_params parameter and define the properties in the other_params parameter. The configurations in the other_params parameter are used when Elasticsearch Writer updates the mappings of the destination index in the Elasticsearch data source.
If you want to write source data to Elasticsearch as arrays, you can enable Elasticsearch Writer to parse the source data in the JSON format or based on a specified delimiter. For more information, see Appendix: Write data to Elasticsearch as arrays. | Yes | No default value |
dynamic | Specifies whether to use the dynamic mapping mechanism of Elasticsearch to establish mappings for fields that are written to the index.
In Elasticsearch V7.X, the default value of the type parameter is _doc. If you set this parameter to true to use the dynamic mapping mechanism, set the type parameter to _doc and the esVersion parameter to 7. You must add the following parameter configuration that specifies the version information to the code: | No | false |
actionType | The type of the action that can be performed when data is written to the destination Elasticsearch cluster. Data Integration supports only the following types of actions: index and update. Default value: index.
| No | index |
primaryKeyInfo | Specifies the value assignment method of the _id column that is used as the primary key to write data to Elasticsearch.
| Yes | specific |
esPartitionColumn | Specifies whether to enable partitioning for the index when data is written to Elasticsearch. This parameter is used to change the routing setting of the Elasticsearch cluster.
| No | false |
enableWriteNull | Specifies whether source fields whose values are NULL can be synchronized to Elasticsearch. Valid values:
| No | true |
Scenario 1: Extract all data
Description: You can extract all fields in a document in an Elasticsearch cluster to a field.
Example:
## Source: raw data in an Elasticsearch cluster "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "IXgdO4MB4GR_1DmrjTXP", "_score": 1.0, "_source": { "feature1": "value1", "feature2": "value2", "feature3": "value3" } }] ## Configurations of Elasticsearch Reader "parameter": { "column": [ "content" ], "full":true } ## Destination: Data is synchronized to one column of a row in the destination table. {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}
Scenario 2: Synchronize nested JSON-formatted data or object properties
Description: You can use paths to parse nested JSON-formatted data or object properties.
Configuration format:
Property
Property.Child property
Property[0].Child property
Configuration in the code editor:
"multi":{ "multi":true }
NoteThis configuration is not supported in the codeless UI.
Example:
## Source: raw data in an Elasticsearch cluster "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "7XAOOoMB4GR_1Dmrrust", "_score": 1.0, "_source": { "level1": { "level2": [ { "level3": "testlevel3_1" }, { "level3": "testlevel3_2" } ] } } } ] ## Configurations of Elasticsearch Reader "parameter": { "column": [ "level1", "level1.level2", "level1.level2[0]", "level1.level2.level3" ], "multi":{ "multi":true } } ## Destination: four columns of a row column1(level1): {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]} column2(level1.level2): [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}] column3(level1.level2[0]): {"level3":"testlevel3_1"} column4(level1.level2.level3): null
NoteIf an upper-level property of a child property that you configure as a column contains an array, you must specify the upper-level property in the format of Property[N]. Otherwise, null is synchronized as the value of the column. In the preceding example, level2 contains an array, and level1.level2.level3 is specified as a column. As a result, no error message is returned, but null is synchronized as the value of the level1.level2.level3 column. In this example, to obtain the data of level3, you must specify level1.level2[0].level3 or level1.level2[1].level3 in the column parameter. level1.level2[*].level3 is not supported.
Data that contains periods (.) in the key cannot be read. For example, if the data to read is "level1.level2":{"level3":"testlevel3_1"}, null is returned.
Scenario 3: Split an array into multiple rows
Description: If data of a property has multiple child properties, you need to split the data to multiple rows.
Configuration format: Property[*].Child property.
Example: After the source data { "splitKey" :[1,2,3,4,5]} is split, the source data is written to five rows in the destination: { "splitKey[0]":1,"splitKey[1]":2,"splitKey[2]":3,"splitKey[3]":4,"splitKey[4]":5}.
Configuration in the code editor:
"multi":{ "multi":true, "key": "headers" }
NoteIf you configure Split array fields name in the codeless UI, a script is automatically generated, and the code in the script has the same effect as the code configured in the code editor.
The value of the source data must be in the List format. Otherwise, an error is reported.
Example:
## Source: raw data in an Elasticsearch cluster [ { "_index": "lmtestjson", "_type": "_doc", "_id": "nhxmIYMBKDL4VkVLyXRN", "_score": 1.0, "_source": { "headers": [ { "remoteip": "192.0.2.1" }, { "remoteip": "192.0.2.2" } ] } }, { "_index": "lmtestjson", "_type": "_doc", "_id": "wRxsIYMBKDL4VkVLcXqf", "_score": 1.0, "_source": { "headers": [ { "remoteip": "192.0.2.3" }, { "remoteip": "192.0.2.4" } ] } } ] ## Configurations of Elasticsearch Reader { "column":[ "headers[*].remoteip" ] "multi":{ "multi":true, "key": "headers" } } ## Destination: four rows 192.0.2.1 192.0.2.2 192.0.2.3 192.0.2.4
Scenario 4: Merge data in an array into one property and remove duplicates
Description: You can merge data in an array into one property and remove duplicates. The child properties can be name1.name 2, and the data is deduplicated based on the toString result.
Configuration format: Property []
If the data in a column contains the keyword [], the data is merged, and duplicates are removed.
Configuration in the code editor:
"multi":{ "multi":true }
NoteThis configuration is not supported in the codeless UI.
Example:
## Source: raw data in an Elasticsearch cluster "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "4nbUOoMB4GR_1Dmryj8O", "_score": 1.0, "_source": { "feature1": [ "value1", "value1", "value2", "value2", "value3" ] } } ] ## Configurations of Elasticsearch Reader "parameter": { "column":[ "feature1[]" ], "multi":{ "multi":true } } ## Destination: one column of a row "value1,value2,value3"
Scenario 5: Merge multiple properties into one property
Description: Multiple properties are selectively processed, and the first property that has values is returned. If no value exists, null is returned.
Configuration format: property 1|property 2|...
You can use this method if the value that you specify for the column parameter contains vertical bars (|).
Configuration in the code editor:
"multi":{ "multi":true }
NoteThis configuration is not supported in the codeless UI.
Example:
## Source: raw data in an Elasticsearch cluster "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ## Configurations of Elasticsearch Reader "parameter": { "column":[ "feature1|feature2|feature3" ], "multi":{ "multi":true } } ## Destination: one column of a row "feature1"
Scenario 6: Select properties from multiple properties for processing
Description: If you select properties from multiple properties for processing, the first property that has values is returned. If no value exists, null is returned.
Configuration format: property 1|property 2|...
This method is used if the value that you specify for the column parameter contains vertical bars (|).
Configuration in the code editor:
"multi":{ "multi":true }
NoteThis configuration is not supported in the codeless UI.
Example:
## Source: raw data in an Elasticsearch cluster "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ## Configurations of Elasticsearch Reader "parameter": { "column":[ "feature1,feature2,feature3" ], "multi":{ "multi":true } } ## Destination: one column of a row "feature1,[1,2,3],{"child":"feature3"}"
Appendix: Write data to Elasticsearch as arrays
You can use one of the following methods to configure Elasticsearch Writer to write data to Elasticsearch as arrays.
Enable Elasticsearch Writer to parse source data in the JSON format
For example, the value of a source field is
"[1,2,3,4,5]"
. If you want to enable Elasticsearch Writer to write the value to Elasticsearch as an array, you can set the json_array parameter to true in the code of the synchronization task. This way, Elasticsearch Writer writes the source data record to Elasticsearch as an array."parameter" : { { "name":"docs_1", "type":"keyword", "json_array":true } }
Enable Elasticsearch Writer to parse source data based on a specified delimiter
For example, the value of a source field is
"1,2,3,4,5"
. If you want to enable Elasticsearch Writer to write the value to Elasticsearch as an array, you can set the splitter parameter to a comma (,) in the code of the synchronization task. This way, Elasticsearch Writer parses the value based on the delimiter and writes the value to Elasticsearch as an array.NoteA synchronization task supports only one type of delimiter. You cannot specify different delimiters for different fields that you want to write to Elasticsearch as arrays. For example, you cannot specify a comma (,) as a delimiter for the source field col1="1,2,3,4,5" and a hyphen (-) as a delimiter for the source field col2="6-7-8-9-10".
"parameter" : { "column": [ { "name": "docs_2", "array": true, "type": "long" } ], "splitter":","// You must configure the splitter parameter at the same level as the column parameter. }
References
For information about data source types supported by Data Integration, see Supported data source types and synchronization operations.