Simple Log Serviceは、Flinkに接続するためのFlink Log Connectorを提供します。 オープンソースFlinkとRealtime Compute for Apache Flinkがサポートされています。 このトピックでは、FlinkをSimple Log Serviceに接続してログデータを消費する方法について説明します。
前提条件
プロジェクトと Logstore が作成済みである必要があります。 詳細については、「プロジェクトの作成」および「Logstore の作成」をご参照ください。
RAM (Resource Access Management) ユーザーが作成され、必要な権限がRAMユーザーに付与されます。 詳細については、「RAMユーザーの作成とRAMユーザーへの権限付与」をご参照ください。
ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数が設定されています。 詳細については、「環境変数の設定」をご参照ください。
重要Alibaba CloudアカウントのAccessKeyペアには、すべてのAPI操作に対する権限があります。 RAMユーザーのAccessKeyペアを使用して、API操作を呼び出したり、ルーチンのO&Mを実行したりすることを推奨します。
プロジェクトコードにAccessKey IDまたはAccessKey secretを保存しないことを推奨します。 そうしないと、AccessKeyペアが漏洩し、アカウント内のすべてのリソースのセキュリティが侵害される可能性があります。
背景情報
Flink Log Connectorは、Flink Log ConsumerとFlink Log Producerで構成されています。 Flink Log ConsumerとFlink Log Producerの違いを次に示します。
Flink Log Consumerは、Simple Log Serviceからデータを読み取ります。 Flink Log Consumerは、シャード間の1回限りのセマンティクスと負荷分散をサポートします。
Flink Log Producerは、Simple Log Serviceにデータを書き込みます。
Flink Log Connectorを使用する前に、プロジェクトにMaven依存関係を追加する必要があります。 サンプルコード:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>flink-log-connector</artifactId>
<version>0.1.38</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
他のプログラミング言語でコードを記述するには、GitHubのソースコードを参照してください。 詳細については、aliyun-log-flink-connectorをご参照ください。
Flinkログコンシューマ
Flink Log Consumerは、Logstoreからログデータを消費できます。 ログの消費中に、1回だけのセマンティクスが適用されます。 Flink Log Consumerは、Logstore内のシャード数の変化を検出します。
各Flinkサブタスクは、Logstore内の一部のシャードからデータを消費します。 Logstore内のシャードが分割またはマージされると、サブタスクで消費されるシャードも変更されます。
Flink Log Consumerを使用してSimple Log Serviceからデータを消費する場合、次のAPI操作を呼び出すことができます。
GetCursorOrData
この操作では、シャードからログデータをプルします。 この操作を頻繁に呼び出すと、データトラフィックがシャードの機能を超える可能性があります。 ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLISパラメーターを使用して、API呼び出しの間隔を制御できます。 ConfigConstants.LOG_MAX_NUMBER_PER_FETCHパラメーターを使用して、各API呼び出しによってプルされるログの数を制御できます。 シャード機能の詳細については、「シャード」をご参照ください。
例:
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100"); configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
ListShards
この操作を呼び出して、Logstore内のすべてのシャードと各シャードのステータスを表示できます。 シャードが頻繁に分割およびマージされる場合は、呼び出し間隔を調整して、シャードの数の変化をタイムリーに検出できます。 例:
// Call the ListShards operation once every 30 seconds. configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");
CreateConsumerGroup
この操作では、チェックポイントを同期するためのコンシューマグループを作成します。
UpdateCheckPoint
この操作では、Flinkスナップショットをコンシューマーグループに同期します。
起動パラメーターを設定します。
次のコードは、データの使用方法の例を示しています。 java.util.Propertiesクラスは設定ツールとして使用され、Flink Log Consumerの設定はConfigConstantsクラスに含まれます。
Properties configProps = new Properties(); // Specify the Simple Log Service endpoint. configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com"); // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); configProps.put(ConfigConstants.LOG_ACCESSKEYID,accessKeyId); configProps.put(ConfigConstants.LOG_ACCESSKEY,accessKeySecret); // Specify the project. String project = "your-project"; // Specify the Logstore. String logstore = "your-logstore"; // Specify the start position of log consumption. configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); // Specify the method that you want to use for data deserialization. FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<FastLogGroupList> dataStream = env.addSource( new FlinkLogConsumer<FastLogGroupList>(project, logstore, deserializer, configProps) ); dataStream.addSink(new SinkFunction<FastLogGroupList>() { @Override public void invoke(FastLogGroupList logGroupList, Context context) throws Exception { for (FastLogGroup logGroup : logGroupList.getLogGroups()) { int logsCount = logGroup.getLogsCount(); String topic = logGroup.getTopic(); String source = logGroup.getSource(); for (int i = 0; i < logsCount; ++i) { FastLog row = logGroup.getLogs(i); for (int j = 0; j < row.getContentsCount(); ++j) { FastLogContent column = row.getContents(j); // Process logs. System.out.println(column.getKey()); System.out.println(column.getValue()); } } } } }); // You can also use RawLogGroupListDeserializer. RawLogGroupListDeserializer rawLogGroupListDeserializer = new RawLogGroupListDeserializer(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<RawLogGroupList> rawLogGroupListDataStream = env.addSource( new FlinkLogConsumer<RawLogGroupList>(project, logstore, rawLogGroupListDeserializer, configProps) ); rawLogGroupListDataStream.addSink(new SinkFunction<RawLogGroupList>() { @Override public void invoke(RawLogGroupList logGroupList, Context context) throws Exception { for (RawLogGroup logGroup : logGroupList.getRawLogGroups()) { String topic = logGroup.getTopic(); String source = logGroup.getSource(); for (RawLog row : logGroup.getLogs()) { // Process logs. } } } });
説明Flinkサブタスクの数は、Logstore内のシャードの数とは無関係です。 シャードの数がサブタスクの数より大きい場合、各サブタスクは1つ以上のシャードのデータを消費します。 シャードの数がサブタスクの数より少ない場合、一部のサブタスクは新しいシャードが生成されるまでアイドル状態になります。 各シャードのデータは、1つのサブタスクのみで使用できます。
ログ消費の開始位置を指定します。
Flink Log Consumerを使用してLogstoreからデータを消費する場合、ConfigConstants.LOG_CONSUMER_BEGIN_POSITIONパラメーターを使用して、ログ消費の開始位置を指定できます。 最も早いログ、最新のログ、または特定の時点からデータの消費を開始できます。 さらに、Flink Log Consumerを使用すると、特定のコンシューマーグループからの消費を再開できます。 パラメーターを次のいずれかの値に設定できます。
Consts.LOG_BEGIN_CURSOR: 最も早いログからデータの消費を開始します。
Consts.LOG_END_CURSOR: 最新のログからデータの消費を開始します。
Consts.LOG_FROM_CHECKPOINT: 指定されたコンシューマーグループに格納されているチェックポイントからデータの消費を開始します。 ConfigConstants.LOG_CONSUMERGROUPパラメーターを使用して、コンシューマーグループを指定できます。
UnixTimestamp: ある時点からデータの消費を開始します。 この値は、1970年1月1日00:00:00 UTCから経過した秒数を表すUNIXタイムスタンプです。 INTEGERデータ型の文字列を指定する必要があります。
例:
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000"); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
説明Flinkタスクの開始時にFlinkのステートバックエンドからの消費再開を設定した場合、Flink Log Connectorはステートバックエンドに格納されたチェックポイントを使用します。
オプション: 消費進捗モニタリングを設定します。
Flink Log Consumerを使用すると、消費の進行状況を監視できます。 各シャードのリアルタイムの消費位置を取得できます。 消費位置は、タイムスタンプによって示される。 詳細については、「手順3: コンシューマーグループのステータスの表示」をご参照ください。.
例:
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name");
説明このステップは省略可能です。 消費進捗モニタリングを設定した後に消費者グループが存在しない場合、Flink Log consumerは消費者グループを作成します。 コンシューマーグループが存在する場合、操作を実行する必要はなく、Flink Log consumerのスナップショットは自動的にコンシューマーグループに同期されます。 Flink Log Consumerの消費の進行状況は、Simple Log Serviceコンソールで確認できます。
消費の再開と1回だけのセマンティクスを設定します。
Flinkのチェックポイント機能が有効になっている場合、Flink Log Consumerは各シャードの消費進行状況を定期的に保存します。 サブタスクが失敗した場合、Flinkはサブタスクを復元し、最新のチェックポイントからデータの消費を開始します。
チェックポイントが保存される間隔は、サブタスクが失敗した場合に消費される最大データ量を定義します。 次のコードを使用して、間隔を設定できます。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Configure the exactly-once semantics. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // Save checkpoints every 5 seconds. env.enableCheckpointing(5000);
Flinkチェックポイントの詳細については、「チェックポイント」をご参照ください。
Flinkログプロデューサー
Flink Log Producerは、Simple Log Serviceにデータを書き込みます。
Flink Log Producerは、Flink at-least-onceセマンティクスのみをサポートします。 タスクが失敗した場合、Simple Log Serviceに書き込まれた一部のデータが重複する可能性があります。 ただし、データが失われることはありません。
Flink Log Producerを使用してSimple Log Serviceにデータを書き込む場合、次のAPI操作を呼び出すことができます。
PutLogs
ListShards
Flinkログプロデューサーを初期化します。
Propertiesパラメーターを初期化します。
Flink Log ProducerはFlink Log Consumerと同じ方法で初期化されます。 Flink Log Producerの初期化パラメーターを設定する例を次に示します。 パラメーターのデフォルト値を使用するか、ビジネス要件を満たすカスタム値を指定できます。 例:
// The number of I/O threads that are used to send data. The default value is the number of cores. ConfigConstants.IO_THREAD_NUM // The maximum time during which logs can be cached before the logs are sent. Default value: 2000. Unit: milliseconds. ConfigConstants.FLUSH_INTERVAL_MS // The total memory size that can be used by the task. Default value: 100. Unit: MB. ConfigConstants.TOTAL_SIZE_IN_BYTES // The maximum blocking time for sending logs when the memory usage reaches the upper limit. Default value: 60000. Unit: milliseconds. ConfigConstants.MAX_BLOCK_TIME_MS // The maximum number of retries. Default value: 10. ConfigConstants.MAX_RETRIES
LogSerializationSchemaをリロードし、データを生のロググループにシリアル化するために使用されるメソッドを定義します。
生ロググループはログのコレクションです。 ログフィールドの詳細については、「ログ」をご参照ください。
特定のシャードにデータを書き込む場合は、LogPartitionerパラメーターを使用してログデータのハッシュキーを生成できます。 このパラメーターはオプションです。 このパラメーターを設定しない場合、データはランダムシャードに書き込まれます。
例:
FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps); logProducer.setCustomPartitioner(new LogPartitioner<String>() { // Generate a 32-bit hash key. public String getHashKey(String element) { try { MessageDigest md = MessageDigest.getInstance("MD5"); md.update(element.getBytes()); String hash = new BigInteger(1, md.digest()).toString(16); while(hash.length() < 32) hash = "0" + hash; return hash; } catch (NoSuchAlgorithmException e) { } return "0000000000000000000000000000000000000000000000000000000000000000"; } });
シミュレーション結果を文字列形式でSimple Log Serviceに書き込みます。 例:
// Serialize data into raw log groups. class SimpleLogSerializer implements LogSerializationSchema<String> { public RawLogGroup serialize(String element) { RawLogGroup rlg = new RawLogGroup(); RawLog rl = new RawLog(); rl.setTime((int)(System.currentTimeMillis() / 1000)); rl.addContent("message", element); rlg.addLog(rl); return rlg; } } public class ProducerSample { public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com"; // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. public static String sAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); public static String sAccessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static String sProject = "ali-cn-hangzhou-sls-admin"; public static String sLogstore = "test-flink-producer"; private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class); public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(3); DataStream<String> simpleStringStream = env.addSource(new EventsGenerator()); Properties configProps = new Properties(); // Specify the Simple Log Service endpoint. configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint); // Specify the AccessKey ID and AccessKey secret of your account. configProps.put(ConfigConstants.LOG_ACCESSKEYID, sAccessKeyId); configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey); // Specify the project to which you want to write logs. configProps.put(ConfigConstants.LOG_PROJECT, sProject); // Specify the Logstore to which you want to write logs. configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore); FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps); simpleStringStream.addSink(logProducer); env.execute("flink log producer"); } // Simulate log generation. public static class EventsGenerator implements SourceFunction<String> { private boolean running = true; @Override public void run(SourceContext<String> ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(10); ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); } } @Override public void cancel() { running = false; } } }
消費の例
この例では、Flink Log Consumerは、FastLogGroupList形式でデータストリームに読み込まれるデータを格納します。 次に、Flink Log ConsumerはflatMap関数を使用して、データをFastLogGroupList形式からJSON文字列形式に変換し、CLIに出力を表示するか、出力をテキストファイルに書き込みます。
package com.aliyun.openservices.log.flink.sample;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.flink.ConfigConstants;
import com.aliyun.openservices.log.flink.FlinkLogConsumer;
import com.aliyun.openservices.log.flink.data.FastLogGroupDeserializer;
import com.aliyun.openservices.log.flink.data.FastLogGroupList;
import com.aliyun.openservices.log.flink.model.CheckpointMode;
import com.aliyun.openservices.log.flink.util.Consts;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class FlinkConsumerSample {
private static final String SLS_ENDPOINT = "your-endpoint";
// In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
private static final String ACCESS_KEY_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String ACCESS_KEY_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static final String SLS_PROJECT = "your-project";
private static final String SLS_LOGSTORE = "your-logstore";
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
Configuration conf = new Configuration();
// Checkpoint dir like "file:///tmp/flink"
conf.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "your-checkpoint-dir");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(1);
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("file:///tmp/flinkstate"));
Properties configProps = new Properties();
configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
configProps.put(ConfigConstants.LOG_ACCESSKEYID, ACCESS_KEY_ID);
configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET);
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your-consumer-group");
configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());
configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000");
FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
DataStream<FastLogGroupList> stream = env.addSource(
new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));
stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
for (FastLogGroup logGroup : value.getLogGroups()) {
int logCount = logGroup.getLogsCount();
for (int i = 0; i < logCount; i++) {
FastLog log = logGroup.getLogs(i);
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", logGroup.getTopic());
jsonObject.put("source", logGroup.getSource());
for (int j = 0; j < log.getContentsCount(); j++) {
jsonObject.put(log.getContents(j).getKey(), log.getContents(j).getValue());
}
out.collect(jsonObject.toJSONString());
}
}
}).returns(String.class);
stream.writeAsText("log-" + System.nanoTime());
env.execute("Flink consumer");
}
}