All Products
Search
Document Center

Realtime Compute for Apache Flink:Simple Log Service (SLS)

Last Updated:Feb 05, 2026

This topic describes how to use the Simple Log Service (SLS) connector.

Background information

Simple Log Service is an end-to-end data logging service. It helps you efficiently collect, consume, deliver, query, and analyze log data. This improves O&M and operational efficiency and helps you build the capacity to process massive amounts of log data.

The SLS connector supports the following types of information.

Category

Details

Supported type

Source and Sink Tables

Running mode

Streaming mode only

Specific monitoring metrics

Not applicable

Data format

None

API type

SQL, Datastream, and data ingestion YAML

Can you update or delete data in a sink table?

You cannot update or delete data in a sink table. You can only insert data.

Features

The SLS connector for source tables supports reading attribute fields of messages directly. The following table describes the supported attribute fields.

Field name

Field type

Field description

__source__

STRING METADATA VIRTUAL

The message source.

__topic__

STRING METADATA VIRTUAL

The message topic.

__timestamp__

BIGINT METADATA VIRTUAL

The log time.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

The message tag.

For the attribute "__tag__:__receive_time__":"1616742274", '__receive_time__' and '1616742274' are recorded as a key-value pair in the map. You can access the value using __tag__['__receive_time__'] in SQL.

Prerequisites

You have created a Simple Log Service project and Logstore. For more information, see Create a project and a Logstore.

Limits

  • Only Realtime Compute for Apache Flink VVR 11.1 or later supports using Simple Log Service (SLS) as a synchronization data source for data ingestion in YAML.

  • The SLS connector guarantees only at-least-once semantics.

  • We strongly recommend that you do not set the source concurrency to a value greater than the number of shards. This practice wastes resources and may cause the automatic failover feature to fail in VVR 8.0.5 or earlier if the number of shards changes. This can result in some shards not being consumed.

SQL

Syntax

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH parameters

  • General

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The table type.

    String

    Yes

    None

    Set this parameter to sls.

    endPoint

    The endpoint.

    String

    Yes

    None

    Enter the private network endpoint of SLS. For more information, see Endpoints.

    Note
    • By default, Realtime Compute for Apache Flink cannot access the public network. However, Alibaba Cloud provides NAT Gateway to enable communication between a VPC and the public network. For more information, see How do I access the public network?.

    • We do not recommend that you access SLS over the public network. If you must access SLS over the public network, use the HTTPS protocol and enable Global Accelerator (GA) for SLS. For more information, see Manage transfer acceleration.

    project

    The name of the SLS project.

    String

    Yes

    None

    None.

    logStore

    The name of the SLS Logstore or metricstore.

    String

    Yes

    None

    Data in a Logstore is consumed in the same way as in a metricstore.

    accessId

    The AccessKey ID of your Alibaba Cloud account.

    String

    Yes

    None

    For more information, see How do I view the AccessKey ID and AccessKey secret?.

    Important

    To prevent your AccessKey information from being leaked, we recommend that you use variables to specify the AccessKey pair. For more information, see Manage variables.

    accessKey

    The AccessKey secret of your Alibaba Cloud account.

    String

    Yes

    None

  • Source table-specific

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    enableNewSource

    Specifies whether to enable the new data source that implements the FLIP-27 interface.

    Boolean

    No

    false

    The new data source can automatically adapt to shard changes and ensure that shards are distributed as evenly as possible across all source concurrencies.

    Important
    • This parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.9 or later. This parameter is set to true by default from VVR 11.1.

    • A job cannot be restored from a state after this configuration item is changed. To resolve this issue, you can first set the consumerGroup configuration item to start the job and record the consumption progress in the SLS consumer group. Then, set the consumeFromCheckpoint configuration item to true and start the job in a stateless manner. This way, the job can resume consumption from the historical progress.

    • If read-only shards exist in SLS, some concurrent Flink tasks continue to request data from other unfinished shards after they finish consuming data from the read-only shards. This can lead to some concurrent tasks being assigned to multiple shards, causing an imbalance in shard distribution among them. This imbalance affects overall consumption efficiency and system performance. To mitigate this issue, adjust the concurrency, optimize the task scheduling policy, or merge small shards to reduce the number of shards and the complexity of task assignment.

    shardDiscoveryIntervalMs

    The interval at which shard changes are dynamically detected. Unit: millisecond.

    Long

    No

    60000

    Set this parameter to a negative value to disable dynamic detection.

    Note
    • The value of this parameter cannot be less than 1 minute (60,000 milliseconds).

    • This parameter takes effect only when the enableNewSource parameter is set to true.

    • This parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.9 or later.

    startupMode

    The startup mode of the source table.

    String

    No

    timestamp

    • timestamp (default): Logs are consumed from the specified start time.

    • latest: Logs are consumed from the latest offset.

    • earliest: Logs are consumed from the earliest offset.

    • consumer_group: Logs are consumed from the offset recorded in the consumer group. If the consumer group does not record the consumer offset of a shard, logs are consumed from the earliest offset.

    Important
    • In VVR versions earlier than 11.1, the consumer_group value is not supported. You must set consumeFromCheckpoint to true. In this case, logs are consumed from the offset recorded in the specified consumer group, and the startup mode specified here does not take effect.

    startTime

    The time when log consumption starts.

    String

    No

    Current time

    The format is yyyy-MM-dd hh:mm:ss.

    This parameter takes effect only when startupMode is set to timestamp.

    Note

    The startTime and stopTime parameters are based on the __receive_time__ attribute in SLS, not the __timestamp__ attribute.

    stopTime

    The time when log consumption ends.

    String

    No

    None

    The format is yyyy-MM-dd hh:mm:ss.

    Note
    • This parameter is used only to consume historical logs and should be set to a past point in time. If you set it to a future time, consumption may stop prematurely if no new logs are written. This appears as a data stream interruption with no error message.

    • If you want the Flink program to exit after log consumption is complete, you must also set exitAfterFinish=true.

    consumerGroup

    The name of the consumer group.

    String

    No

    None

    A consumer group is used to record consumption progress. You can specify a custom name for the consumer group. The format is not fixed.

    Note

    Multiple jobs cannot use the same consumer group for collaborative consumption. Different Flink jobs should have different consumer groups. If different Flink jobs use the same consumer group, they will consume all the data. This is because when Flink consumes data from SLS, it does not perform partition allocation through the SLS consumer group. As a result, each consumer consumes its own messages independently, even if the consumer group is the same.

    consumeFromCheckpoint

    Specifies whether to consume logs from the checkpoint that is saved in the specified consumer group.

    String

    No

    false

    • true: You must also specify a consumer group. The Flink program consumes logs from the checkpoint that is saved in the consumer group. If the consumer group does not have a corresponding checkpoint, logs are consumed from the time specified by the startTime parameter.

    • false (default): Logs are not consumed from the checkpoint that is saved in the specified consumer group.

    Important

    This parameter is no longer supported from VVR 11.1. For VVR 11.1 and later, you must set startupMode to consumer_group.

    maxRetries

    The number of retries after a failed attempt to read data from SLS.

    String

    No

    3

    None.

    batchGetSize

    The number of log groups to read in a single request.

    String

    No

    100

    The value of batchGetSize cannot exceed 1,000. Otherwise, an error is reported.

    exitAfterFinish

    Specifies whether the Flink program exits after data consumption is complete.

    String

    No

    false

    • true: The Flink program exits after data consumption is complete.

    • false (default): The Flink program does not exit after data consumption is complete.

    query

    Important

    This parameter is deprecated in VVR 11.3 and later, but remains compatible with subsequent versions.

    The pre-processing statement for SLS data consumption.

    String

    No

    None

    Using the query parameter lets you filter data before it is consumed from SLS. This prevents all data from being consumed into Flink, which saves costs and improves processing speed.

    For example, 'query' = '*| where request_method = ''GET''' indicates that before Flink reads data from SLS, it matches data where the value of the request_method field is 'get'.

    Note

    The query parameter requires the use of Structured Process Language (SPL) for Simple Log Service. For more information, see SPL syntax.

    Important
    • This parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.1 or later.

    • This feature incurs fees from Simple Log Service. For more information, see Billing.

    processor

    The SLS Consum Processor. If both this parameter and `query` are configured, `query` takes precedence and `processor` does not take effect.

    String

    No

    None

    Using the processor parameter lets you filter data before it is consumed by Flink. This helps save costs and improve processing speed. We recommend that you use the processor parameter instead of the query parameter.

    For example, 'processor' = 'test-filter-processor' indicates that data is filtered by the SLS Consum Processor before it is read by Flink.

    Note

    The processor must use the Structured Process Language (SPL) of Simple Log Service (SLS). For more information, see SPL syntax. To create and update SLS Consum Processors, see Manage Consum Processors.

    Important

    This parameter is supported only in Realtime Compute for Apache Flink VVR 11.3 or later.

    This feature incurs fees from Simple Log Service. For more information, see Billing.

  • For sink tables only

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    topicField

    Specifies a field name. The value of this field overwrites the value of the __topic__ attribute field and indicates the topic of the log.

    String

    No

    None

    The value of this parameter must be one of the existing fields in the table.

    timeField

    Specifies a field name. The value of this field overwrites the value of the __timestamp__ attribute field and indicates the time when the log is written.

    String

    No

    Current time

    The value of this parameter must be one of the existing fields in the table, and the field type must be INT. If this parameter is not specified, the current time is used by default.

    sourceField

    Specifies a field name. The value of this field overwrites the value of the __source__ attribute field and indicates the source of the log, such as the IP address of the machine that generates the log.

    String

    No

    None

    The value of this parameter must be one of the existing fields in the table.

    partitionField

    Specifies a field name. When data is written, a hash value is calculated based on the value of this column. Data with the same hash value is written to the same shard.

    String

    No

    None

    If this parameter is not specified, each piece of data is randomly written to a currently available shard.

    buckets

    The number of groups that are re-grouped based on the hash value when partitionField is specified.

    String

    No

    64

    The value of this parameter must be in the range of [1, 256] and must be an integer power of 2. The number of buckets must be greater than or equal to the number of shards. Otherwise, no data is written to some shards.

    flushIntervalMs

    The period that triggers data writes.

    String

    No

    2000

    Unit: millisecond.

    writeNullProperties

    Specifies whether to write null values as empty strings to SLS.

    Boolean

    No

    true

    • true (default): Null values are written as empty strings to logs.

    • false: Fields with null values are not written to logs.

    Note

    This parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.6 or later.

Type mapping

Flink field type

SLS field type

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Data ingestion (public preview)

Limits

This feature is supported only in Realtime Compute for Apache Flink VVR 11.1 or later.

Syntax

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

Configuration items

Parameter

Description

Data type

Required

Default value

Remarks

type

The data source type.

String

Yes

None

Set this parameter to sls.

endpoint

The endpoint.

String

Yes

None

Enter the private network endpoint of SLS. For more information, see Endpoints.

Note
  • By default, Realtime Compute for Apache Flink cannot access the public network. However, Alibaba Cloud provides NAT Gateway to enable communication between a VPC and the public network. For more information, see How do I access the public network?.

  • We do not recommend that you access SLS over the public network. If you must access SLS over the public network, use the HTTPS protocol and enable Global Accelerator (GA) for SLS. For more information, see Manage transfer acceleration.

accessId

The AccessKey ID of your Alibaba Cloud account.

String

Yes

None

For more information, see How do I view the AccessKey ID and AccessKey secret?.

Important

To prevent your AccessKey information from being leaked, we recommend that you use variables to specify the AccessKey pair. For more information, see Manage variables.

accessKey

The AccessKey secret of your Alibaba Cloud account.

String

Yes

None

project

The name of the SLS project.

String

Yes

None

None.

logStore

The name of the SLS Logstore or metricstore.

String

Yes

None

Data in a Logstore is consumed in the same way as in a metricstore.

schema.inference.strategy

The schema inference strategy.

String

No

continuous

  • continuous: Schema inference is performed for each piece of data. If the before and after schemas are incompatible, a wider schema is inferred and a schema change event is generated.

  • static: Schema inference is performed only once when the job starts. Subsequent data is parsed based on the initial schema, and no schema change events are generated.

maxPreFetchLogGroups

The maximum number of log groups to try to read and parse for each shard during initial schema inference.

Integer

No

50

Before the job actually reads and processes data, it attempts to pre-consume a specified number of log groups for each shard to initialize the schema information.

shardDiscoveryIntervalMs

The interval at which shard changes are dynamically detected. Unit: millisecond.

Long

No

60000

Set this parameter to a negative value to disable dynamic detection.

Note

The value of this parameter cannot be less than 1 minute (60,000 milliseconds).

startupMode

The startup mode.

String

No

None

  • timestamp (default): Logs are consumed from the specified start time.

  • latest: Logs are consumed from the latest offset.

  • earliest: Logs are consumed from the earliest offset.

  • consumer_group: Logs are consumed from the offset recorded in the consumer group. If the consumer group does not record the consumer offset of a shard, logs are consumed from the earliest offset.

startTime

The time when log consumption starts.

String

No

Current time

The format is yyyy-MM-dd hh:mm:ss.

This parameter takes effect only when startupMode is set to timestamp.

Note

The startTime and stopTime parameters are based on the __receive_time__ attribute in SLS, not the __timestamp__ attribute.

stopTime

The time when log consumption ends.

String

No

None

The format is yyyy-MM-dd hh:mm:ss.

Note

If you want the Flink program to exit after log consumption is complete, you must also set exitAfterFinish=true.

consumerGroup

The name of the consumer group.

String

No

None

A consumer group is used to record consumption progress. You can specify a custom name for the consumer group. The format is not fixed.

batchGetSize

The number of log groups to read in a single request.

Integer

No

100

The value of batchGetSize cannot exceed 1,000. Otherwise, an error is reported.

maxRetries

The number of retries after a failed attempt to read data from SLS.

Integer

No

3

None.

exitAfterFinish

Specifies whether the Flink program exits after data consumption is complete.

Boolean

No

false

  • true: The Flink program exits after data consumption is complete.

  • false (default): The Flink program does not exit after data consumption is complete.

query

The pre-processing statement for SLS data consumption.

String

No

None

Using the query parameter lets you filter data before it is consumed from SLS. This prevents all data from being consumed into Flink, which saves costs and improves processing speed.

For example, 'query' = '*| where request_method = ''GET''' indicates that before Flink reads data from SLS, it matches data where the value of the request_method field is 'get'.

Note

The query parameter requires the use of Structured Process Language (SPL) for Simple Log Service. For more information, see SPL syntax.

Important
  • For information about the regions that support this feature, see Consume logs based on rules.

  • This feature is in public preview and is free of charge. You may be charged for this feature later. For more information, see Billing.

compressType

The compression type for SLS.

String

No

None

Supported compression types include the following:

  • lz4

  • deflate

  • zstd

timeZone

The time zone for startTime and stopTime.

String

No

None

By default, no offset is added.

regionId

The region where the SLS service is activated.

String

No

None

See Supported regions for configuration.

signVersion

The version of the SLS request signature.

String

No

None

For more information about configuration, see Request Signature.

shardModDivisor

The divisor for reading partitions of an SLS Logstore.

Int

No

-1

For more information, see the shard documentation.

shardModRemainder

The remainder for reading partitions of an SLS Logstore.

Int

No

-1

See Shard to configure this option.

metadata.list

The metadata columns to pass to the downstream.

String

No

None

Available metadata fields include __source__, __topic__, __timestamp__, and __tag__. Separate them with commas.

Type mapping

The following table shows the type mapping for data ingestion.

SLS field type

CDC field type

STRING

STRING

Table schema inference and change synchronization

  • Shard data pre-consumption and table schema initialization

    The SLS connector maintains the schema of the Logstore that is being read. Before it reads data from the Logstore, the SLS connector attempts to pre-consume a maximum of maxPreFetchLogGroups log groups from each shard. The connector then parses the schema of each log in these log groups and merges the schemas to initialize the table schema. Before the actual data consumption starts, a corresponding table creation event is generated based on the initialized schema.

    Note

    For each shard, the SLS connector attempts to consume data and parse the log schema starting from one hour before the current time.

  • Primary key information

    SLS logs do not contain primary key information. You can manually add primary keys to a table using transform rules:

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • Schema inference and schema evolution

    After the table schema is initialized, if schema.inference.strategy is set to static, the SLS connector parses each log based on the initial table schema and does not generate schema change events. If schema.inference.strategy is set to continuous, the SLS connector parses the data of each log, infers the physical columns, and compares them with the currently recorded schema. If the inferred schema is inconsistent with the current schema, the schemas are merged based on the following rules:

    • If the inferred physical columns contain fields that are not in the current schema, these fields are added to the schema, and an event for adding nullable columns is generated.

    • If the inferred physical columns do not contain fields that are already in the current schema, these fields are retained, their data is filled with NULL, and no event for deleting columns is generated.

    The SLS connector infers the type of all fields in each log as String. Currently, only adding columns is supported. New columns are added to the end of the current schema and are set as nullable columns.

Code examples

  • SQL source table and sink table

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • Data ingestion data source

    You can use SLS as a data source for data ingestion jobs to write SLS data in real time to supported downstream systems. For example, you can configure a data ingestion job as follows to write data from a Logstore to a DLF data lake in the paimon format. The job automatically infers the data types of fields and the schema of the downstream table. The job also supports dynamic schema evolution at runtime.

source:
  type: sls
  name: SLS Source
  endpoint: ${endpoint}
  project: test_project
  logstore: test_log
  accessId: ${accessId}
  accessKey: ${accessKey}
   
# Add primary key information to the table. 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# Write all data from test_log to the test_database.inventory table.
route:
  - source-table: test_project.test_log
    sink-table: test_database.inventory

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

DataStream API

Important

When you read or write data using the DataStream API, you must use the corresponding DataStream connector to connect to Flink. For more information, see Use DataStream connectors.

If you use a VVR version earlier than 8.0.10, dependency JAR packages may be missing when you start a job. To resolve this issue, you can add the corresponding uber package to the additional dependencies.

Read data from SLS

Realtime Compute for Apache Flink provides the SlsSourceFunction class, which is an implementation of SourceFunction, to read data from SLS. The following code provides an example.

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Creates and adds SLS source and sink.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // The batch get size must be given.
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // time to start consuming, set to current time.
        int startInSec = (int) (new Date().getTime() / 1000);

        // time to stop consuming, -1 means never stop.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

Write data to SLS

The SLSOutputFormat class, which is an implementation of OutputFormat, is provided to write data to SLS. The following code provides an example.

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

The SLS DataStream connector is available in the Maven central repository.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

FAQ

What do I do if a TaskManager runs out of memory and reports a "java.lang.OutOfMemoryError: Java heap space" error for a source table when I restore a failed Flink program?