全部產品
Search
文件中心

Realtime Compute for Apache Flink:Elasticsearch

更新時間:Aug 22, 2024

本文為您介紹如何使用Elasticsearch連接器。

背景資訊

Elasticsearch相容開源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商業功能,致力於資料分析、資料搜尋等情境服務。為您提供企業級許可權管控、安全監控警示、自動報表產生等情境服務。

Elasticsearch連接器支援的資訊如下:

類別

詳情

支援類型

源表、維表和結果表

運行模式

批模式和流模式

資料格式

JSON

特有監控指標

  • 源表

    • pendingRecords

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

  • 維表

  • 結果表 ( VVR 6.0.6及以上 )

    • numRecordsOut

    • numRecordsOutPerSecond

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream和SQL

是否支援更新或刪除結果表資料

前提條件

使用限制

  • 源表和維表支援大於等於5.5,但小於8.x版本的Elasticsearch。

  • 結果表僅支援Elasticsearch 6.x、7.x和8.x版本。

  • 僅Flink計算引擎VVR 2.0.0及以上版本支援Elasticsearch連接器。

  • 僅支援全量Elasticsearch源表,不支援增量Elasticsearch源表。

文法結構

  • 源表

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
  • 維表

    CREATE TABLE es_dim(
      field1 STRING, --作為JOIN時的Key,必須為STRING類型。
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
    說明
    • 如果指定主鍵,則維表JOIN時的Key(欄位)有且只能有一個,且必須為Elasticsearch對應索引的文檔ID。

    • 如果不指定主鍵,則維表JOIN時的Key可以有一個或多個,需要為Elasticsearch對應索引的文檔中的欄位。

    • 對於String類型,為了保持相容性,預設會對錶中欄位名增加.keyword尾碼。如果因此無法匹配到Elasticsearch中的Text欄位,可以將配置項ignoreKeywordSuffix配置為true。

  • 結果表

    CREATE TABLE es_sink(
      user_id   STRING,
      user_name   STRING,
      uv BIGINT,
      pv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-7', -- 如果是Elasticsearch 6.x版本,填寫elasticsearch-6
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );
    說明
    • Elasticsearch結果表會根據是否定義了主鍵,確定是在upsert模式或append模式下工作。

      • 如果定義了主鍵,則主鍵必須為文檔ID,Elasticsearch結果表將在upsert模式下工作,該模式可以處理包含UPDATE和DELETE的訊息。

      • 如果未定義主鍵,Elasticsearch將自動產生隨機的文檔ID,Elasticsearch結果表將在append模式工作,該模式只能消費INSERT訊息。

    • 某些類型(例如BYTES、ROW、ARRAY和MAP等)由於沒有對應的字串表示形式,所以不允許其作為主鍵欄位。

    • DDL中的欄位均對應Elasticsearch文檔中的欄位,不支援將文檔ID等Meta資訊寫入Elasticsearch結果表中,因為文檔ID等Meta資訊由Elasticsearch執行個體側維護。

WITH參數

  • 源表

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    源表類型。

    String

    固定值為elasticsearch。

    endPoint

    Server地址。

    String

    例如:http://127.0.0.1:XXXX

    indexName

    索引名稱。

    String

    無。

    accessId

    Elasticsearch執行個體的使用者名稱。

    String

    預設為空白,不進行許可權驗證。如果定義了accessId,則必須定義非空的accessKey

    重要

    為了避免您的使用者名稱和密碼資訊泄露,建議您通過密鑰管理的方式填寫使用者名稱和密碼取值,詳情請參見變數和密鑰管理

    accessKey

    Elasticsearch執行個體的密碼。

    String

    typeNames

    Type名稱。

    String

    _doc

    Elasticsearch 7.0以上版本不建議設定該參數。

    batchSize

    每個scroll請求從Elasticsearch叢集擷取的最大文檔數。

    Int

    2000

    無。

    keepScrollAliveSecs

    scroll上下文保留的最長時間。

    Int

    3600

    單位為秒。

  • 結果表

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    結果表類型。

    String

    固定值為elasticsearch-6elasticsearch-7 elasticsearch-8

    說明

    僅Realtime Compute引擎VVR 8.0.5及以上版本支援配置為elasticsearch-8

    hosts

    Server地址。

    String

    例如:127.0.0.1:XXXX

    index

    索引名稱。

    String

    Elasticsearch結果表同時支援靜態索引和動態索引。在使用靜態和動態索引時,您需要注意以下幾點:

    • 如果使用靜態索引,則索引選項值應為純字串,例如myusers,所有記錄都將被寫入myusers索引。

    • 如果使用動態索引,可以使用{field_name}引用記錄中的欄位值以動態產生目標索引。您還可以使用{field_name|date_format_string}將TIMESTAMP、DATE和TIME類型的欄位值轉換為date_format_string指定的格式。date_format_string與Java的DateTimeFormatter相容。例如,如果設定為myusers-{log_ts|yyyy-MM-dd},則log_ts欄位值為2020-03-27 12:25:55的記錄將被寫入myusers-2020-03-27索引。

    document-type

    文件類型。

    String

    • elasticsearch-6:必填

    • elasticsearch-7:不支援

    當連接器類型為elasticsearch-6時,此處參數取值需要和Elasticsearch側的type參數取值保持一致。

    username

    使用者名稱。

    String

    預設為空白,不進行許可權驗證。如果定義了username,則必須定義非空的password。

    重要

    為了避免您的使用者名稱和密碼資訊泄露,建議您通過密鑰管理的方式填寫使用者名稱和密碼取值,詳情請參見變數和密鑰管理

    password

    密碼。

    String

    document-id.key-delimiter

    文檔ID的分隔字元。

    String

    _

    在Elasticsearch結果表中,主鍵用於計算Elasticsearch的文檔ID。Elasticsearch結果表通過使用document-id.key-delimiter指定的鍵分隔字元,按照DDL中定義的順序串連所有主鍵欄位,從而為每一行產生一個文檔ID字串。

    說明

    文檔ID為最多512個位元組但不包含空格的字串。

    failure-handler

    Elasticsearch請求失敗時的故障處理策略。

    String

    fail

    可選策略如下:

    • fail(預設值):如果請求失敗,則作業失敗。

    • ignore:忽略失敗並刪除請求。

    • retry-rejected:重新添加由於隊列容量滿而失敗的請求。

    • custom class name:用於使用ActionRequestFailureHandler子類進行故障處理。

    sink.flush-on-checkpoint

    是否在checkpoint時執行flush。

    Boolean

    true

    • true:預設值。

    • false:禁用該功能後,在Elasticsearch進行Checkpoint時,連接器將不等待確認所有pending請求是否已完成,故連接器不會為請求提供At-least-once保證。

    sink.bulk-flush.backoff.strategy

    如果由於臨時請求錯誤導致flush操作失敗,則設定sink.bulk-flush.backoff.strategy指定重試策略。

    Enum

    DISABLED

    • DISABLED(預設值):不執行重試,即第一次請求錯誤後失敗。

    • CONSTANT:常量回退,即每次回退等待時間相同。

    • EXPONENTIAL:指數回退,即每次回退等待時間指數遞增。

    sink.bulk-flush.backoff.max-retries

    最大回退重試次數。

    Int

    無。

    sink.bulk-flush.backoff.delay

    每次回退嘗試之間的延遲。

    Duration

    • 對於CONSTANT回退策略:該值為每次重試之間的延遲。

    • 對於EXPONENTIAL回退策略:該值為初始基準延遲。

    sink.bulk-flush.max-actions

    每個批量請求的最大緩衝運算元。

    Int

    1000

    0表示禁用該功能。

    sink.bulk-flush.max-size

    存放請求的緩衝區記憶體最大值。

    String

    2 MB

    單位為MB,預設值為2 MB,0 MB表示禁用該功能。

    sink.bulk-flush.interval

    flush的間隔。

    Duration

    1s

    單位為秒,預設值為1s,0s表示禁用該功能。

    connection.path-prefix

    要添加到每個REST通訊中的前置詞字元串。

    String

    無。

    retry-on-conflict

    更新操作中,允許因版本衝突異常而重試的最大次數。超過該次數後將拋出異常導致作業失敗。

    Int

    0

    說明
    • 僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。

    • 該參數僅在定義了主鍵的情況下生效。

    routing-fields

    指定一個或多個ES欄位名稱,用來將文檔路由到Elasticsearch的指定分區中。

    String

    多個欄位名以分號(;)進行分割。如果某個欄位資料為空白,則該欄位會被置為null。

    說明

    僅Realtime Compute引擎VVR 8.0.6及以上版本,且elasticsearch-7和elasticsearch-8支援該參數。

    sink.delete-strategy

    用來配置收到回撤(-D/-U)類型訊息時的行為

    Enum

    DELETE_ROW_ON_PK

    可選行為如下:

    • DELETE_ROW_ON_PK(預設值):忽略-U類型的訊息,但是在收到-D類型的訊息時刪除主鍵對應的行(文檔)。

    • IGNORE_DELETE:忽略-U和-D 類型的訊息,Elasticsearch Sink不發生回撤。

    • NON_PK_FIELD_TO_NULL:忽略 -U類型的訊息,但是在收到-D類型的訊息時,會修改主鍵對應的行(文檔),主索引值保持不變,表 Schema中其他非主索引值均置為 NULL。主要用在多個Sink同時寫入同一張Elasticsearch表時部分更新的情境。

    • CHANGELOG_STANDARD:和 DELETE_ROW_ON_PK類似,唯一的區別是該模式收到-U類型的訊息時也會刪除主鍵對應的行(文檔)。

      說明

      僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

  • 維表

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    維表類型。

    String

    固定值為elasticsearch。

    endPoint

    Server地址。

    String

    例如:http://127.0.0.1:XXXX

    indexName

    索引名稱。

    String

    無。

    accessId

    Elasticsearch執行個體的使用者名稱。

    String

    預設為空白,不進行許可權驗證。如果定義了accessId,則必須定義非空的accessKey

    重要

    為了避免您的使用者名稱和密碼資訊泄露,建議您通過密鑰管理的方式填寫使用者名稱和密碼取值,詳情請參見變數和密鑰管理

    accessKey

    Elasticsearch執行個體的密碼。

    String

    typeNames

    Type名稱。

    String

    _doc

    Elasticsearch 7.0以上版本不建議設定該參數。

    maxJoinRows

    單行資料Join的最多行數。

    Integer

    1024

    無。

    cache

    緩衝策略。

    String

    ALL

    支援以下三種緩衝策略:

    • ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中的所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。

    • LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料。如果沒有找到,則去物理維表中尋找。

    • None:無緩衝。

    cacheSize

    緩衝大小,即緩衝多少行資料。

    Long

    100000

    僅當cache選擇LRU緩衝策略時,cacheSize參數生效。

    cacheTTLMs

    緩衝失效的逾時時間。

    Long

    Long.MAX_VALUE

    單位為毫秒。cacheTTLMs配置和cache配置有關:

    • 如果cache配置為LRU,則cacheTTLMs為緩衝失效的逾時時間,預設不到期。

    • 如果cache配置為ALL,則cacheTTLMs為設定緩衝重新載入的間隔時間,預設不重新載入。

    ignoreKeywordSuffix

    是否忽略自動為String欄位添加的.keyword尾碼。

    Boolean

    false

    為了保證相容性,Flink將Elasticsearch中的Text類型轉換為String,並預設在String類型欄位名後增加.keyword尾碼。

    參數取值如下:

    • true:忽略。

      如果因此無法匹配到Elasticsearch中的Text類型欄位,需要將該參數配置為true。

    • false:不忽略。

    cacheEmpty

    是否緩衝物理維表中尋找結果為空白的結果。

    Boolean

    true

    僅當cache選擇LRU緩衝策略時,cacheEmpty參數生效。

    queryMaxDocs

    非主鍵維表的輸入端每條資料到來後,查詢Elasticsearch Server時返回的最大文檔條數。

    Integer

    10000

    預設值10000是Elasticsearch Server返迴文檔條數的最大限制,該配置項的取值不能超過這個上限。

    說明
    • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

    • 該參數僅對非主鍵維表生效,因為主鍵表中資料是唯一的。

    • 為了查詢的正確性,預設值給的比較大。但是該值會增大查詢Elasticsearch時的記憶體佔用,確實遇到記憶體問題後,可以適當降低該值來最佳化記憶體使用量。

類型映射

Flink以JSON來解析Elasticsearch資料,詳情請參見資料類型映射關係

使用樣本

  • 源表示例

    CREATE TEMPORARY TABLE elasticsearch_source (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT name, location, `value`
    FROM elasticsearch_source;
  • 維表示例

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen' 
    );
    
    CREATE TEMPORARY TABLE es_dim (
      id STRING,
      `value` FLOAT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      `value` FLOAT
    ) WITH (
      'connector' = 'blackhole' 
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • 結果表示例1

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      name STRING,
      uv BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      user_id STRING,
      user_name STRING,
      uv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, name, uv
    FROM datagen_source;
  • 結果表示例2

    CREATE TEMPORARY TABLE datagen_source(  
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >
    ) WITH (  
        'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >, 
      PRIMARY KEY (id) NOT ENFORCED  -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, details
    FROM datagen_source;