全部產品
Search
文件中心

Simple Log Service:Spark Streaming消費

更新時間:Aug 31, 2024

Log Service採集到日誌資料後,您可以通過運行Spark Streaming任務消費日誌資料。

Log Service提供的Spark SDK實現了Receiver模式和Direct模式兩種消費模式。Maven依賴如下:

<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

Receiver模式

Receiver模式通過消費組消費日誌資料並暫存在Spark Executor,Spark Streaming任務啟動之後從Executor讀取並處理資料。每條資料以JSON字串的形式返回。消費組自動定時儲存Checkpoint到服務端,無需手動更新Checkpoint。更多資訊,請參見通過消費組消費日誌資料

  • 參數說明

    參數

    類型

    說明

    project

    String

    Log ServiceProject名稱。

    logstore

    String

    Log ServiceLogstore名稱。

    consumerGroup

    String

    消費組名稱。

    endpoint

    String

    Log ServiceProject所在地區的服務入口。更多資訊,請參見服務存取點

    accessKeyId

    String

    訪問Log Service的AccessKey ID。

    accessKeySecret

    String

    訪問Log Service的AccessKey Secret。

  • 樣本

    說明

    預設配置下,Receiver模式在異常情況下可能導致資料丟失。為了避免此類情況發生,建議開啟Write-Ahead Logs開關(Spark 1.2以上版本支援)。更多關於Write-Ahead Logs的細節請參見Spark

    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

Direct模式

Direct模式不需要消費組,使用API在任務運行時直接從服務端請求資料。Direct模式具有如下優勢:

  • 簡化並行:Spark partition個數與Logstore Shard總數一致。只需分裂Shard即可提高任務的並行度。

  • 高效:不需要開啟Write-Ahead Logs來保證資料不丟失。

  • Exactly-once語義:直接從服務端按需擷取資料,任務成功之後再提交Checkpoint。

    由於Spark異常退出或者其他原因導致任務未正常結束,可能會導致部分資料被重複消費。

Direct模式需要依賴ZooKeeper環境,用於臨時儲存中間狀態。同時,必須設定Checkpoint目錄。中間狀態資料儲存在ZooKeeper內對應的Checkpoint目錄內。如果任務重啟之後希望重新消費,需要在ZooKeeper內刪除該目錄,並更改消費組名稱。

  • 參數說明

    參數

    類型

    說明

    project

    String

    Log ServiceProject名稱。

    logstore

    String

    Log ServiceLogstore名稱。

    consumerGroup

    String

    消費組名稱,僅用於儲存消費位置。

    endpoint

    String

    Log ServiceProject所在地區的服務入口。更多資訊,請參見服務存取點

    accessKeyId

    String

    訪問Log Service的Access Key ID。

    accessKeySecret

    String

    訪問Log Service的Access Key Secret。

    zkAddress

    String

    ZooKeeper的串連地址。

  • 限流配置

    Spark Streaming流式消費是將資料分成微小的批次進行處理,因此Log Service開始消費時,需預知每個批次的邊界,即每個批次需要擷取的資料條數。

    Log Service底層的儲存模型以LogGroup為單位,正常情況下每個LogGroup對應一次寫入請求,例如一次寫入請求可能包含數千條日誌,這些日誌作為一個LogGroup進行儲存和消費。而通過WebTracking方式寫入日誌時,每次寫入請求僅包含一條日誌,即一個LogGroup只有一條日誌。為了能夠應對不同寫入情境的消費需求,SDK提供如下兩個參數進行限流配置。

    參數

    說明

    預設值

    spark.loghub.batchGet.step

    限制單次消費請求擷取的最大LogGroup個數。

    100

    spark.streaming.loghub.maxRatePerShard

    限制單批次內每個Shard消費的日誌條數。

    10000

    通過spark.streaming.loghub.maxRatePerShard可指定每個批次每個Shard期望消費的最大日誌條數。Spark SDK的實現原理是每次從服務端擷取spark.loghub.batchGet.step中的LogGroup個數並累計其中的日誌條數,直到達到或超過spark.streaming.loghub.maxRatePerShard。因此spark.streaming.loghub.maxRatePerShard並非一個精確控制單批次消費日誌條數的參數,實際上每個批次消費的日誌條數與spark.loghub.batchGet.step以及每個LogGroup中包含的日誌條數相關。

  • 樣本

    import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }

更多資訊,請參見GitHub