本文为您介绍如何使用日志服务SLS连接器。
背景信息
日志服务是针对日志类数据的一站式服务。日志服务可以帮助您快捷地完成数据采集、消费、投递以及查询分析,提升运维和运营效率,建立海量日志处理能力。
SLS连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表和结果表 |
运行模式 | 仅支持流模式 |
特有监控指标 | 暂不适用 |
数据格式 | 暂无 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 不支持更新和删除结果表数据,只支持插入数据。 |
特色功能
SLS连接器源表支持直接读取消息的属性字段,支持的属性字段如下。
字段名 | 字段类型 | 字段说明 |
__source__ | STRING METADATA VIRTUAL | 消息源。 |
__topic__ | STRING METADATA VIRTUAL | 消息主题。 |
__timestamp__ | BIGINT METADATA VIRTUAL | 日志时间。 |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | 消息TAG。 对于属性 |
前提条件
已创建日志服务Project和Logstore,详情请参见创建Project和Logstore。
使用限制
仅实时计算引擎VVR 2.0.0及以上版本支持日志服务SLS连接器。
SLS连接器仅保证At-Least-Once语义。
仅实时计算引擎VVR 4.0.13及以上版本支持Shard数目变化触发自动Failover功能。
强烈建议不要设置Source并发度大于Shard个数,不仅会造成资源浪费,且在8.0.5及更低版本中,如果后续Shard数目发生变化,自动Failover功能可能会失效,造成部分Shard不被消费。
语法结构
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值sls。
endPoint
EndPoint地址。
String
是
无
请填写SLS的私网服务地址,详情请参见服务接入点。
project
SLS项目名称。
String
是
无
无。
logStore
SLS LogStore或metricstore名称。
String
是
无
logStore和metricstore是相同的消费方式。
accessId
阿里云账号的AccessKey ID。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理。
accessKey
阿里云账号的AccessKey Secret。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量管理。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
enableNewSource
是否启用实现了FLIP-27接口的新数据源。
Boolean
否
false
新数据源可以自动适应shard变化,同时尽可能保证shard在所有的source并发上分布均匀。
说明仅实时计算引擎VVR 8.0.9及以上版本支持该参数。
重要作业在该配置项发生变化后无法从状态恢复。
可通过先设置配置项consumerGroup启动作业,将消费进度记录到SLS消费组中,再将配置项consumeFromCheckpoint设为true后无状态启动作业,从而实现从历史进度继续消费。
shardDiscoveryIntervalMs
动态检测shard变化时间间隔,单位为毫秒。
Long
否
60000
设置为负值时可以关闭动态检测。
说明仅当配置项enableNewSource为true时生效。
仅实时计算引擎VVR 8.0.9及以上版本支持该参数。
startupMode
源表启动模式。
String
否
timestamp
参数取值如下:
timestamp(默认):从指定的起始时间开始消费日志。
latest:从最新位点开始消费日志。
earliest:从最早位点开始消费日志。
说明若将配置项consumeFromCheckpoint设为true,则会从指定的消费组中保存的Checkpoint开始消费日志,此处的启动模式将不会生效。
startTime
消费日志的开始时间。
String
否
当前时间
格式为yyyy-MM-dd hh:mm:ss。
仅当startupMode设为timestamp时生效。
说明startTime和stopTime基于SLS中的__receive_time__属性,而非__timestamp__属性。
stopTime
消费日志的结束时间。
String
否
无
格式为yyyy-MM-dd hh:mm:ss。
说明如期望日志消费到结尾时退出Flink程序,需要同时设置exitAfterFinish=true.
consumerGroup
消费组名称。
String
否
无
消费组用于记录消费进度。您可以自定义消费组名,无固定格式。
说明不支持通过相同的消费组进行多作业的协同消费。不同的Flink作业应该设置不同的消费组。如果不同的Flink作业使用相同的消费组,它们将会消费全部数据。这是因为在Flink消费SLS的数据时,并不会经过SLS消费组进行分区分配,因此导致各个消费者独立消费各自的消息,即使消费组是相同的。
consumeFromCheckpoint
是否从指定的消费组中保存的Checkpoint开始消费日志。
String
否
false
参数取值如下:
true:必须同时指定消费组,Flink程序会从消费组中保存的Checkpoint开始消费日志,如果该消费组没有对应的Checkpoint,则从startTime配置值开始消费。
false(默认值):不从指定的消费组中保存的Checkpoint开始消费日志。
说明仅实时计算引擎VVR 6.0.5及以上版本支持该参数。
maxRetries
读取SLS失败后重试次数。
String
否
3
无。
batchGetSize
单次请求读取logGroup的个数。
String
否
100
batchGetSize设置不能超过1000,否则会报错。
exitAfterFinish
在数据消费完成后,Flink程序是否退出。
String
否
false
参数取值如下:
true:数据消费完后,Flink程序退出。
false(默认):数据消费完后,Flink程序不退出。
说明仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
query
SLS消费预处理语句。
String
否
无
通过使用query参数,您可以在消费SLS数据之前对其进行过滤,以避免将所有数据都消费到Flink中,从而实现节约成本和提高处理速度的目的。
例如
'query' = '*| where request_method = ''GET'''
表示在Flink读取SLS数据前,先匹配出request_method字段值等于get的数据。说明query需使用日志服务SPL语言,请参见SPL概述。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
topicField
指定字段名,该字段的值会覆盖__topic__属性字段的值,表示日志的主题。
String
否
无
该参数值是表中已存在的字段之一。
timeField
指定字段名,该字段的值会覆盖__timestamp__属性字段的值,表示日志写入时间。
String
否
当前时间
该参数值是表中已存在的字段之一,且字段类型必须为INT。如果未指定,则默认填充当前时间。
sourceField
指定字段名,该字段的值会覆盖__source__属性字段的值,表示日志的来源地,例如产生该日志机器的IP地址。
String
否
无
该参数值是表中已存在的字段之一。
partitionField
指定字段名,数据写入时会根据该列值计算Hash值,Hash值相同的数据会写入同一个shard。
String
否
无
如果未指定,则每条数据会随机写入当前可用的Shard中。
buckets
当指定partitionField时,根据Hash值重新分组的个数。
String
否
64
该参数的取值范围是[1, 256],且必须是2的整数次幂。同时,buckets个数应当大于等于Shard个数,否则会出现部分Shard没有数据写入的情况。
说明仅实时计算引擎VVR 6.0.5及以上版本支持该参数。
flushIntervalMs
触发数据写入的时间周期。
String
否
2000
单位为毫秒。
writeNullProperties
是否将null值作为空字符串写入SLS。
Boolean
否
true
参数取值如下:
true(默认值):将null值作为空字符串写入日志。
false:计算结果为null的字段不会写入到日志中。
说明仅实时计算引擎VVR 8.0.6及以上版本支持该参数。
类型映射
Flink字段类型 | SLS字段类型 |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
代码示例
CREATE TEMPORARY TABLE sls_input(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` STRING METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
`time`,
url,
dt,
float_field,
double_field,
boolean_field,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;
DataStream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。
读取SLS
实时计算引擎VVR提供SourceFunction的实现类SlsSourceFunction,用于读取SLS,读取SLS的示例如下。
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Creates and adds SLS source and sink.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// The batch get size must be given.
accessInfo.setBatchGetSize(10);
// Optional parameters
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// time to start consuming, set to current time.
int startInSec = (int) (new Date().getTime() / 1000);
// time to stop consuming, -1 means never stop.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}
写入SLS
提供OutputFormat的实现类SLSOutputFormat,用于写入SLS。写入SLS的示例如下。
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}
XML
Maven中央库中已经放置了SLS DataStream连接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
</dependency>
常见问题
恢复失败的Flink程序时,TaskManager发生OOM,源表报错java.lang.OutOfMemoryError: Java heap space