全部產品
Search
文件中心

E-MapReduce:使用E-MapReduce進行MySQL Binlog日誌准即時傳輸

更新時間:Jul 01, 2024

本文介紹如何利用阿里雲SLS外掛程式功能和E-MapReduce叢集進行MySQL Binlog的准即時傳輸。

前提條件

  • 已在E-MapReduce上建立Hadoop叢集,詳情請參見建立叢集

  • 已建立MySQL類型的資料庫(例如RDS或DRDS)。MySQL必須開啟Binlog,且Binlog必須為ROW模式。

    本文以RDS為例介紹,詳情請參見建立RDS MySQL執行個體

    說明

    RDS預設已開啟Binlog功能。

操作步驟

  1. 串連MySQL執行個體並添加使用者權限。

    1. 使用命令方式串連MySQL執行個體,詳情請參見通過用戶端、命令列串連RDS

    2. 執行以下命令添加使用者權限。

      CREATE USER canal IDENTIFIED BY 'canal';
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
      FLUSH PRIVILEGES;
  2. 為SLS服務添加對應的設定檔,詳情請參見採集MySQL Binlog

    說明

    本文建立的Project名稱為canaltest,Logstore名稱為canal。

    在SLS控制台查看日誌資料是否上傳成功,如果未上傳成功請根據SLS的採集日誌排查。

  3. 編譯JAR包並上傳至OSS。

    1. 在本地開啟Git複製範例程式碼。

      git clone https://github.com/aliyun/aliyun-emapreduce-demo.git
    2. 修改範例程式碼。

      範例程式碼中已經有LoghubSample類,該類主要用於從SLS採集資料並列印。以下樣本為修改後的代碼。

      package com.aliyun.emr.example
      import org.apache.spark.SparkConf
      import org.apache.spark.storage.StorageLevel
      import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
      import org.apache.spark.streaming.{Milliseconds, StreamingContext}
      object LoghubSample {
      def main(args: Array[String]): Unit = {
      if (args.length < 7) {
       System.err.println(
         """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
           |            
           |           
         """.stripMargin)
       System.exit(1)
      }
      val loghubProject = args(0)
      val logStore = args(1)
      val loghubGroupName = args(2)
      val endpoint = args(3)
      val accessKeyId = args(4)
      val accessKeySecret = args(5)
      val batchInterval = Milliseconds(args(6).toInt * 1000)
      val conf = new SparkConf().setAppName("Mysql Sync")
      //    conf.setMaster("local[4]");
      val ssc = new StreamingContext(conf, batchInterval)
      val loghubStream = LoghubUtils.createStream(
       ssc,
       loghubProject,
       logStore,
       loghubGroupName,
       endpoint,
       1,
       accessKeyId,
       accessKeySecret,
       StorageLevel.MEMORY_AND_DISK)
      loghubStream.foreachRDD(rdd =>
         rdd.saveAsTextFile("/mysqlbinlog")
      )
      ssc.start()
      ssc.awaitTermination()
      }
      }

      範例程式碼主要修改loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile("/mysqlbinlog") )loghubStream.foreachRDD(rdd => rdd.saveAsTextFile("/mysqlbinlog")),以便於在E-MapReduce中運行時,儲存Spark Streaming中流出來的資料至EMR的HDFS。

    3. 您可以在本地完成代碼調試後,通過如下命令打包。

      mvn clean install
    4. 上傳JAR包至OSS。

      在OSS上建立儲存空間和上傳檔案,詳情請參見控制台建立儲存空間控制台上傳檔案

      說明

      本樣本在OSS上建立的Bucket為EMR-test,上傳examples-1.1-shaded.jarEMR-test/jar目錄。

  4. 提交並運行Spark作業。

    spark-submit --master yarn --deploy-mode client \
      --driver-memory 4g --executor-memory 2g --executor-cores 2 \
      --class com.aliyun.EMR.example.LoghubSample \
      oss://EMR-test/jar/examples-1.1-shaded.jar \
      canaltest canal sparkstreaming cn-hangzhou-intranet.log.aliyuncs.com \
      $ALIBABA_CLOUD_ACCESS_KEY_ID $ALIBABA_CLOUD_ACCESS_KEY_SECRET 1
    說明

    運行程式碼範例前必須先配置環境變數。關於如何配置環境變數,請參見配置環境變數

  5. 執行如下命令查看mysqlbinlog目錄下的檔案。

    hadoop fs -ls /mysqlbinlog

    您還可以通過執行命令hadoop fs -cat /mysqlbinlog/part-00000查看檔案內容。