Flink Log Connector是Log Service提供的用於對接Flink的工具,支援對接開源Flink和Realtime ComputeFlink版。本文介紹如何對接Flink消費日誌資料。
前提條件
已開通Log Service。更多資訊,請參見開通Log Service。
已建立RAM使用者並完成授權。具體操作,請參見建立RAM使用者並完成授權。
已配置環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變數。
重要阿里雲帳號的AccessKey擁有所有API的存取權限,建議您使用RAM使用者的AccessKey進行API訪問或日常營運。
強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
已建立Project和Logstore。具體操作,請參見建立專案Project和建立Logstore。
背景資訊
Flink Log Connector包括兩部分,消費者(Flink Log Consumer)和生產者(Flink Log Producer),兩者用途區別如下:
消費者用於從Log Service中讀取資料,支援exactly once語義,支援Shard負載平衡。
生產者用於將資料寫入Log Service。
使用Flink Log Connector時,需要在專案中添加Maven依賴,樣本如下:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>flink-log-connector</artifactId>
<version>0.1.38</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
除此之外的代碼編寫,請您參考GitHub上源碼進行編寫。更多資訊,請參見aliyun-log-flink-connector。
Flink Log Consumer
在Flink Log Connector中,Flink Log Consumer提供了訂閱Log Service中某一個Logstore的能力,實現了exactly once語義,在使用時您無需關心Logstore中Shard數量的變化,Flink Log Consumer會自動感知。
Flink中每一個子任務負責消費Logstore中的部分Shard,如果Logstore中Shard發生分裂或合并,子任務消費的Shard也會隨之改變。
Flink Log Consumer用到的Log ServiceAPI介面如下:
GetCursorOrData
用於從Shard中擷取資料,注意頻繁的調用該介面可能會導致資料超過Log Service的Shard限額,可以通過ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH控制介面調用的時間間隔和每次調用擷取的日誌數量。Shard的限額請參見分區(Shard)。
樣本如下:
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100"); configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
ListShards
用於擷取Logstore中所有的Shard列表及Shard狀態等。如果您的Shard經常發生分裂合并,可以通過調整介面的調用周期來及時發現Shard的變化。樣本如下:
// 設定每30s調用一次ListShards介面。 configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");
CreateConsumerGroup
當設定消費進度監控時調用該介面建立ConsumerGroup,用於同步Checkpoint。
UpdateCheckPoint
該介面將Flink的snapshot同步到Log Service的ConsumerGroup中。
配置啟動參數。
以下是一個簡單的消費樣本,使用java.util.Properties作為組態工具,所有Flink Log Consumer的配置均在ConfigConstants中。
Properties configProps = new Properties(); // 設定訪問Log Service的網域名稱。 configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com"); // 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。 String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); configProps.put(ConfigConstants.LOG_ACCESSKEYID,accessKeyId); configProps.put(ConfigConstants.LOG_ACCESSKEY,accessKeySecret); // 設定Log Service的project。 String project = "your-project"; // 設定Log Service的Logstore。 String logstore = "your-logstore"; // 設定消費Log Service起始位置。 configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); // 設定Log Service的訊息還原序列化方法。 FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<FastLogGroupList> dataStream = env.addSource( new FlinkLogConsumer<FastLogGroupList>(project, logstore, deserializer, configProps) ); dataStream.addSink(new SinkFunction<FastLogGroupList>() { @Override public void invoke(FastLogGroupList logGroupList, Context context) throws Exception { for (FastLogGroup logGroup : logGroupList.getLogGroups()) { int logsCount = logGroup.getLogsCount(); String topic = logGroup.getTopic(); String source = logGroup.getSource(); for (int i = 0; i < logsCount; ++i) { FastLog row = logGroup.getLogs(i); for (int j = 0; j < row.getContentsCount(); ++j) { FastLogContent column = row.getContents(j); // 處理日誌 System.out.println(column.getKey()); System.out.println(column.getValue()); } } } } }); // 或者使用RawLogGroupListDeserializer RawLogGroupListDeserializer rawLogGroupListDeserializer = new RawLogGroupListDeserializer(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<RawLogGroupList> rawLogGroupListDataStream = env.addSource( new FlinkLogConsumer<RawLogGroupList>(project, logstore, rawLogGroupListDeserializer, configProps) ); rawLogGroupListDataStream.addSink(new SinkFunction<RawLogGroupList>() { @Override public void invoke(RawLogGroupList logGroupList, Context context) throws Exception { for (RawLogGroup logGroup : logGroupList.getRawLogGroups()) { String topic = logGroup.getTopic(); String source = logGroup.getSource(); for (RawLog row : logGroup.getLogs()) { // 處理日誌 } } } });
說明Flink的子任務數量和Log ServiceLogstore中的Shard數量是獨立的,如果Shard數量多於子任務數量,每個子任務不重複的消費Shard,如果少於子任務數量,那麼部分子任務就會空閑,直到新的Shard產生。
設定消費起始位置。
Flink Log Consumer支援設定Shard的消費起始位置,通過設定屬性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定製從Shard的頭、尾或者某個特定時間開始消費。另外,Flink Log Connector也支援從某個具體的消費組中恢複消費。屬性的具體取值如下:
Consts.LOG_BEGIN_CURSOR:表示從Shard的頭開始消費,也就是從Shard中最舊的資料開始消費。
Consts.LOG_END_CURSOR:表示從Shard的尾部開始,也就是從Shard中最新的資料開始消費。
Consts.LOG_FROM_CHECKPOINT:表示從某個特定的消費組中儲存的Checkpoint開始消費,通過ConfigConstants.LOG_CONSUMERGROUP指定具體的消費組。
UnixTimestamp:一個整型數值的字串,用1970-01-01到現在的秒數表示,含義是消費Shard中這個時間點之後的資料。
樣本如下:
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000"); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
說明如果在啟動Flink任務時,設定了從Flink自身的StateBackend中恢複,那麼Flink Log Connector會忽略上面的配置,使用StateBackend中儲存的Checkpoint。
可選:設定消費進度監控。
Flink Log Consumer支援設定消費進度監控,擷取每一個Shard的即時消費位置,使用時間戳表示。更多資訊,請參見步驟二:查看消費組狀態。
樣本如下:
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name");
說明該項為可選配置項,設定後Flink Log Consumer會首先建立消費組,如果消費組已經存在,則不執行任何操作,Flink Log Consumer中的snapshot會自動同步到Log Service的消費組中,您可以通過Log Service的控制台查看Flink Log Consumer的消費進度。
設定容災和exactly once語義支援。
當開啟Flink的Checkpointing功能時,Flink Log Consumer會周期性地將每個Shard的消費進度儲存,當任務失敗時,Flink會恢複消費任務,並從儲存的最新的Checkpoint開始消費。
Checkpoint的周期定義了當任務失敗時,最多多少的資料會被回溯,即重新消費,使用代碼如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 開啟Flink exactly once語義。 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 每5s儲存一次Checkpoint。 env.enableCheckpointing(5000);
更多Flink Checkpoint的資訊請參見Checkpoints。
Flink Log Producer
Flink Log Producer用於將資料寫入Log Service。
Flink Log Producer只支援Flink at least once語義,在任務失敗時,寫入Log Service中的資料有可能會重複,但不會丟失。
Flink Log Producer用到的Log ServiceAPI介面如下:
PutLogs
ListShards
初始化Flink Log Producer。
初始化配置參數Properties。
Flink Log Producer初始化步驟與Flink Log Consumer類似。Flink Log Producer初始化配置包含以下參數,一般情況下使用預設值即可,如有需要可以自訂配置。樣本如下:
// 用於發送資料的I/O線程的數量,預設為核心數。 ConfigConstants.IO_THREAD_NUM // 日誌發送前被緩衝的最大允許時間,預設為2000毫秒。 ConfigConstants.FLUSH_INTERVAL_MS // 任務可以使用的記憶體總的大小,預設為100 MB。 ConfigConstants.TOTAL_SIZE_IN_BYTES // 記憶體達到上限時,發送日誌的最大阻塞時間,單位為毫秒,預設為 60s。 ConfigConstants.MAX_BLOCK_TIME_MS // 最大重試次數,預設為 10 次。 ConfigConstants.MAX_RETRIES
重載LogSerializationSchema,定義將資料序列化成RawLogGroup的方法。
RawLogGroup是日誌的集合,各欄位含義請參見日誌(Log)。
如果您需要指定資料寫到某一個Shard中,可以使用LogPartitioner產生資料的HashKey,LogPartitioner為可選項,如果您沒有配置,資料會隨機寫入某一個Shard。
樣本如下:
FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps); logProducer.setCustomPartitioner(new LogPartitioner<String>() { // 產生32位Hash值。 public String getHashKey(String element) { try { MessageDigest md = MessageDigest.getInstance("MD5"); md.update(element.getBytes()); String hash = new BigInteger(1, md.digest()).toString(16); while(hash.length() < 32) hash = "0" + hash; return hash; } catch (NoSuchAlgorithmException e) { } return "0000000000000000000000000000000000000000000000000000000000000000"; } });
將類比產生的字串寫入Log Service,樣本如下:
// 將資料序列化成Log Service的資料格式。 class SimpleLogSerializer implements LogSerializationSchema<String> { public RawLogGroup serialize(String element) { RawLogGroup rlg = new RawLogGroup(); RawLog rl = new RawLog(); rl.setTime((int)(System.currentTimeMillis() / 1000)); rl.addContent("message", element); rlg.addLog(rl); return rlg; } } public class ProducerSample { public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com"; //本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。 public static String sAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); public static String sAccessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static String sProject = "ali-cn-hangzhou-sls-admin"; public static String sLogstore = "test-flink-producer"; private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class); public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(3); DataStream<String> simpleStringStream = env.addSource(new EventsGenerator()); Properties configProps = new Properties(); // 設定訪問Log Service的網域名稱。 configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint); // 設定使用者AK。 configProps.put(ConfigConstants.LOG_ACCESSKEYID, sAccessKeyId); configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey); // 設定日誌寫入的Log Serviceproject。 configProps.put(ConfigConstants.LOG_PROJECT, sProject); // 設定日誌寫入的Log ServiceLogstore。 configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore); FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps); simpleStringStream.addSink(logProducer); env.execute("flink log producer"); } // 類比產生日誌。 public static class EventsGenerator implements SourceFunction<String> { private boolean running = true; @Override public void run(SourceContext<String> ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(10); ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); } } @Override public void cancel() { running = false; } } }
消費樣本
本樣本中,Flink Log Consumer將讀取到的資料以FastLogGroupList形式儲存到資料流中,接著使用flatMap函數將FastLogGroupList轉換為JSON字串並輸出到命令列或寫入文字檔。
package com.aliyun.openservices.log.flink.sample;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.flink.ConfigConstants;
import com.aliyun.openservices.log.flink.FlinkLogConsumer;
import com.aliyun.openservices.log.flink.data.FastLogGroupDeserializer;
import com.aliyun.openservices.log.flink.data.FastLogGroupList;
import com.aliyun.openservices.log.flink.model.CheckpointMode;
import com.aliyun.openservices.log.flink.util.Consts;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class FlinkConsumerSample {
private static final String SLS_ENDPOINT = "your-endpoint";
//本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
private static final String ACCESS_KEY_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String ACCESS_KEY_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static final String SLS_PROJECT = "your-project";
private static final String SLS_LOGSTORE = "your-logstore";
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
Configuration conf = new Configuration();
// Checkpoint dir like "file:///tmp/flink"
conf.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "your-checkpoint-dir");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(1);
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("file:///tmp/flinkstate"));
Properties configProps = new Properties();
configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
configProps.put(ConfigConstants.LOG_ACCESSKEYID, ACCESS_KEY_ID);
configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET);
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your-consumer-group");
configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());
configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000");
FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
DataStream<FastLogGroupList> stream = env.addSource(
new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));
stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
for (FastLogGroup logGroup : value.getLogGroups()) {
int logCount = logGroup.getLogsCount();
for (int i = 0; i < logCount; i++) {
FastLog log = logGroup.getLogs(i);
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", logGroup.getTopic());
jsonObject.put("source", logGroup.getSource());
for (int j = 0; j < log.getContentsCount(); j++) {
jsonObject.put(log.getContents(j).getKey(), log.getContents(j).getValue());
}
out.collect(jsonObject.toJSONString());
}
}
}).returns(String.class);
stream.writeAsText("log-" + System.nanoTime());
env.execute("Flink consumer");
}
}