This topic describes how to create an Elasticsearch sink connector to export data from a topic on an ApsaraMQ for Kafka instance to Elasticsearch.
Prerequisites
For information about the prerequisites, see Prerequisites.
Step 1: Create Elasticsearch resources
Create an instance and an index in the Elasticsearch console. For more information, see Getting started.
Add the CIDR block of the endpoint that you use to access Function Compute to the IP address whitelist of the Elasticsearch cluster. This operation is performed only when you access Elasticsearch in a virtual private cloud (VPC). For information about how to configure a whitelist, see Configure a public or private IP address whitelist for an Elasticsearch cluster.
Step 2: Create an Elasticsearch sink connector
Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.
In the left-side navigation pane, choose .
On the Tasks page, click Create Task.
In the Create Task page, configure the Task Name and Description parameters. Then, follow the on-screen instructions to configure other parameters.
Task Creation
In the Source step, set the Data Provider parameter to Message Queue for Apache Kafka and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.
Parameter
Description
Example
Region
The region where the ApsaraMQ for Kafka instance resides.
China (Hangzhou)
Message Queue for Apache Kafka Instance
The ID of the ApsaraMQ for Kafka instance in which the data that you want to route are produced.
alikafka_post-cn-9hdsbdhd****
Topic
The topic on the ApsaraMQ for Kafka instance in which the data that you want to route are produced.
guide-sink-topic
Group ID
The ID of the group on the ApsaraMQ for Kafka instance in which the data that you want to route are produced.
Quickly Create: The system automatically creates a group whose ID is in the GID_EVENTBRIDGE_xxx format.
Use Existing Group: Select the ID of an existing group that is not being used. If you select an existing group that is being used, the publishing and subscription of existing messages are affected.
Use Existing Group
Consumer Offset
Latest Offset: Messages are consumed from the latest offset.
Earliest Offset: Messages are consumed from the earliest offset.
Latest Offset
Network Configuration
If cross-border data transmission is required, select Internet. In other cases, select Basic Network.
Basic Network
Data Format
The data format feature is used to encode binary data delivered from the source into a specific data format. Multiple data formats are supported. If you do not have special requirements on encoding, specify Json as the value.
Json: Binary data is encoded into JSON-formatted data based on UTF-8 encoding and then put into the payload. This is the default value.
Text: Binary data is encoded into strings based on UTF-8 encoding and then put into the payload.
Binary: Binary data is encoded into strings based on Base64 encoding and then put into the payload.
Json
Messages
The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.
2000
Interval (Unit: Seconds)
The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 indicates that messages are sent immediately after aggregation.
3
In the Filtering step, define a data pattern to filter data. For more information, see Message filtering.
In the Transformation step, specify a data cleansing method to implement data processing capabilities such as splitting, mapping, enrichment, and dynamic routing. For more information, see Use Function Compute to perform message cleansing.
In the Sink step, set the Service Type parameter to Elasticsearch acs.elasticSearch and configure other parameters. The following table describes the parameters.
Parameter
Description
Example
Elasticsearch Cluster
The Alibaba Cloud Elasticsearch cluster that you created.
es-cn-pe336j0gj001e****
Cluster Logon Name
The logon name that you specified when you created the Elasticsearch cluster. Default logon name: elastic.
elastic
Cluster Logon Password
The password that you specified when you created the Elasticsearch cluster.
******
Index Name
The name of the index that you created. For information about how to create an index, see Step 3: Create an index. String constants or variants extracted by using the JSONPath syntax are supported. Examples: product_info and $.data.key.
product_info
Document Type
The document type. String constants or variants extracted by using the JSONPath syntax are supported.
Examples: _doc and $.data.key.
NoteThis parameter is available only when the version of the Elasticsearch instance is earlier than 7. The default value of this parameter is _doc.
_doc
Document
Specify whether to deliver all content of an event or specific content of an event to Elasticsearch. If you select Partial Event for this parameter, you must specify the JSONPath extraction rule.
Complete Event
Network Settings
VPC: Messages in ApsaraMQ for Kafka are delivered to Elasticsearch in a VPC.
Internet: Messages in ApsaraMQ for Kafka are delivered to Elasticsearch over the Internet.
Internet
VPC
The VPC to which the Elasticsearch instance belongs. This parameter is required only if you set the Network Settings parameter to VPC.
vpc-bp17fapfdj0dwzjkd****
vSwitch
The vSwitch to which the Elasticsearch instance belongs. This parameter is required only if you set the Network Settings parameter to VPC.
vsw-bp1gbjhj53hdjdkg****
Security Group
The security group to which the Elasticsearch instance belongs. This parameter is required only if you set the Network Settings parameter to VPC.
test_group
Task Property
Configure the retry policy that you want to use when events fail to be pushed and the method that you want to use to handle faults. For more information, see Retry policies and dead-letter queues.
Click Save. On the Tasks page, find the Elasticsearch sink connector that you created. When the status in the Status column changes from Starting to Running, the connector is created.
Step 3: Test the Elasticsearch sink connector
On the Tasks page, find the Elasticsearch sink connector that you created and click the source topic in the Event Source column.
- On the Topic Details page, click Send Message.
In the Start to Send and Consume Message panel, configure the parameters based on the following figure and click OK.
Log on to the Elasticsearch console and use Kibana to access the instance. For more information, see Getting started.
In the Kibana console of the Elasticsearch cluster, run the following command to view the data insertion result.
GET /{Index name}/_search
The following figure shows the data insertion result.