本文介绍如何创建Elasticsearch Sink Connector,您可以通过Elasticsearch Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至阿里云Elasticsearch中。
前提条件
详细步骤,请参见创建前提。
步骤一:创建目标服务资源
在Elasticsearch管理控制台创建实例和索引。更多信息,请参见快速入门。
在Elasticsearch白名单配置中添加函数计算服务地址所在的网段,通过专有网络连接需要配置,公网无需执行。更多信息,请参见配置实例公网或私网访问白名单。
步骤二:创建Elasticsearch Sink Connector
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,选择 。
在任务列表页面,单击创建任务。
在创建任务面板,设置任务名称和描述,配置以下参数。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 Kafka 版,设置以下参数,然后单击下一步。
参数
说明
示例
地域
源Kafka实例所在的地域。
华东1(杭州)
kafka实例
数据源所在的Kafka实例ID。
alikafka_post-cn-9hdsbdhd****
Topic
数据源所在的Kafka实例Topic。
guide-sink-topic
Group ID
数据源所在的Kafka实例中的Group ID。
快速创建:自动创建以GID_EVENTBRIDGE_xxx命名的Group ID。
使用已有:选择已创建的Group,请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发。
使用已有
消费位点
最新位点:从最新位点开始消费。
最早位点:从最初位点开始消费。
最新位点
网络配置
有跨境传输数据需求时选择自建公网,其他情况可选择基础网络。
基础网络
数据格式
数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。
Json(默认Json格式编码,二进制数据按照utf-8 编码为Json格式放入Payload。)
Text(文本格式编码,二进制数据按照utf-8编码为字符串放入Payload。)
Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload。)
Json
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
2000
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,定义数据模式过滤发送的请求。更多信息,请参见消息过滤。
在Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见使用函数计算实现消息数据清洗。
在Sink(目标)配置向导,选择服务类型为阿里云Elasticsearch acs.elasticSearch,配置以下参数。
参数
说明
示例
Elasticsearch实例
已创建的Elasticsearch实例。
es-cn-pe336j0gj001e****
实例登录名
创建实例时配置的登录名,默认为elastic。
elastic
实例登录密码
创建实例时配置的密码。
******
索引名称
已创建的索引名称。如何创建索引,请参见步骤三:创建索引。索引名称支持字符串常量或 JsonPath 规则提示变量,如:product_info、$.data.key。
product_info
数据文档类型
填写数据文档类型,支持字符串常量或 JsonPath 规则提取变量。
如:_doc、$.data.key。
说明仅当 Elasticsearch实例版本小于 7 时才可配置,默认为常量_doc。
_doc
文档 document
选择将完整事件或部分事件投递到Elasticsearch,如选择部分事件,需配置JsonPath 提取规则。
完整事件
网络配置
专有网络:通过专有网络VPC将Kafka消息投递到Elasticsearch。
公网:通过公网将Kafka消息投递到Elasticsearch。
公网
VPC
选择Elasticsearch实例所属的专有网络。仅当网络配置为专有网络时需配置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择Elasticsearch实例所属的专有交换机。仅当网络配置为专有网络时需配置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。仅当网络配置为专有网络时需配置此参数。
test_group
任务属性
配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信。
完成上述配置后,单击保存。在任务列表页面,找到刚创建的Elasticsearch Sink Connector任务,此时状态栏为启动中,当状态变为运行中时,Connector创建成功。
步骤三:测试Elasticsearch Sink Connector
在任务列表页面,在Elasticsearch Sink Connector任务的事件源列单击源Topic。
在Topic详情页面,单击体验发送消息。
在快速体验消息收发面板,按照下图配置消息内容,然后单击确定。
登录Elasticsearch管理控制台,通过Kibana访问实例。更多信息,请参见快速入门。
在Kibana控制台上执行以下命令,查看数据插入结果。
GET /{索引名称}/_search
数据插入结果如下所示: