This topic describes how to use the Elasticsearch connector.
Background information
Alibaba Cloud Elasticsearch is compatible with the features of open source Elasticsearch, such as Security, Machine Learning, Graph, and Application Performance Monitoring (APM). Alibaba Cloud Elasticsearch is suitable for a variety of scenarios, such as data analysis and data search. Alibaba Cloud Elasticsearch provides enterprise-class services, such as access control, security monitoring and alerting, and automatic generation of reports.
The following table describes the capabilities supported by the Elasticsearch connector.
Item | Description |
Table type | Source table, dimension table, and result table |
Running mode | Batch mode and streaming mode |
Data format | JSON |
Metric |
Note For more information about the metrics, see Metrics. |
API type | DataStream API and SQL API |
Data update or deletion in a result table | Supported |
Prerequisites
An Elasticsearch index is created. For more information, see the "Step 1: Create a cluster" section of the Getting started topic.
A public or private IP address whitelist is configured for the related Elasticsearch cluster. For more information, see Configure a public or private IP address whitelist for an Elasticsearch cluster.
Limits
The Elasticsearch connector can be used for source tables and dimension tables only if the related Elasticsearch cluster is of a version that is later than or equal to V5.5 but earlier than V8.X.
The Elasticsearch connector can be used for result tables only if the related Elasticsearch cluster is of V6.X, V7.X, or V8.X.
Only Realtime Compute for Apache Flink that uses VVR 2.0.0 or later supports the Elasticsearch connector.
The Elasticsearch connector can be used only for full Elasticsearch source tables and cannot be used for incremental Elasticsearch source tables.
Syntax
Statement for creating a source table
CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );
Statement for creating a dimension table
CREATE TABLE es_dim( field1 STRING, --- If this field is used as a key to join a dimension table with another table, the value of this field must be of the STRING data type. field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );
NoteIf you define a primary key for a dimension table, only one key can be used to join the dimension table with another table. The key is the ID of a document in your Elasticsearch index.
If you do not define a primary key for the dimension table, one or more keys can be used to join the dimension table with another table. The keys are the fields of a document in your Elasticsearch index.
By default, the .keyword suffix is added to the names of the fields of the STRING data type to ensure compatibility. If the fields of the TEXT data type in the Elasticsearch table cannot be matched, you can set the value of the ignoreKeywordSuffix parameter to true.
Statement for creating a result table
CREATE TABLE es_sink( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7' -- If the Elasticsearch version is V6.X, enter elasticsearch-6. 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );
NoteThe Elasticsearch result table is determined to work in upsert mode or append mode based on whether a primary key is defined.
If a primary key is defined for the Elasticsearch result table, the primary key must be a document ID and the Elasticsearch result table works in upsert mode. In this mode, the Elasticsearch result table can consume UPDATE and DELETE messages.
If no primary key is defined for the Elasticsearch result table, Elasticsearch automatically generates a random document ID and the Elasticsearch result table works in append mode. In this mode, the Elasticsearch result table can consume only INSERT messages.
Specific data types, such as BYTES, ROW, ARRAY, and MAP, cannot be represented as strings. Therefore, fields of these data types cannot be used as primary key fields.
The fields in the DDL statement correspond to the fields in an Elasticsearch document. Metadata, such as document IDs, is maintained on Elasticsearch clusters. Therefore, metadata cannot be written to Elasticsearch result tables.
Parameters in the WITH clause
Parameters only for source tables
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the source table.
STRING
Yes
No default value
Set the value to elasticsearch.
endPoint
The endpoint of the Elasticsearch cluster.
STRING
Yes
No default value
Example:
http://127.0.0.1:XXXX
.indexName
The name of the Elasticsearch index.
STRING
Yes
No default value
N/A.
accessId
The username that is used to access the Elasticsearch cluster.
STRING
No
No default value
By default, this parameter is empty. This indicates that permission verification is not required. If you configure the accessId parameter, you must also configure the accessKey parameter.
ImportantTo protect your AccessKey pair, we recommend that you specify the username and password by using the key management method. For more information, see Manage variables and keys.
accessKey
The password that is used to access the Elasticsearch cluster.
STRING
No
No default value
typeNames
The names of types.
STRING
No
_doc
We recommend that you do not configure this parameter if the version of your Elasticsearch cluster is later than V7.0.
batchSize
The maximum number of documents that can be obtained from the Elasticsearch cluster for each scroll request.
INT
No
2000
N/A.
keepScrollAliveSecs
The maximum retention period of the scroll context.
INT
No
3600
Unit: seconds.
Parameters only for result tables
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the result table.
String
Yes
No default value
Set the value to
elasticsearch-6
,elasticsearch-7
, orelasticsearch-8
.NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.5 or later allows you to set this parameter to
elasticsearch-8
.hosts
The endpoint of the Elasticsearch cluster.
String
Yes
No default value
Example:
127.0.0.1:XXXX
.index
The name of the Elasticsearch index.
String
Yes
No default value
The Elasticsearch result table supports both static and dynamic indexes. When you use static and dynamic indexes, take note of the following points:
If you use a static index, the index option value must be a string, such as
myusers
. All records are written to themyusers
index.If you use a dynamic index, you can use
{field_name}
to reference the field values in the records to dynamically generate the destination index. You can also use{field_namedate_format_string}
to convert field values of the TIMESTAMP, DATE, and TIME data types into the format specified bydate_format_string
.date_format_string
is compatible with DateTimeFormatter in Java. For example, if you set the dynamic index tomyusers-{log_tsyyyy-MM-dd}
, the record2020-03-27 12:25:55
in the value of the log_ts field is written to themyusers-2020-03-27
index.
document-type
The type of a document.
String
If the connector parameter is set to elasticsearch-6, this parameter must be configured.
If the connector parameter is set to elasticsearch-7, this parameter is not supported.
No default value
If the connector parameter is set to
elasticsearch-6
, the value of this parameter must be the same as the value of the type parameter that is configured for Elasticsearch.username
The username that is used to access the Elasticsearch cluster.
String
No
No default value
By default, this parameter is empty. This indicates that permission verification is not required. If you configure the username parameter, you must also configure the password parameter.
ImportantTo protect your AccessKey pair, we recommend that you specify the username and password by using the key management method. For more information, see Manage variables and keys.
password
The password that is used to access the Elasticsearch cluster.
String
No
No default value
document-id.key-delimiter
The delimiter that is used to separate multiple document IDs.
String
No
_
In the Elasticsearch result table, the primary key is used to calculate document IDs of Elasticsearch. The Elasticsearch result table concatenates all primary key fields in the order that is defined in the DDL statement by using the key delimiter that is specified by document-id.key-delimiter. A document ID is also generated for each row.
NoteA document ID is a string that contains a maximum of 512 bytes without spaces.
failure-handler
The fault handling policy that is used when an Elasticsearch request fails.
String
No
fail
Valid values:
fail: The deployment fails if the request fails. This is the default value.
ignore: The failure is ignored and the request is deleted.
retry_rejected: The request is retried if the failure is caused by full queue capacity.
custom class name: The ActionRequestFailureHandler subclass is used to troubleshoot the failure.
sink.flush-on-checkpoint
Specifies whether the flush operation is triggered during checkpointing.
Boolean
No
true
true: The flush operation is triggered during checkpointing. This is the default value.
false: The flush operation is not triggered during checkpointing. After this feature is disabled, the Elasticsearch connector does not wait to check whether all pending requests are complete during checkpointing. Therefore, the Elasticsearch connector does not provide the at-least-once guarantee for requests.
sink.bulk-flush.backoff.strategy
You can configure the sink.bulk-flush.backoff.strategy parameter to specify a retry policy if the flush operation fails due to a temporary request error.
Enum
No
DISABLED
DISABLED: The flush operation is not retried. The flush operation fails when the first request error occurs. This is the default value.
CONSTANT: The waiting time for each flush operation is the same.
EXPONENTIAL: The waiting time for each flush operation exponentially increases.
sink.bulk-flush.backoff.max-retries
The maximum number of retries.
Int
No
No default value
N/A.
sink.bulk-flush.backoff.delay
The delay between retries.
Duration
No
No default value
If the sink.bulk-flush.backoff.strategy parameter is set to CONSTANT, the value of this parameter is the delay between reties.
If the sink.bulk-flush.backoff.strategy parameter is set to EXPONENTIAL, the value of this parameter is the initial baseline delay.
sink.bulk-flush.max-actions
The maximum number of flush operations that can be performed for each batch of requests.
Int
No
1000
The value 0 indicates that this feature is disabled.
sink.bulk-flush.max-size
The maximum memory size of the buffer in which requests are saved.
String
No
2 MB
Default value: 2. Unit: MB. If this parameter is set to 0, this feature is disabled.
sink.bulk-flush.interval
The interval at which flush operations are performed.
Duration
No
1s
Default value: 1. Unit: seconds. If this parameter is set to 0, this feature is disabled.
connection.path-prefix
The prefix that must be added to each REST communication.
String
No
No default value
N/A.
retry-on-conflict
The maximum number of retries that are allowed due to version conflicts in an update operation. If the number of retries exceeds the value of this parameter, an exception occurs and the deployment fails.
Int
No
0
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.
This parameter takes effect only when a primary key is specified.
routing-fields
One or more names for fields in the Elasticsearch result table. The field names are used to route documents to the specified shards of the Elasticsearch cluster.
String
No
No default value
Separate multiple field names with semicolons (;). If a field is empty, the field is set to null.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter when the connector parameter is set to elasticsearch-7 or elasticsearch-8.
sink.delete-strategy
The operation that is performed when a retraction message (DELETE or UPDATE) is received.
Enum
No
DELETE_ROW_ON_PK
Valid values:
DELETE_ROW_ON_PK: ignores UPDATE messages and deletes the row (document) that matches the primary key value when a DELETE message is received. This is the default value.
IGNORE_DELETE: ignores UPDATE and DELETE messages. No retraction occurs in the Elasticsearch sink.
NON_PK_FIELD_TO_NULL: ignores UPDATE messages and modifies the row (document) that matches the primary key value when a DELETE message is received. The primary key value remains unchanged, and the non-primary key values in the table schema are set to NULL. This value is used to partially update data when multiple sinks are used to write data to the same Elasticsearch table.
CHANGELOG_STANDARD: Similar to DELETE_ROW_ON_PK. The only difference is that the row (document) that matches the primary key value is also deleted when an UPDATE message is received.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.
Parameters only for dimension tables
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the dimension table.
STRING
Yes
No default value
Set the value to elasticsearch.
endPoint
The endpoint of the Elasticsearch cluster.
STRING
Yes
No default value
Example:
http://127.0.0.1:XXXX
.indexName
The name of the Elasticsearch index.
STRING
Yes
No default value
N/A.
accessId
The username that is used to access the Elasticsearch cluster.
STRING
No
No default value
By default, this parameter is empty. This indicates that permission verification is not required. If you configure the accessId parameter, you must also configure the accessKey parameter.
ImportantTo protect your AccessKey pair, we recommend that you specify the username and password by using the key management method. For more information, see Manage variables and keys.
accessKey
The password that is used to access the Elasticsearch cluster.
STRING
No
No default value
typeNames
The names of types.
STRING
No
_doc
We recommend that you do not configure this parameter if the version of your Elasticsearch cluster is later than V7.0.
maxJoinRows
The maximum number of rows that can be joined.
INTEGER
No
1024
N/A.
cache
The cache policy.
STRING
No
ALL
Valid values:
ALL: All data in the dimension table is cached. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.
LRU: Partial data in the dimension table is cached. The system searches for data in the cache each time a data record is read from the source table. If the data is not found, the system searches for the data in the physical dimension table.
None: No data is cached.
cacheSize
The maximum number of rows of data that can be cached.
LONG
No
100000
The cacheSize parameter takes effect only when you set the cache parameter to LRU.
cacheTTLMs
The cache timeout period.
LONG
No
Long.MAX_VALUE
Unit: milliseconds. The configuration of the cacheTTLMs parameter varies based on the cache parameter.
If the cache parameter is set to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.
If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system refreshes the cache. By default, the cache is not refreshed.
ignoreKeywordSuffix
Specifies whether to ignore the .keyword suffix that is automatically added to the name of the field of the STRING data type.
BOOLEAN
No
false
Realtime Compute for Apache Flink converts fields of the TEXT data type into fields of the STRING data type to ensure compatibility. By default, the .keyword suffix is added to the names of the fields of the STRING data type.
Valid values:
true: The .keyword suffix is ignored.
If the fields of the TEXT data type in the Elasticsearch result table cannot be matched, set this parameter to true.
false: The .keyword suffix is not ignored.
cacheEmpty
Specifies whether to cache the empty results that are found in the physical dimension table.
BOOLEAN
No
true
The cacheEmpty parameter takes effect only when the cache parameter is set to LRU.
queryMaxDocs
The maximum number of documents that can be returned when the Elasticsearch server is queried after each data record is sent to a non-primary key dimension table.
Integer
No
10000
The default value 10000 is also the maximum value of this parameter.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.
This parameter takes effect only for non-primary key dimension tables because data in primary key tables is unique.
In order to ensure the correctness of queries, a large value is used as the default value. However, a large value increases memory usage during Elasticsearch query. If you encounter a memory insufficiency issue, you can decrease the value to reduce memory usage.
Data type mappings
Realtime Compute for Apache Flink parses Elasticsearch data in the JSON format. For more information, see Data Type Mapping.
Sample code
Sample code for a source table
CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source;
Sample code for a dimension table
CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_dim ( id STRING, `value` FLOAT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, `value` FLOAT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;
Sample code for a result table 1
CREATE TEMPORARY TABLE datagen_source ( id STRING, name STRING, uv BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( user_id STRING, user_name STRING, uv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED -- The primary key is optional. If you specify a primary key, the primary key is used as the document ID. If you do not specify a primary key, a random value is used as the document ID. ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, name, uv FROM datagen_source;
Sample code for a result table 2
CREATE TEMPORARY TABLE datagen_source( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> > ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> >, PRIMARY KEY (id) NOT ENFORCED -- The primary key is optional. If you specify a primary key, the primary key is used as the document ID. If you do not specify a primary key, a random value is used as the document ID. ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, details FROM datagen_source;