すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Spark Streamingを使用したログデータの消費

最終更新日:Sep 10, 2024

ログデータがSimple log Serviceに収集された後、Spark Streamingを使用してデータを消費できます。

Alibaba Cloudが提供するSpark SDKを使用すると、Simple log ServiceのログデータをReceiverモードまたはDirectモードで使用できます。 次のMaven依存関係を追加する必要があります。

<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

Receiverモードでログデータを使用する

Receiverモードでは、コンシューマーグループはSimple Log Serviceからデータを消費し、そのデータをSparkエグゼキュータに一時的に保存します。 Spark Streamingジョブが開始されると、コンシューマグループはSparkエグゼキュータからデータを読み取り、処理します。 各ログエントリはJSON文字列として返されます。 コンシューマーグループは、チェックポイントをSimple Log Serviceに定期的に保存します。 チェックポイントを更新する必要はありません。 詳細については、「コンシューマーグループを使用したログデータの消費」をご参照ください。

  • パラメーター

    パラメーター

    データ型

    説明

    project

    String

    Simple Log Serviceのプロジェクトの名前。

    logstore

    String

    Simple Log ServiceのLogstoreの名前。

    consumerGroup

    String

    消費者グループの名前です。

    エンドポイント

    String

    Simple Log Serviceプロジェクトが存在するリージョンのエンドポイント。 詳細については、「エンドポイント」をご参照ください。

    accessKeyId

    String

    Simple Log Serviceへのアクセスに使用されるAccessKey ID。

    accessKeySecret

    String

    Simple Log Serviceへのアクセスに使用されるAccessKeyシークレット。

  • 例:

    説明

    Receiverモードでは、デフォルト設定が使用されている場合、データ損失が発生する可能性があります。 データの損失を回避するには、先への書き込みログ機能を有効にします。 この機能はSpark 1.2以降で使用できます。 ログの書き込み機能の詳細については、「Spark」をご参照ください。

    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{ Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

ダイレクトモードでログデータを使用する

Directモードでは、コンシューマグループは必要ありません。 API操作を呼び出して、Simple Log Serviceからデータを要求できます。 ダイレクトモードでログデータを使用すると、次の利点があります。

  • 単純化された同時実行。 Sparkパーティションの数は、Logstore内のシャードの数と同じです。 シャードを分割して、タスクの同時実行性を向上させることができます。

  • 効率の向上。 データの損失を防ぐために、先への書き込みログ機能を有効にする必要がなくなりました。

  • 正確に-一度のセマンティクス。 データはSimple Log Serviceから直接読み取られます。 チェックポイントは、タスクが成功した後に送信されます。

    場合によっては、予期しないSparkの終了によりタスクが終了すると、データが繰り返し消費されることがあります。

Directモードでデータを使用する場合は、ZooKeeperサービスを設定する必要があります。 このサービスは、中間状態でデータを保存するために使用されます。 中間データを格納するには、ZooKeeperサービスにチェックポイントディレクトリを設定する必要があります。 タスクの再起動後にデータを再消費するには、ZooKeeperから対応するディレクトリを削除し、消費者グループの名前を変更する必要があります。

  • パラメーター

    パラメーター

    データ型

    説明

    project

    String

    Simple Log Serviceのプロジェクトの名前。

    logstore

    String

    Simple Log ServiceのLogstoreの名前。

    consumerGroup

    String

    消費者グループの名前です。 この名前は、消費チェックポイントを保存するためにのみ使用されます。

    エンドポイント

    String

    Simple Log Serviceプロジェクトが存在するリージョンのエンドポイント。 詳細については、「エンドポイント」をご参照ください。

    accessKeyId

    String

    Simple Log Serviceへのアクセスに使用されるAccessKey ID。

    accessKeySecret

    String

    Simple Log Serviceへのアクセスに使用されるAccessKeyシークレット。

    zkAddress

    String

    ZooKeeperサービスの接続URL。

  • 消費制限

    Spark Streamingは、各シャードのデータを1つのバッチで消費します。 各バッチで消費されるログエントリの数を指定する必要があります。

    Simple Log Serviceでは、ロググループが各書き込み要求の基本単位として機能します。 例えば、書き込み要求は、複数のログエントリを含み得る。 これらのログエントリは、ロググループとして保存および使用されます。 webトラッキングを使用してログを書き込む場合、各書き込みリクエストには1つのログエントリのみが含まれます。 この場合、リクエストに対応するロググループに含まれるログエントリは1つだけです。 パラメーターを指定して、単一バッチ内のログデータの量を制限できます。 次の表に、2つのパラメーターを示します。

    パラメーター

    説明

    デフォルト

    spark.loghub.batchGet.step

    1つの消費リクエストに対して返されるロググループの最大数。

    100

    spark.streaming.loghub.maxRatePerShard

    1つのバッチで各シャードから消費されるログエントリの最大数。

    10000

    spark.streaming.loghub.maxRatePerShardパラメーターを指定することで、各バッチの各シャードから消費されるログエントリの最大数を設定できます。 Spark SDKは、spark.loghub.batchGet.stepパラメーターからロググループの数を取得し、これらのロググループ内のログエントリの数を累積することにより、Simple log Serviceからログデータを消費します。 累積数がspark.streaming.loghub.maxRatePerShardパラメーターで指定された数以上になると、Spark SDKはログデータの消費を停止します。 spark.streaming.loghub.maxRatePerShardパラメーターは、各バッチで消費されるログエントリの数を正確に制御しません。 各バッチで消費されたログエントリの数は、spark.loghub.batchGet.stepパラメーターと各ロググループのログエントリの数に基づいています。

  • 例:

    import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{ Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{ CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            project,
            logStore,
            consumerGroup,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }

詳細については、『GitHub』をご参照ください。