全部產品
Search
文件中心

DataWorks:Elasticsearch資料來源

更新時間:Jul 04, 2024

Elasticsearch資料來源為您提供讀取和寫入Elasticsearch雙向通道的功能,本文為您介紹DataWorks的Elasticsearch資料同步的能力支援情況。

背景資訊

Elasticsearch在公用資源群組上支援Elasticsearch 5.x版本,在獨享Data Integration資源群組和新版資源群組(通用型資源群組)上支援Elasticsearch 5.x、6.x、7.x和8.x版本。

說明

Elasticsearch是遵從Apache開源條款的一款開源產品,是當前主流的企業級搜尋引擎。Elasticsearch是一個基於Lucene的搜尋和資料分析工具,它提供分布式服務。Elasticsearch核心概念同資料庫核心概念的對應關係如下所示。

Relational DB(執行個體)-> Databases(資料庫)-> Tables(表)-> Rows(一行資料)-> Columns(一行資料的一列)
Elasticsearch        -> Index              -> Types       -> Documents       -> Fields

Elasticsearch中可以有多個索引或資料庫,每個索引可以包括多個類型或表,每個類型可以包括多個文檔或行,每個文檔可以包括多個欄位或列。Elasticsearch Writer外掛程式使用Elasticsearch的Rest API介面,批量把從Reader讀入的資料寫入Elasticsearch中。

支援的版本

DataWorks平台目前僅支援配置Elasticsearch 5.x、6.x、7.x和8.x版本資料來源,不支援配置自建Elasticsearch資料來源。

使用限制

離線讀寫

  • Elasticsearch Reader會擷取Server端shard資訊用於資料同步,需要確保在任務同步中Server端的shards處於存活狀態,否則會存在資料不一致風險。

  • 如果您使用的是6.x及以上版本,僅支援使用獨享Data Integration資源群組使用新版資源群組

  • 不支援同步scaled_float類型的欄位。

  • 不支援同步欄位中帶有關鍵字 $ref的索引。

支援的欄位類型

類型

離線讀(Elasticsearch Reader)

離線寫(Elasticsearch Writer)

即時寫

binary

支援

支援

支援

boolean

支援

支援

支援

keyword

支援

支援

支援

constant_keyword

不支援

不支援

不支援

wildcard

不支援

不支援

不支援

long

支援

支援

支援

integer

支援

支援

支援

short

支援

支援

支援

byte

支援

支援

支援

double

支援

支援

支援

float

支援

支援

支援

half_float

不支援

不支援

不支援

scaled_float

不支援

不支援

不支援

unsigned_long

不支援

不支援

不支援

date

支援

支援

支援

date_nanos

不支援

不支援

不支援

alias

不支援

不支援

不支援

object

支援

支援

支援

flattened

不支援

不支援

不支援

nested

支援

支援

支援

join

不支援

不支援

不支援

integer_range

支援

支援

支援

float_range

支援

支援

支援

long_range

支援

支援

支援

double_range

支援

支援

支援

date_range

支援

支援

支援

ip_range

不支援

支援

支援

ip

支援

支援

支援

version

支援

支援

支援

murmur3

不支援

不支援

不支援

aggregate_metric_double

不支援

不支援

不支援

histogram

不支援

不支援

不支援

text

支援

支援

支援

annotated-text

不支援

不支援

不支援

completion

支援

不支援

不支援

search_as_you_type

不支援

不支援

不支援

token_count

支援

不支援

不支援

dense_vector

不支援

不支援

不支援

rank_feature

不支援

不支援

不支援

rank_features

不支援

不支援

不支援

geo_point

支援

支援

支援

geo_shape

支援

支援

支援

point

不支援

不支援

不支援

shape

不支援

不支援

不支援

percolator

不支援

不支援

不支援

string

支援

支援

支援

工作原理

Elasticsearch Reader的工作原理如下:

  • 通過Elasticsearch的_searchscrollslice(即遊標分區)方式實現,slice結合Data Integration任務的task多線程分區機制使用。

  • 根據Elasticsearch中的Mapping配置,轉換資料類型。

更多詳情請參見Elasticsearch官方文檔

說明

Elasticsearch Reader會擷取Server端shard資訊用於資料同步,需要確保在任務同步中Server端的shards處於存活狀態,否則會存在資料不一致風險。

基本配置

重要

實際運行時,請刪除下述代碼中的注釋。

{
 "order":{
  "hops":[
   {
    "from":"Reader",
    "to":"Writer"
   }
  ]
 },
 "setting":{
  "errorLimit":{
   "record":"0" //錯誤記錄數。
  },
  "jvmOption":"",
  "speed":{
   "concurrent":3,//並發數
   "throttle":true,//
                     "mbps":"12",//限流,此處1mbps = 1MB/s。
  }
 },
 "steps":[
  {
   "category":"reader",
   "name":"Reader",
   "parameter":{
    "column":[ //讀取列。
     "id",
     "name"
    ],
    "endpoint":"", //服務地址。
    "index":"",  //索引。
    "password":"",  //密碼。
    "scroll":"",  //scroll標誌。
    "search":"",  //查詢query參數,與Elasticsearch的query內容相同,使用_search api,重新命名為search。
    "type":"default",
    "username":""  //使用者名稱。
   },
   "stepType":"elasticsearch"
  },
  {
   "stepType": "elasticsearch",
            "parameter": {
                "column": [ //寫入列
                    {
                        "name": "id",
                        "type": "integer"
                    },
                    {
                        "name": "name",
                        "type": "text"
                    }
                ],
                "index": "test",   //寫入索引
                 "indexType": "",   //寫入索引類型,es7不填
                "actionType": "index",  //寫入方式
                "cleanup": false,         //是否重建索引
                "datasource": "test",   //資料來源名稱
                "primaryKeyInfo": {     //主鍵取值方式
                    "fieldDelimiterOrigin": ",",
                    "column": [
                        "id"
                    ],
                    "type": "specific",
                    "fieldDelimiter": ","
                },
                "dynamic": false,  //動態映射
                "batchSize": 1024   //批量寫文檔數
            },
            "name": "Writer",
            "category": "writer"
  }
 ],
 "type":"job",
 "version":"2.0" //版本號碼。
}

進階功能

  • 支援全量拉取

    支援將Elasticsearch中一個文檔的所有內容拉取為一個欄位。配置詳情請參見情境一:全量拉取

  • 支援提取半結構化到結構化資料

    分類

    描述

    相關文檔

    產生背景

    Elasticsearch中的資料特徵為欄位不固定,且有中文名、資料使用深層嵌套的形式。為更好地方便下遊業務對資料的計算和儲存需求,特推出從半結構化到結構化的轉換解決方案。

    實現原理

    將Elasticsearch擷取到的JSON資料,利用JSON工具的路徑擷取特性,將嵌套資料扁平化為一維結構的資料。然後將資料對應至結構化資料表中,拆分Elasticsearch複合結構資料至多個結構化資料表。

    解決方案

    JSON有嵌套的情況,通過path路徑來解決。

    • 屬性

    • 屬性.子屬性

    • 屬性[0].子屬性

    情境二:嵌套或對象欄位屬性同步

    附屬資訊有一對多的情況,需要進行拆表拆行處理,進行遍曆。

    屬性[*].子屬性

    情境三:數組屬性拆分為多行

    數組歸併,一個字串數組內容,歸併為一個屬性,並進行去重。

    屬性[]

    情境四:數組屬性去重歸併

    多屬性合一,將多個屬性合并為一個屬性。

    屬性1,屬性2

    情境五:多屬性合一同步

    多屬性選擇處理。

    屬性1|屬性2

    情境六:多屬性選擇同步

資料同步任務開發

Elasticsearch資料同步任務的配置入口和通用配置流程指導可參見下文的配置指導,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源

單表離線同步任務配置指導

單表即時寫同步任務配置指導

操作流程請參見DataStudio側即時同步任務配置

整庫離線寫、單表/整庫全增量即時寫同步任務配置指導

操作流程請參見Data Integration側同步任務配置

附錄:指令碼Demo與參數說明

附錄:離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。

Reader指令碼Demo

{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //錯誤記錄數。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //讀取列。
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服務地址。
                "index":"aliyun_es_xx",  //索引。
                "password":"*******",  //密碼。
                "multiThread":true,
                "scroll":"5m",  //scroll標誌。
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  //查詢query參數,與Elasticsearch的query內容相同,使用_search api,重新命名為search。
                "type":"doc",
                "username":"aliyun_di"  //使用者名稱。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本號碼。
}

Reader指令碼參數

參數

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,該配置項填寫的內容必須與添加的資料來源名稱保持一致。

index

Elasticsearch中的index名。

type

Elasticsearch中indextype名。

index名

search

Elasticsearch的query參數。

pageSize

每次讀取資料的條數。

100

scroll

Elasticsearch的分頁參數,設定遊標存放時間。

  • 設定的過小時,如果擷取兩頁資料間隔時間超出scroll,會導致遊標到期,進而遺失資料。

  • 設定的過大時,如果同一時刻發起的查詢過多,超出服務端max_open_scroll_context配置時,會導致資料查詢報錯。

strictMode

以strict 模式讀取Elasticsearch中的資料,當出現Elasticsearch的shard.failed時會停止讀取,避免讀取少資料。

true

sort

返回結果的排序欄位。

retryCount

失敗後重試的次數。

300

connTimeOut

用戶端連線逾時時間。

600,000

readTimeOut

用戶端讀取逾時時間。

600,000

multiThread

http請求,是否有多線程。

true

preemptiveAuth

http是否使用搶先模式請求

false

retrySleepTime

失敗後重試的時間間隔。

1000

discovery

是否開啟節點發現。

  • true:與叢集中隨機一個節點進行串連。啟用節點發現將輪詢並定期更新客戶機中的伺服器列表,並對發現的節點發起查詢請求。

  • false:對配置的endpoint發起查詢請求。

false

compression

是否使用GZIP壓縮請求本文,使用時需要在es節點上啟用http.compression設定。

false

dateFormat

待同步欄位存在date類型,且該欄位mapping沒有format配置時,需要配置dateFormat參數。配置形式如下: "dateFormat" : "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss",該配置需要包含同步date類型欄位的所有格式。

full

是否將全文檔內容作為一個欄位同步至目標端,將Elasticsearch的查詢資料作為一個欄位,配置詳情請參見情境一:全量拉取

multi

該配置是一個進階功能具有五種用法,兩個子屬性分別為multi.keymulti.mult,配置詳情請參見進階功能中表格內容。

Writer指令碼Demo

{
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1, //作業並發數。
            "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    },
    "steps": [
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {

            },
            "stepType": "stream"
        },
        {
            "category": "writer",
            "name": "Writer",
            "parameter": {
                "datasource":"xxx",
                "index": "test-1",
                "type": "default",
                "cleanup": true,
                "settings": {
                        "number_of_shards": 1,
                        "number_of_replicas": 0
                },
                "discovery": false,
                "primaryKeyInfo":{
                    "type":"pk",    
                     "fieldDelimiter":",",
                     "column":[]
                    },
                "batchSize": 1000,
                "dynamic":false,
                "esPartitionColumn":[
                    {
                        "name":"col1",  
                        "comment":"xx", 
                        "type":"STRING" 
                        }
                     ],
                "column": [
                    {
                        "name": "pk",
                        "type": "id"
                    },
                    {
                        "name": "col_ip",
                        "type": "ip"
                    },
                    {
                        "name": "col_array",
                        "type": "long",
                        "array": true,
                    },
                    {
                        "name": "col_double",
                        "type": "double"
                    },
                    {
                        "name": "col_long",
                        "type": "long"
                    },
                    {
                        "name": "col_integer",
                        "type": "integer"
                    {
                        "name": "col_keyword",
                        "type": "keyword"
                    },
                    {
                        "name": "col_text",
                        "type": "text",
                        "analyzer": "ik_max_word",
                        "other_params":
                            {
                                "doc_values": false
                            },
                    },
                    {
                        "name": "col_geo_point",
                        "type": "geo_point"
                    },
                    {
                        "name": "col_date",
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss"
                    },
                    {
                        "name": "col_nested1",
                        "type": "nested"
                    },
                    {
                        "name": "col_nested2",
                        "type": "nested"
                    },
                    {
                        "name": "col_object1",
                        "type": "object"
                    },
                    {
                        "name": "col_object2",
                        "type": "object"
                    },
                    {
                        "name": "col_integer_array",
                        "type": "integer",
                        "array": true
                    },
                    {
                        "name": "col_geo_shape",
                        "type": "geo_shape",
                        "tree": "quadtree",
                        "precision": "10m"
                    }
                ]
            },
            "stepType": "elasticsearch"
        }
    ],
    "type": "job",
    "version": "2.0"
}
說明

VPC環境的Elasticsearch運行在預設資源群組會存在網路不通的情況。您需要使用獨享Data Integration資源群組,才能連通VPC進行資料同步。添加資源的詳情請參見獨享Data Integration資源群組

Writer指令碼參數

參數

描述

是否必選

預設值

datasource

選擇需要同步的Elasticsearch資料來源,若還未在DataWorks建立該資料來源,請先建立,詳情請參見配置Elasticsearch資料來源

index

Elasticsearch中的index名。

indexType

Elasticsearch中index的type名。

Elasticsearch

cleanup

定義當前任務在索引index已存在的情況是否要刪除資料。

  • 是(true):匯入資料前刪除原來的索引並重建同名索引,此操作會刪除該索引下的資料。

  • 否(false):匯入資料前保留索引中已存在的資料。

false

batchSize

定義同步任務一次性插入Elasticsearch的Document條數。

1,000

trySize

定義往Elasticsearch寫入資料失敗後的重試次數。

30

timeout

用戶端逾時時間。

600,000

discovery

任務是否啟動節點發現功能。

  • true:與叢集中隨機一個節點進行串連。啟用節點發現將輪詢並定期更新客戶機中的伺服器列表。

  • false:與Elasticsearch叢集進行串連。

false

compression

HTTP請求,開啟壓縮。

true

multiThread

HTTP請求,是否有多線程。

true

ignoreWriteError

忽略寫入錯誤,不重試,繼續寫入。

false

ignoreParseError

忽略解析資料格式錯誤,繼續寫入。

true

alias

Elasticsearch的別名類似於資料庫的視圖機制,為索引my_index建立一個別名my_index_alias,對my_index_alias的操作與my_index的操作一致。

配置alias表示在資料匯入完成後,為指定的索引建立別名。

aliasMode

資料匯入完成後增加別名的模式,包括append(增加模式)和exclusive(只留這一個):

  • aliasModeappend時,表示追加當前索引至別名alias映射中(一個別名對應多個索引)。

  • aliasModeexclusive時,表示首先刪除別名alias,再添加當前索引至別名alias映射中(一個別名對應一個索引)。

後續會轉換別名為實際的索引名稱,別名可以用來進行索引遷移和多個索引的查詢統一,並可以用來實現視圖的功能。

append

settings

建立index時的settings,與Elasticsearch官方一致。

column

column用來配置文檔的多個欄位Filed資訊,具體每個欄位項可以配置name(名稱)、type(類型)等基礎配置,以及AnalyzerFormatArray等擴充配置。

Elasticsearch所支援的欄位類型如下所示。

- id  //type id對應Elasticsearch中的_id,可以理解為唯一主鍵。寫入時,相同id的資料會被覆蓋,且不會被索引。
- string
- text
- keyword
- long
- integer
- short
- byte
- double
- float
- date
- boolean
- binary
- integer_range
- float_range
- long_range
- double_range
- date_range
- geo_point
- geo_shape
- ip
- token_count
- array
- object
- nested

列類型的說明如下:

  • 列類型為text類型時,可以配置analyzer(分詞器)、normsindex_options等參數,樣本如下。

    {
        "name": "col_text",
        "type": "text",
        "analyzer": "ik_max_word"
        }
  • 列類型為Date類型時,您可配置如下兩種方式解析源端資料,配置方式請保持一致。

    • 方式一:根據reader端讀取欄位的內容直接寫入es data欄位:

      • 配置origin:true必填,讓讀取欄位的內容直接寫入es data

      • 配置"format",表示在通過es writer建立mapping時,該欄位需要設定format屬性。樣本如下:

          {
             "parameter":{
               "column":[{
                   "name": "col_date",
                   "type": "date",
                   "format": "yyyy-MM-dd HH:mm:ss",
                   "origin": true
                }]
           }
        }
    • 方式二:(時區轉換)如果需要Data Integration協助您進行時區轉換,可添加Timezone參數。樣本如下:

      配置的"format"表示Data Integration在做時區轉換時,解析的時間格式如下:

        {
           "parameter" :{
             "column": [{
                "name": "col_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss",
               "Timezone": "UTC"
             }]
         }
      }
  • 列類型為地理形狀geo_shape時,可以配置tree(geohash或quadtree)、precision(精度)屬性,樣本如下。

    {
        "name": "col_geo_shape",
        "type": "geo_shape",
        "tree": "quadtree",
        "precision": "10m"
        }

如果需要在column中配置除了type以外的屬性值,您可以使用other_params參數,該參數配置在column中,在update mappings時,用於描述column中除了type以外的Elasticsearch屬性資訊。

 {
   "name": "guid",
   "other_params":
    {
       "doc_values": false
      },
    "type": "text"
  }

如果您希望源端資料寫入為Elasticsearch時按照數群組類型寫入,您可按照JSON格式或指定分隔字元的方式來解析源端資料。配置詳情請參見附錄:Elasticsearch寫入的格式期望是數群組類型

dynamic

定義當在文檔中發現未存在的欄位時,同步任務是否通過Elasticsearch動態映射機製為欄位添加映射。

  • true:保留Elasticsearch的自動mappings映射。

  • false:預設值,不填寫預設為false,根據同步任務配置的column產生並更新Elasticsearch的mappings映射。

Elasticsearch 7.x版本的預設type_doc。使用Elasticsearch的自動mappings時,請配置_docesVersion為7。

您需要轉換為指令碼模式,添加一個版本參數:"esVersion": "7"

false

actionType

表示Elasticsearch在資料寫出時的action類型,目前Data Integration支援indexupdate兩種actionType,預設值為index

  • index:底層使用了Elasticsearch SDK的Index.Builder構造批量請求。Elasticsearch index插入時,需要首先判斷插入的文檔資料中是否指定ID:

    • 如果沒有指定ID,Elasticsearch會預設產生一個唯一ID。該情況下會直接添加文檔至Elasticsearch中。

    • 如果已指定ID,會進行更新(替換整個文檔),且不支援針對特定Field進行修改。

      說明

      此處的更新並非Elasticsearch中的更新(替換部分指定列替換)。

  • update:根據使用者指定的ID進行文檔更新,如果ID值在索引中不存在則插入文檔,存在則更新指定的column欄位內容(其他文檔欄位內容不變)。每次update完成都會擷取整個文檔資訊,從而實現針對特定欄位進行修改。這裡update不支援條件式篩選,僅根據指定ID值進行更新操作。由於每次更新都需要擷取一遍原始文檔,因此對效能同步能會有較大影響。

    說明

    設定action類型為update時,您需要設定主鍵primaryKeyInfo

index

primaryKeyInfo

定義當前寫入Elasticsearch的主鍵取值方式。

  • 業務主鍵(pk):_id 的值指定為某一個欄位。

    "parameter":{
    "primaryKeyInfo":{
    "type":"pk",
    "column":["id"]}
    }
  • 聯合主鍵(specific):_id 的值指定為某幾個欄位的值拼接,分隔字元為您設定的主鍵分隔字元fieldDelimiter

    說明

    其中欄位名為eswriter的待寫入欄位,嚮導模式拉取主鍵列配置時只包含Elasticsearch索引中已存在的欄位。

    "parameter":{
    "primaryKeyInfo":{
    "type":"specific",
    "fieldDelimiter":",",
    "column":["col1","col2"]}
    }
  • 無主鍵(nopk):_id在寫入Elasticsearch時系統自動產生。

    "primaryKeyInfo":{
    "type":"nopk"
    }

specific

esPartitionColumn

定義寫入Elasticsearch時是否開啟分區,用於修改Elasticsearch中的routing的參數。

  • 開啟分區:把指定列的value通過分隔字元空串聯接指定為routing的值,在寫入時,插入或更新指定shard中的doc,開啟分區的情況下您需要指定分區列。

    {    "esPartitionColumn": [
            {
                "name":"col1",
                "comment":"xx",
                "type":"STRING"
                }
            ],
        }
  • 不開啟分區:不填寫該參數,預設使用_id作為routing起到將文檔均勻分布到多個分區上防止資料扭曲的作用。

false

enableWriteNull

該參數用於是否支援將來源端的空值欄位同步至Elasticsearch。取值如下:

  • true:支援。同步後,Elasticsearch中對應欄位的value為空白。

  • false:不支援。來源端的空值欄位無法同步至Elasticsearch,即在Elasticsearch中不顯示該欄位。

true

情境一:全量拉取

  • 背景說明:將Elasticsearch中文檔查詢的結果拉取為一個欄位。

  • 配置樣本:

    
    ## 讀端:Elasticsearch中的未經處理資料
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "IXgdO4MB4GR_1DmrjTXP",
            "_score": 1.0,
            "_source": {
                "feature1": "value1",
                "feature2": "value2",
                "feature3": "value3"
            }
        }]
    
    ##Data IntegrationElasticsearch Reader外掛程式配置
    "parameter": {
      "column": [
          "content"
      ],
      "full":true
    }
    
    ##寫端結果:同步至目標端1行1列
    {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}

情境二:嵌套或對象欄位屬性同步

  • 背景說明:Object對象或nested嵌套欄位的屬性時,通過path路徑來解決。

  • 配置形式:

    • 屬性

    • 屬性.子屬性

    • 屬性[0].子屬性

  • 指令碼配置:

    "multi":{
        "multi":true
    }
    說明

    嚮導模式暫不支援配置。

  • 配置樣本:

    ## 讀端:Elasticsearch中的未經處理資料
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "7XAOOoMB4GR_1Dmrrust",
            "_score": 1.0,
            "_source": {
                "level1": {
                    "level2": [
                        {
                            "level3": "testlevel3_1"
                        },
                        {
                            "level3": "testlevel3_2"
                        }
                    ]
                }
            }
        }
    ]
    ##Data IntegrationElasticsearch reader外掛程式配置
    "parameter": {
      "column": [
          "level1",
          "level1.level2",
          "level1.level2[0]",
          "level1.level2.level3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##寫端結果:1行資料4列
    column1(level1):            {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]}
    column2(level1.level2):     [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]
    column3(level1.level2[0]):  {"level3":"testlevel3_1"}
    column4(level1.level2.level3):  null
    說明
    • 擷取的節點上層有數組時結果為null,如上範例擷取level1.level2.level3不會報錯,同步結果為null,需要配置為level1.level2[0].level3或level1.level2[1].level3,當前不支援level1.level2[*].level3。

    • 不支援key出現"."的資料, 如"level1.level2":{"level3":"testlevel3_1"}, 此時該條資料擷取結果為null。

情境三:數組屬性拆分為多行

  • 背景說明:附屬資訊有一對多的情況,需要將數組列拆成多行。

  • 配置形式:屬性[*].子屬性

  • 效果示意:源端資料{ "splitKey" :[1,2,3,4,5]},拆完後寫到目標端為5行:{"splitKey[0]":1,"splitKey[1]":2,"splitKey[2]":3,"splitKey[3]":4,"splitKey[4]":5}

  • 指令碼配置:

    "multi":{   
           "multi":true,    
            "key": "headers"
    }
    說明
    • 嚮導模式下配置拆多行數組列名,會自動產生指令碼配置,具有相同效果。

    • value必須為List,否則會報錯。

  • 配置樣本:

    ## 讀端:Elasticsearch中的未經處理資料
    [
        {
            "_index": "lmtestjson",
            "_type": "_doc",
            "_id": "nhxmIYMBKDL4VkVLyXRN",
            "_score": 1.0,
            "_source": {
                "headers": [
                    {
                        "remoteip": "192.0.2.1"
                    },
                    {
                        "remoteip": "192.0.2.2"
                    }
                ]
            }
        },
        {
            "_index": "lmtestjson",
            "_type": "_doc",
            "_id": "wRxsIYMBKDL4VkVLcXqf",
            "_score": 1.0,
            "_source": {
                "headers": [
                    {
                        "remoteip": "192.0.2.3"
                    },
                    {
                        "remoteip": "192.0.2.4"
                    }
                ]
            }
        }
    ]
    ##Data IntegrationElasticsearch reader外掛程式配置
    {
       "column":[
          "headers[*].remoteip"
      ]
      "multi":{
          "multi":true,
          "key": "headers"
      }
    }
    
    ##寫端結果:4行
    192.0.2.1
    192.0.2.2
    192.0.2.3
    192.0.2.4

情境四:數組屬性去重歸併

  • 背景說明:數組去重歸併,將一個數組屬性去重歸併後寫入為字串屬性,數組屬性可以為子屬性如name1.name2,去重採用tostring結果作為標準。

  • 配置形式:屬性[]。

    column裡面帶有 [] 關鍵字就會認為對該屬性做去重歸併。

  • 指令碼配置:

    "multi":{
        "multi":true
    }
    說明

    嚮導模式暫不支援配置。

  • 配置樣本:

    ## 讀端:Elasticsearch中的未經處理資料
    "hits": [
    {
        "_index": "mutiltest_1",
        "_type": "_doc",
        "_id": "4nbUOoMB4GR_1Dmryj8O",
        "_score": 1.0,
        "_source": {
            "feature1": [
                "value1",
                "value1",
                "value2",
                "value2",
                "value3"
            ]
        }
    }
    ]
    ##Data IntegrationElasticsearch reader外掛程式配置
    "parameter": {
      "column":[
            "feature1[]"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##寫端結果:1行1列資料
    "value1,value2,value3"

情境五:多屬性合一同步

  • 背景說明:多屬性選擇處理,返回第一個有值的屬性,都不存在時將寫入null。

  • 配置形式:屬性1|屬性2|...

    column裡面帶有 "|"關鍵字就會對該項做多屬性選擇。

  • 指令碼配置:

    "multi":{    
        "multi":true
    }
    說明

    嚮導模式暫不支援該配置。

  • 配置樣本:

    ##讀端:Elasticsearch中的未經處理資料
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    
    ##Data IntegrationElasticsearch reade外掛程式配置
    "parameter": {
      "column":[
            "feature1|feature2|feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##寫端結果:1行1列資料
    "feature1"

情境六:多屬性選擇同步

  • 背景說明:多屬性選擇處理 ,返回第一個有值的屬性,都不存在時寫入null。

  • 配置形式:屬性1|屬性2|...

    column裡面帶有 "|"關鍵字就會對該項做多屬性選擇

  • 指令碼配置:

    "multi":{
        "multi":true
    }
    說明

    嚮導模式暫不支援該配置。

  • 配置樣本:

    ##讀端:Elasticsearch中的未經處理資料
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    ##Data IntegrationElasticsearch reader外掛程式配置
    "parameter": {
      "column":[
            "feature1,feature2,feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##寫端結果:1行1列資料
    "feature1,[1,2,3],{"child":"feature3"}"

附錄:Elasticsearch寫入的格式期望是數群組類型

支援以下兩種方式將源端資料按照數群組類型寫入Elasticsearch。

  • 按JSON格式解析源端資料

    例如:源端資料為"[1,2,3,4,5]",配置json_array=true對其進行解析,同步將以數組格式寫入Elasticsearch。

    "parameter" : {
      {
        "name":"docs_1",
        "type":"keyword",
        "json_array":true
      }
    }
  • 按分隔字元解析源端資料

    例如:源端資料為"1,2,3,4,5", 配置分隔字元splitter=","對其進行解析,同步將以數組格式寫入Elasticsearch。

    說明

    一個任務僅支援配置一種分隔字元,splitter全域唯一,不支援多array欄位配置為不同的分隔字元。例如源端欄位列col1="1,2,3,4,5" , col2="6-7-8-9-10", splitter無法針對每列單獨配置使用。

    "parameter" : {
          "column": [
            {
              "name": "docs_2",
              "array": true,
              "type": "long"
            }
          ],
          "splitter":","//注意:splitter配置與column配置同級。
    }

相關文檔

Data Integration支援其他更多資料來源接入,更多資訊,請參見支援的資料來源及同步方案