All Products
Search
Document Center

Realtime Compute for Apache Flink:Simple Log Service (SLS)

Last Updated:Feb 12, 2026

This topic describes how to use the Simple Log Service (SLS) connector.

Background information

Simple Log Service is an end-to-end service for log data. It helps you collect, consume, ship, query, and analyze log data quickly. It improves operations and maintenance efficiency and enables large-scale log processing.

The SLS connector supports the following types of information.

Category

Description

Supported types

Source table and sink table

Running mode

Streaming mode only

Monitoring metrics

N/A

Data format

N/A

API type

SQL, DataStream API, and data ingestion YAML

Update or delete data in the sink table

You cannot update or delete data in a sink table. You can only insert data into a sink table.

Features

The SLS source connector reads message attribute fields directly. The following table lists the supported attribute fields.

Field name

Type

Description

__source__

STRING METADATA VIRTUAL

The message source.

__topic__

STRING METADATA VIRTUAL

The message topic.

__timestamp__

BIGINT METADATA VIRTUAL

The time when the log was generated.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

The message tag.

For the attribute "__tag__:__receive_time__":"1616742274", '__receive_time__' and '1616742274' are stored as key-value pairs in a map. In SQL, access them using __tag__['__receive_time__'].

Prerequisites

You have created an SLS project and a Logstore. For more information, see Create a project and a Logstore.

Limits

  • Only Ververica Runtime (VVR) 11.1 or later supports using SLS as a data ingestion source.

  • The SLS connector supports only at-least-once semantics.

  • Avoid setting the source parallelism higher than the number of shards. Doing so wastes resources. In VVR 8.0.5 or earlier, if the shard count changes after you set a high parallelism, automatic failover may fail. This can leave some shards unconsumed.

SQL

Syntax

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH parameters

  • General

    Parameters

    Description

    Data type

    Required?

    Default value

    Remarks

    connector

    Table type.

    String

    Yes

    None

    Set it to sls.

    endPoint

    The endpoint address.

    String

    Yes

    None

    Enter the VPC endpoint of SLS. For more information, see Endpoints.

    Note
    • By default, Realtime Compute for Apache Flink cannot access the Internet. However, Alibaba Cloud provides NAT gateways to enable communication between VPCs and the Internet. For more information, see How do I access the Internet?.

    • Avoid accessing SLS over the Internet. If you must do so, use HTTPS and enable transfer acceleration for SLS.

    project

    The name of the SLS project.

    String

    Yes

    None

    None.

    logStore

    The name of an SLS Logstore or Metricstore.

    String

    Yes

    None

    Data in a Logstore is consumed in the same way as in a Metricstore.

    accessId

    The AccessKey ID of your Alibaba Cloud account.

    String

    Yes

    No default value

    For more information, see How do I view the AccessKey ID and AccessKey secret?.

    Important

    To protect your AccessKey pair, use variables to configure your AccessKey.

    accessKey

    The AccessKey secret of your Alibaba Cloud account.

    String

    Yes

    No default value

  • Source-specific

    Parameters

    Description

    Data type

    Required?

    Default value

    Remarks

    enableNewSource

    Specifies whether to use the new source interface that implements FLIP-27.

    Boolean

    No

    false

    The new source adapts automatically to shard changes and distributes shards evenly across all source subtasks.

    Important
    • This option is supported only in VVR 8.0.9 or later. Starting from VVR 11.1, this option defaults to true.

    • If you change this option, your job cannot resume from a saved state. To work around this, first start your job with the consumerGroup option to record the current consumer offset. Then, set consumeFromCheckpoint to true and restart your job without states.

    • If SLS contains read-only shards, some Flink subtasks may finish reading from those shards and then request other unread shards. This can cause uneven shard distribution across subtasks, reducing overall consumption efficiency and system performance. To reduce this imbalance, adjust the source parallelism, optimize task scheduling, or merge small shards.

    shardDiscoveryIntervalMs

    The interval at which shard changes are detected dynamically. Unit: milliseconds.

    Long

    No

    60000

    Set this option to a negative value to disable dynamic detection.

    Note
    • This option must be at least 1 minute (60,000 milliseconds).

    • This option takes effect only if enableNewSource is set to true.

    • This option is supported only in VVR 8.0.9 or later.

    startupMode

    The startup mode of the source table.

    String

    No

    timestamp

    • timestamp (default): Consume logs starting from the specified time.

    • latest: Consume logs starting from the latest offset.

    • earliest: Consume logs starting from the earliest offset.

    • consumer_group: Consume logs starting from the offset recorded in the consumer group. If no offset is recorded for a shard, consume logs starting from the earliest offset.

    Important
    • In VVR versions earlier than 11.1, the consumer_group value is not supported. To consume logs from the offset recorded by the specified consumer group, set consumeFromCheckpoint to true. In this case, this startup mode will not take effect.

    startTime

    The time to start consuming logs.

    String

    No

    Current time

    Format: yyyy-MM-dd hh:mm:ss.

    This option takes effect only if startupMode is set to timestamp.

    Note

    The startTime and stopTime options are based on the __receive_time__ field in SLS, not the __timestamp__ field.

    stopTime

    The end time of the consumption log.

    String

    No

    None

    Format: yyyy-MM-dd hh:mm:ss.

    Note
    • Use this option only to consume historical logs. Set it to a past time point. If you set it to a future time, consumption may stop unexpectedly if no new logs are written. This appears as a broken data stream with no error messages.

    • To exit the Flink program after log consumption finishes, also set exitAfterFinish to true.

    consumerGroup

    The name of the consumer group.

    String

    No

    None

    A consumer group records consumption progress. You can specify any custom name.

    Note

    You cannot share a consumer group across multiple jobs for collaborative consumption. Use different consumer groups for different jobs. If you use the same consumer group for different jobs, each job consumes all data. When Flink consumes data from SLS, it does not assign shards through the SLS consumer group. So each job independently consumes all messages, even if they share the same consumer group.

    consumeFromCheckpoint

    Specifies whether to consume logs from the checkpoint saved in the specified consumer group.

    String

    No

    false

    • true: If you set this parameter to true, you must also specify a consumer group. Flink consumes logs from the checkpoint that is stored in the consumer group. If no checkpoint exists in the consumer group, Flink consumes logs from the time specified by the startTime parameter.

    • false (default): Flink does not consume logs from the checkpoint saved in the specified consumer group.

    Important

    This option is not supported in VVR 11.1 or later. For VVR 11.1 or later, set startupMode to consumer_group.

    maxRetries

    The number of retries after reading from SLS fails.

    String

    No

    3

    None.

    batchGetSize

    The number of log groups to read per request.

    String

    No

    100

    Set batchGetSize to a value less than 1000. Otherwise, an error occurs.

    exitAfterFinish

    Specifies whether the Flink program exits after data consumption finishes.

    String

    No

    false

    • true: The Flink program exits after data consumption finishes.

    • false (default): The Flink program does not exit after data consumption finishes.

    query

    Important

    This option was deprecated in VVR 11.3 but remains compatible in later versions.

    The query statement used to preprocess data before consuming SLS data.

    String

    No

    No default value

    Use the query option to filter SLS data before consumption. This avoids loading all data into Flink, reducing costs and improving processing speed.

    For example, 'query' = '*| where request_method = ''GET''' filters logs where the request_method field equals GET before Flink reads them.

    Note

    Write queries using SPL syntax.

    Important
    • This option is supported only in VVR 8.0.1 or later.

    • This feature incurs SLS fees. For details, see Billing.

    processor

    The SLS consumer processor. If both query and processor are set, query takes precedence.

    String

    No

    None

    Use the processor option to filter SLS data before consumption. This avoids loading all data into Flink, reducing costs and improving processing speed. We recommend using processor instead of query.

    For example, 'processor' = 'test-filter-processor' applies the SLS consumer processor to filter data before Flink reads it.

    Note

    Write processors using SPL syntax. For details about creating and updating SLS consumer processors, see Manage consumer processors.

    Important

    This option is supported only in VVR 11.3 or later.

    This feature incurs SLS fees. For details, see Billing.

  • Sink-specific

    parameter

    Description

    Data type

    Required?

    Default value

    Remarks

    topicField

    The name of a field whose value overrides the __topic__ field. This indicates the log topic.

    String

    No

    None

    This parameter specifies an existing field in the table.

    timeField

    The name of a field whose value overrides the __timestamp__ field. This indicates the log write time.

    String

    No

    Current time

    This field must exist in the table and its type must be INT. If not specified, the current time is used.

    sourceField

    The name of a field whose value overrides the __source__ field. This indicates the log source, such as the IP address of the machine that generated the log.

    String

    No

    None

    This field must exist in the table.

    partitionField

    The name of a field. A hash value is calculated from this field's value when writing data. Data with the same hash value is written to the same shard.

    String

    No

    No default value

    If not specified, each data entry is written randomly to an available shard.

    buckets

    The number of buckets to regroup by hash value when partitionField is specified.

    String

    No

    64

    Valid values: [1, 256]. The value must be a power of 2. The number of buckets must be greater than or equal to the number of shards. Otherwise, some shards receive no data.

    flushIntervalMs

    The interval at which data writes are triggered.

    String

    No

    2000

    Unit: milliseconds.

    writeNullProperties

    Specifies whether to write null values as empty strings to SLS.

    Boolean

    No

    true

    • true (default): Write null values as empty strings.

    • false: Do not write fields whose computed value is null.

    Note

    This option is supported only in VVR 8.0.6 or later.

Data type mappings

Flink data type

SLS data type

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Data ingestion (public preview)

Limits

Supported only by real-time computing engine Ververica Runtime (VVR) 11.1 and later.

Syntax

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

Configuration options

Parameter

Description

Data type

Required?

Default value

Remarks

type

The type of the data source.

String

Yes

None

Set it to sls.

endpoint

The endpoint address.

String

Yes

No default value

Enter the VPC endpoint of SLS. For more information, see Endpoints.

Note
  • By default, Realtime Compute for Apache Flink cannot access the Internet. However, Alibaba Cloud provides NAT gateways to enable communication between VPCs and the Internet. For more information, see How do I access the Internet?.

  • Avoid accessing SLS over the Internet. If you must do so, use HTTPS and enable transfer acceleration for SLS.

accessId

The AccessKey ID of your Alibaba Cloud account.

String

Yes

No default value

For more information, see How do I view the AccessKey ID and AccessKey secret?.

Important

To protect your AccessKey pair, use variables to configure your AccessKey.

accessKey

The AccessKey secret of your Alibaba Cloud account.

String

Yes

None

project

The name of the SLS project.

String

Yes

None

None.

logStore

The name of an SLS Logstore or Metricstore.

String

Yes

None

Data in a Logstore is consumed in the same way as in a Metricstore.

schema.inference.strategy

The strategy for schema inference.

String

No

continuous

  • continuous: Perform schema inference for each data entry. If schemas are incompatible, infer a wider schema and generate schema change events.

  • static: Perform schema inference once at job startup. Parse subsequent data using the initial schema. No schema change events are generated.

maxPreFetchLogGroups

The maximum number of log groups to read and parse per shard during initial schema inference.

Integer

No

50

Before reading and processing data, the connector attempts to pre-consume a specified number of log groups from each shard to initialize the schema.

shardDiscoveryIntervalMs

The interval at which shard changes are detected dynamically. Unit: milliseconds.

Long

No

60000

Set this option to a negative value to disable dynamic detection.

Note

This option must be at least 1 minute (60,000 milliseconds).

startupMode

The startup mode.

String

No

No default value

  • timestamp (default): Consume logs starting from the specified time.

  • latest: Consume logs starting from the latest offset.

  • earliest: Consume logs starting from the earliest offset.

  • consumer_group: Consume logs starting from the offset recorded in the consumer group. If no offset is recorded for a shard, consume logs starting from the earliest offset.

startTime

The time to start consuming logs.

String

No

Current time

Format: yyyy-MM-dd hh:mm:ss.

This option takes effect only if startupMode is set to timestamp.

Note

The startTime and stopTime options are based on the __receive_time__ field in SLS, not the __timestamp__ field.

stopTime

The end time for consuming logs.

String

No

No default value

Format: yyyy-MM-dd hh:mm:ss.

Note

To exit the Flink program after log consumption finishes, also set exitAfterFinish to true.

consumerGroup

The name of the consumer group.

String

No

None

A consumer group records consumption progress. You can specify any custom name.

batchGetSize

The number of log groups to read per request.

Integer

No

100

batchGetSize must be less than 1000. Otherwise, an error occurs.

maxRetries

The number of retries after reading from SLS fails.

Integer

No

3

None

exitAfterFinish

Specifies whether the Flink program exits after data consumption finishes.

Boolean

No

false

  • true: The Flink program exits after data consumption finishes.

  • false (default): The Flink program does not exit after data consumption finishes.

query

The query statement used to preprocess data before consuming SLS data.

String

No

No default value

Use the query option to filter SLS data before consumption. This avoids loading all data into Flink, reducing costs and improving processing speed.

For example, 'query' = '*| where request_method = ''GET''' indicates filtering data where the request_method field equals GET before Flink reads SLS data.

Note

Write queries using SPL syntax.

Important
  • For regions that support this feature, see Consume logs based on rules.

  • This feature is free during public preview. You may be billed in the future. For details, see Billing.

compressType

The compression type used by SLS.

String

No

No default value

Supported compression types include the following:

  • lz4

  • deflate

  • zstd

timeZone

The time zone for startTime and stopTime.

String

No

No default value

No offset is added by default.

regionId

The region where SLS is deployed.

String

No

No default value

For configuration details, see Regions.

signVersion

The signature version used for SLS requests.

String

No

No default value

For configuration details, see Request signatures.

shardModDivisor

The divisor used when reading from SLS Logstore shards.

Int

No

-1

For configuration details, see Shard.

shardModRemainder

The remainder used when reading from SLS Logstore shards.

Int

No

-1

For configuration details, see Shard.

metadata.list

The metadata columns passed to downstream.

String

No

None

Available metadata fields include __source__, __topic__, __timestamp__, and __tag__. Separate them with commas.

Data type mappings

The data type mappings for data ingestion are as follows:

SLS data type

CDC Field Types

STRING

STRING

Schema inference and evolution

  • Pre-consumption and schema initialization

    The SLS connector maintains the schema of the current Logstore. Before reading data from the Logstore, the connector attempts to pre-consume up to maxPreFetchLogGroups log groups from each shard. It parses the schema of each log and merges them to initialize the table schema. Later, before actual consumption begins, it generates a table creation event based on the initialized schema.

    Note

    For each shard, the connector attempts to consume data one hour before the current time to parse the schema.

  • Primary key

    SLS logs do not contain primary keys. Add primary keys manually using the transform rule:

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • Schema inference and evolution

    After schema initialization, if schema.inference.strategy is set to static, the connector parses each log entry using the initial schema and does not generate schema change events. If schema.inference.strategy is set to continuous, the connector parses each log entry, infers physical columns, and compares them with the current schema. If the inferred schema differs from the current schema, the connector merges them using these rules:

    • If the inferred schema includes fields not in the current schema, add those fields to the current schema and generate nullable column addition events.

    • If the inferred schema excludes fields present in the current schema, retain those fields and set their values to NULL. Do not generate column deletion events.

    The SLS connector infers all fields as strings. Currently, only column additions are supported. New columns are appended to the end of the current schema and marked as nullable.

Sample code

  • SQL source table and sink table

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • Data ingestion source

    You can use SLS as a data ingestion source to write SLS data in real time to supported downstream systems. For example, you can configure a data ingestion job to write data from a Logstore to a DLF data lake in Paimon format. The job automatically infers data types and the sink table schema and supports dynamic schema evolution during runtime.

source:
  type: sls
  name: SLS Source
  endpoint: ${endpoint}
  project: test_project
  logstore: test_log
  accessId: ${accessId}
  accessKey: ${accessKey}
   
# Add primary key information to the table 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# Write all data from test_log to the test_database.inventory table
route:
  - source-table: test_project.test_log
    sink-table: test_database.inventory

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Enable deletion vectors to improve read performance
  table.properties.deletion-vectors.enabled: true

DataStream API

Important

To read or write data using the DataStream API, use the corresponding DataStream connector to connect to Realtime Compute for Apache Flink. For more information, see Usage of DataStream connectors.

If you use VVR versions earlier than 8.0.10, your job may fail to start because dependency JAR packages are missing. To resolve this, add the corresponding uber JAR package as an additional dependency.

Read data from SLS

Realtime Compute for Apache Flink provides the SlsSourceFunction implementation of SourceFunction to read data from SLS. Sample code:

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Creates and adds SLS source and sink.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // The batch get size must be given.
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // time to start consuming, set to current time.
        int startInSec = (int) (new Date().getTime() / 1000);

        // time to stop consuming, -1 means never stop.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

Write data to SLS

Realtime Compute for Apache Flink provides the SLSOutputFormat implementation of OutputFormat to write data to SLS. Sample code:

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

The SLS DataStream connector is available in the Maven Central Repository.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-format-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>

FAQ

What do I do if an OOM error occurs in TaskManagers and the error message "java.lang.OutOfMemoryError: Java heap space" appears for the source table when I restore a failed Flink program?