This topic describes how to use the Simple Log Service (SLS) connector.
Background information
Simple Log Service is an end-to-end service for log data. It helps you collect, consume, ship, query, and analyze log data quickly. It improves operations and maintenance efficiency and enables large-scale log processing.
The SLS connector supports the following types of information.
Category | Description |
Supported types | Source table and sink table |
Running mode | Streaming mode only |
Monitoring metrics | N/A |
Data format | N/A |
API type | SQL, DataStream API, and data ingestion YAML |
Update or delete data in the sink table | You cannot update or delete data in a sink table. You can only insert data into a sink table. |
Features
The SLS source connector reads message attribute fields directly. The following table lists the supported attribute fields.
Field name | Type | Description |
__source__ | STRING METADATA VIRTUAL | The message source. |
__topic__ | STRING METADATA VIRTUAL | The message topic. |
__timestamp__ | BIGINT METADATA VIRTUAL | The time when the log was generated. |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | The message tag. For the attribute |
Prerequisites
You have created an SLS project and a Logstore. For more information, see Create a project and a Logstore.
Limits
Only Ververica Runtime (VVR) 11.1 or later supports using SLS as a data ingestion source.
The SLS connector supports only at-least-once semantics.
Avoid setting the source parallelism higher than the number of shards. Doing so wastes resources. In VVR 8.0.5 or earlier, if the shard count changes after you set a high parallelism, automatic failover may fail. This can leave some shards unconsumed.
SQL
Syntax
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);WITH parameters
General
Parameters
Description
Data type
Required?
Default value
Remarks
connector
Table type.
String
Yes
None
Set it to sls.
endPoint
The endpoint address.
String
Yes
None
Enter the VPC endpoint of SLS. For more information, see Endpoints.
NoteBy default, Realtime Compute for Apache Flink cannot access the Internet. However, Alibaba Cloud provides NAT gateways to enable communication between VPCs and the Internet. For more information, see How do I access the Internet?.
Avoid accessing SLS over the Internet. If you must do so, use HTTPS and enable transfer acceleration for SLS.
project
The name of the SLS project.
String
Yes
None
None.
logStore
The name of an SLS Logstore or Metricstore.
String
Yes
None
Data in a Logstore is consumed in the same way as in a Metricstore.
accessId
The AccessKey ID of your Alibaba Cloud account.
String
Yes
No default value
For more information, see How do I view the AccessKey ID and AccessKey secret?.
ImportantTo protect your AccessKey pair, use variables to configure your AccessKey.
accessKey
The AccessKey secret of your Alibaba Cloud account.
String
Yes
No default value
Source-specific
Parameters
Description
Data type
Required?
Default value
Remarks
enableNewSource
Specifies whether to use the new source interface that implements FLIP-27.
Boolean
No
false
The new source adapts automatically to shard changes and distributes shards evenly across all source subtasks.
ImportantThis option is supported only in VVR 8.0.9 or later. Starting from VVR 11.1, this option defaults to true.
If you change this option, your job cannot resume from a saved state. To work around this, first start your job with the consumerGroup option to record the current consumer offset. Then, set consumeFromCheckpoint to true and restart your job without states.
If SLS contains read-only shards, some Flink subtasks may finish reading from those shards and then request other unread shards. This can cause uneven shard distribution across subtasks, reducing overall consumption efficiency and system performance. To reduce this imbalance, adjust the source parallelism, optimize task scheduling, or merge small shards.
shardDiscoveryIntervalMs
The interval at which shard changes are detected dynamically. Unit: milliseconds.
Long
No
60000
Set this option to a negative value to disable dynamic detection.
NoteThis option must be at least 1 minute (60,000 milliseconds).
This option takes effect only if enableNewSource is set to true.
This option is supported only in VVR 8.0.9 or later.
startupMode
The startup mode of the source table.
String
No
timestamp
timestamp(default): Consume logs starting from the specified time.latest: Consume logs starting from the latest offset.earliest: Consume logs starting from the earliest offset.consumer_group: Consume logs starting from the offset recorded in the consumer group. If no offset is recorded for a shard, consume logs starting from the earliest offset.
ImportantIn VVR versions earlier than 11.1, the consumer_group value is not supported. To consume logs from the offset recorded by the specified consumer group, set
consumeFromCheckpointtotrue. In this case, this startup mode will not take effect.
startTime
The time to start consuming logs.
String
No
Current time
Format:
yyyy-MM-dd hh:mm:ss.This option takes effect only if
startupModeis set totimestamp.NoteThe startTime and stopTime options are based on the __receive_time__ field in SLS, not the __timestamp__ field.
stopTime
The end time of the consumption log.
String
No
None
Format:
yyyy-MM-dd hh:mm:ss.NoteUse this option only to consume historical logs. Set it to a past time point. If you set it to a future time, consumption may stop unexpectedly if no new logs are written. This appears as a broken data stream with no error messages.
To exit the Flink program after log consumption finishes, also set exitAfterFinish to true.
consumerGroup
The name of the consumer group.
String
No
None
A consumer group records consumption progress. You can specify any custom name.
NoteYou cannot share a consumer group across multiple jobs for collaborative consumption. Use different consumer groups for different jobs. If you use the same consumer group for different jobs, each job consumes all data. When Flink consumes data from SLS, it does not assign shards through the SLS consumer group. So each job independently consumes all messages, even if they share the same consumer group.
consumeFromCheckpoint
Specifies whether to consume logs from the checkpoint saved in the specified consumer group.
String
No
false
true: If you set this parameter to true, you must also specify a consumer group. Flink consumes logs from the checkpoint that is stored in the consumer group. If no checkpoint exists in the consumer group, Flink consumes logs from the time specified by the startTime parameter.false(default): Flink does not consume logs from the checkpoint saved in the specified consumer group.
ImportantThis option is not supported in VVR 11.1 or later. For VVR 11.1 or later, set
startupModetoconsumer_group.maxRetries
The number of retries after reading from SLS fails.
String
No
3
None.
batchGetSize
The number of log groups to read per request.
String
No
100
Set
batchGetSizeto a value less than 1000. Otherwise, an error occurs.exitAfterFinish
Specifies whether the Flink program exits after data consumption finishes.
String
No
false
true: The Flink program exits after data consumption finishes.false(default): The Flink program does not exit after data consumption finishes.
query
ImportantThis option was deprecated in VVR 11.3 but remains compatible in later versions.
The query statement used to preprocess data before consuming SLS data.
String
No
No default value
Use the query option to filter SLS data before consumption. This avoids loading all data into Flink, reducing costs and improving processing speed.
For example,
'query' = '*| where request_method = ''GET'''filters logs where the request_method field equals GET before Flink reads them.NoteWrite queries using SPL syntax.
ImportantThis option is supported only in VVR 8.0.1 or later.
This feature incurs SLS fees. For details, see Billing.
processor
The SLS consumer processor. If both query and processor are set, query takes precedence.
String
No
None
Use the processor option to filter SLS data before consumption. This avoids loading all data into Flink, reducing costs and improving processing speed. We recommend using processor instead of query.
For example,
'processor' = 'test-filter-processor'applies the SLS consumer processor to filter data before Flink reads it.NoteWrite processors using SPL syntax. For details about creating and updating SLS consumer processors, see Manage consumer processors.
ImportantThis option is supported only in VVR 11.3 or later.
This feature incurs SLS fees. For details, see Billing.
Sink-specific
parameter
Description
Data type
Required?
Default value
Remarks
topicField
The name of a field whose value overrides the __topic__ field. This indicates the log topic.
String
No
None
This parameter specifies an existing field in the table.
timeField
The name of a field whose value overrides the __timestamp__ field. This indicates the log write time.
String
No
Current time
This field must exist in the table and its type must be INT. If not specified, the current time is used.
sourceField
The name of a field whose value overrides the __source__ field. This indicates the log source, such as the IP address of the machine that generated the log.
String
No
None
This field must exist in the table.
partitionField
The name of a field. A hash value is calculated from this field's value when writing data. Data with the same hash value is written to the same shard.
String
No
No default value
If not specified, each data entry is written randomly to an available shard.
buckets
The number of buckets to regroup by hash value when partitionField is specified.
String
No
64
Valid values: [1, 256]. The value must be a power of 2. The number of buckets must be greater than or equal to the number of shards. Otherwise, some shards receive no data.
flushIntervalMs
The interval at which data writes are triggered.
String
No
2000
Unit: milliseconds.
writeNullProperties
Specifies whether to write null values as empty strings to SLS.
Boolean
No
true
true(default): Write null values as empty strings.false: Do not write fields whose computed value is null.
NoteThis option is supported only in VVR 8.0.6 or later.
Data type mappings
Flink data type | SLS data type |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
Data ingestion (public preview)
Limits
Supported only by real-time computing engine Ververica Runtime (VVR) 11.1 and later.
Syntax
source:
type: sls
name: SLS Source
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>Configuration options
Parameter | Description | Data type | Required? | Default value | Remarks |
type | The type of the data source. | String | Yes | None | Set it to sls. |
endpoint | The endpoint address. | String | Yes | No default value | Enter the VPC endpoint of SLS. For more information, see Endpoints. Note
|
accessId | The AccessKey ID of your Alibaba Cloud account. | String | Yes | No default value | For more information, see How do I view the AccessKey ID and AccessKey secret?. Important To protect your AccessKey pair, use variables to configure your AccessKey. |
accessKey | The AccessKey secret of your Alibaba Cloud account. | String | Yes | None | |
project | The name of the SLS project. | String | Yes | None | None. |
logStore | The name of an SLS Logstore or Metricstore. | String | Yes | None | Data in a Logstore is consumed in the same way as in a Metricstore. |
schema.inference.strategy | The strategy for schema inference. | String | No | continuous |
|
maxPreFetchLogGroups | The maximum number of log groups to read and parse per shard during initial schema inference. | Integer | No | 50 | Before reading and processing data, the connector attempts to pre-consume a specified number of log groups from each shard to initialize the schema. |
shardDiscoveryIntervalMs | The interval at which shard changes are detected dynamically. Unit: milliseconds. | Long | No | 60000 | Set this option to a negative value to disable dynamic detection. Note This option must be at least 1 minute (60,000 milliseconds). |
startupMode | The startup mode. | String | No | No default value |
|
startTime | The time to start consuming logs. | String | No | Current time | Format: yyyy-MM-dd hh:mm:ss. This option takes effect only if startupMode is set to timestamp. Note The startTime and stopTime options are based on the __receive_time__ field in SLS, not the __timestamp__ field. |
stopTime | The end time for consuming logs. | String | No | No default value | Format: yyyy-MM-dd hh:mm:ss. Note To exit the Flink program after log consumption finishes, also set exitAfterFinish to true. |
consumerGroup | The name of the consumer group. | String | No | None | A consumer group records consumption progress. You can specify any custom name. |
batchGetSize | The number of log groups to read per request. | Integer | No | 100 | batchGetSize must be less than 1000. Otherwise, an error occurs. |
maxRetries | The number of retries after reading from SLS fails. | Integer | No | 3 | None |
exitAfterFinish | Specifies whether the Flink program exits after data consumption finishes. | Boolean | No | false |
|
query | The query statement used to preprocess data before consuming SLS data. | String | No | No default value | Use the query option to filter SLS data before consumption. This avoids loading all data into Flink, reducing costs and improving processing speed. For example, Note Write queries using SPL syntax. Important
|
compressType | The compression type used by SLS. | String | No | No default value | Supported compression types include the following:
|
timeZone | The time zone for startTime and stopTime. | String | No | No default value | No offset is added by default. |
regionId | The region where SLS is deployed. | String | No | No default value | For configuration details, see Regions. |
signVersion | The signature version used for SLS requests. | String | No | No default value | For configuration details, see Request signatures. |
shardModDivisor | The divisor used when reading from SLS Logstore shards. | Int | No | -1 | For configuration details, see Shard. |
shardModRemainder | The remainder used when reading from SLS Logstore shards. | Int | No | -1 | For configuration details, see Shard. |
metadata.list | The metadata columns passed to downstream. | String | No | None | Available metadata fields include |
Data type mappings
The data type mappings for data ingestion are as follows:
SLS data type | CDC Field Types |
STRING | STRING |
Schema inference and evolution
Pre-consumption and schema initialization
The SLS connector maintains the schema of the current Logstore. Before reading data from the Logstore, the connector attempts to pre-consume up to maxPreFetchLogGroups log groups from each shard. It parses the schema of each log and merges them to initialize the table schema. Later, before actual consumption begins, it generates a table creation event based on the initialized schema.
NoteFor each shard, the connector attempts to consume data one hour before the current time to parse the schema.
Primary key
SLS logs do not contain primary keys. Add primary keys manually using the transform rule:
transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2Schema inference and evolution
After schema initialization, if schema.inference.strategy is set to static, the connector parses each log entry using the initial schema and does not generate schema change events. If schema.inference.strategy is set to continuous, the connector parses each log entry, infers physical columns, and compares them with the current schema. If the inferred schema differs from the current schema, the connector merges them using these rules:
If the inferred schema includes fields not in the current schema, add those fields to the current schema and generate nullable column addition events.
If the inferred schema excludes fields present in the current schema, retain those fields and set their values to NULL. Do not generate column deletion events.
The SLS connector infers all fields as strings. Currently, only column additions are supported. New columns are appended to the end of the current schema and marked as nullable.
Sample code
SQL source table and sink table
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;Data ingestion source
You can use SLS as a data ingestion source to write SLS data in real time to supported downstream systems. For example, you can configure a data ingestion job to write data from a Logstore to a DLF data lake in Paimon format. The job automatically infers data types and the sink table schema and supports dynamic schema evolution during runtime.
source:
type: sls
name: SLS Source
endpoint: ${endpoint}
project: test_project
logstore: test_log
accessId: ${accessId}
accessKey: ${accessKey}
# Add primary key information to the table
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# Write all data from test_log to the test_database.inventory table
route:
- source-table: test_project.test_log
sink-table: test_database.inventory
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Enable deletion vectors to improve read performance
table.properties.deletion-vectors.enabled: trueDataStream API
To read or write data using the DataStream API, use the corresponding DataStream connector to connect to Realtime Compute for Apache Flink. For more information, see Usage of DataStream connectors.
If you use VVR versions earlier than 8.0.10, your job may fail to start because dependency JAR packages are missing. To resolve this, add the corresponding uber JAR package as an additional dependency.
Read data from SLS
Realtime Compute for Apache Flink provides the SlsSourceFunction implementation of SourceFunction to read data from SLS. Sample code:
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Creates and adds SLS source and sink.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// The batch get size must be given.
accessInfo.setBatchGetSize(10);
// Optional parameters
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// time to start consuming, set to current time.
int startInSec = (int) (new Date().getTime() / 1000);
// time to stop consuming, -1 means never stop.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}Write data to SLS
Realtime Compute for Apache Flink provides the SLSOutputFormat implementation of OutputFormat to write data to SLS. Sample code:
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}XML
The SLS DataStream connector is available in the Maven Central Repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-format-common</artifactId>
</exclusion>
</exclusions>
</dependency>