本文為您介紹如何使用Log ServiceSLS連接器。
背景資訊
Log Service是針對日誌類資料的一站式服務。Log Service可以協助您快捷地完成資料擷取、消費、投遞以及查詢分析,提升營運和營運效率,建立海量Tlog能力。
SLS連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表和結果表 |
運行模式 | 僅支援流模式 |
特有監控指標 | 暫不適用 |
資料格式 | 暫無 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 不支援更新和刪除結果表資料,只支援插入資料。 |
特色功能
SLS連接器源表支援直接讀取訊息的屬性欄位,支援的屬性欄位如下。
欄位名 | 欄位類型 | 欄位說明 |
__source__ | STRING METADATA VIRTUAL | 訊息源。 |
__topic__ | STRING METADATA VIRTUAL | 訊息主題。 |
__timestamp__ | BIGINT METADATA VIRTUAL | 日誌時間。 |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | 訊息TAG。 對於屬性 |
前提條件
已建立Log ServiceProject和Logstore,詳情請參見建立Project和Logstore。
使用限制
僅Realtime Compute引擎VVR 2.0.0及以上版本支援Log ServiceSLS連接器。
SLS連接器僅保證At-Least-Once語義。
僅Realtime Compute引擎VVR 4.0.13及以上版本支援Shard數目變化觸發自動Failover功能。
強烈建議不要設定Source並發度大於Shard個數,不僅會造成資源浪費,且在8.0.5及更低版本中,如果後續Shard數目發生變化,自動Failover功能可能會失效,造成部分Shard不被消費。
文法結構
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值sls。
endPoint
EndPoint地址。
String
是
無
請填寫SLS的私網服務地址,詳情請參見服務存取點。
project
SLS專案名稱。
String
是
無
無。
logStore
SLS LogStore或metricstore名稱。
String
是
無
logStore和metricstore是相同的消費方式。
accessId
阿里雲帳號的AccessKey ID。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理。
accessKey
阿里雲帳號的AccessKey Secret。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理。
源表專屬
參數
說明
資料類型
是否必填
預設值
備忘
enableNewSource
是否啟用實現了FLIP-27介面的新資料來源。
Boolean
否
false
新資料來源可以自動適應shard變化,同時儘可能保證shard在所有的source並發上分布均勻。
說明僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。
重要作業在該配置項發生變化後無法從狀態恢複。
可通過先設定配置項consumerGroup啟動作業,將消費進度記錄到SLS消費組中,再將配置項consumeFromCheckpoint設為true後無狀態啟動作業,從而實現從歷史進度繼續消費。
shardDiscoveryIntervalMs
動態檢測shard變化時間間隔,單位為毫秒。
Long
否
60000
設定為負值時可以關閉動態檢測。
說明僅當配置項enableNewSource為true時生效。
僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。
startupMode
源表啟動模式。
String
否
timestamp
參數取值如下:
timestamp(預設):從指定的起始時間開始消費日誌。
latest:從最新位點開始消費日誌。
earliest:從最早位點開始消費日誌。
說明若將配置項consumeFromCheckpoint設為true,則會從指定的消費組中儲存的Checkpoint開始消費日誌,此處的啟動模式將不會生效。
startTime
消費日誌的開始時間。
String
否
目前時間
格式為yyyy-MM-dd hh:mm:ss。
僅當startupMode設為timestamp時生效。
說明startTime和stopTime基於SLS中的__receive_time__屬性,而非__timestamp__屬性。
stopTime
消費日誌的結束時間。
String
否
無
格式為yyyy-MM-dd hh:mm:ss。
說明如期望日誌消費到結尾時退出Flink程式,需要同時設定exitAfterFinish=true.
consumerGroup
消費組名稱。
String
否
無
消費組用於記錄消費進度。您可以自訂消費組名,無固定格式。
說明不支援通過相同的消費組進行多作業的協同消費。不同的Flink作業應該設定不同的消費組。如果不同的Flink作業使用相同的消費組,它們將會消費全部資料。這是因為在Flink消費SLS的資料時,並不會經過SLS消費組進行分區分配,因此導致各個消費者獨立消費各自的訊息,即使消費組是相同的。
consumeFromCheckpoint
是否從指定的消費組中儲存的Checkpoint開始消費日誌。
String
否
false
參數取值如下:
true:必須同時指定消費組,Flink程式會從消費組中儲存的Checkpoint開始消費日誌,如果該消費組沒有對應的Checkpoint,則從startTime配置值開始消費。
false(預設值):不從指定的消費組中儲存的Checkpoint開始消費日誌。
說明僅Realtime Compute引擎VVR 6.0.5及以上版本支援該參數。
maxRetries
讀取SLS失敗後重試次數。
String
否
3
無。
batchGetSize
單次請求讀取logGroup的個數。
String
否
100
batchGetSize設定不能超過1000,否則會報錯。
exitAfterFinish
在資料消費完成後,Flink程式是否退出。
String
否
false
參數取值如下:
true:資料消費完後,Flink程式退出。
false(預設):資料消費完後,Flink程式不退出。
說明僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。
query
SLS消費預先處理語句。
String
否
無
通過使用query參數,您可以在消費SLS資料之前對其進行過濾,以避免將所有資料都消費到Flink中,從而實現節約成本和提高處理速度的目的。
例如
'query' = '*| where request_method = ''GET'''
表示在Flink讀取SLS資料前,先匹配出request_method欄位值等於get的資料。說明query需使用Log ServiceSPL語言,請參見SPL概述。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
topicField
指定欄位名,該欄位的值會覆蓋__topic__屬性欄位的值,表示日誌的主題。
String
否
無
該參數值是表中已存在的欄位之一。
timeField
指定欄位名,該欄位的值會覆蓋__timestamp__屬性欄位的值,表示日誌寫入時間。
String
否
目前時間
該參數值是表中已存在的欄位之一,且欄位類型必須為INT。如果未指定,則預設填充目前時間。
sourceField
指定欄位名,該欄位的值會覆蓋__source__屬性欄位的值,表示日誌的來源地,例如產生該日誌機器的IP地址。
String
否
無
該參數值是表中已存在的欄位之一。
partitionField
指定欄位名,資料寫入時會根據該列值計算Hash值,Hash值相同的資料會寫入同一個shard。
String
否
無
如果未指定,則每條資料會隨機寫入當前可用的Shard中。
buckets
當指定partitionField時,根據Hash值重新分組的個數。
String
否
64
該參數的取值範圍是[1, 256],且必須是2的整數次冪。同時,buckets個數應當大於等於Shard個數,否則會出現部分Shard沒有資料寫入的情況。
說明僅Realtime Compute引擎VVR 6.0.5及以上版本支援該參數。
flushIntervalMs
觸發資料寫入的時間周期。
String
否
2000
單位為毫秒。
writeNullProperties
是否將null值作為空白字串寫入SLS。
Boolean
否
true
參數取值如下:
true(預設值):將null值作為空白字串寫入日誌。
false:計算結果為null的欄位不會寫入到日誌中。
說明僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。
類型映射
Flink欄位類型 | SLS欄位類型 |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
程式碼範例
CREATE TEMPORARY TABLE sls_input(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` STRING METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
`time`,
url,
dt,
float_field,
double_field,
boolean_field,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;
DataStream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法。
讀取SLS
Realtime Compute引擎VVR提供SourceFunction的實作類別SlsSourceFunction,用於讀取SLS,讀取SLS的樣本如下。
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Creates and adds SLS source and sink.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// The batch get size must be given.
accessInfo.setBatchGetSize(10);
// Optional parameters
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// time to start consuming, set to current time.
int startInSec = (int) (new Date().getTime() / 1000);
// time to stop consuming, -1 means never stop.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}
寫入SLS
提供OutputFormat的實作類別SLSOutputFormat,用於寫入SLS。寫入SLS的樣本如下。
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}
XML
Maven中央庫中已經放置了SLS DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
</dependency>
常見問題
恢複失敗的Flink程式時,TaskManager發生OOM,源表報錯java.lang.OutOfMemoryError: Java heap space