Elasticsearch資料來源為您提供讀取和寫入Elasticsearch雙向通道的功能,本文為您介紹DataWorks的Elasticsearch資料同步的能力支援情況。
背景資訊
Elasticsearch在公用資源群組上支援Elasticsearch 5.x版本,在獨享Data Integration資源群組和新版資源群組(通用型資源群組)上支援Elasticsearch 5.x、6.x、7.x和8.x版本。
獨享Data Integration資源群組的詳情請參見新增和使用獨享Data Integration資源群組。
新版資源群組的詳情請參見新增和使用新版資源群組。
Elasticsearch是遵從Apache開源條款的一款開源產品,是當前主流的企業級搜尋引擎。Elasticsearch是一個基於Lucene的搜尋和資料分析工具,它提供分布式服務。Elasticsearch核心概念同資料庫核心概念的對應關係如下所示。
Relational DB(執行個體)-> Databases(資料庫)-> Tables(表)-> Rows(一行資料)-> Columns(一行資料的一列)
Elasticsearch -> Index -> Types -> Documents -> Fields
Elasticsearch中可以有多個索引或資料庫,每個索引可以包括多個類型或表,每個類型可以包括多個文檔或行,每個文檔可以包括多個欄位或列。Elasticsearch Writer外掛程式使用Elasticsearch的Rest API介面,批量把從Reader讀入的資料寫入Elasticsearch中。
支援的版本
DataWorks平台目前僅支援配置Elasticsearch 5.x、6.x、7.x和8.x版本資料來源,不支援配置自建Elasticsearch資料來源。
使用限制
離線讀寫
Elasticsearch Reader會擷取Server端shard資訊用於資料同步,需要確保在任務同步中Server端的shards處於存活狀態,否則會存在資料不一致風險。
如果您使用的是6.x及以上版本,僅支援使用獨享Data Integration資源群組或使用新版資源群組。
不支援同步scaled_float類型的欄位。
不支援同步欄位中帶有關鍵字
$ref
的索引。
支援的欄位類型
類型 | 離線讀(Elasticsearch Reader) | 離線寫(Elasticsearch Writer) | 即時寫 |
binary | 支援 | 支援 | 支援 |
boolean | 支援 | 支援 | 支援 |
keyword | 支援 | 支援 | 支援 |
constant_keyword | 不支援 | 不支援 | 不支援 |
wildcard | 不支援 | 不支援 | 不支援 |
long | 支援 | 支援 | 支援 |
integer | 支援 | 支援 | 支援 |
short | 支援 | 支援 | 支援 |
byte | 支援 | 支援 | 支援 |
double | 支援 | 支援 | 支援 |
float | 支援 | 支援 | 支援 |
half_float | 不支援 | 不支援 | 不支援 |
scaled_float | 不支援 | 不支援 | 不支援 |
unsigned_long | 不支援 | 不支援 | 不支援 |
date | 支援 | 支援 | 支援 |
date_nanos | 不支援 | 不支援 | 不支援 |
alias | 不支援 | 不支援 | 不支援 |
object | 支援 | 支援 | 支援 |
flattened | 不支援 | 不支援 | 不支援 |
nested | 支援 | 支援 | 支援 |
join | 不支援 | 不支援 | 不支援 |
integer_range | 支援 | 支援 | 支援 |
float_range | 支援 | 支援 | 支援 |
long_range | 支援 | 支援 | 支援 |
double_range | 支援 | 支援 | 支援 |
date_range | 支援 | 支援 | 支援 |
ip_range | 不支援 | 支援 | 支援 |
ip | 支援 | 支援 | 支援 |
version | 支援 | 支援 | 支援 |
murmur3 | 不支援 | 不支援 | 不支援 |
aggregate_metric_double | 不支援 | 不支援 | 不支援 |
histogram | 不支援 | 不支援 | 不支援 |
text | 支援 | 支援 | 支援 |
annotated-text | 不支援 | 不支援 | 不支援 |
completion | 支援 | 不支援 | 不支援 |
search_as_you_type | 不支援 | 不支援 | 不支援 |
token_count | 支援 | 不支援 | 不支援 |
dense_vector | 不支援 | 不支援 | 不支援 |
rank_feature | 不支援 | 不支援 | 不支援 |
rank_features | 不支援 | 不支援 | 不支援 |
geo_point | 支援 | 支援 | 支援 |
geo_shape | 支援 | 支援 | 支援 |
point | 不支援 | 不支援 | 不支援 |
shape | 不支援 | 不支援 | 不支援 |
percolator | 不支援 | 不支援 | 不支援 |
string | 支援 | 支援 | 支援 |
工作原理
Elasticsearch Reader的工作原理如下:
通過Elasticsearch的_searchscrollslice(即遊標分區)方式實現,slice結合Data Integration任務的task多線程分區機制使用。
根據Elasticsearch中的Mapping配置,轉換資料類型。
更多詳情請參見Elasticsearch官方文檔。
Elasticsearch Reader會擷取Server端shard資訊用於資料同步,需要確保在任務同步中Server端的shards處於存活狀態,否則會存在資料不一致風險。
基本配置
實際運行時,請刪除下述代碼中的注釋。
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" //錯誤記錄數。
},
"jvmOption":"",
"speed":{
"concurrent":3,//並發數
"throttle":true,//
"mbps":"12",//限流,此處1mbps = 1MB/s。
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ //讀取列。
"id",
"name"
],
"endpoint":"", //服務地址。
"index":"", //索引。
"password":"", //密碼。
"scroll":"", //scroll標誌。
"search":"", //查詢query參數,與Elasticsearch的query內容相同,使用_search api,重新命名為search。
"type":"default",
"username":"" //使用者名稱。
},
"stepType":"elasticsearch"
},
{
"stepType": "elasticsearch",
"parameter": {
"column": [ //寫入列
{
"name": "id",
"type": "integer"
},
{
"name": "name",
"type": "text"
}
],
"index": "test", //寫入索引
"indexType": "", //寫入索引類型,es7不填
"actionType": "index", //寫入方式
"cleanup": false, //是否重建索引
"datasource": "test", //資料來源名稱
"primaryKeyInfo": { //主鍵取值方式
"fieldDelimiterOrigin": ",",
"column": [
"id"
],
"type": "specific",
"fieldDelimiter": ","
},
"dynamic": false, //動態映射
"batchSize": 1024 //批量寫文檔數
},
"name": "Writer",
"category": "writer"
}
],
"type":"job",
"version":"2.0" //版本號碼。
}
進階功能
支援全量拉取
支援將Elasticsearch中一個文檔的所有內容拉取為一個欄位。配置詳情請參見情境一:全量拉取。
支援提取半結構化到結構化資料
分類
描述
相關文檔
產生背景
Elasticsearch中的資料特徵為欄位不固定,且有中文名、資料使用深層嵌套的形式。為更好地方便下遊業務對資料的計算和儲存需求,特推出從半結構化到結構化的轉換解決方案。
—
實現原理
將Elasticsearch擷取到的JSON資料,利用JSON工具的路徑擷取特性,將嵌套資料扁平化為一維結構的資料。然後將資料對應至結構化資料表中,拆分Elasticsearch複合結構資料至多個結構化資料表。
—
解決方案
JSON有嵌套的情況,通過path路徑來解決。
屬性
屬性.子屬性
屬性[0].子屬性
附屬資訊有一對多的情況,需要進行拆表拆行處理,進行遍曆。
屬性[*].子屬性
數組歸併,一個字串數組內容,歸併為一個屬性,並進行去重。
屬性[]
多屬性合一,將多個屬性合并為一個屬性。
屬性1,屬性2
多屬性選擇處理。
屬性1|屬性2
資料同步任務開發
Elasticsearch資料同步任務的配置入口和通用配置流程指導可參見下文的配置指導,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源。
單表離線同步任務配置指導
操作流程請參見通過嚮導模式配置離線同步任務、通過指令碼模式配置離線同步任務。
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄:指令碼Demo與參數說明。
單表即時寫同步任務配置指導
操作流程請參見DataStudio側即時同步任務配置。
整庫離線寫、單表/整庫全增量即時寫同步任務配置指導
操作流程請參見Data Integration側同步任務配置。
附錄:指令碼Demo與參數說明
附錄:離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。
Reader指令碼Demo
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" //錯誤記錄數。
},
"jvmOption":"",
"speed":{
"concurrent":3,
"throttle":false
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ //讀取列。
"id",
"name"
],
"endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服務地址。
"index":"aliyun_es_xx", //索引。
"password":"*******", //密碼。
"multiThread":true,
"scroll":"5m", //scroll標誌。
"pageSize":5000,
"connTimeOut":600000,
"readTimeOut":600000,
"retryCount":30,
"retrySleepTime":"10000",
"search":{
"range":{
"gmt_modified":{
"gte":0
}
}
}, //查詢query參數,與Elasticsearch的query內容相同,使用_search api,重新命名為search。
"type":"doc",
"username":"aliyun_di" //使用者名稱。
},
"stepType":"elasticsearch"
},
{
"category":"writer",
"name":"Writer",
"parameter":{ },
"stepType":"stream"
}
],
"type":"job",
"version":"2.0" //版本號碼。
}
Reader指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
datasource | 資料來源名稱,指令碼模式支援添加資料來源,該配置項填寫的內容必須與添加的資料來源名稱保持一致。 | 是 | 無 |
index | Elasticsearch中的index名。 | 是 | 無 |
type | Elasticsearch中index的type名。 | 否 | index名 |
search | Elasticsearch的query參數。 | 是 | 無 |
pageSize | 每次讀取資料的條數。 | 否 | 100 |
scroll | Elasticsearch的分頁參數,設定遊標存放時間。
| 是 | 無 |
strictMode | 以strict 模式讀取Elasticsearch中的資料,當出現Elasticsearch的shard.failed時會停止讀取,避免讀取少資料。 | 否 | true |
sort | 返回結果的排序欄位。 | 否 | 無 |
retryCount | 失敗後重試的次數。 | 否 | 300 |
connTimeOut | 用戶端連線逾時時間。 | 否 | 600,000 |
readTimeOut | 用戶端讀取逾時時間。 | 否 | 600,000 |
multiThread | http請求,是否有多線程。 | 否 | true |
preemptiveAuth | http是否使用搶先模式請求 | 否 | false |
retrySleepTime | 失敗後重試的時間間隔。 | 否 | 1000 |
discovery | 是否開啟節點發現。
| 否 | false |
compression | 是否使用GZIP壓縮請求本文,使用時需要在es節點上啟用http.compression設定。 | 否 | false |
dateFormat | 待同步欄位存在date類型,且該欄位mapping沒有format配置時,需要配置dateFormat參數。配置形式如下: | 否 | 無 |
full | 是否將全文檔內容作為一個欄位同步至目標端,將Elasticsearch的查詢資料作為一個欄位,配置詳情請參見情境一:全量拉取。 | 否 | 無 |
multi | 該配置是一個進階功能具有五種用法,兩個子屬性分別為 | 否 | 無 |
Writer指令碼Demo
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
},
"stepType": "stream"
},
{
"category": "writer",
"name": "Writer",
"parameter": {
"datasource":"xxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"discovery": false,
"primaryKeyInfo":{
"type":"pk",
"fieldDelimiter":",",
"column":[]
},
"batchSize": 1000,
"dynamic":false,
"esPartitionColumn":[
{
"name":"col1",
"comment":"xx",
"type":"STRING"
}
],
"column": [
{
"name": "pk",
"type": "id"
},
{
"name": "col_ip",
"type": "ip"
},
{
"name": "col_array",
"type": "long",
"array": true,
},
{
"name": "col_double",
"type": "double"
},
{
"name": "col_long",
"type": "long"
},
{
"name": "col_integer",
"type": "integer"
{
"name": "col_keyword",
"type": "keyword"
},
{
"name": "col_text",
"type": "text",
"analyzer": "ik_max_word",
"other_params":
{
"doc_values": false
},
},
{
"name": "col_geo_point",
"type": "geo_point"
},
{
"name": "col_date",
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"name": "col_nested1",
"type": "nested"
},
{
"name": "col_nested2",
"type": "nested"
},
{
"name": "col_object1",
"type": "object"
},
{
"name": "col_object2",
"type": "object"
},
{
"name": "col_integer_array",
"type": "integer",
"array": true
},
{
"name": "col_geo_shape",
"type": "geo_shape",
"tree": "quadtree",
"precision": "10m"
}
]
},
"stepType": "elasticsearch"
}
],
"type": "job",
"version": "2.0"
}
VPC環境的Elasticsearch運行在預設資源群組會存在網路不通的情況。您需要使用獨享Data Integration資源群組,才能連通VPC進行資料同步。添加資源的詳情請參見獨享Data Integration資源群組。
Writer指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
datasource | 選擇需要同步的Elasticsearch資料來源,若還未在DataWorks建立該資料來源,請先建立,詳情請參見配置Elasticsearch資料來源。 | 是 | 無 |
index | Elasticsearch中的index名。 | 是 | 無 |
indexType | Elasticsearch中index的type名。 | 否 | Elasticsearch |
cleanup | 定義當前任務在索引index已存在的情況是否要刪除資料。
| 否 | false |
batchSize | 定義同步任務一次性插入Elasticsearch的Document條數。 | 否 | 1,000 |
trySize | 定義往Elasticsearch寫入資料失敗後的重試次數。 | 否 | 30 |
timeout | 用戶端逾時時間。 | 否 | 600,000 |
discovery | 任務是否啟動節點發現功能。
| 否 | false |
compression | HTTP請求,開啟壓縮。 | 否 | true |
multiThread | HTTP請求,是否有多線程。 | 否 | true |
ignoreWriteError | 忽略寫入錯誤,不重試,繼續寫入。 | 否 | false |
ignoreParseError | 忽略解析資料格式錯誤,繼續寫入。 | 否 | true |
alias | Elasticsearch的別名類似於資料庫的視圖機制,為索引my_index建立一個別名my_index_alias,對my_index_alias的操作與my_index的操作一致。 配置alias表示在資料匯入完成後,為指定的索引建立別名。 | 否 | 無 |
aliasMode | 資料匯入完成後增加別名的模式,包括append(增加模式)和exclusive(只留這一個):
後續會轉換別名為實際的索引名稱,別名可以用來進行索引遷移和多個索引的查詢統一,並可以用來實現視圖的功能。 | 否 | append |
settings | 建立index時的settings,與Elasticsearch官方一致。 | 否 | 無 |
column | column用來配置文檔的多個欄位Filed資訊,具體每個欄位項可以配置name(名稱)、type(類型)等基礎配置,以及Analyzer、Format和Array等擴充配置。 Elasticsearch所支援的欄位類型如下所示。
列類型的說明如下:
如果需要在column中配置除了type以外的屬性值,您可以使用other_params參數,該參數配置在column中,在update mappings時,用於描述column中除了type以外的Elasticsearch屬性資訊。
如果您希望源端資料寫入為Elasticsearch時按照數群組類型寫入,您可按照JSON格式或指定分隔字元的方式來解析源端資料。配置詳情請參見附錄:Elasticsearch寫入的格式期望是數群組類型。 | 是 | 無 |
dynamic | 定義當在文檔中發現未存在的欄位時,同步任務是否通過Elasticsearch動態映射機製為欄位添加映射。
Elasticsearch 7.x版本的預設type為_doc。使用Elasticsearch的自動mappings時,請配置_doc和esVersion為7。 您需要轉換為指令碼模式,添加一個版本參數: | 否 | false |
actionType | 表示Elasticsearch在資料寫出時的action類型,目前Data Integration支援index和update兩種actionType,預設值為index:
| 否 | index |
primaryKeyInfo | 定義當前寫入Elasticsearch的主鍵取值方式。
| 是 | specific |
esPartitionColumn | 定義寫入Elasticsearch時是否開啟分區,用於修改Elasticsearch中的routing的參數。
| 否 | false |
enableWriteNull | 該參數用於是否支援將來源端的空值欄位同步至Elasticsearch。取值如下:
| 否 | true |
情境一:全量拉取
背景說明:將Elasticsearch中文檔查詢的結果拉取為一個欄位。
配置樣本:
## 讀端:Elasticsearch中的未經處理資料 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "IXgdO4MB4GR_1DmrjTXP", "_score": 1.0, "_source": { "feature1": "value1", "feature2": "value2", "feature3": "value3" } }] ##Data IntegrationElasticsearch Reader外掛程式配置 "parameter": { "column": [ "content" ], "full":true } ##寫端結果:同步至目標端1行1列 {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}
情境二:嵌套或對象欄位屬性同步
背景說明:Object對象或nested嵌套欄位的屬性時,通過path路徑來解決。
配置形式:
屬性
屬性.子屬性
屬性[0].子屬性
指令碼配置:
"multi":{ "multi":true }
說明嚮導模式暫不支援配置。
配置樣本:
## 讀端:Elasticsearch中的未經處理資料 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "7XAOOoMB4GR_1Dmrrust", "_score": 1.0, "_source": { "level1": { "level2": [ { "level3": "testlevel3_1" }, { "level3": "testlevel3_2" } ] } } } ] ##Data IntegrationElasticsearch reader外掛程式配置 "parameter": { "column": [ "level1", "level1.level2", "level1.level2[0]", "level1.level2.level3" ], "multi":{ "multi":true } } ##寫端結果:1行資料4列 column1(level1): {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]} column2(level1.level2): [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}] column3(level1.level2[0]): {"level3":"testlevel3_1"} column4(level1.level2.level3): null
說明擷取的節點上層有數組時結果為null,如上範例擷取level1.level2.level3不會報錯,同步結果為null,需要配置為level1.level2[0].level3或level1.level2[1].level3,當前不支援level1.level2[*].level3。
不支援key出現"."的資料, 如"level1.level2":{"level3":"testlevel3_1"}, 此時該條資料擷取結果為null。
情境三:數組屬性拆分為多行
背景說明:附屬資訊有一對多的情況,需要將數組列拆成多行。
配置形式:屬性[*].子屬性
效果示意:源端資料{ "splitKey" :[1,2,3,4,5]},拆完後寫到目標端為5行:{"splitKey[0]":1,"splitKey[1]":2,"splitKey[2]":3,"splitKey[3]":4,"splitKey[4]":5}
指令碼配置:
"multi":{ "multi":true, "key": "headers" }
說明嚮導模式下配置拆多行數組列名,會自動產生指令碼配置,具有相同效果。
value必須為List,否則會報錯。
配置樣本:
## 讀端:Elasticsearch中的未經處理資料 [ { "_index": "lmtestjson", "_type": "_doc", "_id": "nhxmIYMBKDL4VkVLyXRN", "_score": 1.0, "_source": { "headers": [ { "remoteip": "192.0.2.1" }, { "remoteip": "192.0.2.2" } ] } }, { "_index": "lmtestjson", "_type": "_doc", "_id": "wRxsIYMBKDL4VkVLcXqf", "_score": 1.0, "_source": { "headers": [ { "remoteip": "192.0.2.3" }, { "remoteip": "192.0.2.4" } ] } } ] ##Data IntegrationElasticsearch reader外掛程式配置 { "column":[ "headers[*].remoteip" ] "multi":{ "multi":true, "key": "headers" } } ##寫端結果:4行 192.0.2.1 192.0.2.2 192.0.2.3 192.0.2.4
情境四:數組屬性去重歸併
背景說明:數組去重歸併,將一個數組屬性去重歸併後寫入為字串屬性,數組屬性可以為子屬性如name1.name2,去重採用tostring結果作為標準。
配置形式:屬性[]。
column裡面帶有 [] 關鍵字就會認為對該屬性做去重歸併。
指令碼配置:
"multi":{ "multi":true }
說明嚮導模式暫不支援配置。
配置樣本:
## 讀端:Elasticsearch中的未經處理資料 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "4nbUOoMB4GR_1Dmryj8O", "_score": 1.0, "_source": { "feature1": [ "value1", "value1", "value2", "value2", "value3" ] } } ] ##Data IntegrationElasticsearch reader外掛程式配置 "parameter": { "column":[ "feature1[]" ], "multi":{ "multi":true } } ##寫端結果:1行1列資料 "value1,value2,value3"
情境五:多屬性合一同步
背景說明:多屬性選擇處理,返回第一個有值的屬性,都不存在時將寫入null。
配置形式:屬性1|屬性2|...
column裡面帶有 "|"關鍵字就會對該項做多屬性選擇。
指令碼配置:
"multi":{ "multi":true }
說明嚮導模式暫不支援該配置。
配置樣本:
##讀端:Elasticsearch中的未經處理資料 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ##Data IntegrationElasticsearch reade外掛程式配置 "parameter": { "column":[ "feature1|feature2|feature3" ], "multi":{ "multi":true } } ##寫端結果:1行1列資料 "feature1"
情境六:多屬性選擇同步
背景說明:多屬性選擇處理 ,返回第一個有值的屬性,都不存在時寫入null。
配置形式:屬性1|屬性2|...
column裡面帶有 "|"關鍵字就會對該項做多屬性選擇
指令碼配置:
"multi":{ "multi":true }
說明嚮導模式暫不支援該配置。
配置樣本:
##讀端:Elasticsearch中的未經處理資料 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ##Data IntegrationElasticsearch reader外掛程式配置 "parameter": { "column":[ "feature1,feature2,feature3" ], "multi":{ "multi":true } } ##寫端結果:1行1列資料 "feature1,[1,2,3],{"child":"feature3"}"
附錄:Elasticsearch寫入的格式期望是數群組類型
支援以下兩種方式將源端資料按照數群組類型寫入Elasticsearch。
按JSON格式解析源端資料
例如:源端資料為
"[1,2,3,4,5]"
,配置json_array=true對其進行解析,同步將以數組格式寫入Elasticsearch。"parameter" : { { "name":"docs_1", "type":"keyword", "json_array":true } }
按分隔字元解析源端資料
例如:源端資料為
"1,2,3,4,5"
, 配置分隔字元splitter=","對其進行解析,同步將以數組格式寫入Elasticsearch。說明一個任務僅支援配置一種分隔字元,splitter全域唯一,不支援多array欄位配置為不同的分隔字元。例如源端欄位列col1="1,2,3,4,5" , col2="6-7-8-9-10", splitter無法針對每列單獨配置使用。
"parameter" : { "column": [ { "name": "docs_2", "array": true, "type": "long" } ], "splitter":","//注意:splitter配置與column配置同級。 }
相關文檔
Data Integration支援其他更多資料來源接入,更多資訊,請參見支援的資料來源及同步方案。