本文為您介紹如何使用Elasticsearch連接器。
背景資訊
Elasticsearch相容開源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商業功能,致力於資料分析、資料搜尋等情境服務。為您提供企業級許可權管控、安全監控警示、自動報表產生等情境服務。
Elasticsearch連接器支援的資訊如下:
類別 | 詳情 |
支援類型 | 源表、維表和結果表 |
運行模式 | 批模式和流模式 |
資料格式 | JSON |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | Datastream和SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
已建立Elasticsearch索引,詳情請參見建立樣本。
已配置Elasticsearch公網或私網訪問白名單,詳情請參見配置執行個體公網或私網訪問白名單。
使用限制
源表和維表支援大於等於6.8.x,但小於8.x版本的Elasticsearch。
結果表僅支援Elasticsearch 6.x、7.x和8.x版本。
僅Flink計算引擎VVR 2.0.0及以上版本支援Elasticsearch連接器。
僅支援全量Elasticsearch源表,不支援增量Elasticsearch源表。
文法結構
源表
CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );
維表
CREATE TABLE es_dim( field1 STRING, --作為JOIN時的Key,必須為STRING類型。 field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );
說明如果指定主鍵,則維表JOIN時的Key(欄位)有且只能有一個,且必須為Elasticsearch對應索引的文檔ID。
如果不指定主鍵,則維表JOIN時的Key可以有一個或多個,需要為Elasticsearch對應索引的文檔中的欄位。
對於String類型,為了保持相容性,預設會對錶中欄位名增加.keyword尾碼。如果因此無法匹配到Elasticsearch中的Text欄位,可以將配置項ignoreKeywordSuffix配置為true。
結果表
CREATE TABLE es_sink( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', -- 如果是Elasticsearch 6.x版本,填寫elasticsearch-6 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );
說明Elasticsearch結果表會根據是否定義了主鍵,確定是在upsert模式或append模式下工作。
如果定義了主鍵,則主鍵必須為文檔ID,Elasticsearch結果表將在upsert模式下工作,該模式可以處理包含UPDATE和DELETE的訊息。
如果未定義主鍵,Elasticsearch將自動產生隨機的文檔ID,Elasticsearch結果表將在append模式工作,該模式只能消費INSERT訊息。
某些類型(例如BYTES、ROW、ARRAY和MAP等)由於沒有對應的字串表示形式,所以不允許其作為主鍵欄位。
DDL中的欄位均對應Elasticsearch文檔中的欄位,不支援將文檔ID等Meta資訊寫入Elasticsearch結果表中,因為文檔ID等Meta資訊由Elasticsearch執行個體側維護。
WITH參數
源表
參數
說明
資料類型
是否必填
預設值
備忘
connector
源表類型。
String
是
無
固定值為elasticsearch。
endPoint
Server地址。
String
是
無
例如:
http://127.0.0.1:XXXX
。indexName
索引名稱。
String
是
無
無。
accessId
Elasticsearch執行個體的使用者名稱。
String
否
無
預設為空白,不進行許可權驗證。如果定義了accessId,則必須定義非空的accessKey。
重要為了避免您的使用者名稱和密碼資訊泄露,建議您通過密鑰管理的方式填寫使用者名稱和密碼取值,詳情請參見變數管理。
accessKey
Elasticsearch執行個體的密碼。
String
否
無
typeNames
Type名稱。
String
否
_doc
Elasticsearch 7.0以上版本不建議設定該參數。
batchSize
每個scroll請求從Elasticsearch叢集擷取的最大文檔數。
Int
否
2000
無。
keepScrollAliveSecs
scroll上下文保留的最長時間。
Int
否
3600
單位為秒。
結果表
參數
說明
資料類型
是否必填
預設值
備忘
connector
結果表類型。
String
是
無
固定值為
elasticsearch-6
、elasticsearch-7
或elasticsearch-8
。說明僅Realtime Compute引擎VVR 8.0.5及以上版本支援配置為
elasticsearch-8
。hosts
Server地址。
String
是
無
例如:
127.0.0.1:XXXX
。index
索引名稱。
String
是
無
Elasticsearch結果表同時支援靜態索引和動態索引。在使用靜態和動態索引時,您需要注意以下幾點:
如果使用靜態索引,則索引選項值應為純字串,例如
myusers
,所有記錄都將被寫入myusers
索引。如果使用動態索引,可以使用
{field_name}
引用記錄中的欄位值以動態產生目標索引。您還可以使用{field_name|date_format_string}
將TIMESTAMP、DATE和TIME類型的欄位值轉換為date_format_string
指定的格式。date_format_string
與Java的DateTimeFormatter相容。例如,如果設定為myusers-{log_ts|yyyy-MM-dd}
,則log_ts欄位值為2020-03-27 12:25:55
的記錄將被寫入myusers-2020-03-27
索引。
document-type
文件類型。
String
elasticsearch-6:必填
elasticsearch-7:不支援
無
當連接器類型為
elasticsearch-6
時,此處參數取值需要和Elasticsearch側的type參數取值保持一致。username
使用者名稱。
String
否
空
預設為空白,不進行許可權驗證。如果定義了username,則必須定義非空的password。
重要為了避免您的使用者名稱和密碼資訊泄露,建議您通過密鑰管理的方式填寫使用者名稱和密碼取值,詳情請參見變數管理。
password
密碼。
String
否
空
document-id.key-delimiter
文檔ID的分隔字元。
String
否
_
在Elasticsearch結果表中,主鍵用於計算Elasticsearch的文檔ID。Elasticsearch結果表通過使用document-id.key-delimiter指定的鍵分隔字元,按照DDL中定義的順序串連所有主鍵欄位,從而為每一行產生一個文檔ID字串。
說明文檔ID為最多512個位元組但不包含空格的字串。
failure-handler
Elasticsearch請求失敗時的故障處理策略。
String
否
fail
可選策略如下:
fail(預設值):如果請求失敗,則作業失敗。
ignore:忽略失敗並刪除請求。
retry-rejected:重新添加由於隊列容量滿而失敗的請求。
custom class name:用於使用ActionRequestFailureHandler子類進行故障處理。
sink.flush-on-checkpoint
是否在checkpoint時執行flush。
Boolean
否
true
true:預設值。
false:禁用該功能後,在Elasticsearch進行Checkpoint時,連接器將不等待確認所有pending請求是否已完成,故連接器不會為請求提供At-least-once保證。
sink.bulk-flush.backoff.strategy
如果由於臨時請求錯誤導致flush操作失敗,則設定sink.bulk-flush.backoff.strategy指定重試策略。
Enum
否
DISABLED
DISABLED(預設值):不執行重試,即第一次請求錯誤後失敗。
CONSTANT:常量回退,即每次回退等待時間相同。
EXPONENTIAL:指數回退,即每次回退等待時間指數遞增。
sink.bulk-flush.backoff.max-retries
最大回退重試次數。
Int
否
無
無。
sink.bulk-flush.backoff.delay
每次回退嘗試之間的延遲。
Duration
否
無
對於CONSTANT回退策略:該值為每次重試之間的延遲。
對於EXPONENTIAL回退策略:該值為初始基準延遲。
sink.bulk-flush.max-actions
每個批量請求的最大緩衝運算元。
Int
否
1000
0表示禁用該功能。
sink.bulk-flush.max-size
存放請求的緩衝區記憶體最大值。
String
否
2 MB
單位為MB,預設值為2 MB,0 MB表示禁用該功能。
sink.bulk-flush.interval
flush的間隔。
Duration
否
1s
單位為秒,預設值為1s,0s表示禁用該功能。
connection.path-prefix
要添加到每個REST通訊中的前置詞字元串。
String
否
空
無。
retry-on-conflict
更新操作中,允許因版本衝突異常而重試的最大次數。超過該次數後將拋出異常導致作業失敗。
Int
否
0
說明僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。
該參數僅在定義了主鍵的情況下生效。
routing-fields
指定一個或多個ES欄位名稱,用來將文檔路由到Elasticsearch的指定分區中。
String
否
無
多個欄位名以分號(;)進行分割。如果某個欄位資料為空白,則該欄位會被置為null。
說明僅Realtime Compute引擎VVR 8.0.6及以上版本,且elasticsearch-7和elasticsearch-8支援該參數。
sink.delete-strategy
用來配置收到回撤(-D/-U)類型訊息時的行為
Enum
否
DELETE_ROW_ON_PK
可選行為如下:
DELETE_ROW_ON_PK(預設值):忽略-U類型的訊息,但是在收到-D類型的訊息時刪除主鍵對應的行(文檔)。
IGNORE_DELETE:忽略-U和-D 類型的訊息,Elasticsearch Sink不發生回撤。
NON_PK_FIELD_TO_NULL:忽略 -U類型的訊息,但是在收到-D類型的訊息時,會修改主鍵對應的行(文檔),主索引值保持不變,表 Schema中其他非主索引值均置為 NULL。主要用在多個Sink同時寫入同一張Elasticsearch表時部分更新的情境。
CHANGELOG_STANDARD:和 DELETE_ROW_ON_PK類似,唯一的區別是該模式收到-U類型的訊息時也會刪除主鍵對應的行(文檔)。
說明僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。
維表
參數
說明
資料類型
是否必填
預設值
備忘
connector
維表類型。
String
是
無
固定值為elasticsearch。
endPoint
Server地址。
String
是
無
例如:
http://127.0.0.1:XXXX
。indexName
索引名稱。
String
是
無
無。
accessId
Elasticsearch執行個體的使用者名稱。
String
否
無
預設為空白,不進行許可權驗證。如果定義了accessId,則必須定義非空的accessKey。
重要為了避免您的使用者名稱和密碼資訊泄露,建議您通過密鑰管理的方式填寫使用者名稱和密碼取值,詳情請參見變數管理。
accessKey
Elasticsearch執行個體的密碼。
String
否
無
typeNames
Type名稱。
String
否
_doc
Elasticsearch 7.0以上版本不建議設定該參數。
maxJoinRows
單行資料Join的最多行數。
Integer
否
1024
無。
cache
緩衝策略。
String
否
ALL
支援以下三種緩衝策略:
ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中的所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。
LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料。如果沒有找到,則去物理維表中尋找。
None:無緩衝。
cacheSize
緩衝大小,即緩衝多少行資料。
Long
否
100000
僅當cache選擇LRU緩衝策略時,cacheSize參數生效。
cacheTTLMs
緩衝失效的逾時時間。
Long
否
Long.MAX_VALUE
單位為毫秒。cacheTTLMs配置和cache配置有關:
如果cache配置為LRU,則cacheTTLMs為緩衝失效的逾時時間,預設不到期。
如果cache配置為ALL,則cacheTTLMs為設定緩衝重新載入的間隔時間,預設不重新載入。
ignoreKeywordSuffix
是否忽略自動為String欄位添加的.keyword尾碼。
Boolean
否
false
為了保證相容性,Flink將Elasticsearch中的Text類型轉換為String,並預設在String類型欄位名後增加.keyword尾碼。
參數取值如下:
true:忽略。
如果因此無法匹配到Elasticsearch中的Text類型欄位,需要將該參數配置為true。
false:不忽略。
cacheEmpty
是否緩衝物理維表中尋找結果為空白的結果。
Boolean
否
true
僅當cache選擇LRU緩衝策略時,cacheEmpty參數生效。
queryMaxDocs
非主鍵維表的輸入端每條資料到來後,查詢Elasticsearch Server時返回的最大文檔條數。
Integer
否
10000
預設值10000是Elasticsearch Server返迴文檔條數的最大限制,該配置項的取值不能超過這個上限。
說明僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。
該參數僅對非主鍵維表生效,因為主鍵表中資料是唯一的。
為了查詢的正確性,預設值給的比較大。但是該值會增大查詢Elasticsearch時的記憶體佔用,確實遇到記憶體問題後,可以適當降低該值來最佳化記憶體使用量。
類型映射
Flink以JSON來解析Elasticsearch資料,詳情請參見資料類型映射關係。
使用樣本
源表示例
CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source;
維表示例
CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_dim ( id STRING, `value` FLOAT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, `value` FLOAT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;
結果表示例1
CREATE TEMPORARY TABLE datagen_source ( id STRING, name STRING, uv BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( user_id STRING, user_name STRING, uv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, name, uv FROM datagen_source;
結果表示例2
CREATE TEMPORARY TABLE datagen_source( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> > ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> >, PRIMARY KEY (id) NOT ENFORCED -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, details FROM datagen_source;