All Products
Search
Document Center

Elasticsearch:Use Logstash to synchronize data from ApsaraDB RDS for MySQL to Elasticsearch

Last Updated:Dec 11, 2024

If you want to synchronize data from an ApsaraDB RDS for MySQL database to an Alibaba Cloud Elasticsearch cluster, you can use the logstash-input-jdbc plug-in and the pipeline configuration feature provided by Alibaba Cloud Logstash. logstash-input-jdbc is a built-in plug-in of Alibaba Cloud Logstash and cannot be removed. You can use this method to synchronize full or incremental data from an ApsaraDB RDS for MySQL database to an Alibaba Cloud Elasticsearch cluster. This topic describes the procedure in detail.

Background information

Alibaba Cloud Logstash is a powerful data processing tool that you can use to collect, transform, optimize, and generate data. Alibaba Cloud Logstash provides the logstash-input-jdbc plug-in, which is installed on Logstash clusters by default and cannot be removed. The plug-in can query multiple data records in ApsaraDB RDS for MySQL at a time and synchronize the data records to Alibaba Cloud Elasticsearch. The plug-in uses a round-robin method to identify the most recently inserted or updated data records in ApsaraDB RDS for MySQL on a regular basis and synchronizes the data records to Alibaba Cloud Elasticsearch. For more information, see Use Logstash and JDBC to synchronize data from a relational database to an Elasticsearch cluster. If you want to synchronize full data and can accept latency of a few seconds, or you want to query specific data at a time and synchronize the data, you can use Alibaba Cloud Logstash.

Prerequisites

We recommend that you create the following instance and clusters in the same virtual private cloud (VPC).

Note

You can also use Logstash to synchronize data from an ApsaraDB RDS for MySQL instance that is deployed on the Internet. Before you perform the operation, you must configure a Source Network Address Translation (SNAT) entry for the Logstash cluster, enable the public IP address of the ApsaraDB RDS for MySQL instance, and add the related IP addresses to a whitelist of the ApsaraDB RDS for MySQL instance. For more information about how to configure an SNAT entry, see Configure a NAT gateway for data transmission over the Internet. For more information about how to add IP addresses to a whitelist of the ApsaraDB RDS for MySQL instance, see Configure an IP address whitelist for an ApsaraDB RDS for MySQL instance.

Limits

  • The ApsaraDB RDS for MySQL instance, Elasticsearch cluster, and Logstash cluster must reside in the same time zone. If they do not reside in the same time zone, the time-related data may have a time zone offset after the synchronization.

  • The values of the _id field in the Elasticsearch cluster must be the same as those of the id field in the ApsaraDB RDS for MySQL database.

    This condition ensures that the data synchronization task can establish a mapping between data records in the ApsaraDB RDS for MySQL database and documents in the Elasticsearch cluster. If you update a data record in the ApsaraDB RDS for MySQL database, the data synchronization task uses the updated data record to overwrite the document that has the same ID in the Elasticsearch cluster.

    Note

    In essence, an update operation in Elasticsearch deletes the original document and indexes the new document. Therefore, the overwrite operation is as efficient as an update operation performed by the data synchronization task.

  • If you insert a data record into or update a data record in the ApsaraDB RDS for MySQL database, the data record must contain a field that indicates the time when the data record is inserted or updated.

    Each time the logstash-input-jdbc plug-in performs a round robin, the plug-in records the time when the last data record in the round robin is inserted into or updated in the ApsaraDB RDS for MySQL database. Logstash synchronizes only data records that meet the following requirements from the ApsaraDB RDS for MySQL database: The time when the data records are inserted into or updated in the ApsaraDB RDS for MySQL database is later than the time when the last data record in the previous round robin is inserted into or updated in the ApsaraDB RDS for MySQL database.

    Important

    If you delete data records in the ApsaraDB RDS for MySQL database, the logstash-input-jdbc plug-in cannot delete the documents that have the same IDs from the Elasticsearch cluster. To delete the documents from the Elasticsearch cluster, you must run the related command on the Elasticsearch cluster.

Procedure

Step 1: Make preparations

  1. Enable the Auto Indexing feature for the Elasticsearch cluster. For more information, see Access and configure an Elasticsearch cluster.

  2. In the Logstash cluster, upload an SQL JDBC driver whose version is compatible with the version of the ApsaraDB RDS for MySQL instance. In this example, the mysql-connector-java-5.1.48.jar driver is used. For more information, see Configure third-party libraries.

  3. Prepare test data. In this example, the following statement is used to create a table:

    CREATE table food (
      id int PRIMARY key AUTO_INCREMENT,
      name VARCHAR (32),
      insert_time DATETIME,
      update_time DATETIME
    );

    The following statements are used to insert data into the table:

    INSERT INTO food values(null,'Chocolates',now(),now());
    INSERT INTO food values(null,'Yogurt',now(),now());
    INSERT INTO food values(null,'Ham sausage',now(),now());
  4. Add the IP addresses of the nodes in the Logstash cluster to a whitelist of the ApsaraDB RDS for MySQL instance. You can obtain the IP addresses on the Basic Information page of the Logstash cluster.

Step 2: Configure a Logstash pipeline

  1. Go to the Logstash Clusters page of the Alibaba Cloud Elasticsearch console.

  2. Navigate to the desired cluster.

    1. In the top navigation bar, select the region where the cluster resides.

    2. On the Logstash Clusters page, find the cluster and click its ID.

  3. In the left-side navigation pane of the page that appears, click Pipelines.

  4. On the Pipelines page, click Create Pipeline.

  5. On the Create page, enter a pipeline ID in the Pipeline ID field and enter the required configurations in the Config Settings field.

    In this example, the following configurations are entered in the Config Settings field:

    input {
      jdbc {
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.48.jar"
        jdbc_connection_string => "jdbc:mysql://rm-bp1xxxxx.mysql.rds.aliyuncs.com:3306/<Database name>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
        jdbc_user => "xxxxx"
        jdbc_password => "xxxx"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        statement => "select * from food where update_time >= :sql_last_value"
        schedule => "* * * * *"
        record_last_run => true
        last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt"
        clean_run => false
        tracking_column_type => "timestamp"
        use_column_value => true
        tracking_column => "update_time"
      }
    }
    filter {
    }
    output {
     elasticsearch {
        hosts => "http://es-cn-0h****dd0hcbnl.elasticsearch.aliyuncs.com:9200"
        index => "rds_es_dxhtest_datetime"
        user => "elastic"
        password => "xxxxxxx"
        document_id => "%{id}"
      }
    }
    Note

    You must replace <Logstash cluster ID> in the preceding code with the ID of the Logstash cluster that you use. For more information about how to obtain the ID, see View the basic information of a cluster.

    Table 1. Description of the configurations in the Config Settings field

    Parameter

    Description

    input

    Specifies the input data source. For more information about the supported data source types, see Input plugins. In this example, an input data source that is connected by using JDBC is used. For more information about the related parameters, see Parameters in the input part.

    filter

    Specifies the plug-in that is used to filter input data. For more information about the supported plug-ins, see Filter plugins.

    output

    Specifies the output data source. For more information about the supported data source types, see Output plugins. In this example, data in an ApsaraDB RDS for MySQL database is synchronized to an Elasticsearch cluster. Therefore, the information of the Elasticsearch cluster is configured in the output part. For more information about the related parameters, see Step 3: Create and run a Logstash pipeline.

    Important

    If the file_extend parameter is specified in the output configuration of a pipeline, you must make sure that the logstash-output-file_extend plug-in is installed for the Logstash cluster. For more information, see Install or remove a Logstash plug-in.

    Table 2. Parameters in the input part

    Parameter

    Description

    jdbc_driver_class

    The class of the JDBC driver.

    jdbc_driver_library

    The driver file that is used for the JDBC-based connection to the ApsaraDB RDS for MySQL database. Configure this parameter in the /ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/<Name of the driver file> format. You must upload the desired driver file in the Elasticsearch console in advance. For more information about the driver files that are supported by Logstash and how to upload a driver file, see Configure third-party libraries.

    jdbc_connection_string

    The connection string that is used to connect to the ApsaraDB RDS for MySQL database. The connection string contains the endpoint and port number of the related ApsaraDB RDS for MySQL instance and the name of the ApsaraDB RDS for MySQL database. Configure this parameter in the following format: jdbc:mysql://<Endpoint of the ApsaraDB RDS for MySQL instance>:<Port number of the ApsaraDB RDS for MySQL instance>/<Name of the ApsaraDB RDS for MySQL database>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false.

    • <Endpoint of the ApsaraDB RDS for MySQL instance>: You must specify the internal endpoint of the ApsaraDB RDS for MySQL instance.

    • Note

      If you want to use the public endpoint of the ApsaraDB RDS for MySQL instance, you must configure a Network Address Translation (NAT) gateway for the Logstash cluster and configure the parameter in the jdbc:mysql://<Public endpoint of the ApsaraDB RDS for MySQL instance>:<Port number of the ApsaraDB RDS for MySQL instance> format to enable the Logstash cluster to connect to the Internet. For more information, see Configure a NAT gateway for data transmission over the Internet.

    • <Port number of the ApsaraDB RDS for MySQL instance>: The port number must be the same as the port number of the outbound traffic of the ApsaraDB RDS for MySQL instance. In most cases, the port number is 3306.

    jdbc_user

    The username that is used to access the ApsaraDB RDS for MySQL database.

    jdbc_password

    The password that is used to access the ApsaraDB RDS for MySQL database.

    jdbc_paging_enabled

    Specifies whether to enable paging. Default value: false.

    jdbc_page_size

    The number of entries per page.

    statement

    The SQL statement that is used to query data from the ApsaraDB RDS for MySQL database. If you want to query data from multiple tables in the ApsaraDB RDS for MySQL database, you can use a JOIN statement.

    Note

    The value of sql_last_value is used to calculate the rows to query. By default, this parameter is set to Thursday, 1 January 1970. For more information, see Jdbc input plugin.

    schedule

    The interval at which the SQL statement is executed. The value * * * * * indicates that the SQL statement is executed every minute. Set this parameter to a cron expression that is supported by Rufus.

    record_last_run

    Specifies whether to record the last execution result. If this parameter is set to true, the value of tracking_column in the last execution result is stored in the file in the path specified by using the last_run_metadata_path parameter.

    last_run_metadata_path

    The path of the file that contains the last execution time. A file path is provided at the backend. The path is in the /ssd/1/<Logstash cluster ID>/logstash/data/ format. After you specify a path, Logstash automatically generates a file in the path, but you cannot view the data in the file.

    Note

    We recommend that you configure this parameter in the /ssd/1/<Logstash cluster ID>/logstash/data/ format when you configure a Logstash pipeline. If you configure this parameter in another format, the condition records for synchronization cannot be stored in the file in the path specified by using the last_run_metadata_path parameter. The storage failure is due to insufficient permissions.

    clean_run

    Specifies whether to clear the path that is specified by using the last_run_metadata_path parameter. Default value: false. If this parameter is set to true, each query starts from the first entry in the database.

    use_column_value

    Specifies whether to record the values of a specific column. If this parameter is set to true, the system records the latest value of the column that is specified by using tracking_column and determines the records that need to be updated in the file based on the value of tracking_column when the SQL statement runs for the next time.

    tracking_column_type

    The type of the column whose values you want to track. Default value: numeric.

    tracking_column

    The column whose values you want to track. The values must be sorted in ascending order. In most cases, this column is the primary key.

    Important
    • The preceding configurations are based on test data. You can configure the pipeline based on your business requirements. For more information about other parameters supported by the input plug-in, see Logstash Jdbc input plugin.

    • If your configurations contain a parameter similar to last_run_metadata_path, the file path must be provided by Logstash. A path in the /ssd/1/<Logstash cluster ID>/logstash/data/ format is provided at the backend and is available for tests. The system does not delete the data in this path. Make sure that your disk has sufficient storage space when you use this path. After you specify a path, Logstash automatically generates a file in the path, but you cannot view the data in the file.

    • For security purposes, if you specify a JDBC driver when you configure a pipeline, you must add allowLoadLocalInfile=false&autoDeserialize=false at the end of the jdbc_connection_string parameter, such as jdbc_connection_string => "jdbc:mysql://xxx.drds.aliyuncs.com:3306/<Database name>?allowLoadLocalInfile=false&autoDeserialize=false". Otherwise, when you add a configuration file for the Logstash pipeline, the system displays an error message that indicates a check failure.

    For more information about how to configure parameters in the Config Settings field, see Logstash configuration files.

  6. Click Next to configure pipeline parameters.

    管道参数配置

    Parameter

    Description

    Pipeline Workers

    The number of worker threads that run the filter and output plug-ins of the pipeline in parallel. If a backlog of events exists or some CPU resources are not used, we recommend that you increase the number of threads to maximize CPU utilization. The default value of this parameter is the number of vCPUs.

    Pipeline Batch Size

    The maximum number of events that a single worker thread can collect from input plug-ins before it attempts to run filter and output plug-ins. If you set this parameter to a large value, a single worker thread can collect more events but consumes larger memory. If you want to make sure that the worker thread has sufficient memory to collect more events, specify the LS_HEAP_SIZE variable to increase the Java virtual machine (JVM) heap size. Default value: 125.

    Pipeline Batch Delay

    The wait time for an event. This time occurs before you assign a small batch to a pipeline worker thread and after you create batch tasks for pipeline events. Default value: 50. Unit: milliseconds.

    Queue Type

    The internal queue model for buffering events. Valid values:

    • MEMORY: traditional memory-based queue. This is the default value.

    • PERSISTED: disk-based ACKed queue, which is a persistent queue.

    Queue Max Bytes

    The value must be less than the total capacity of your disk. Default value: 1024. Unit: MB.

    Queue Checkpoint Writes

    The maximum number of events that are written before a checkpoint is enforced when persistent queues are enabled. The value 0 indicates no limit. Default value: 1024.

    Warning

    After you configure the parameters, you must save the settings and deploy the pipeline. This triggers a restart of the Logstash cluster. Before you can proceed, make sure that the restart does not affect your business.

  7. Click Save or Save and Deploy.

    • Save: After you click this button, the system stores the pipeline settings and triggers a cluster change. However, the settings do not take effect. After you click Save, the Pipelines page appears. On the Pipelines page, find the created pipeline and click Deploy Now in the Actions column. Then, the system restarts the Logstash cluster to make the settings take effect.

    • Save and Deploy: After you click this button, the system restarts the Logstash cluster to make the settings take effect.

Step 3: Verify the result

  1. Log on to the Kibana console of the Elasticsearch cluster.

    For more information, see Log on to the Kibana console.

  2. In the upper-left corner, click the 菜单.png icon and choose Management > Dev Tools.

  3. On the Console tab of the page that appears, run the following command to view the number of indexes that store synchronized data:

    GET rds_es_dxhtest_datetime/_count
    {
      "query": {"match_all": {}}
    }

    If the command is successfully run, the following result is returned:

    {
      "count" : 3,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      }
    }
  4. Update the data in the MySQL table and insert data into the table.

    UPDATE food SET name='Chocolates',update_time=now() where id = 1;
    INSERT INTO food values(null,'Egg',now(),now());
  5. View the updated and inserted data.

    • Query the data record in which the value of name is Chocolates.

      GET rds_es_dxhtest_datetime/_search
      {
        "query": {
          "match": {
            "name": "Chocolates"
         }}
      }

      If the command is successfully run, the following result is returned:

      返回结果

    • Query all data.

      GET rds_es_dxhtest_datetime/_search
      {
        "query": {
          "match_all": {}
        }
      }

      If the command is successfully run, the result shown in the following figure is returned.

      返回结果

FAQ

  • Q: What do I do if my data synchronization task fails because the pipeline is stuck in the initializing state, data before and after synchronization is inconsistent, or the connection to the database fails?

    A: Check whether the cluster logs of your Logstash cluster contain error information and identify the cause based on the error information. For more information, see Query logs. The following table describes common causes of errors and solutions to the errors.

    Note

    If an update operation is being performed on your Logstash cluster when you perform the operations described in the following solutions, pause the update operation by referring to View the progress of a cluster task. After the operations described in the solutions are complete, the system restarts the Logstash cluster and resumes the update operation.

    Cause

    Solution

    The IP addresses of nodes in the Logstash cluster are not added to a whitelist of the ApsaraDB RDS for MySQL instance.

    Add the IP addresses of nodes in the Logstash cluster to a whitelist of the ApsaraDB RDS for MySQL instance. For more information, see Use a database client or the CLI to connect to an ApsaraDB RDS for MySQL instance.

    Note

    For more information about how to obtain the IP addresses of the nodes in a Logstash cluster, see View the basic information of a cluster.

    You use the Logstash cluster to synchronize data from a self-managed MySQL database that is hosted on an ECS instance, but the private IP addresses and internal ports of nodes in the cluster are not added to a security group of the ECS instance.

    Add the private IP addresses and internal ports of nodes in the cluster to a security group of the ECS instance. For more information, see Add a security group rule.

    Note

    For more information about how to obtain the IP address and port of a node in a Logstash cluster, see View the basic information of a cluster.

    The Elasticsearch cluster does not reside in the same VPC as the Logstash cluster.

    Use one of the following solutions:

    The endpoint of the ApsaraDB RDS for MySQL instance is incorrect, and the port number of the instance is not 3306.

    Obtain the correct endpoint and port number. For more information, see View and manage instance endpoints and ports. Then, replace the endpoint and port number in the value of the jdbc_connection_string parameter with the endpoint and port number that you obtained.

    Important

    <Endpoint of the ApsaraDB RDS for MySQL instance>: You must specify the internal endpoint of the ApsaraDB RDS for MySQL instance. If you want to use the public endpoint of the ApsaraDB RDS for MySQL instance, you must configure a NAT gateway for the Logstash cluster to enable the Logstash cluster to connect to the Internet. For more information, see Configure a NAT gateway for data transmission over the Internet.

    The Auto Indexing feature is disabled for the Elasticsearch cluster.

    Enable the Auto Indexing feature for the Elasticsearch cluster. For more information, see Configure the YML file.

    The load of the Elasticsearch or Logstash cluster is excessively high.

    Upgrade the configuration of the Elasticsearch or Logstash cluster. For more information, see Upgrade the configuration of a cluster.

    Note

    If the load of the Elasticsearch cluster is excessively high, you can view the monitoring data collected based on metrics in the Elasticsearch console to obtain the load information of the Elasticsearch cluster. For more information, see Metrics and exception handling suggestions. If the load of the Logstash cluster is excessively high, you can enable the X-Pack Monitoring feature for the Logstash cluster, use the feature to monitor the Logstash cluster, and then view the monitoring data. For more information, see Enable the X-Pack Monitoring feature.

    No driver file that is used for JDBC-based connection to the ApsaraDB RDS for MySQL database is uploaded.

    Upload a driver file. For more information, see Configure third-party libraries.

    file_extend is specified in the configuration of the pipeline. However, the logstash-output-file_extend plug-in is not installed.

    Use one of the following solutions:

    • Install the logstash-output-file_extend plug-in. For more information, see Install and remove a plug-in.

    • Remove the file_extend parameter from the configuration of the pipeline.

    For more information about the causes of and solutions to this issue, see FAQ about data transfer by using Logstash.

  • Q: How do I configure multiple JDBC connections to the source database in the input configuration of a pipeline?

    A: You can define multiple JDBC data sources in the input configuration of the pipeline and specify a query statement for each table in the statement parameter. The following code provides an example:

    input {
        jdbc {
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.48.jar"
          jdbc_connection_string => "jdbc:mysql://rm-bp1xxxxx.mysql.rds.aliyuncs.com:3306/<Database name>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
          jdbc_user => "xxxxx"
          jdbc_password => "xxxx"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          statement => "select * from tableA where update_time >= :sql_last_value"
          schedule => "* * * * *"
          record_last_run => true
          last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt"
          clean_run => false
          tracking_column_type => "timestamp"
          use_column_value => true
          tracking_column => "update_time"
          type => "A"
        }
        jdbc {
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.48.jar"
          jdbc_connection_string => "jdbc:mysql://rm-bp1xxxxx.mysql.rds.aliyuncs.com:3306/<Database name>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
          jdbc_user => "xxxxx"
          jdbc_password => "xxxx"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          statement => "select * from tableB where update_time >= :sql_last_value"
          schedule => "* * * * *"
          record_last_run => true
          last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt"
          clean_run => false
          tracking_column_type => "timestamp"
          use_column_value => true
          tracking_column => "update_time"
          type => "B"
        }
    }
    output {
        if[type] == "A" {
            elasticsearch {
                hosts => "http://es-cn-0h****dd0hcbnl.elasticsearch.aliyuncs.com:9200"
                index => "rds_es_dxhtest_datetime_A"
                user => "elastic"
                password => "xxxxxxx"
                document_id => "%{id}"
            }
        }
        if[type] == "B" {
            elasticsearch {            
                hosts => "http://es-cn-0h****dd0hcbnl.elasticsearch.aliyuncs.com:9200"
                index => "rds_es_dxhtest_datetime_B"
                user => "elastic"
                password => "xxxxxxx"
                document_id => "%{id}"
            }
        }
    }

    In the preceding example, the type parameter is added to the jdbc configuration. This parameter helps you define conditions in the output configuration and synchronize data from different tables to different indexes.