すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Flinkを使用したログデータの消費

最終更新日:Sep 10, 2024

Simple Log Serviceは、Flinkに接続するためのFlink Log Connectorを提供します。 オープンソースFlinkとRealtime Compute for Apache Flinkがサポートされています。 このトピックでは、FlinkをSimple Log Serviceに接続してログデータを消費する方法について説明します。

前提条件

  • プロジェクトと Logstore が作成済みである必要があります。 詳細については、「プロジェクトの作成」および「Logstore の作成」をご参照ください。

  • RAM (Resource Access Management) ユーザーが作成され、必要な権限がRAMユーザーに付与されます。 詳細については、「RAMユーザーの作成とRAMユーザーへの権限付与」をご参照ください。

  • ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数が設定されています。 詳細については、「環境変数の設定」をご参照ください。

    重要
    • Alibaba CloudアカウントのAccessKeyペアには、すべてのAPI操作に対する権限があります。 RAMユーザーのAccessKeyペアを使用して、API操作を呼び出したり、ルーチンのO&Mを実行したりすることを推奨します。

    • プロジェクトコードにAccessKey IDまたはAccessKey secretを保存しないことを推奨します。 そうしないと、AccessKeyペアが漏洩し、アカウント内のすべてのリソースのセキュリティが侵害される可能性があります。

背景情報

Flink Log Connectorは、Flink Log ConsumerとFlink Log Producerで構成されています。 Flink Log ConsumerとFlink Log Producerの違いを次に示します。

  • Flink Log Consumerは、Simple Log Serviceからデータを読み取ります。 Flink Log Consumerは、シャード間の1回限りのセマンティクスと負荷分散をサポートします。

  • Flink Log Producerは、Simple Log Serviceにデータを書き込みます。

Flink Log Connectorを使用する前に、プロジェクトにMaven依存関係を追加する必要があります。 サンプルコード:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>flink-log-connector</artifactId>
    <version>0.1.38</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.5.0</version>
</dependency>

他のプログラミング言語でコードを記述するには、GitHubのソースコードを参照してください。 詳細については、aliyun-log-flink-connectorをご参照ください。

Flinkログコンシューマ

Flink Log Consumerは、Logstoreからログデータを消費できます。 ログの消費中に、1回だけのセマンティクスが適用されます。 Flink Log Consumerは、Logstore内のシャード数の変化を検出します。

各Flinkサブタスクは、Logstore内の一部のシャードからデータを消費します。 Logstore内のシャードが分割またはマージされると、サブタスクで消費されるシャードも変更されます。

Flink Log Consumerを使用してSimple Log Serviceからデータを消費する場合、次のAPI操作を呼び出すことができます。

  • GetCursorOrData

    この操作では、シャードからログデータをプルします。 この操作を頻繁に呼び出すと、データトラフィックがシャードの機能を超える可能性があります。 ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLISパラメーターを使用して、API呼び出しの間隔を制御できます。 ConfigConstants.LOG_MAX_NUMBER_PER_FETCHパラメーターを使用して、各API呼び出しによってプルされるログの数を制御できます。 シャード機能の詳細については、「シャード」をご参照ください。

    例:

    configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
    configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
  • ListShards

    この操作を呼び出して、Logstore内のすべてのシャードと各シャードのステータスを表示できます。 シャードが頻繁に分割およびマージされる場合は、呼び出し間隔を調整して、シャードの数の変化をタイムリーに検出できます。 例:

    // Call the ListShards operation once every 30 seconds. 
    configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");
  • CreateConsumerGroup

    この操作では、チェックポイントを同期するためのコンシューマグループを作成します。

  • UpdateCheckPoint

    この操作では、Flinkスナップショットをコンシューマーグループに同期します。

  1. 起動パラメーターを設定します。

    次のコードは、データの使用方法の例を示しています。 java.util.Propertiesクラスは設定ツールとして使用され、Flink Log Consumerの設定はConfigConstantsクラスに含まれます。

    Properties configProps = new Properties();
    // Specify the Simple Log Service endpoint. 
    configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
    // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
    String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    configProps.put(ConfigConstants.LOG_ACCESSKEYID,accessKeyId);
    configProps.put(ConfigConstants.LOG_ACCESSKEY,accessKeySecret);
    // Specify the project. 
    String project = "your-project";
    // Specify the Logstore. 
    String logstore = "your-logstore";
    // Specify the start position of log consumption. 
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
    // Specify the method that you want to use for data deserialization. 
    FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<FastLogGroupList> dataStream = env.addSource(
            new FlinkLogConsumer<FastLogGroupList>(project, logstore, deserializer, configProps)
    );
    dataStream.addSink(new SinkFunction<FastLogGroupList>() {
        @Override
        public void invoke(FastLogGroupList logGroupList, Context context) throws Exception {
            for (FastLogGroup logGroup : logGroupList.getLogGroups()) {
                int logsCount = logGroup.getLogsCount();
                String topic = logGroup.getTopic();
                String source = logGroup.getSource();
                for (int i = 0; i < logsCount; ++i) {
                    FastLog row = logGroup.getLogs(i);
                    for (int j = 0; j < row.getContentsCount(); ++j) {
                        FastLogContent column = row.getContents(j);
                        // Process logs.
                        System.out.println(column.getKey());
                        System.out.println(column.getValue());
                    }
                }
            }
        }
    });
    // You can also use RawLogGroupListDeserializer.
    RawLogGroupListDeserializer rawLogGroupListDeserializer = new RawLogGroupListDeserializer();
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<RawLogGroupList> rawLogGroupListDataStream = env.addSource(
            new FlinkLogConsumer<RawLogGroupList>(project, logstore, rawLogGroupListDeserializer, configProps)
    );
    rawLogGroupListDataStream.addSink(new SinkFunction<RawLogGroupList>() {
        @Override
        public void invoke(RawLogGroupList logGroupList, Context context) throws Exception {
            for (RawLogGroup logGroup : logGroupList.getRawLogGroups()) {
                String topic = logGroup.getTopic();
                String source = logGroup.getSource();
                for (RawLog row : logGroup.getLogs()) {
                    // Process logs.
                }
            }
        }
    });
    説明

    Flinkサブタスクの数は、Logstore内のシャードの数とは無関係です。 シャードの数がサブタスクの数より大きい場合、各サブタスクは1つ以上のシャードのデータを消費します。 シャードの数がサブタスクの数より少ない場合、一部のサブタスクは新しいシャードが生成されるまでアイドル状態になります。 各シャードのデータは、1つのサブタスクのみで使用できます。

  2. ログ消費の開始位置を指定します。

    Flink Log Consumerを使用してLogstoreからデータを消費する場合、ConfigConstants.LOG_CONSUMER_BEGIN_POSITIONパラメーターを使用して、ログ消費の開始位置を指定できます。 最も早いログ、最新のログ、または特定の時点からデータの消費を開始できます。 さらに、Flink Log Consumerを使用すると、特定のコンシューマーグループからの消費を再開できます。 パラメーターを次のいずれかの値に設定できます。

    • Consts.LOG_BEGIN_CURSOR: 最も早いログからデータの消費を開始します。

    • Consts.LOG_END_CURSOR: 最新のログからデータの消費を開始します。

    • Consts.LOG_FROM_CHECKPOINT: 指定されたコンシューマーグループに格納されているチェックポイントからデータの消費を開始します。 ConfigConstants.LOG_CONSUMERGROUPパラメーターを使用して、コンシューマーグループを指定できます。

    • UnixTimestamp: ある時点からデータの消費を開始します。 この値は、1970年1月1日00:00:00 UTCから経過した秒数を表すUNIXタイムスタンプです。 INTEGERデータ型の文字列を指定する必要があります。

    例:

    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
    説明

    Flinkタスクの開始時にFlinkのステートバックエンドからの消費再開を設定した場合、Flink Log Connectorはステートバックエンドに格納されたチェックポイントを使用します。

  3. オプション: 消費進捗モニタリングを設定します。

    Flink Log Consumerを使用すると、消費の進行状況を監視できます。 各シャードのリアルタイムの消費位置を取得できます。 消費位置は、タイムスタンプによって示される。 詳細については、「手順3: コンシューマーグループのステータスの表示」をご参照ください。.

    例:

    configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name");
    説明

    このステップは省略可能です。 消費進捗モニタリングを設定した後に消費者グループが存在しない場合、Flink Log consumerは消費者グループを作成します。 コンシューマーグループが存在する場合、操作を実行する必要はなく、Flink Log consumerのスナップショットは自動的にコンシューマーグループに同期されます。 Flink Log Consumerの消費の進行状況は、Simple Log Serviceコンソールで確認できます。

  4. 消費の再開と1回だけのセマンティクスを設定します。

    Flinkのチェックポイント機能が有効になっている場合、Flink Log Consumerは各シャードの消費進行状況を定期的に保存します。 サブタスクが失敗した場合、Flinkはサブタスクを復元し、最新のチェックポイントからデータの消費を開始します。

    チェックポイントが保存される間隔は、サブタスクが失敗した場合に消費される最大データ量を定義します。 次のコードを使用して、間隔を設定できます。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Configure the exactly-once semantics. 
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // Save checkpoints every 5 seconds. 
    env.enableCheckpointing(5000);

    Flinkチェックポイントの詳細については、「チェックポイント」をご参照ください。

Flinkログプロデューサー

Flink Log Producerは、Simple Log Serviceにデータを書き込みます。

説明

Flink Log Producerは、Flink at-least-onceセマンティクスのみをサポートします。 タスクが失敗した場合、Simple Log Serviceに書き込まれた一部のデータが重複する可能性があります。 ただし、データが失われることはありません。

Flink Log Producerを使用してSimple Log Serviceにデータを書き込む場合、次のAPI操作を呼び出すことができます。

  • PutLogs

  • ListShards

  1. Flinkログプロデューサーを初期化します。

    1. Propertiesパラメーターを初期化します。

      Flink Log ProducerはFlink Log Consumerと同じ方法で初期化されます。 Flink Log Producerの初期化パラメーターを設定する例を次に示します。 パラメーターのデフォルト値を使用するか、ビジネス要件を満たすカスタム値を指定できます。 例:

      // The number of I/O threads that are used to send data. The default value is the number of cores. 
      ConfigConstants.IO_THREAD_NUM
      // The maximum time during which logs can be cached before the logs are sent. Default value: 2000. Unit: milliseconds. 
      ConfigConstants.FLUSH_INTERVAL_MS
      // The total memory size that can be used by the task. Default value: 100. Unit: MB. 
      ConfigConstants.TOTAL_SIZE_IN_BYTES
      // The maximum blocking time for sending logs when the memory usage reaches the upper limit. Default value: 60000. Unit: milliseconds. 
      ConfigConstants.MAX_BLOCK_TIME_MS
      // The maximum number of retries. Default value: 10. 
      ConfigConstants.MAX_RETRIES
    2. LogSerializationSchemaをリロードし、データを生のロググループにシリアル化するために使用されるメソッドを定義します。

      生ロググループはログのコレクションです。 ログフィールドの詳細については、「ログ」をご参照ください。

      特定のシャードにデータを書き込む場合は、LogPartitionerパラメーターを使用してログデータのハッシュキーを生成できます。 このパラメーターはオプションです。 このパラメーターを設定しない場合、データはランダムシャードに書き込まれます。

      例:

      FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
      logProducer.setCustomPartitioner(new LogPartitioner<String>() {
            // Generate a 32-bit hash key. 
            public String getHashKey(String element) {
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                }
                return  "0000000000000000000000000000000000000000000000000000000000000000";
            }
        });
  2. シミュレーション結果を文字列形式でSimple Log Serviceに書き込みます。 例:

    // Serialize data into raw log groups. 
    class SimpleLogSerializer implements LogSerializationSchema<String> {
        public RawLogGroup serialize(String element) {
            RawLogGroup rlg = new RawLogGroup();
            RawLog rl = new RawLog();
            rl.setTime((int)(System.currentTimeMillis() / 1000));
            rl.addContent("message", element);
            rlg.addLog(rl);
            return rlg;
        }
    }
    public class ProducerSample {
        public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
        public static String sAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        public static String sAccessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static String sProject = "ali-cn-hangzhou-sls-admin";
        public static String sLogstore = "test-flink-producer";
        private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);
        public static void main(String[] args) throws Exception {
            final ParameterTool params = ParameterTool.fromArgs(args);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(params);
            env.setParallelism(3);
            DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());
            Properties configProps = new Properties();
            // Specify the Simple Log Service endpoint. 
            configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
            // Specify the AccessKey ID and AccessKey secret of your account. 
            configProps.put(ConfigConstants.LOG_ACCESSKEYID, sAccessKeyId);
            configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
            // Specify the project to which you want to write logs. 
            configProps.put(ConfigConstants.LOG_PROJECT, sProject);
            // Specify the Logstore to which you want to write logs. 
            configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);
            FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
            simpleStringStream.addSink(logProducer);
            env.execute("flink log producer");
        }
        // Simulate log generation. 
        public static class EventsGenerator implements SourceFunction<String> {
            private boolean running = true;
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                long seq = 0;
                while (running) {
                    Thread.sleep(10);
                    ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
                }
            }
            @Override
            public void cancel() {
                running = false;
            }
        }
    }

消費の例

この例では、Flink Log Consumerは、FastLogGroupList形式でデータストリームに読み込まれるデータを格納します。 次に、Flink Log ConsumerはflatMap関数を使用して、データをFastLogGroupList形式からJSON文字列形式に変換し、CLIに出力を表示するか、出力をテキストファイルに書き込みます。

package com.aliyun.openservices.log.flink.sample;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.flink.ConfigConstants;
import com.aliyun.openservices.log.flink.FlinkLogConsumer;
import com.aliyun.openservices.log.flink.data.FastLogGroupDeserializer;
import com.aliyun.openservices.log.flink.data.FastLogGroupList;
import com.aliyun.openservices.log.flink.model.CheckpointMode;
import com.aliyun.openservices.log.flink.util.Consts;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class FlinkConsumerSample {
    private static final String SLS_ENDPOINT = "your-endpoint";
    // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
    private static final String ACCESS_KEY_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static final String ACCESS_KEY_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    private static final String SLS_PROJECT = "your-project";
    private static final String SLS_LOGSTORE = "your-logstore";

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);

        Configuration conf = new Configuration();
        // Checkpoint dir like "file:///tmp/flink"
        conf.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "your-checkpoint-dir");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStateBackend(new FsStateBackend("file:///tmp/flinkstate"));
        Properties configProps = new Properties();
        configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
        configProps.put(ConfigConstants.LOG_ACCESSKEYID, ACCESS_KEY_ID);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET);
        configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
        configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
        configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your-consumer-group");
        configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());
        configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000");

        FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
        DataStream<FastLogGroupList> stream = env.addSource(
                new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));

        stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
            for (FastLogGroup logGroup : value.getLogGroups()) {
                int logCount = logGroup.getLogsCount();
                for (int i = 0; i < logCount; i++) {
                    FastLog log = logGroup.getLogs(i);
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("topic", logGroup.getTopic());
                    jsonObject.put("source", logGroup.getSource());
                    for (int j = 0; j < log.getContentsCount(); j++) {
                        jsonObject.put(log.getContents(j).getKey(), log.getContents(j).getValue());
                    }
                    out.collect(jsonObject.toJSONString());
                }
            }
        }).returns(String.class);

        stream.writeAsText("log-" + System.nanoTime());
        env.execute("Flink consumer");
    }
}