All Products
Search
Document Center

ApsaraMQ for Kafka:Create Elasticsearch sink connectors

Last Updated:Dec 04, 2024

This topic describes how to create an Elasticsearch sink connector to export data from a topic on an ApsaraMQ for Kafka instance to Elasticsearch.

Prerequisites

For information about the prerequisites, see Prerequisites.

Step 1: Create Elasticsearch resources

Step 2: Create an Elasticsearch sink connector

  1. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  2. In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.

  3. On the Tasks page, click Create Task.

  4. In the Create Task page, configure the Task Name and Description parameters. Then, follow the on-screen instructions to configure other parameters.

    • Task Creation

      1. In the Source step, set the Data Provider parameter to Message Queue for Apache Kafka and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.

        Parameter

        Description

        Example

        Region

        The region where the ApsaraMQ for Kafka instance resides.

        China (Hangzhou)

        Message Queue for Apache Kafka Instance

        The ID of the ApsaraMQ for Kafka instance in which the data that you want to route are produced.

        alikafka_post-cn-9hdsbdhd****

        Topic

        The topic on the ApsaraMQ for Kafka instance in which the data that you want to route are produced.

        guide-sink-topic

        Group ID

        The ID of the group on the ApsaraMQ for Kafka instance in which the data that you want to route are produced.

        • Quickly Create: The system automatically creates a group whose ID is in the GID_EVENTBRIDGE_xxx format.

        • Use Existing Group: Select the ID of an existing group that is not being used. If you select an existing group that is being used, the publishing and subscription of existing messages are affected.

        Use Existing Group

        Consumer Offset

        • Latest Offset: Messages are consumed from the latest offset.

        • Earliest Offset: Messages are consumed from the earliest offset.

        Latest Offset

        Network Configuration

        If cross-border data transmission is required, select Internet. In other cases, select Basic Network.

        Basic Network

        Data Format

        The data format feature is used to encode binary data delivered from the source into a specific data format. Multiple data formats are supported. If you do not have special requirements on encoding, specify Json as the value.

        • Json: Binary data is encoded into JSON-formatted data based on UTF-8 encoding and then put into the payload. This is the default value.

        • Text: Binary data is encoded into strings based on UTF-8 encoding and then put into the payload.

        • Binary: Binary data is encoded into strings based on Base64 encoding and then put into the payload.

        Json

        Messages

        The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.

        2000

        Interval (Unit: Seconds)

        The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 indicates that messages are sent immediately after aggregation.

        3

      2. In the Filtering step, define a data pattern to filter data. For more information, see Message filtering.

      3. In the Transformation step, specify a data cleansing method to implement data processing capabilities such as splitting, mapping, enrichment, and dynamic routing. For more information, see Use Function Compute to perform message cleansing.

      4. In the Sink step, set the Service Type parameter to Elasticsearch acs.elasticSearch and configure other parameters. The following table describes the parameters.

        Parameter

        Description

        Example

        Elasticsearch Cluster

        The Alibaba Cloud Elasticsearch cluster that you created.

        es-cn-pe336j0gj001e****

        Cluster Logon Name

        The logon name that you specified when you created the Elasticsearch cluster. Default logon name: elastic.

        elastic

        Cluster Logon Password

        The password that you specified when you created the Elasticsearch cluster.

        ******

        Index Name

        The name of the index that you created. For information about how to create an index, see Step 3: Create an index. String constants or variants extracted by using the JSONPath syntax are supported. Examples: product_info and $.data.key.

        product_info

        Document Type

        The document type. String constants or variants extracted by using the JSONPath syntax are supported.

        Examples: _doc and $.data.key.

        Note

        This parameter is available only when the version of the Elasticsearch instance is earlier than 7. The default value of this parameter is _doc.

        _doc

        Document

        Specify whether to deliver all content of an event or specific content of an event to Elasticsearch. If you select Partial Event for this parameter, you must specify the JSONPath extraction rule.

        Complete Event

        Network Settings

        • VPC: Messages in ApsaraMQ for Kafka are delivered to Elasticsearch in a VPC.

        • Internet: Messages in ApsaraMQ for Kafka are delivered to Elasticsearch over the Internet.

        Internet

        VPC

        The VPC to which the Elasticsearch instance belongs. This parameter is required only if you set the Network Settings parameter to VPC.

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        The vSwitch to which the Elasticsearch instance belongs. This parameter is required only if you set the Network Settings parameter to VPC.

        vsw-bp1gbjhj53hdjdkg****

        Security Group

        The security group to which the Elasticsearch instance belongs. This parameter is required only if you set the Network Settings parameter to VPC.

        test_group

    • Task Property

      Configure the retry policy that you want to use when events fail to be pushed and the method that you want to use to handle faults. For more information, see Retry policies and dead-letter queues.

  5. Click Save. On the Tasks page, find the Elasticsearch sink connector that you created. When the status in the Status column changes from Starting to Running, the connector is created.

Step 3: Test the Elasticsearch sink connector

  1. On the Tasks page, find the Elasticsearch sink connector that you created and click the source topic in the Event Source column.

  2. On the Topic Details page, click Send Message.
  3. In the Start to Send and Consume Message panel, configure the parameters based on the following figure and click OK.

    发送消息

  4. Log on to the Elasticsearch console and use Kibana to access the instance. For more information, see Getting started.

  5. In the Kibana console of the Elasticsearch cluster, run the following command to view the data insertion result.

    GET /{Index name}/_search

    The following figure shows the data insertion result.测试结果