All Products
Search
Document Center

Realtime Compute for Apache Flink:Elasticsearch connector

Last Updated:Sep 20, 2024

This topic describes how to use the Elasticsearch connector.

Background information

Alibaba Cloud Elasticsearch is compatible with the features of open source Elasticsearch, such as Security, Machine Learning, Graph, and Application Performance Monitoring (APM). Alibaba Cloud Elasticsearch is suitable for a variety of scenarios, such as data analysis and data search. Alibaba Cloud Elasticsearch provides enterprise-class services, such as access control, security monitoring and alerting, and automatic generation of reports.

The following table describes the capabilities supported by the Elasticsearch connector.

Item

Description

Table type

Source table, dimension table, and result table

Running mode

Batch mode and streaming mode

Data format

JSON

Metric

  • Metrics for source tables

    • pendingRecords

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

  • Metrics for dimension tables

    No default value

  • Metrics for result tables in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.6 or later

    • numRecordsOut

    • numRecordsOutPerSecond

Note

For more information about the metrics, see Metrics.

API type

DataStream API and SQL API

Data update or deletion in a result table

Supported

Prerequisites

Limits

  • The Elasticsearch connector can be used for source tables and dimension tables only if the related Elasticsearch cluster is of a version that is later than or equal to V5.5 but earlier than V8.X.

  • The Elasticsearch connector can be used for result tables only if the related Elasticsearch cluster is of V6.X, V7.X, or V8.X.

  • Only Realtime Compute for Apache Flink that uses VVR 2.0.0 or later supports the Elasticsearch connector.

  • The Elasticsearch connector can be used only for full Elasticsearch source tables and cannot be used for incremental Elasticsearch source tables.

Syntax

  • Statement for creating a source table

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
  • Statement for creating a dimension table

    CREATE TABLE es_dim(
      field1 STRING, --- If this field is used as a key to join a dimension table with another table, the value of this field must be of the STRING data type. 
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
    Note
    • If you define a primary key for a dimension table, only one key can be used to join the dimension table with another table. The key is the ID of a document in your Elasticsearch index.

    • If you do not define a primary key for the dimension table, one or more keys can be used to join the dimension table with another table. The keys are the fields of a document in your Elasticsearch index.

    • By default, the .keyword suffix is added to the names of the fields of the STRING data type to ensure compatibility. If the fields of the TEXT data type in the Elasticsearch table cannot be matched, you can set the value of the ignoreKeywordSuffix parameter to true.

  • Statement for creating a result table

    CREATE TABLE es_sink(
      user_id   STRING,
      user_name   STRING,
      uv BIGINT,
      pv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-7' -- If the Elasticsearch version is V6.X, enter elasticsearch-6.
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );
    Note
    • The Elasticsearch result table is determined to work in upsert mode or append mode based on whether a primary key is defined.

      • If a primary key is defined for the Elasticsearch result table, the primary key must be a document ID and the Elasticsearch result table works in upsert mode. In this mode, the Elasticsearch result table can consume UPDATE and DELETE messages.

      • If no primary key is defined for the Elasticsearch result table, Elasticsearch automatically generates a random document ID and the Elasticsearch result table works in append mode. In this mode, the Elasticsearch result table can consume only INSERT messages.

    • Specific data types, such as BYTES, ROW, ARRAY, and MAP, cannot be represented as strings. Therefore, fields of these data types cannot be used as primary key fields.

    • The fields in the DDL statement correspond to the fields in an Elasticsearch document. Metadata, such as document IDs, is maintained on Elasticsearch clusters. Therefore, metadata cannot be written to Elasticsearch result tables.

Parameters in the WITH clause

  • Parameters only for source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the source table.

    STRING

    Yes

    No default value

    Set the value to elasticsearch.

    endPoint

    The endpoint of the Elasticsearch cluster.

    STRING

    Yes

    No default value

    Example: http://127.0.0.1:XXXX.

    indexName

    The name of the Elasticsearch index.

    STRING

    Yes

    No default value

    N/A.

    accessId

    The username that is used to access the Elasticsearch cluster.

    STRING

    No

    No default value

    By default, this parameter is empty. This indicates that permission verification is not required. If you configure the accessId parameter, you must also configure the accessKey parameter.

    Important

    To protect your AccessKey pair, we recommend that you specify the username and password by using the key management method. For more information, see Manage variables and keys.

    accessKey

    The password that is used to access the Elasticsearch cluster.

    STRING

    No

    No default value

    typeNames

    The names of types.

    STRING

    No

    _doc

    We recommend that you do not configure this parameter if the version of your Elasticsearch cluster is later than V7.0.

    batchSize

    The maximum number of documents that can be obtained from the Elasticsearch cluster for each scroll request.

    INT

    No

    2000

    N/A.

    keepScrollAliveSecs

    The maximum retention period of the scroll context.

    INT

    No

    3600

    Unit: seconds.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the result table.

    String

    Yes

    No default value

    Set the value to elasticsearch-6, elasticsearch-7, or elasticsearch-8.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later allows you to set this parameter to elasticsearch-8.

    hosts

    The endpoint of the Elasticsearch cluster.

    String

    Yes

    No default value

    Example: 127.0.0.1:XXXX.

    index

    The name of the Elasticsearch index.

    String

    Yes

    No default value

    The Elasticsearch result table supports both static and dynamic indexes. When you use static and dynamic indexes, take note of the following points:

    • If you use a static index, the index option value must be a string, such as myusers. All records are written to the myusers index.

    • If you use a dynamic index, you can use {field_name} to reference the field values in the records to dynamically generate the destination index. You can also use {field_namedate_format_string} to convert field values of the TIMESTAMP, DATE, and TIME data types into the format specified by date_format_string. date_format_string is compatible with DateTimeFormatter in Java. For example, if you set the dynamic index to myusers-{log_tsyyyy-MM-dd}, the record 2020-03-27 12:25:55 in the value of the log_ts field is written to the myusers-2020-03-27 index.

    document-type

    The type of a document.

    String

    • If the connector parameter is set to elasticsearch-6, this parameter must be configured.

    • If the connector parameter is set to elasticsearch-7, this parameter is not supported.

    No default value

    If the connector parameter is set to elasticsearch-6, the value of this parameter must be the same as the value of the type parameter that is configured for Elasticsearch.

    username

    The username that is used to access the Elasticsearch cluster.

    String

    No

    No default value

    By default, this parameter is empty. This indicates that permission verification is not required. If you configure the username parameter, you must also configure the password parameter.

    Important

    To protect your AccessKey pair, we recommend that you specify the username and password by using the key management method. For more information, see Manage variables and keys.

    password

    The password that is used to access the Elasticsearch cluster.

    String

    No

    No default value

    document-id.key-delimiter

    The delimiter that is used to separate multiple document IDs.

    String

    No

    _

    In the Elasticsearch result table, the primary key is used to calculate document IDs of Elasticsearch. The Elasticsearch result table concatenates all primary key fields in the order that is defined in the DDL statement by using the key delimiter that is specified by document-id.key-delimiter. A document ID is also generated for each row.

    Note

    A document ID is a string that contains a maximum of 512 bytes without spaces.

    failure-handler

    The fault handling policy that is used when an Elasticsearch request fails.

    String

    No

    fail

    Valid values:

    • fail: The deployment fails if the request fails. This is the default value.

    • ignore: The failure is ignored and the request is deleted.

    • retry_rejected: The request is retried if the failure is caused by full queue capacity.

    • custom class name: The ActionRequestFailureHandler subclass is used to troubleshoot the failure.

    sink.flush-on-checkpoint

    Specifies whether the flush operation is triggered during checkpointing.

    Boolean

    No

    true

    • true: The flush operation is triggered during checkpointing. This is the default value.

    • false: The flush operation is not triggered during checkpointing. After this feature is disabled, the Elasticsearch connector does not wait to check whether all pending requests are complete during checkpointing. Therefore, the Elasticsearch connector does not provide the at-least-once guarantee for requests.

    sink.bulk-flush.backoff.strategy

    You can configure the sink.bulk-flush.backoff.strategy parameter to specify a retry policy if the flush operation fails due to a temporary request error.

    Enum

    No

    DISABLED

    • DISABLED: The flush operation is not retried. The flush operation fails when the first request error occurs. This is the default value.

    • CONSTANT: The waiting time for each flush operation is the same.

    • EXPONENTIAL: The waiting time for each flush operation exponentially increases.

    sink.bulk-flush.backoff.max-retries

    The maximum number of retries.

    Int

    No

    No default value

    N/A.

    sink.bulk-flush.backoff.delay

    The delay between retries.

    Duration

    No

    No default value

    • If the sink.bulk-flush.backoff.strategy parameter is set to CONSTANT, the value of this parameter is the delay between reties.

    • If the sink.bulk-flush.backoff.strategy parameter is set to EXPONENTIAL, the value of this parameter is the initial baseline delay.

    sink.bulk-flush.max-actions

    The maximum number of flush operations that can be performed for each batch of requests.

    Int

    No

    1000

    The value 0 indicates that this feature is disabled.

    sink.bulk-flush.max-size

    The maximum memory size of the buffer in which requests are saved.

    String

    No

    2 MB

    Default value: 2. Unit: MB. If this parameter is set to 0, this feature is disabled.

    sink.bulk-flush.interval

    The interval at which flush operations are performed.

    Duration

    No

    1s

    Default value: 1. Unit: seconds. If this parameter is set to 0, this feature is disabled.

    connection.path-prefix

    The prefix that must be added to each REST communication.

    String

    No

    No default value

    N/A.

    retry-on-conflict

    The maximum number of retries that are allowed due to version conflicts in an update operation. If the number of retries exceeds the value of this parameter, an exception occurs and the deployment fails.

    Int

    No

    0

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.

    • This parameter takes effect only when a primary key is specified.

    routing-fields

    One or more names for fields in the Elasticsearch result table. The field names are used to route documents to the specified shards of the Elasticsearch cluster.

    String

    No

    No default value

    Separate multiple field names with semicolons (;). If a field is empty, the field is set to null.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter when the connector parameter is set to elasticsearch-7 or elasticsearch-8.

    sink.delete-strategy

    The operation that is performed when a retraction message (DELETE or UPDATE) is received.

    Enum

    No

    DELETE_ROW_ON_PK

    Valid values:

    • DELETE_ROW_ON_PK: ignores UPDATE messages and deletes the row (document) that matches the primary key value when a DELETE message is received. This is the default value.

    • IGNORE_DELETE: ignores UPDATE and DELETE messages. No retraction occurs in the Elasticsearch sink.

    • NON_PK_FIELD_TO_NULL: ignores UPDATE messages and modifies the row (document) that matches the primary key value when a DELETE message is received. The primary key value remains unchanged, and the non-primary key values in the table schema are set to NULL. This value is used to partially update data when multiple sinks are used to write data to the same Elasticsearch table.

    • CHANGELOG_STANDARD: Similar to DELETE_ROW_ON_PK. The only difference is that the row (document) that matches the primary key value is also deleted when an UPDATE message is received.

      Note

      Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

  • Parameters only for dimension tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the dimension table.

    STRING

    Yes

    No default value

    Set the value to elasticsearch.

    endPoint

    The endpoint of the Elasticsearch cluster.

    STRING

    Yes

    No default value

    Example: http://127.0.0.1:XXXX.

    indexName

    The name of the Elasticsearch index.

    STRING

    Yes

    No default value

    N/A.

    accessId

    The username that is used to access the Elasticsearch cluster.

    STRING

    No

    No default value

    By default, this parameter is empty. This indicates that permission verification is not required. If you configure the accessId parameter, you must also configure the accessKey parameter.

    Important

    To protect your AccessKey pair, we recommend that you specify the username and password by using the key management method. For more information, see Manage variables and keys.

    accessKey

    The password that is used to access the Elasticsearch cluster.

    STRING

    No

    No default value

    typeNames

    The names of types.

    STRING

    No

    _doc

    We recommend that you do not configure this parameter if the version of your Elasticsearch cluster is later than V7.0.

    maxJoinRows

    The maximum number of rows that can be joined.

    INTEGER

    No

    1024

    N/A.

    cache

    The cache policy.

    STRING

    No

    ALL

    Valid values:

    • ALL: All data in the dimension table is cached. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

    • LRU: Partial data in the dimension table is cached. The system searches for data in the cache each time a data record is read from the source table. If the data is not found, the system searches for the data in the physical dimension table.

    • None: No data is cached.

    cacheSize

    The maximum number of rows of data that can be cached.

    LONG

    No

    100000

    The cacheSize parameter takes effect only when you set the cache parameter to LRU.

    cacheTTLMs

    The cache timeout period.

    LONG

    No

    Long.MAX_VALUE

    Unit: milliseconds. The configuration of the cacheTTLMs parameter varies based on the cache parameter.

    • If the cache parameter is set to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.

    • If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system refreshes the cache. By default, the cache is not refreshed.

    ignoreKeywordSuffix

    Specifies whether to ignore the .keyword suffix that is automatically added to the name of the field of the STRING data type.

    BOOLEAN

    No

    false

    Realtime Compute for Apache Flink converts fields of the TEXT data type into fields of the STRING data type to ensure compatibility. By default, the .keyword suffix is added to the names of the fields of the STRING data type.

    Valid values:

    • true: The .keyword suffix is ignored.

      If the fields of the TEXT data type in the Elasticsearch result table cannot be matched, set this parameter to true.

    • false: The .keyword suffix is not ignored.

    cacheEmpty

    Specifies whether to cache the empty results that are found in the physical dimension table.

    BOOLEAN

    No

    true

    The cacheEmpty parameter takes effect only when the cache parameter is set to LRU.

    queryMaxDocs

    The maximum number of documents that can be returned when the Elasticsearch server is queried after each data record is sent to a non-primary key dimension table.

    Integer

    No

    10000

    The default value 10000 is also the maximum value of this parameter.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

    • This parameter takes effect only for non-primary key dimension tables because data in primary key tables is unique.

    • In order to ensure the correctness of queries, a large value is used as the default value. However, a large value increases memory usage during Elasticsearch query. If you encounter a memory insufficiency issue, you can decrease the value to reduce memory usage.

Data type mappings

Realtime Compute for Apache Flink parses Elasticsearch data in the JSON format. For more information, see Data Type Mapping.

Sample code

  • Sample code for a source table

    CREATE TEMPORARY TABLE elasticsearch_source (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT name, location, `value`
    FROM elasticsearch_source;
  • Sample code for a dimension table

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen' 
    );
    
    CREATE TEMPORARY TABLE es_dim (
      id STRING,
      `value` FLOAT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      `value` FLOAT
    ) WITH (
      'connector' = 'blackhole' 
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • Sample code for a result table 1

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      name STRING,
      uv BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      user_id STRING,
      user_name STRING,
      uv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED -- The primary key is optional. If you specify a primary key, the primary key is used as the document ID. If you do not specify a primary key, a random value is used as the document ID. 
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, name, uv
    FROM datagen_source;
  • Sample code for a result table 2

    CREATE TEMPORARY TABLE datagen_source(  
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >
    ) WITH (  
        'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >, 
      PRIMARY KEY (id) NOT ENFORCED  -- The primary key is optional. If you specify a primary key, the primary key is used as the document ID. If you do not specify a primary key, a random value is used as the document ID. 
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, details
    FROM datagen_source;