This topic describes how to use the Simple Log Service (SLS) connector.
Background information
Simple Log Service is an end-to-end data logging service. It helps you efficiently collect, consume, deliver, query, and analyze log data. This improves O&M and operational efficiency and helps you build the capacity to process massive amounts of log data.
The SLS connector supports the following types of information.
Category | Details |
Supported type | Source and Sink Tables |
Running mode | Streaming mode only |
Specific monitoring metrics | Not applicable |
Data format | None |
API type | SQL, Datastream, and data ingestion YAML |
Can you update or delete data in a sink table? | You cannot update or delete data in a sink table. You can only insert data. |
Features
The SLS connector for source tables supports reading attribute fields of messages directly. The following table describes the supported attribute fields.
Field name | Field type | Field description |
__source__ | STRING METADATA VIRTUAL | The message source. |
__topic__ | STRING METADATA VIRTUAL | The message topic. |
__timestamp__ | BIGINT METADATA VIRTUAL | The log time. |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | The message tag. For the attribute |
Prerequisites
You have created a Simple Log Service project and Logstore. For more information, see Create a project and a Logstore.
Limits
Only Realtime Compute for Apache Flink VVR 11.1 or later supports using Simple Log Service (SLS) as a synchronization data source for data ingestion in YAML.
The SLS connector guarantees only at-least-once semantics.
We strongly recommend that you do not set the source concurrency to a value greater than the number of shards. This practice wastes resources and may cause the automatic failover feature to fail in VVR 8.0.5 or earlier if the number of shards changes. This can result in some shards not being consumed.
SQL
Syntax
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);WITH parameters
General
Parameter
Description
Data type
Required
Default value
Remarks
connector
The table type.
String
Yes
None
Set this parameter to sls.
endPoint
The endpoint.
String
Yes
None
Enter the private network endpoint of SLS. For more information, see Endpoints.
NoteBy default, Realtime Compute for Apache Flink cannot access the public network. However, Alibaba Cloud provides NAT Gateway to enable communication between a VPC and the public network. For more information, see How do I access the public network?.
We do not recommend that you access SLS over the public network. If you must access SLS over the public network, use the HTTPS protocol and enable Global Accelerator (GA) for SLS. For more information, see Manage transfer acceleration.
project
The name of the SLS project.
String
Yes
None
None.
logStore
The name of the SLS Logstore or metricstore.
String
Yes
None
Data in a Logstore is consumed in the same way as in a metricstore.
accessId
The AccessKey ID of your Alibaba Cloud account.
String
Yes
None
For more information, see How do I view the AccessKey ID and AccessKey secret?.
ImportantTo prevent your AccessKey information from being leaked, we recommend that you use variables to specify the AccessKey pair. For more information, see Manage variables.
accessKey
The AccessKey secret of your Alibaba Cloud account.
String
Yes
None
Source table-specific
Parameter
Description
Data type
Required
Default value
Remarks
enableNewSource
Specifies whether to enable the new data source that implements the FLIP-27 interface.
Boolean
No
false
The new data source can automatically adapt to shard changes and ensure that shards are distributed as evenly as possible across all source concurrencies.
ImportantThis parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.9 or later. This parameter is set to true by default from VVR 11.1.
A job cannot be restored from a state after this configuration item is changed. To resolve this issue, you can first set the consumerGroup configuration item to start the job and record the consumption progress in the SLS consumer group. Then, set the consumeFromCheckpoint configuration item to true and start the job in a stateless manner. This way, the job can resume consumption from the historical progress.
If read-only shards exist in SLS, some concurrent Flink tasks continue to request data from other unfinished shards after they finish consuming data from the read-only shards. This can lead to some concurrent tasks being assigned to multiple shards, causing an imbalance in shard distribution among them. This imbalance affects overall consumption efficiency and system performance. To mitigate this issue, adjust the concurrency, optimize the task scheduling policy, or merge small shards to reduce the number of shards and the complexity of task assignment.
shardDiscoveryIntervalMs
The interval at which shard changes are dynamically detected. Unit: millisecond.
Long
No
60000
Set this parameter to a negative value to disable dynamic detection.
NoteThe value of this parameter cannot be less than 1 minute (60,000 milliseconds).
This parameter takes effect only when the enableNewSource parameter is set to true.
This parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.9 or later.
startupMode
The startup mode of the source table.
String
No
timestamp
timestamp(default): Logs are consumed from the specified start time.latest: Logs are consumed from the latest offset.earliest: Logs are consumed from the earliest offset.consumer_group: Logs are consumed from the offset recorded in the consumer group. If the consumer group does not record the consumer offset of a shard, logs are consumed from the earliest offset.
ImportantIn VVR versions earlier than 11.1, the consumer_group value is not supported. You must set
consumeFromCheckpointtotrue. In this case, logs are consumed from the offset recorded in the specified consumer group, and the startup mode specified here does not take effect.
startTime
The time when log consumption starts.
String
No
Current time
The format is
yyyy-MM-dd hh:mm:ss.This parameter takes effect only when
startupModeis set totimestamp.NoteThe startTime and stopTime parameters are based on the __receive_time__ attribute in SLS, not the __timestamp__ attribute.
stopTime
The time when log consumption ends.
String
No
None
The format is
yyyy-MM-dd hh:mm:ss.NoteThis parameter is used only to consume historical logs and should be set to a past point in time. If you set it to a future time, consumption may stop prematurely if no new logs are written. This appears as a data stream interruption with no error message.
If you want the Flink program to exit after log consumption is complete, you must also set exitAfterFinish=true.
consumerGroup
The name of the consumer group.
String
No
None
A consumer group is used to record consumption progress. You can specify a custom name for the consumer group. The format is not fixed.
NoteMultiple jobs cannot use the same consumer group for collaborative consumption. Different Flink jobs should have different consumer groups. If different Flink jobs use the same consumer group, they will consume all the data. This is because when Flink consumes data from SLS, it does not perform partition allocation through the SLS consumer group. As a result, each consumer consumes its own messages independently, even if the consumer group is the same.
consumeFromCheckpoint
Specifies whether to consume logs from the checkpoint that is saved in the specified consumer group.
String
No
false
true: You must also specify a consumer group. The Flink program consumes logs from the checkpoint that is saved in the consumer group. If the consumer group does not have a corresponding checkpoint, logs are consumed from the time specified by the startTime parameter.false(default): Logs are not consumed from the checkpoint that is saved in the specified consumer group.
ImportantThis parameter is no longer supported from VVR 11.1. For VVR 11.1 and later, you must set
startupModetoconsumer_group.maxRetries
The number of retries after a failed attempt to read data from SLS.
String
No
3
None.
batchGetSize
The number of log groups to read in a single request.
String
No
100
The value of
batchGetSizecannot exceed 1,000. Otherwise, an error is reported.exitAfterFinish
Specifies whether the Flink program exits after data consumption is complete.
String
No
false
true: The Flink program exits after data consumption is complete.false(default): The Flink program does not exit after data consumption is complete.
query
ImportantThis parameter is deprecated in VVR 11.3 and later, but remains compatible with subsequent versions.
The pre-processing statement for SLS data consumption.
String
No
None
Using the query parameter lets you filter data before it is consumed from SLS. This prevents all data from being consumed into Flink, which saves costs and improves processing speed.
For example,
'query' = '*| where request_method = ''GET'''indicates that before Flink reads data from SLS, it matches data where the value of the request_method field is 'get'.NoteThe query parameter requires the use of Structured Process Language (SPL) for Simple Log Service. For more information, see SPL syntax.
ImportantThis parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.1 or later.
This feature incurs fees from Simple Log Service. For more information, see Billing.
processor
The SLS Consum Processor. If both this parameter and `query` are configured, `query` takes precedence and `processor` does not take effect.
String
No
None
Using the processor parameter lets you filter data before it is consumed by Flink. This helps save costs and improve processing speed. We recommend that you use the processor parameter instead of the query parameter.
For example,
'processor' = 'test-filter-processor'indicates that data is filtered by the SLS Consum Processor before it is read by Flink.NoteThe processor must use the Structured Process Language (SPL) of Simple Log Service (SLS). For more information, see SPL syntax. To create and update SLS Consum Processors, see Manage Consum Processors.
ImportantThis parameter is supported only in Realtime Compute for Apache Flink VVR 11.3 or later.
This feature incurs fees from Simple Log Service. For more information, see Billing.
For sink tables only
Parameter
Description
Data type
Required
Default value
Remarks
topicField
Specifies a field name. The value of this field overwrites the value of the __topic__ attribute field and indicates the topic of the log.
String
No
None
The value of this parameter must be one of the existing fields in the table.
timeField
Specifies a field name. The value of this field overwrites the value of the __timestamp__ attribute field and indicates the time when the log is written.
String
No
Current time
The value of this parameter must be one of the existing fields in the table, and the field type must be INT. If this parameter is not specified, the current time is used by default.
sourceField
Specifies a field name. The value of this field overwrites the value of the __source__ attribute field and indicates the source of the log, such as the IP address of the machine that generates the log.
String
No
None
The value of this parameter must be one of the existing fields in the table.
partitionField
Specifies a field name. When data is written, a hash value is calculated based on the value of this column. Data with the same hash value is written to the same shard.
String
No
None
If this parameter is not specified, each piece of data is randomly written to a currently available shard.
buckets
The number of groups that are re-grouped based on the hash value when partitionField is specified.
String
No
64
The value of this parameter must be in the range of [1, 256] and must be an integer power of 2. The number of buckets must be greater than or equal to the number of shards. Otherwise, no data is written to some shards.
flushIntervalMs
The period that triggers data writes.
String
No
2000
Unit: millisecond.
writeNullProperties
Specifies whether to write null values as empty strings to SLS.
Boolean
No
true
true(default): Null values are written as empty strings to logs.false: Fields with null values are not written to logs.
NoteThis parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.6 or later.
Type mapping
Flink field type | SLS field type |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
Data ingestion (public preview)
Limits
This feature is supported only in Realtime Compute for Apache Flink VVR 11.1 or later.
Syntax
source:
type: sls
name: SLS Source
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>Configuration items
Parameter | Description | Data type | Required | Default value | Remarks |
type | The data source type. | String | Yes | None | Set this parameter to sls. |
endpoint | The endpoint. | String | Yes | None | Enter the private network endpoint of SLS. For more information, see Endpoints. Note
|
accessId | The AccessKey ID of your Alibaba Cloud account. | String | Yes | None | For more information, see How do I view the AccessKey ID and AccessKey secret?. Important To prevent your AccessKey information from being leaked, we recommend that you use variables to specify the AccessKey pair. For more information, see Manage variables. |
accessKey | The AccessKey secret of your Alibaba Cloud account. | String | Yes | None | |
project | The name of the SLS project. | String | Yes | None | None. |
logStore | The name of the SLS Logstore or metricstore. | String | Yes | None | Data in a Logstore is consumed in the same way as in a metricstore. |
schema.inference.strategy | The schema inference strategy. | String | No | continuous |
|
maxPreFetchLogGroups | The maximum number of log groups to try to read and parse for each shard during initial schema inference. | Integer | No | 50 | Before the job actually reads and processes data, it attempts to pre-consume a specified number of log groups for each shard to initialize the schema information. |
shardDiscoveryIntervalMs | The interval at which shard changes are dynamically detected. Unit: millisecond. | Long | No | 60000 | Set this parameter to a negative value to disable dynamic detection. Note The value of this parameter cannot be less than 1 minute (60,000 milliseconds). |
startupMode | The startup mode. | String | No | None |
|
startTime | The time when log consumption starts. | String | No | Current time | The format is yyyy-MM-dd hh:mm:ss. This parameter takes effect only when startupMode is set to timestamp. Note The startTime and stopTime parameters are based on the __receive_time__ attribute in SLS, not the __timestamp__ attribute. |
stopTime | The time when log consumption ends. | String | No | None | The format is yyyy-MM-dd hh:mm:ss. Note If you want the Flink program to exit after log consumption is complete, you must also set exitAfterFinish=true. |
consumerGroup | The name of the consumer group. | String | No | None | A consumer group is used to record consumption progress. You can specify a custom name for the consumer group. The format is not fixed. |
batchGetSize | The number of log groups to read in a single request. | Integer | No | 100 | The value of batchGetSize cannot exceed 1,000. Otherwise, an error is reported. |
maxRetries | The number of retries after a failed attempt to read data from SLS. | Integer | No | 3 | None. |
exitAfterFinish | Specifies whether the Flink program exits after data consumption is complete. | Boolean | No | false |
|
query | The pre-processing statement for SLS data consumption. | String | No | None | Using the query parameter lets you filter data before it is consumed from SLS. This prevents all data from being consumed into Flink, which saves costs and improves processing speed. For example, Note The query parameter requires the use of Structured Process Language (SPL) for Simple Log Service. For more information, see SPL syntax. Important
|
compressType | The compression type for SLS. | String | No | None | Supported compression types include the following:
|
timeZone | The time zone for startTime and stopTime. | String | No | None | By default, no offset is added. |
regionId | The region where the SLS service is activated. | String | No | None | See Supported regions for configuration. |
signVersion | The version of the SLS request signature. | String | No | None | For more information about configuration, see Request Signature. |
shardModDivisor | The divisor for reading partitions of an SLS Logstore. | Int | No | -1 | For more information, see the shard documentation. |
shardModRemainder | The remainder for reading partitions of an SLS Logstore. | Int | No | -1 | See Shard to configure this option. |
metadata.list | The metadata columns to pass to the downstream. | String | No | None | Available metadata fields include |
Type mapping
The following table shows the type mapping for data ingestion.
SLS field type | CDC field type |
STRING | STRING |
Table schema inference and change synchronization
Shard data pre-consumption and table schema initialization
The SLS connector maintains the schema of the Logstore that is being read. Before it reads data from the Logstore, the SLS connector attempts to pre-consume a maximum of maxPreFetchLogGroups log groups from each shard. The connector then parses the schema of each log in these log groups and merges the schemas to initialize the table schema. Before the actual data consumption starts, a corresponding table creation event is generated based on the initialized schema.
NoteFor each shard, the SLS connector attempts to consume data and parse the log schema starting from one hour before the current time.
Primary key information
SLS logs do not contain primary key information. You can manually add primary keys to a table using transform rules:
transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2Schema inference and schema evolution
After the table schema is initialized, if schema.inference.strategy is set to static, the SLS connector parses each log based on the initial table schema and does not generate schema change events. If schema.inference.strategy is set to continuous, the SLS connector parses the data of each log, infers the physical columns, and compares them with the currently recorded schema. If the inferred schema is inconsistent with the current schema, the schemas are merged based on the following rules:
If the inferred physical columns contain fields that are not in the current schema, these fields are added to the schema, and an event for adding nullable columns is generated.
If the inferred physical columns do not contain fields that are already in the current schema, these fields are retained, their data is filled with NULL, and no event for deleting columns is generated.
The SLS connector infers the type of all fields in each log as String. Currently, only adding columns is supported. New columns are added to the end of the current schema and are set as nullable columns.
Code examples
SQL source table and sink table
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;Data ingestion data source
You can use SLS as a data source for data ingestion jobs to write SLS data in real time to supported downstream systems. For example, you can configure a data ingestion job as follows to write data from a Logstore to a DLF data lake in the paimon format. The job automatically infers the data types of fields and the schema of the downstream table. The job also supports dynamic schema evolution at runtime.
source:
type: sls
name: SLS Source
endpoint: ${endpoint}
project: test_project
logstore: test_log
accessId: ${accessId}
accessKey: ${accessKey}
# Add primary key information to the table.
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
# Write all data from test_log to the test_database.inventory table.
route:
- source-table: test_project.test_log
sink-table: test_database.inventory
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueDataStream API
When you read or write data using the DataStream API, you must use the corresponding DataStream connector to connect to Flink. For more information, see Use DataStream connectors.
If you use a VVR version earlier than 8.0.10, dependency JAR packages may be missing when you start a job. To resolve this issue, you can add the corresponding uber package to the additional dependencies.
Read data from SLS
Realtime Compute for Apache Flink provides the SlsSourceFunction class, which is an implementation of SourceFunction, to read data from SLS. The following code provides an example.
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Creates and adds SLS source and sink.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// The batch get size must be given.
accessInfo.setBatchGetSize(10);
// Optional parameters
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// time to start consuming, set to current time.
int startInSec = (int) (new Date().getTime() / 1000);
// time to stop consuming, -1 means never stop.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}Write data to SLS
The SLSOutputFormat class, which is an implementation of OutputFormat, is provided to write data to SLS. The following code provides an example.
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}XML
The SLS DataStream connector is available in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
</dependency>