This topic describes how to use the Simple Log Service connector.
Background information
Simple Log Service is an end-to-end data logging service that is developed by Alibaba Cloud. Simple Log Service allows you to collect, consume, ship, query, and analyze log data in an efficient manner. It improves the O&M efficiency and provides the capability to process large amounts of log data.
The following table describes the capabilities supported by the Simple Log Service connector.
Category | Description |
Table type | Source table and result table |
Running mode | Streaming mode |
Metric | N/A |
Data format | N/A |
API type | SQL API |
Data update or deletion in a result table | Data in a result table cannot be updated or deleted. Data can only be inserted into a result table. |
Features
The Simple Log Service source connector can be used to read the attribute fields of messages. The following table describes the attribute fields supported by the Simple Log Service source connector.
Field | Type | Description |
__source__ | STRING METADATA VIRTUAL | The message source. |
__topic__ | STRING METADATA VIRTUAL | The message topic. |
__timestamp__ | BIGINT METADATA VIRTUAL | The time when the log is generated. |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | The mesasge tag. The |
Prerequisites
A project and a Logstore are created. For more information, see the :Create a project and a Logstore" section of the Getting Started topic.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the Simple Log Service connector.
The Simple Log Service connector supports only the at-least-once semantics.
Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports automatic failovers of deployments due to the change of shard numbers.
We recommended that you do not set the deployment parallelism for a source node to a value greater than the number of shards. If the deployment parallelism for a source node is greater than the number of shards, resources may be wasted. In Realtime Compute for Apache Flink that uses VVR 8.0.5 or earlier, if the number of shards changes, automatic failovers of deployments may become invalid and specific shards may not be consumed.
Syntax
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the table.
STRING
Yes
None
Set the value to sls.
endPoint
The endpoint of Simple Log Service.
STRING
Yes
None
Enter the internal endpoint of Simple Log Service. For more information, see Endpoints.
NoteBy default, Realtime Compute for Apache Flink cannot access the Internet. Alibaba Cloud provides NAT gateways to enable the communication between virtual private clouds (VPCs) and the Internet. For more information, see Console operations.
We recommend that you do not access Simple Log Service over the Internet. If you want to access Simple Log Service over the Internet, use HTTPS and enable the global acceleration feature. For more information, Use the transfer acceleration feature.
project
The name of the Simple Log Service project.
STRING
Yes
None
N/A
logStore
The name of a Logstore or Metricstore of Simple Log Service.
STRING
Yes
None
Data in a Logstore is consumed in the same way as in a Metricstore.
accessId
The AccessKey ID of your Alibaba Cloud account.
STRING
Yes
None
For more information, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.
ImportantTo protect your AccessKey pair, we recommend that you configure the AccessKey ID by using the key management method. For more information, see Manage variables and keys.
accessKey
The AccessKey secret of your Alibaba Cloud account.
STRING
Yes
None
For more information, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.
ImportantTo protect your AccessKey pair, we recommend that you configure the AccessKey secret by using the key management method. For more information, see Manage variables and keys.
Parameters only for source tables
Parameter
Description
Data type
Required
Default value
Remarks
enableNewSource
Specifies whether to use the FLIP-27 refactor source interface.
BOOLEAN
No
false
The FLIP-27 refactor source interface automatically adapts to shard changes. This ensures that shards are evenly distributed across all the data sources.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.9 or later supports this parameter.
ImportantThe deployment cannot be restored from a specific state after the value of this parameter changes.
You can configure the consumerGroup parameter to start the deployment. When the deployment consumes data of Simple Log Service, Realtime Compute for Apache Flink records the current consumer offset in a Simple Log Service consumer group. In this case, you can set the consumeFromCheckpoint parameter to true and start the deployment without states. This way, the deployment resumes consumption from the current consumer offset.
shardDiscoveryIntervalMs
The interval at which shard changes are dynamically detected. Unit: milliseconds.
LONG
No
60000
If you set this parameter to a negative value, the dynamic detection feature can be disabled.
NoteThis parameter takes effect only if the enableNewSource parameter is set to true.
Only Realtime Compute for Apache Flink that uses VVR 8.0.9 or later supports this parameter.
startupMode
The startup mode of the source table.
STRING
No
timestamp
Valid values:
timestamp: Logs are consumed from the specified start time. This is the default value.
latest: Logs are consumed from the latest offset.
earliest: Logs are consumed from the earliest offset.
NoteIf you set the consumeFromCheckpoint parameter to true, logs are consumed from a checkpoint stored in the specified consumer group. The startup mode specified by this parameter does not take effect.
startTime
The time at which logs start to be consumed.
STRING
No
Current time
Format: yyyy-MM-dd hh:mm:ss.
This parameter takes effect only if the startupMode parameter is set to timestamp.
NoteThe startTime and stopTime parameters are configured based on the __receive_time__ attribute field in a Simple Log Service source table but are not based on the __timestamp__ attribute field.
stopTime
The time at which log consumption is stopped.
STRING
No
None
Format: yyyy-MM-dd hh:mm:ss.
NoteIf you want Realtime Compute for Apache Flink to exit after log consumption is complete, you must configure the exitAfterFinish parameter together with this parameter and set the exitAfterFinish parameter to true.
consumerGroup
The name of the consumer group.
STRING
No
None
A consumer group records the consumption progress. You can specify a custom consumer group name. The format of the name is not fixed.
NoteA consumer group cannot be shared by multiple deployments for collaborative consumption. We recommend that you specify different consumer groups for different Realtime Compute for Apache Flink deployments. If you specify the same consumer group for different Realtime Compute for Apache Flink deployments, all data is consumed. When Realtime Compute for Apache Flink consumes data of Simple Log Service, the data is not sharded in a Simple Log Service consumer group. Therefore, if the deployments share the same consumer group, all messages in the consumer group are consumed by each deployment.
consumeFromCheckpoint
Specifies whether to consume logs from the checkpoint that is stored in the specified consumer group.
STRING
No
false
Valid values:
true: If you set this parameter to true, you must specify a consumer group. After you specify a consumer group, Realtime Compute for Apache Flink consumes logs from the checkpoint that is stored in the consumer group. If no checkpoint exists in the consumer group, Realtime Compute for Apache Flink consumes logs from the time specified by the startTime parameter.
false: Realtime Compute for Apache Flink does not consume logs from the checkpoint that is stored in the specified consumer group. This is the default value.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.5 or later supports this parameter.
maxRetries
The number of retries that are allowed when data fails to be read from Simple Log Service.
STRING
No
3
N/A
batchGetSize
The number of log groups from which data is read in a request.
STRING
No
100
The value of the batchGetSize parameter cannot exceed 1000. Otherwise, an error is returned.
exitAfterFinish
Specifies whether Realtime Compute for Apache Flink exits after data consumption is complete.
STRING
No
false
Valid values:
true: Realtime Compute for Apache Flink exits after data consumption is complete.
false: Realtime Compute for Apache Flink does not exit after data consumption is complete. This is the default value.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.
query
The query statement that is used to preprocess data before data consumption in Simple Log Service.
STRING
No
None
If you configure the query parameter, Realtime Compute for Apache Flink can filter out data from Simple Log Service before Realtime Compute for Apache Flink consumes data in Simple Log Service. This helps prevent Realtime Compute for Apache Flink from consuming all data in Simple Log Service. This reduces costs and improves data processing efficiency.
For example, if you execute the
'query' = '*| where request_method = ''GET'''
statement, Realtime Compute for Apache Flink matches the data whose value of the request_method field is equal to the value of the GET clause before Realtime Compute for Apache Flink reads data from a Logstore of Simple Log Service.NoteSimple Log Service Processing Language (SPL) is required if you execute the query statement to preprocess data. For more information, see SPL overview.
ImportantOnly Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports this parameter.
For more information about the regions in which Simple Log Service supports this feature, see Consume logs based on rules.
This feature is free of charge in public preview phase. You may be charged for Simple Log Service in the future. For more information, see the "Billing" section of the Consume logs based on rules topic.
Parameters only for result tables
Parameter
Description
Data type
Required
Default value
Remarks
topicField
Specifies a field name. The value of this parameter overwrites the value of the __topic__ attribute field to indicate the topic of the log.
STRING
No
None
The value of this parameter must be an existing field in the table.
timeField
Specifies a field name. The value of this parameter overwrites the value of the __timestamp__ attribute field to indicate the log write time.
STRING
No
Current time
The value of this parameter must be an existing field of the INT type in the table. If no field is specified, the current time is used by default.
sourceField
Specifies a field name. The value of this parameter overwrites the value of the __source__ attribute field to indicate the origin of the log. For example, the value is the IP address of the machine that generates the log.
STRING
No
None
The value of this parameter must be an existing field in the table.
partitionField
Specifies a field name. A hash value is calculated based on the value of this parameter when data is written to Simple Log Service. Data that includes the same hash value is written to the same shard.
STRING
No
None
If you do not specify this parameter, each data entry is randomly written to an available shard.
buckets
The number of buckets that are regrouped based on the hash value when the partitionField parameter is specified.
STRING
No
64
Valid values: [1,256]. The value of this parameter must be an integer power of 2. The number of buckets must be greater than or equal to the number of shards. Otherwise, no data is written to specific shards.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.5 or later supports this parameter.
flushIntervalMs
The interval at which data writing is triggered.
STRING
No
2000
Unit: milliseconds.
writeNullProperties
Specifies whether to write null values as empty strings to Simple Log Service.
BOOLEAN
No
true
Valid values:
true: Null values are written as empty strings to Simple Log Service. This is the default value.
false: Null values are not written to Simple Log Service.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
Data type mappings
Data type of Realtime Compute for Apache Flink | Data type of Simple Log Service |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
Sample code
CREATE TEMPORARY TABLE sls_input(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` STRING METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
`time`,
url,
dt,
float_field,
double_field,
boolean_field,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;
DataStream API
If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to Realtime Compute for Apache Flink. For more information about how to configure a DataStream connector, see Usage of DataStream connectors.
Read data from Simple Log Service
VVR of Realtime Compute for Apache Flink provides the implementation class SlsSourceFunction of SourceFunction to read data from Simple Log Service. Sample code:
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Creates and adds SLS source and sink.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// The batch get size must be given.
accessInfo.setBatchGetSize(10);
// Optional parameters
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// time to start consuming, set to current time.
int startInSec = (int) (new Date().getTime() / 1000);
// time to stop consuming, -1 means never stop.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}
Write data to Simple Log Service
VVR of Realtime Compute for Apache Flink provides the implementation class SLSOutputFormat of OutputFormat to write data to Simple Log Service. Sample code:
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}
XML
The Simple Log Service DataStream connectors of different versions are stored in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
</dependency>