Elasticsearch資料來源為您提供讀取和寫入Elasticsearch雙向通道的功能,本文為您介紹DataWorks的Elasticsearch資料同步的能力支援情況。
背景資訊
Elasticsearch在公用資源群組上支援Elasticsearch 5.x版本,在Serverless資源群組(推薦)和獨享Data Integration資源群組上支援Elasticsearch 5.x、6.x、7.x和8.x版本。
Serverless資源群組的詳情請參見新增和使用Serverless資源群組。
獨享Data Integration資源群組的詳情請參見新增和使用獨享Data Integration資源群組。
Elasticsearch是遵從Apache開源條款的一款開源產品,是當前主流的企業級搜尋引擎。Elasticsearch是一個基於Lucene的搜尋和資料分析工具,它提供分布式服務。Elasticsearch核心概念同資料庫核心概念的對應關係如下所示。
Relational DB(執行個體)-> Databases(資料庫)-> Tables(表)-> Rows(一行資料)-> Columns(一行資料的一列)
Elasticsearch -> Index -> Types -> Documents -> Fields
Elasticsearch中可以有多個索引或資料庫,每個索引可以包括多個類型或表,每個類型可以包括多個文檔或行,每個文檔可以包括多個欄位或列。Elasticsearch Writer外掛程式使用Elasticsearch的Rest API介面,批量把從Reader讀入的資料寫入Elasticsearch中。
支援的版本
DataWorks平台目前僅支援配置Elasticsearch 5.x、6.x、7.x和8.x版本資料來源,不支援配置自建Elasticsearch資料來源。
使用限制
離線讀寫
Elasticsearch Reader會擷取Server端shard資訊用於資料同步,需要確保在任務同步中Server端的shards處於存活狀態,否則會存在資料不一致風險。
如果您使用的是6.x及以上版本,支援使用Serverless資源群組(推薦)和獨享Data Integration資源群組。
不支援同步scaled_float類型的欄位。
不支援同步欄位中帶有關鍵字
$ref
的索引。
支援的欄位類型
類型 | 離線讀(Elasticsearch Reader) | 離線寫(Elasticsearch Writer) | 即時寫 |
binary | 支援 | 支援 | 支援 |
boolean | 支援 | 支援 | 支援 |
keyword | 支援 | 支援 | 支援 |
constant_keyword | 不支援 | 不支援 | 不支援 |
wildcard | 不支援 | 不支援 | 不支援 |
long | 支援 | 支援 | 支援 |
integer | 支援 | 支援 | 支援 |
short | 支援 | 支援 | 支援 |
byte | 支援 | 支援 | 支援 |
double | 支援 | 支援 | 支援 |
float | 支援 | 支援 | 支援 |
half_float | 不支援 | 不支援 | 不支援 |
scaled_float | 不支援 | 不支援 | 不支援 |
unsigned_long | 不支援 | 不支援 | 不支援 |
date | 支援 | 支援 | 支援 |
date_nanos | 不支援 | 不支援 | 不支援 |
alias | 不支援 | 不支援 | 不支援 |
object | 支援 | 支援 | 支援 |
flattened | 不支援 | 不支援 | 不支援 |
nested | 支援 | 支援 | 支援 |
join | 不支援 | 不支援 | 不支援 |
integer_range | 支援 | 支援 | 支援 |
float_range | 支援 | 支援 | 支援 |
long_range | 支援 | 支援 | 支援 |
double_range | 支援 | 支援 | 支援 |
date_range | 支援 | 支援 | 支援 |
ip_range | 不支援 | 支援 | 支援 |
ip | 支援 | 支援 | 支援 |
version | 支援 | 支援 | 支援 |
murmur3 | 不支援 | 不支援 | 不支援 |
aggregate_metric_double | 不支援 | 不支援 | 不支援 |
histogram | 不支援 | 不支援 | 不支援 |
text | 支援 | 支援 | 支援 |
annotated-text | 不支援 | 不支援 | 不支援 |
completion | 支援 | 不支援 | 不支援 |
search_as_you_type | 不支援 | 不支援 | 不支援 |
token_count | 支援 | 不支援 | 不支援 |
dense_vector | 不支援 | 不支援 | 不支援 |
rank_feature | 不支援 | 不支援 | 不支援 |
rank_features | 不支援 | 不支援 | 不支援 |
geo_point | 支援 | 支援 | 支援 |
geo_shape | 支援 | 支援 | 支援 |
point | 不支援 | 不支援 | 不支援 |
shape | 不支援 | 不支援 | 不支援 |
percolator | 不支援 | 不支援 | 不支援 |
string | 支援 | 支援 | 支援 |
工作原理
Elasticsearch Reader的工作原理如下:
通過Elasticsearch的_searchscrollslice(即遊標分區)方式實現,slice結合Data Integration任務的task多線程分區機制使用。
根據Elasticsearch中的Mapping配置,轉換資料類型。
更多詳情請參見Elasticsearch官方文檔。
Elasticsearch Reader會擷取Server端shard資訊用於資料同步,需要確保在任務同步中Server端的shards處於存活狀態,否則會存在資料不一致風險。
基本配置
實際運行時,請刪除下述代碼中的注釋。
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" //錯誤記錄數。
},
"jvmOption":"",
"speed":{
"concurrent":3,//並發數
"throttle":true,//
"mbps":"12",//限流,此處1mbps = 1MB/s。
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ //讀取列。
"id",
"name"
],
"endpoint":"", //服務地址。
"index":"", //索引。
"password":"", //密碼。
"scroll":"", //scroll標誌。
"search":"", //查詢query參數,與Elasticsearch的query內容相同,使用_search api,重新命名為search。
"type":"default",
"username":"" //使用者名稱。
},
"stepType":"elasticsearch"
},
{
"stepType": "elasticsearch",
"parameter": {
"column": [ //寫入列
{
"name": "id",
"type": "integer"
},
{
"name": "name",
"type": "text"
}
],
"index": "test", //寫入索引
"indexType": "", //寫入索引類型,es7不填
"actionType": "index", //寫入方式
"cleanup": false, //是否重建索引
"datasource": "test", //資料來源名稱
"primaryKeyInfo": { //主鍵取值方式
"fieldDelimiterOrigin": ",",
"column": [
"id"
],
"type": "specific",
"fieldDelimiter": ","
},
"dynamic": false, //動態映射
"batchSize": 1024 //批量寫文檔數
},
"name": "Writer",
"category": "writer"
}
],
"type":"job",
"version":"2.0" //版本號碼。
}
進階功能
支援全量拉取
支援將Elasticsearch中一個文檔的所有內容拉取為一個欄位。配置詳情請參見情境一:全量拉取。
支援提取半結構化到結構化資料
分類
描述
相關文檔
產生背景
Elasticsearch中的資料特徵為欄位不固定,且有中文名、資料使用深層嵌套的形式。為更好地方便下遊業務對資料的計算和儲存需求,特推出從半結構化到結構化的轉換解決方案。
—
實現原理
將Elasticsearch擷取到的JSON資料,利用JSON工具的路徑擷取特性,將嵌套資料扁平化為一維結構的資料。然後將資料對應至結構化資料表中,拆分Elasticsearch複合結構資料至多個結構化資料表。
—
解決方案
JSON有嵌套的情況,通過path路徑來解決。
屬性
屬性.子屬性
屬性[0].子屬性
附屬資訊有一對多的情況,需要進行拆表拆行處理,進行遍曆。
屬性[*].子屬性
數組歸併,一個字串數組內容,歸併為一個屬性,並進行去重。
屬性[]
多屬性合一,將多個屬性合并為一個屬性。
屬性1,屬性2
多屬性選擇處理。
屬性1|屬性2
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
資料同步任務開發
資料同步任務的配置入口和通用配置流程可參見下文的配置指導。
單表離線同步任務配置指導
操作流程請參見通過嚮導模式配置離線同步任務、通過指令碼模式配置離線同步任務。
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄一:指令碼Demo與參數說明。
單表即時寫同步任務配置指導
操作流程請參見DataStudio側即時同步任務配置。
整庫離線寫、單表/整庫全增量即時寫同步任務配置指導
操作流程請參見Data Integration側同步任務配置。
附錄一:指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Reader指令碼Demo
Reader指令碼參數
Writer指令碼Demo
Writer指令碼參數
附錄二:Elasticsearch寫入的格式期望是數群組類型
支援以下兩種方式將源端資料按照數群組類型寫入Elasticsearch。
按JSON格式解析源端資料
例如:源端資料為
"[1,2,3,4,5]"
,配置json_array=true對其進行解析,同步將以數組格式寫入Elasticsearch。"parameter" : { { "name":"docs_1", "type":"keyword", "json_array":true } }
按分隔字元解析源端資料
例如:源端資料為
"1,2,3,4,5"
, 配置分隔字元splitter=","對其進行解析,同步將以數組格式寫入Elasticsearch。說明一個任務僅支援配置一種分隔字元,splitter全域唯一,不支援多array欄位配置為不同的分隔字元。例如源端欄位列col1="1,2,3,4,5" , col2="6-7-8-9-10", splitter無法針對每列單獨配置使用。
"parameter" : { "column": [ { "name": "docs_2", "array": true, "type": "long" } ], "splitter":","//注意:splitter配置與column配置同級。 }
附錄三:情境樣本
情境一:全量拉取
情境二:嵌套或對象欄位屬性同步
情境三:數組屬性拆分為多行
情境四:數組屬性去重歸併
情境五:多屬性合一同步
情境六:多屬性選擇同步
相關文檔
Data Integration支援其他更多資料來源接入,更多資訊,請參見支援的資料來源及同步方案。