当您需要将数据从云消息队列 Kafka 版实例的数据源Topic导出至阿里云Elasticsearch的索引,您可以通过Elasticsearch Sink Connector实现。本文介绍如何创建Elasticsearch Sink Connector。
前提条件
在导出数据前,请确保您已完成以下操作:- 云消息队列 Kafka 版
- 为云消息队列 Kafka 版实例开启Connector。更多信息,请参见开启Connector。
- 为云消息队列 Kafka 版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic。
- 函数计算
- Elasticsearch
- 在Elasticsearch管理控制台创建实例和索引。更多信息,请参见快速入门。
- 在Elasticsearch白名单配置中添加了函数计算服务地址所在的网段。更多信息,请参见配置实例公网或私网访问白名单。
说明- 函数计算使用的Elasticsearch客户端版本为7.7.0,为保持兼容,您需创建7.0或以上版本的Elasticsearch实例。
- 配置白名单时,您可以设置网段为0.0.0.0/0,代表整个VPC可访问,访问成功后根据需要修改为对应的网段。
注意事项
- 仅支持在同地域内,将数据从云消息队列 Kafka 版实例的数据源Topic导出至函数计算,再由函数计算导出至Elasticsearch。关于Connector的限制说明,请参见使用限制。
- 该功能基于函数计算服务提供。函数计算为您提供了一定的免费额度,超额部分将产生费用,请以函数计算的计费规则为准。计费详情,请参见计费概述。
- 函数计算的函数调用支持日志查询,以便您迅速排查问题。具体操作步骤,请参见配置日志。
- 消息转储时,云消息队列 Kafka 版中消息用UTF-8 String序列化,暂不支持二进制的数据格式。
- 如果Elasticsearch Sink Connector接入点是私网接入点,函数计算运行环境默认无法访问,为确保网络畅通,需在函数计算控制台为函数服务配置与Elasticsearch一致的VPC和vSwitch信息。更多信息,请参见更新服务。
创建并部署Elasticsearch Sink Connector
- 登录云消息队列 Kafka 版控制台。
- 在概览页面的资源分布区域,选择地域。
- 在左侧导航栏,单击Connector 任务列表。
- 在Connector 任务列表页面,从选择实例的下拉列表选择Connector所属的实例,然后单击创建 Connector。
- 在创建 Connector配置向导页面,完成以下操作。
- 创建完成后,在Connector 任务列表页面,找到创建的Connector ,单击其操作列的部署。
配置函数服务
您在云消息队列 Kafka 版控制台成功创建并部署Elasticsearch Sink Connector后,函数计算会自动为您创建给该Connector使用的函数服务,服务命名格式为kafka-service-<connector_name>-<随机String>
。
- 在Connector 任务列表页面,找到目标Connector,在其右侧操作列,选择 。页面跳转至函数计算控制台。
- 在函数计算控制台,找到自动创建的函数服务,并配置其VPC和交换机信息。请确保该信息和您阿里云Elasticsearch相同。配置的具体步骤,请参见更新服务。
发送测试消息
您可以向云消息队列 Kafka 版的数据源Topic发送消息,测试数据能否被导出至阿里云Elasticsearch。
- 在Connector 任务列表页面,找到目标Connector,在其右侧操作列,单击测试。
- 在发送消息面板,发送测试消息。
- 发送方式选择控制台。
- 在消息 Key文本框中输入消息的Key值,例如demo。
- 在消息内容文本框输入测试的消息内容,例如 {"key": "test"}。
- 设置发送到指定分区,选择是否指定分区。
- 单击是,在分区 ID文本框中输入分区的ID,例如0。如果您需查询分区的ID,请参见查看分区状态。
- 单击否,不指定分区。
- 发送方式选择Docker,执行运行 Docker 容器生产示例消息区域的Docker命令,发送消息。
- 发送方式选择SDK,根据您的业务需求,选择需要的语言或者框架的SDK以及接入方式,通过SDK发送消息。
- 发送方式选择控制台。
验证结果
向云消息队列 Kafka 版的数据源Topic发送消息后,登录Kibana控制台,执行GET /<index_name>/_search
查看索引,验证数据导出结果。
云消息队列 Kafka 版数据导出至Elasticsearch的格式示例如下:
{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "product_****",
"_type" : "_doc",
"_id" : "TX3TZHgBfHNEDGoZ****",
"_score" : 1.0,
"_source" : {
"msg_body" : {
"key" : "test",
"offset" : 2,
"overflowFlag" : false,
"partition" : 2,
"timestamp" : 1616599282417,
"topic" : "dv****",
"value" : "test1",
"valueSize" : 8
},
"doc_as_upsert" : true
}
}
]
}
}