全部产品
Search
文档中心

实时计算Flink版:Elasticsearch

更新时间:Dec 19, 2024

本文为您介绍如何使用Elasticsearch连接器。

背景信息

阿里云Elasticsearch兼容开源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商业功能,致力于数据分析、数据搜索等场景服务。为您提供企业级权限管控、安全监控告警、自动报表生成等场景服务。

Elasticsearch连接器支持的信息如下:

类别

详情

支持类型

源表、维表和结果表

运行模式

批模式和流模式

数据格式

JSON

特有监控指标

  • 源表

    • pendingRecords

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

  • 维表

  • 结果表 ( VVR 6.0.6及以上 )

    • numRecordsOut

    • numRecordsOutPerSecond

说明

指标含义详情,请参见监控指标说明

API种类

Datastream和SQL

是否支持更新或删除结果表数据

前提条件

使用限制

  • 源表和维表支持大于等于6.8.x,但小于8.x版本的Elasticsearch。

  • 结果表仅支持Elasticsearch 6.x、7.x和8.x版本。

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持Elasticsearch连接器。

  • 仅支持全量Elasticsearch源表,不支持增量Elasticsearch源表。

语法结构

  • 源表

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
  • 维表

    CREATE TABLE es_dim(
      field1 STRING, --作为JOIN时的Key,必须为STRING类型。
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
    说明
    • 如果指定主键,则维表JOIN时的Key(字段)有且只能有一个,且必须为Elasticsearch对应索引的文档ID。

    • 如果不指定主键,则维表JOIN时的Key可以有一个或多个,需要为Elasticsearch对应索引的文档中的字段。

    • 对于String类型,为了保持兼容性,默认会对表中字段名增加.keyword后缀。如果因此无法匹配到Elasticsearch中的Text字段,可以将配置项ignoreKeywordSuffix配置为true。

  • 结果表

    CREATE TABLE es_sink(
      user_id   STRING,
      user_name   STRING,
      uv BIGINT,
      pv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-7', -- 如果是Elasticsearch 6.x版本,填写elasticsearch-6
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );
    说明
    • Elasticsearch结果表会根据是否定义了主键,确定是在upsert模式或append模式下工作。

      • 如果定义了主键,则主键必须为文档ID,Elasticsearch结果表将在upsert模式下工作,该模式可以处理包含UPDATE和DELETE的消息。

      • 如果未定义主键,Elasticsearch将自动生成随机的文档ID,Elasticsearch结果表将在append模式工作,该模式只能消费INSERT消息。

    • 某些类型(例如BYTES、ROW、ARRAY和MAP等)由于没有对应的字符串表示形式,所以不允许其作为主键字段。

    • DDL中的字段均对应Elasticsearch文档中的字段,不支持将文档ID等Meta信息写入Elasticsearch结果表中,因为文档ID等Meta信息由Elasticsearch实例侧维护。

WITH参数

  • 源表

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    源表类型。

    String

    固定值为elasticsearch。

    endPoint

    Server地址。

    String

    例如:http://127.0.0.1:XXXX

    indexName

    索引名称。

    String

    无。

    accessId

    Elasticsearch实例的用户名。

    String

    默认为空,不进行权限验证。如果定义了accessId,则必须定义非空的accessKey

    重要

    为了避免您的用户名和密码信息泄露,建议您通过密钥管理的方式填写用户名和密码取值,详情请参见变量管理

    accessKey

    Elasticsearch实例的密码。

    String

    typeNames

    Type名称。

    String

    _doc

    Elasticsearch 7.0以上版本不建议设置该参数。

    batchSize

    每个scroll请求从Elasticsearch集群获取的最大文档数。

    Int

    2000

    无。

    keepScrollAliveSecs

    scroll上下文保留的最长时间。

    Int

    3600

    单位为秒。

  • 结果表

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    结果表类型。

    String

    固定值为elasticsearch-6elasticsearch-7 elasticsearch-8

    说明

    仅实时计算引擎VVR 8.0.5及以上版本支持配置为elasticsearch-8

    hosts

    Server地址。

    String

    例如:127.0.0.1:XXXX

    index

    索引名称。

    String

    Elasticsearch结果表同时支持静态索引和动态索引。在使用静态和动态索引时,您需要注意以下几点:

    • 如果使用静态索引,则索引选项值应为纯字符串,例如myusers,所有记录都将被写入myusers索引。

    • 如果使用动态索引,可以使用{field_name}引用记录中的字段值以动态生成目标索引。您还可以使用{field_name|date_format_string}将TIMESTAMP、DATE和TIME类型的字段值转换为date_format_string指定的格式。date_format_string与Java的DateTimeFormatter兼容。例如,如果设置为myusers-{log_ts|yyyy-MM-dd},则log_ts字段值为2020-03-27 12:25:55的记录将被写入myusers-2020-03-27索引。

    document-type

    文档类型。

    String

    • elasticsearch-6:必填

    • elasticsearch-7:不支持

    当连接器类型为elasticsearch-6时,此处参数取值需要和Elasticsearch侧的type参数取值保持一致。

    username

    用户名。

    String

    默认为空,不进行权限验证。如果定义了username,则必须定义非空的password。

    重要

    为了避免您的用户名和密码信息泄露,建议您通过密钥管理的方式填写用户名和密码取值,详情请参见变量管理

    password

    密码。

    String

    document-id.key-delimiter

    文档ID的分隔符。

    String

    _

    在Elasticsearch结果表中,主键用于计算Elasticsearch的文档ID。Elasticsearch结果表通过使用document-id.key-delimiter指定的键分隔符,按照DDL中定义的顺序连接所有主键字段,从而为每一行生成一个文档ID字符串。

    说明

    文档ID为最多512个字节但不包含空格的字符串。

    failure-handler

    Elasticsearch请求失败时的故障处理策略。

    String

    fail

    可选策略如下:

    • fail(默认值):如果请求失败,则作业失败。

    • ignore:忽略失败并删除请求。

    • retry-rejected:重新添加由于队列容量满而失败的请求。

    • custom class name:用于使用ActionRequestFailureHandler子类进行故障处理。

    sink.flush-on-checkpoint

    是否在checkpoint时执行flush。

    Boolean

    true

    • true:默认值。

    • false:禁用该功能后,在Elasticsearch进行Checkpoint时,连接器将不等待确认所有pending请求是否已完成,故连接器不会为请求提供At-least-once保证。

    sink.bulk-flush.backoff.strategy

    如果由于临时请求错误导致flush操作失败,则设置sink.bulk-flush.backoff.strategy指定重试策略。

    Enum

    DISABLED

    • DISABLED(默认值):不执行重试,即第一次请求错误后失败。

    • CONSTANT:常量回退,即每次回退等待时间相同。

    • EXPONENTIAL:指数回退,即每次回退等待时间指数递增。

    sink.bulk-flush.backoff.max-retries

    最大回退重试次数。

    Int

    无。

    sink.bulk-flush.backoff.delay

    每次回退尝试之间的延迟。

    Duration

    • 对于CONSTANT回退策略:该值为每次重试之间的延迟。

    • 对于EXPONENTIAL回退策略:该值为初始基准延迟。

    sink.bulk-flush.max-actions

    每个批量请求的最大缓冲操作数。

    Int

    1000

    0表示禁用该功能。

    sink.bulk-flush.max-size

    存放请求的缓冲区内存最大值。

    String

    2 MB

    单位为MB,默认值为2 MB,0 MB表示禁用该功能。

    sink.bulk-flush.interval

    flush的间隔。

    Duration

    1s

    单位为秒,默认值为1s,0s表示禁用该功能。

    connection.path-prefix

    要添加到每个REST通信中的前缀字符串。

    String

    无。

    retry-on-conflict

    更新操作中,允许因版本冲突异常而重试的最大次数。超过该次数后将抛出异常导致作业失败。

    Int

    0

    说明
    • 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。

    • 该参数仅在定义了主键的情况下生效。

    routing-fields

    指定一个或多个ES字段名称,用来将文档路由到Elasticsearch的指定分片中。

    String

    多个字段名以分号(;)进行分割。如果某个字段数据为空,则该字段会被置为null。

    说明

    仅实时计算引擎VVR 8.0.6及以上版本,且elasticsearch-7和elasticsearch-8支持该参数。

    sink.delete-strategy

    用来配置收到回撤(-D/-U)类型消息时的行为

    Enum

    DELETE_ROW_ON_PK

    可选行为如下:

    • DELETE_ROW_ON_PK(默认值):忽略-U类型的消息,但是在收到-D类型的消息时删除主键对应的行(文档)。

    • IGNORE_DELETE:忽略-U和-D 类型的消息,Elasticsearch Sink不发生回撤。

    • NON_PK_FIELD_TO_NULL:忽略 -U类型的消息,但是在收到-D类型的消息时,会修改主键对应的行(文档),主键值保持不变,表 Schema中其他非主键值均置为 NULL。主要用在多个Sink同时写入同一张Elasticsearch表时部分更新的场景。

    • CHANGELOG_STANDARD:和 DELETE_ROW_ON_PK类似,唯一的区别是该模式收到-U类型的消息时也会删除主键对应的行(文档)。

      说明

      仅实时计算引擎VVR 8.0.8及以上版本支持该参数。

  • 维表

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    维表类型。

    String

    固定值为elasticsearch。

    endPoint

    Server地址。

    String

    例如:http://127.0.0.1:XXXX

    indexName

    索引名称。

    String

    无。

    accessId

    Elasticsearch实例的用户名。

    String

    默认为空,不进行权限验证。如果定义了accessId,则必须定义非空的accessKey

    重要

    为了避免您的用户名和密码信息泄露,建议您通过密钥管理的方式填写用户名和密码取值,详情请参见变量管理

    accessKey

    Elasticsearch实例的密码。

    String

    typeNames

    Type名称。

    String

    _doc

    Elasticsearch 7.0以上版本不建议设置该参数。

    maxJoinRows

    单行数据Join的最多行数。

    Integer

    1024

    无。

    cache

    缓存策略。

    String

    ALL

    支持以下三种缓存策略:

    • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中的所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。

    • None:无缓存。

    cacheSize

    缓存大小,即缓存多少行数据。

    Long

    100000

    仅当cache选择LRU缓存策略时,cacheSize参数生效。

    cacheTTLMs

    缓存失效的超时时间。

    Long

    Long.MAX_VALUE

    单位为毫秒。cacheTTLMs配置和cache配置有关:

    • 如果cache配置为LRU,则cacheTTLMs为缓存失效的超时时间,默认不过期。

    • 如果cache配置为ALL,则cacheTTLMs为设置缓存重新加载的间隔时间,默认不重新加载。

    ignoreKeywordSuffix

    是否忽略自动为String字段添加的.keyword后缀。

    Boolean

    false

    为了保证兼容性,Flink将Elasticsearch中的Text类型转换为String,并默认在String类型字段名后增加.keyword后缀。

    参数取值如下:

    • true:忽略。

      如果因此无法匹配到Elasticsearch中的Text类型字段,需要将该参数配置为true。

    • false:不忽略。

    cacheEmpty

    是否缓存物理维表中查找结果为空的结果。

    Boolean

    true

    仅当cache选择LRU缓存策略时,cacheEmpty参数生效。

    queryMaxDocs

    非主键维表的输入端每条数据到来后,查询ElasticSearch Server时返回的最大文档条数。

    Integer

    10000

    默认值10000是ElasticSearch Server返回文档条数的最大限制,该配置项的取值不能超过这个上限。

    说明
    • 仅实时计算引擎VVR 8.0.8及以上版本支持该参数。

    • 该参数仅对非主键维表生效,因为主键表中数据是唯一的。

    • 为了查询的正确性,默认值给的比较大。但是该值会增大查询Elasticsearch时的内存占用,确实遇到内存问题后,可以适当降低该值来优化内存使用。

类型映射

Flink以JSON来解析Elasticsearch数据,详情请参见数据类型映射关系

使用示例

  • 源表示例

    CREATE TEMPORARY TABLE elasticsearch_source (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT name, location, `value`
    FROM elasticsearch_source;
  • 维表示例

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen' 
    );
    
    CREATE TEMPORARY TABLE es_dim (
      id STRING,
      `value` FLOAT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      `value` FLOAT
    ) WITH (
      'connector' = 'blackhole' 
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • 结果表示例1

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      name STRING,
      uv BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      user_id STRING,
      user_name STRING,
      uv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, name, uv
    FROM datagen_source;
  • 结果表示例2

    CREATE TEMPORARY TABLE datagen_source(  
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >
    ) WITH (  
        'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >, 
      PRIMARY KEY (id) NOT ENFORCED  -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, details
    FROM datagen_source;