全部產品
Search
文件中心

Elasticsearch:通過DTS將MySQL資料即時同步到阿里雲ES

更新時間:Nov 20, 2024

當您需要將企業線上的RDS MySQL中的生產資料即時同步到Elasticsearch(ES)中進行搜尋分析時,可通過Data Transmission Service快速建立RDS MySQL到阿里雲ES的即時同步作業,適用於對即時同步要求較高的同步情境。本文介紹如何配置RDS MySQL到阿里雲ES執行個體的即時同步作業,並驗證全量和增量資料同步的結果。

背景資訊

  • Data Transmission Service是一種集資料移轉、資料訂閱及資料即時同步於一體的Data Transmission Service,詳細資料請參見Data Transmission Service。DTS支援同步的SQL操作包括Insert、Delete和Update,支援同步的資料來源版本要求請參見同步方案概覽

  • 通過在DTS中配置從源(MySQL)同步到目標(ES),可實現全量、增量資料同步。適用於對即時同步要求較高的關係型資料庫中資料的同步情境或需要將關係型資料庫中的全量或增量資料同步到阿里雲ES情境。

注意事項

  • DTS不支援同步DDL操作,如果源庫中待同步的表在同步的過程中已經執行了DDL操作,您需要先移除同步對象,然後在ES執行個體中移除該表對應的索引,最後新增同步對象。詳情請參見移除同步對象新增同步對象

  • 如果源庫中待同步的表需要執行增加列的操作,您只需先在ES執行個體中修改對應表的mapping,然後在源庫中執行相應的DDL操作,最後暫停並啟動DTS增量資料同步任務。

  • DTS在執行全量資料初始化時將佔用源庫和目標庫一定的讀寫資源,可能會導致資料庫的負載上升,在資料庫效能較差、規格較低或業務量較大的情況下(例如源庫有大量慢SQL、存在無主鍵表或目標庫存在死結等),可能會加重資料庫壓力,甚至導致資料庫服務不可用。因此您需要在執行資料同步前評估源庫和目標庫的效能,同時建議您在業務低峰期執行資料同步(例如源庫和目標庫的CPU負載在30%以下)。

    • 在業務高峰期全量同步資料,可能造成全量資料同步失敗,重啟全量同步任務即可。

    • 在業務高峰期增量同步處理資料,可能出現資料同步延遲的情況。

  • 由於MySQL和ES執行個體支援的資料類型不同,資料類型無法一一對應。所以DTS在進行結構初始化時,會根據目標庫支援的資料類型進行類型映射,詳情請參見結構初始化涉及的資料類型映射關係

操作流程

  1. 準備環境:先在源庫RDS MySQL中添加待同步資料,然後建立目標庫ES執行個體,並為ES執行個體開啟自動建立索引功能。

  2. 建立資料同步任務:在DTS控制台配置併購買資料同步任務。

  3. 驗證資料同步結果:先在ES的Kibana控制台驗證全量同步結果,然後在源庫RDS MySQL中添加資料,最後在ES的Kibana控制台驗證增量同步處理結果。

操作步驟

步驟一:環境準備

文本以將RDS MySQL 8.0版本執行個體中的資料同步到阿里雲ES 7.10版本執行個體中為例。

準備源庫待同步資料

  1. 建立RDS MySQL 8.0版本執行個體。具體操作,請參見建立RDS MySQL執行個體

  2. 建立帳號和資料庫test_mysql。具體操作,請參見(廢棄,重新導向至“第一步”)建立資料庫和帳號

  3. 在資料庫test_mysql中,建立表es_test並插入資料。使用的建表語句及資料如下:

    -- create table
    CREATE TABLE `es_test` (
        `id` bigint(32) NOT NULL,
        `name` varchar(32) NULL,
        `age` bigint(32) NULL,
        `hobby` varchar(32) NULL,
        PRIMARY KEY (`id`)
    ) ENGINE=InnoDB
    DEFAULT CHARACTER SET=utf8;
    
    -- insert data
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (1,'user1',22,'music');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (2,'user2',23,'sport');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (3,'user3',43,'game');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (4,'user4',24,'run');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (5,'user5',42,'basketball');

準備目標庫ES執行個體

  1. 建立阿里雲ES 7.10版本執行個體。具體操作,請參見建立Elasticsearch執行個體

  2. ES執行個體開啟自動建立索引功能。具體操作,請參見配置YML參數

    image

步驟二:建立資料同步任務

  1. 登入新版DTS同步工作清單頁面

  2. 單擊建立任務

  3. 按照頁面提示配置資料同步任務。

    說明

    以下步驟中涉及的參數的說明,請參見RDS MySQL同步至Elasticsearch

    1. 配置源庫及目標庫,在頁面下方單擊測試連接以進行下一步

      image

    2. 配置任務對象。

      image

    3. 配置進階設定,本文進階配置保持預設。

    4. 庫表列配置頁面,單擊全部設定為非_routing策略,將全部表設定為非_routing策略。

      說明

      目標庫ES執行個體為7.x版本時,全部表必須設定為非_routing策略。

  4. 配置完成後,根據頁面提示儲存並預檢查任務、購買並啟動任務。

    購買並啟動任務成功後,同步任務正式開始。您可在資料同步介面查看具體任務進度,待全量同步完成後,您即可在ES執行個體中查看同步成功的資料。

    image

步驟三(可選):驗證資料同步結果

  1. 登入目標ES執行個體的Kibana控制台。

    登入Kibana控制台,請參見登入Kibana控制台

  2. 在Kibana頁面的左上方,選擇菜單.png > Management > 開發工具(Dev Tools),在控制台(Console)中執行以下命令。

  3. 驗證全量資料同步結果。

    執行如下命令,查看全量資料同步結果。

    GET /es_test/_search

    預期結果如下:

    {
      "took" : 10,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 5,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "3",
            "_score" : 1.0,
            "_source" : {
              "id" : 3,
              "name" : "user3",
              "age" : 43,
              "hobby" : "game"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "5",
            "_score" : 1.0,
            "_source" : {
              "id" : 5,
              "name" : "user5",
              "age" : 42,
              "hobby" : "basketball"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "4",
            "_score" : 1.0,
            "_source" : {
              "id" : 4,
              "name" : "user4",
              "age" : 24,
              "hobby" : "run"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "2",
            "_score" : 1.0,
            "_source" : {
              "id" : 2,
              "name" : "user2",
              "age" : 23,
              "hobby" : "sport"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "id" : 1,
              "name" : "user1",
              "age" : 22,
              "hobby" : "music"
            }
          }
        ]
      }
    }
  4. 驗證增量資料同步結果。

    1. 通過以下SQL語句在RDS MySQL中插入一條資料。

      INSERT INTO `test_mysql`.`es_test` (`id`,`name`,`age`,`hobby`) VALUES (6,'user6',30,'dance');
    2. 等待增量同步處理完成後,再次執行命令GET /es_test/_search,查看增量資料同步結果。

      預期結果如下:

      {
        "took" : 541,
        "timed_out" : false,
        "_shards" : {
          "total" : 5,
          "successful" : 5,
          "skipped" : 0,
          "failed" : 0
        },
        "hits" : {
          "total" : {
            "value" : 6,
            "relation" : "eq"
          },
          "max_score" : 1.0,
          "hits" : [
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "3",
              "_score" : 1.0,
              "_source" : {
                "id" : 3,
                "name" : "user3",
                "age" : 43,
                "hobby" : "game"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "5",
              "_score" : 1.0,
              "_source" : {
                "id" : 5,
                "name" : "user5",
                "age" : 42,
                "hobby" : "basketball"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "4",
              "_score" : 1.0,
              "_source" : {
                "id" : 4,
                "name" : "user4",
                "age" : 24,
                "hobby" : "run"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "2",
              "_score" : 1.0,
              "_source" : {
                "id" : 2,
                "name" : "user2",
                "age" : 23,
                "hobby" : "sport"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "6",
              "_score" : 1.0,
              "_source" : {
                "name" : "user6",
                "id" : 6,
                "age" : 30,
                "hobby" : "dance"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "1",
              "_score" : 1.0,
              "_source" : {
                "id" : 1,
                "name" : "user1",
                "age" : 22,
                "hobby" : "music"
              }
            }
          ]
        }
      }