This topic describes how to create an Elasticsearch sink connector to synchronize data from a source topic in your ApsaraMQ for Kafka instance to an index of your Elasticsearch cluster.
Prerequisites
Before you synchronize data, make sure that the following requirements are met:- ApsaraMQ for Kafka
- The connector feature is enabled for your ApsaraMQ for Kafka instance. For more information, see Enable the connector feature.
- A topic is created in the ApsaraMQ for Kafka instance. For more information, see Step 1: Create a topic.
- Function Compute
- Function Compute is activated. For more information, see Activate Function Compute.
- Elasticsearch
- An instance and an index are created in the Elasticsearch console. For more information, see Getting started.
- The CIDR block of the Function Compute endpoint that you use is added to the whitelist of the Elasticsearch instance. For more information, see Configure a public or private IP address whitelist for an Elasticsearch cluster.
Note- The version of the Elasticsearch client that is used by Function Compute is 7.7.0. To ensure compatibility, create an Elasticsearch cluster of version 7.0 or later.
- When you configure the whitelist, you can specify 0.0.0.0/0 as the CIDR block, which indicates that the Elasticsearch cluster can be accessed from all IP addresses in the virtual private cloud (VPC) that you use. After the access succeeds, change the CIDR block as needed.
Usage notes
- To synchronize data from ApsaraMQ for Kafka to Elasticsearch, the Message Queue for Apache Kafka instance that contains the source topic and the Elasticsearch cluster must be in the same region. Message Queue for Apache Kafka first synchronizes the data to Function Compute. Then, Function Compute synchronizes the data to Elasticsearch. For information about the limits on connectors, see Limits.
- Elasticsearch sink connectors export data by using Function Compute. Function Compute provides a certain amount of resources for free. When you use up this free quota, you are charged for the Function Compute resources that you use based on the billing rules. For more information, see Billing overview.
- Function Compute allows you to query the logs of function calls to troubleshoot issues. For more information, see Configure the logging feature.
- ApsaraMQ for Kafka serializes messages into UTF-8-encoded strings for transfer. Message Queue for Apache Kafka does not support binary data.
- By default, if you specify the private endpoint of the Elasticsearch cluster for the Elasticsearch sink connector, Function Compute cannot access the Elasticsearch cluster. To ensure network connectivity, you must specify the same VPC and vSwitch as those of the Elasticsearch cluster for the related Function Compute service in the Function Compute console. For more information, see Update a service.
Create and deploy an Elasticsearch sink connector
- Log on to the ApsaraMQ for Kafka console.
- In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
- In the left-side navigation pane, click Connectors.
- On the Connectors page, select the instance in which the data source topic resides from the Select Instance drop-down list and click Create Connector.
- Perform the following operations to complete the Create Connector wizard.
- Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.
Configure the related Function Compute service
After you create and deploy the Elasticsearch sink connector in the ApsaraMQ for Kafka console, Function Compute automatically creates a Function Compute service for the connector and names the service in the kafka-service-<Connector_name>-<Random string>
format.
- On the Connectors page, find the connector that you created. In the Actions column of the connector, choose . You are redirected to the Function Compute console.
- In the Function Compute console, find the automatically created service and configure a VPC and vSwitch for the service. Make sure that the VPC and vSwitch are the same as those specified for your Elasticsearch cluster. For more information, see Update a service.
Send messages
You can send a message to the source topic in your ApsaraMQ for Kafka instance to test whether the data can be synchronized to Elasticsearch.
- On the Connectors page, find the connector that you want to use and click Test in the Actions column.
- In the Send Message panel, configure the required parameters to send a test message.
- Set the Method of Sending parameter to Console.
- In the Message Key field, enter the key of the message. For example, you can enter demo as the key of the message.
- In the Message Content field, enter the content of the message. For example, you can enter {"key": "test"} as the content of the message.
- Configure the Send to Specified Partition parameter to specify whether to send the message to a specified partition.
- If you want to send the message to a specified partition, click Yes and enter the partition ID in the Partition ID field. For example, you can enter 0 as the partition ID. For information about how to query partition IDs, see View partition status.
- If you do not want to send the message to a specified partition, click No.
- Set the Method of Sending parameter to Docker and run the docker commands that are provided in the Run the Docker container to produce a sample message section to send a test message.
- Set the Method of Sending parameter to SDK and click the link to the topic that describes how to obtain and use the SDK that you want to use. Then, use the SDK to send and consume a test message. Message Queue for Apache Kafka provides topics that describe how to use SDKs for different programming languages based on different connection types.
- Set the Method of Sending parameter to Console.
Verify the results
After you send a message to the source topic in your ApsaraMQ for Kafka instance, log on to the Kibana console and run the GET /<index_name>/_search
command to view the Elasticsearch index and verify whether the data is synchronized.
{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "product_****",
"_type" : "_doc",
"_id" : "TX3TZHgBfHNEDGoZ****",
"_score" : 1.0,
"_source" : {
"msg_body" : {
"key" : "test",
"offset" : 2,
"overflowFlag" : false,
"partition" : 2,
"timestamp" : 1616599282417,
"topic" : "dv****",
"value" : "test1",
"valueSize" : 8
},
"doc_as_upsert" : true
}
}
]
}
}