本文介紹如何利用阿里雲SLS外掛程式功能和E-MapReduce叢集進行MySQL Binlog的准即時傳輸。
前提條件
已在E-MapReduce上建立Hadoop叢集,詳情請參見建立叢集。
已建立MySQL類型的資料庫(例如RDS或DRDS)。MySQL必須開啟Binlog,且Binlog必須為ROW模式。
本文以RDS為例介紹,詳情請參見建立RDS MySQL執行個體。
說明RDS預設已開啟Binlog功能。
操作步驟
串連MySQL執行個體並添加使用者權限。
使用命令方式串連MySQL執行個體,詳情請參見通過用戶端、命令列串連RDS。
執行以下命令添加使用者權限。
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
為SLS服務添加對應的設定檔,詳情請參見採集MySQL Binlog。
說明本文建立的Project名稱為canaltest,Logstore名稱為canal。
在SLS控制台查看日誌資料是否上傳成功,如果未上傳成功請根據SLS的採集日誌排查。
編譯JAR包並上傳至OSS。
在本地開啟Git複製範例程式碼。
git clone https://github.com/aliyun/aliyun-emapreduce-demo.git修改範例程式碼。
範例程式碼中已經有LoghubSample類,該類主要用於從SLS採集資料並列印。以下樣本為修改後的代碼。
package com.aliyun.emr.example import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.aliyun.logservice.LoghubUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} object LoghubSample { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar | | """.stripMargin) System.exit(1) } val loghubProject = args(0) val logStore = args(1) val loghubGroupName = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val conf = new SparkConf().setAppName("Mysql Sync") // conf.setMaster("local[4]"); val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, loghubProject, logStore, loghubGroupName, endpoint, 1, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.foreachRDD(rdd => rdd.saveAsTextFile("/mysqlbinlog") ) ssc.start() ssc.awaitTermination() } }範例程式碼主要修改
loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile("/mysqlbinlog") )為loghubStream.foreachRDD(rdd => rdd.saveAsTextFile("/mysqlbinlog")),以便於在E-MapReduce中運行時,儲存Spark Streaming中流出來的資料至EMR的HDFS。您可以在本地完成代碼調試後,通過如下命令打包。
mvn clean install上傳JAR包至OSS。
提交並運行Spark作業。
spark-submit --master yarn --deploy-mode client \ --driver-memory 4g --executor-memory 2g --executor-cores 2 \ --class com.aliyun.EMR.example.LoghubSample \ oss://EMR-test/jar/examples-1.1-shaded.jar \ canaltest canal sparkstreaming cn-hangzhou-intranet.log.aliyuncs.com \ $ALIBABA_CLOUD_ACCESS_KEY_ID $ALIBABA_CLOUD_ACCESS_KEY_SECRET 1說明運行程式碼範例前必須先配置環境變數。關於如何配置環境變數,請參見配置環境變數。
執行如下命令查看mysqlbinlog目錄下的檔案。
hadoop fs -ls /mysqlbinlog您還可以通過執行命令
hadoop fs -cat /mysqlbinlog/part-00000查看檔案內容。