全部產品
Search
文件中心

Realtime Compute for Apache Flink:Log ServiceSLS

更新時間:Feb 06, 2026

本文為您介紹如何使用Log ServiceSLS連接器。

背景資訊

Log Service是針對日誌類資料的一站式服務。Log Service可以協助您快捷地完成資料擷取、消費、投遞以及查詢分析,提升營運和營運效率,建立海量Tlog能力。

SLS連接器支援的資訊如下。

類別

詳情

支援類型

源表和結果表

運行模式

僅支援流模式

特有監控指標

暫不適用

資料格式

暫無

API種類

SQL,Datastream和資料攝入YAML

是否支援更新或刪除結果表資料

不支援更新和刪除結果表資料,只支援插入資料。

特色功能

SLS連接器源表支援直接讀取訊息的屬性欄位,支援的屬性欄位如下。

欄位名

欄位類型

欄位說明

__source__

STRING METADATA VIRTUAL

訊息源。

__topic__

STRING METADATA VIRTUAL

訊息主題。

__timestamp__

BIGINT METADATA VIRTUAL

日誌時間。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

訊息TAG。

對於屬性"__tag__:__receive_time__":"1616742274"'__receive_time__'和'1616742274'會被作為KV對,記錄在Map中,在SQL中通過__tag__['__receive_time__']的方式訪問。

前提條件

已建立Log ServiceProject和Logstore,詳情請參見建立Project和Logstore

使用限制

  • 僅Realtime Compute引擎VVR 11.1及以上版本支援Log ServiceSLS作為資料攝入YAML的同步資料來源。

  • SLS連接器僅保證At-Least-Once語義。

  • 強烈建議不要設定Source並發度大於Shard個數,不僅會造成資源浪費,且在8.0.5及更低版本中,如果後續Shard數目發生變化,自動Failover功能可能會失效,造成部分Shard不被消費。

SQL

文法結構

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值sls。

    endPoint

    EndPoint地址。

    String

    請填寫SLS的私網服務地址,詳情請參見服務存取點

    說明
    • Realtime ComputeFlink版預設不具備訪問公網的能力,但阿里雲提供的NAT Gateway可以實現VPC網路與公網網路互連,詳情請參見如何訪問公網?

    • 不建議跨公網訪問SLS。如確有需要,請使用HTTPS網路傳輸協議並且開啟SLSGlobal Acceleration服務,詳情請參見管理傳輸加速

    project

    SLS專案名稱。

    String

    無。

    logStore

    SLS LogStore或metricstore名稱。

    String

    logStore和metricstore是相同的消費方式。

    accessId

    阿里雲帳號的AccessKey ID。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數

    accessKey

    阿里雲帳號的AccessKey Secret。

    String

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    enableNewSource

    是否啟用實現了FLIP-27介面的新資料來源。

    Boolean

    false

    新資料來源可以自動適應Shard變化,同時儘可能保證Shard在所有的source並發上分布均勻。

    重要
    • 僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。從Realtime Compute引擎VVR 11.1版本開始該參數預設為true。

    • 作業在該配置項發生變化後無法從狀態恢複。可通過先設定配置項consumerGroup啟動作業,將消費進度記錄到SLS消費組中,再將配置項consumeFromCheckpoint設為true後無狀態啟動作業,從而實現從歷史進度繼續消費。

    • 如果SLS中存在唯讀Shard,Flink的某些並發任務在完成對唯讀Shard的消費後會繼續請求讀取其他未完成的Shard。這可能導致部分並發任務被分配到多個Shard,從而造成不同並發任務之間的Shard分配不均衡。這種不均衡會影響整體的消費效率和系統效能。為緩解這一問題,您可以通過調整並發度、最佳化任務調度策略、合并小Shard等方法,以減少Shard數量和任務分配複雜度。

    shardDiscoveryIntervalMs

    動態檢測shard變化時間間隔,單位為毫秒。

    Long

    60000

    設定為負值時可以關閉動態檢測。

    說明
    • 該參數值不能少於1分鐘(60000毫秒)。

    • 僅當配置項enableNewSource為true時生效。

    • 僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

    startupMode

    源表啟動模式。

    String

    timestamp

    • timestamp(預設):從指定的起始時間開始消費日誌。

    • latest:從最新位點開始消費日誌。

    • earliest:從最早位點開始消費日誌。

    • consumer_group:從消費組記錄位點開始消費日誌。若消費組未記錄某shard消費位點,則會從最早位點開始消費日誌。

    重要
    • Realtime Compute引擎VVR 11.1以下版本,不支援取值為consumer_group,需要將consumeFromCheckpoint設為true,此時會從指定消費組記錄的位點開始消費日誌,此處的啟動模式將不會生效。

    startTime

    消費日誌的開始時間。

    String

    目前時間

    格式為yyyy-MM-dd hh:mm:ss

    僅當startupMode設為timestamp時生效。

    說明

    startTime和stopTime基於SLS中的__receive_time__屬性,而非__timestamp__屬性。

    stopTime

    消費日誌的結束時間。

    String

    格式為yyyy-MM-dd hh:mm:ss

    說明
    • 僅用於消費歷史日誌,應設定為過去時間點。若配置為未來時間,可能因暫無新日誌寫入而導致消費提前終止,表現為資料流中斷且無異常提示。

    • 如期望日誌消費到結尾時退出Flink程式,需要同時設定exitAfterFinish=true.

    consumerGroup

    消費組名稱。

    String

    消費組用於記錄消費進度。您可以自訂消費組名,無固定格式。

    說明

    不支援通過相同的消費組進行多作業的協同消費。不同的Flink作業應該設定不同的消費組。如果不同的Flink作業使用相同的消費組,它們將會消費全部資料。這是因為在Flink消費SLS的資料時,並不會經過SLS消費組進行分區分配,因此導致各個消費者獨立消費各自的訊息,即使消費組是相同的。

    consumeFromCheckpoint

    是否從指定的消費組中儲存的Checkpoint開始消費日誌。

    String

    false

    • true:必須同時指定消費組,Flink程式會從消費組中儲存的Checkpoint開始消費日誌,如果該消費組沒有對應的Checkpoint,則從startTime配置值開始消費。

    • false(預設值):不從指定的消費組中儲存的Checkpoint開始消費日誌。

    重要

    Realtime Compute引擎VVR 11.1版本開始不再支援配置該參數。對於VVR 11.1及更高版本,需要將startupMode配置為consumer_group

    maxRetries

    讀取SLS失敗後重試次數。

    String

    3

    無。

    batchGetSize

    單次請求讀取logGroup的個數。

    String

    100

    batchGetSize設定不能超過1000,否則會報錯。

    exitAfterFinish

    在資料消費完成後,Flink程式是否退出。

    String

    false

    • true:資料消費完後,Flink程式退出。

    • false(預設):資料消費完後,Flink程式不退出。

    query

    重要

    自VVR 11.3起廢棄,後續版本仍相容。

    SLS消費預先處理語句。

    String

    通過使用query參數,您可以在消費SLS資料之前對其進行過濾,以避免將所有資料都消費到Flink中,從而實現節約成本和提高處理速度的目的。

    例如 'query' = '*| where request_method = ''GET'''表示在Flink讀取SLS資料前,先匹配出request_method欄位值等於get的資料。

    說明

    query需使用Log ServiceSPL語言,請參見SPL文法

    重要
    • 僅Realtime Compute引擎VVR 8.0.1及以上版本支援該參數。

    • 該功能會產生Log ServiceSLS費用,詳情請參見費用說明

    processor

    SLS 消費處理器,與query欄位同時存在時,query生效,processor不生效。

    String

    通過使用processor參數,您可以在消費SLS資料之前對其進行過濾,以避免將所有資料都消費到Flink中,從而實現節約成本和提高處理速度的目的。推薦使用processor參數而不是query參數。

    例如 'processor' = 'test-filter-processor'表示在Flink讀取SLS資料前,先經過SLS消費處理器的過濾。

    說明

    processor需使用Log ServiceSPL語言,請參見SPL文法。SLS 消費處理器的建立、更新,請參見管理消費處理器

    重要

    僅Realtime Compute引擎VVR 11.3及以上版本支援該參數。

    該功能會產生Log ServiceSLS費用,詳情請參見費用說明

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    topicField

    指定欄位名,該欄位的值會覆蓋__topic__屬性欄位的值,表示日誌的主題。

    String

    該參數值是表中已存在的欄位之一。

    timeField

    指定欄位名,該欄位的值會覆蓋__timestamp__屬性欄位的值,表示日誌寫入時間。

    String

    目前時間

    該參數值是表中已存在的欄位之一,且欄位類型必須為INT。如果未指定,則預設填充目前時間。

    sourceField

    指定欄位名,該欄位的值會覆蓋__source__屬性欄位的值,表示日誌的來源地,例如產生該日誌機器的IP地址。

    String

    該參數值是表中已存在的欄位之一。

    partitionField

    指定欄位名,資料寫入時會根據該列值計算Hash值,Hash值相同的資料會寫入同一個shard。

    String

    如果未指定,則每條資料會隨機寫入當前可用的Shard中。

    buckets

    當指定partitionField時,根據Hash值重新分組的個數。

    String

    64

    該參數的取值範圍是[1, 256],且必須是2的整數次冪。同時,buckets個數應當大於等於Shard個數,否則會出現部分Shard沒有資料寫入的情況。

    flushIntervalMs

    觸發資料寫入的時間周期。

    String

    2000

    單位為毫秒。

    writeNullProperties

    是否將null值作為空白字串寫入SLS。

    Boolean

    true

    • true(預設值):將null值作為空白字串寫入日誌。

    • false:計算結果為null的欄位不會寫入到日誌中。

    說明

    僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

類型映射

Flink欄位類型

SLS欄位類型

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

資料攝入(公測中)

使用限制

僅Realtime Compute引擎VVR 11.1及以上版本支援。

文法結構

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

配置項

參數

說明

資料類型

是否必填

預設值

備忘

type

資料來源類型。

String

固定值sls。

endpoint

EndPoint地址。

String

請填寫SLS的私網服務地址,詳情請參見服務存取點

說明
  • Realtime ComputeFlink版預設不具備訪問公網的能力,但阿里雲提供的NAT Gateway可以實現VPC網路與公網網路互連,詳情請參見如何訪問公網?

  • 不建議跨公網訪問SLS。如確有需要,請使用HTTPS網路傳輸協議並且開啟SLSGlobal Acceleration服務,詳情請參見管理傳輸加速

accessId

阿里雲帳號的AccessKey ID。

String

詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

重要

為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數

accessKey

阿里雲帳號的AccessKey Secret。

String

project

SLS專案名稱。

String

無。

logStore

SLS LogStore或metricstore名稱。

String

logStore和metricstore是相同的消費方式。

schema.inference.strategy

Schema推導策略。

String

continuous

  • continuous:對每條資料均進行 Schema 推導。在前後 Schema 不相容時,推匯出更寬的 Schema 併產生 Schema 變更事件。

  • static:僅在作業啟動時進行一次Schema推導,後續根據初始Schema解析資料,不會產生Schema變更事件。

maxPreFetchLogGroups

Schema初始推導時,對每個shard最多嘗試讀取解析的logGroup個數。

Integer

50

在作業實際讀取並處理資料前,對每個shard嘗試提前消費指定數量的logGroup,用於初始化schema資訊。

shardDiscoveryIntervalMs

動態檢測shard變化時間間隔,單位為毫秒。

Long

60000

設為負值時可以關閉動態檢測。

說明

該參數值不能少於1分鐘(60000毫秒)。

startupMode

啟動模式。

String

  • timestamp(預設):從指定的起始時間開始消費日誌。

  • latest:從最新位點開始消費日誌。

  • earliest:從最早位點開始消費日誌。

  • consumer_group:從消費組記錄位點開始消費日誌。若消費組未記錄某shard消費位點,則會從最早位點開始消費日誌。

startTime

消費日誌的開始時間。

String

目前時間

格式為yyyy-MM-dd hh:mm:ss。

僅當startupMode設為timestamp時生效。

說明

startTime和stopTime基於SLS中的__receive_time__屬性,而非__timestamp__屬性。

stopTime

消費日誌的結束時間。

String

格式為yyyy-MM-dd hh:mm:ss。

說明

如期望日誌消費到結尾時退出Flink程式,需要同時設定exitAfterFinish=true。

consumerGroup

消費組名稱。

String

消費組用於記錄消費進度。您可以自訂消費組名,無固定格式。

batchGetSize

單次請求讀取logGroup的個數。

Integer

100

batchGetSize設定不能超過1000,否則會報錯。

maxRetries

讀取SLS失敗後重試次數。

Integer

3

無。

exitAfterFinish

在資料消費完成後,Flink程式是否退出。

Boolean

false

  • true:資料消費完後,Flink程式退出。

  • false(預設):資料消費完後,Flink程式不退出。

query

SLS消費預先處理語句。

String

通過使用query參數,您可以在消費SLS資料之前對其進行過濾,以避免將所有資料都消費到Flink中,從而實現節約成本和提高處理速度的目的。

例如'query' = '*| where request_method = ''GET'''表示在Flink讀取SLS資料前,先匹配出request_method欄位值等於get的資料。

說明

query需使用Log ServiceSPL語言,請參見SPL文法

重要

compressType

SLS壓縮類型。

String

支援的壓縮類型包括:

  • lz4

  • deflate

  • zstd

timeZone

startTime 和 stopTime 對應的時區。

String

預設情況下不添加位移量。

regionId

SLS開服地區。

String

參閱開服地區文檔配置。

signVersion

SLS請求籤名版本。

String

參閱請求籤名文檔配置。

shardModDivisor

在讀取SLS LogStore分區時的除數。

Int

-1

參閱分區(Shard)文檔配置。

shardModRemainder

在讀取SLS LogStore分區時的餘數。

Int

-1

參閱分區(Shard)文檔配置。

metadata.list

需要傳遞給下遊的中繼資料列。

String

可用的中繼資料欄位包括__source____topic____timestamp____tag__,您可以用英文逗號分隔。

類型映射

資料攝入類型映射如下表所示:

SLS欄位類型

CDC欄位類型

STRING

STRING

表結構推導和變更同步

  • Shard資料預消費和表結構初始化

    SLS連接器會維護當前讀取logstore的Schema。在讀取logstore中的資料前,SLS連接器會預先在每個shard中嘗試消費最多maxPreFetchLogGroups個logGroup的資料,解析其中每條日誌的Schema,再將這些Schema合并,用於初始化表結構資訊。後續在實際消費資料前會根據初始化的Schema產生對應的建表事件。

    說明

    對於每個shard,SLS連接器會嘗試從目前時間一小時之前的時間開始消費資料並解析日誌Schema。

  • 主鍵資訊

    SLS日誌中不包含主鍵資訊,可以通過transform規則手動為表添加主鍵:

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • Schema推導和Schema變更

    在表結構初始化完成後,若schema.inference.strategy配置為static,SLS連接器會根據初始的表結構解析每條日誌資料,不會產生Schema變更事件。若schema.inference.strategy配置為continuous,SLS連接器會解析每條日誌的資料,推匯出物理列,並與目前記錄的Schema比對,若推匯出的Schema與當前Schema不一致時,會將Schema合并,合并規則如下:

    • 如果推匯出的物理列中包含當前Schema中沒有的欄位,則會將這些欄位加入到Schema中,同時產生新增可空列事件。

    • 如果推匯出的物理列中不包含當前Schema中已有的欄位,該欄位仍會保留,該列的資料會填充為NULL,不產生刪除列事件。

    SLS連接器會將每條日誌中的欄位類型都推導為String類型,目前僅支援新增列,即會在當前Schema末尾添加新列,新增的列會設定為可空列。

程式碼範例

  • SQL源表和結果表

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • 資料攝入資料來源

    SLS 可以作為資料攝入作業的資料來源使用,將SLS資料即時寫入支援的下遊系統。比如,您可以配置如下資料攝入作業,將logstore中的資料以paimon格式寫入到DLF資料湖,作業會自動推導各個欄位的資料類型和下遊表結構,並在運行過程中支援表結構動態演化(Schema Evolution)。

source:
  type: sls
  name: SLS Source
  endpoint: ${endpoint}
  project: test_project
  logstore: test_log
  accessId: ${accessId}
  accessKey: ${accessKey}
   
# 為表添加主鍵資訊 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
    
# 將test_log中所有的資料都寫入到 test_database.inventory 表中
route:
  - source-table: test_project.test_log
    sink-table: test_database.inventory

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可選)開啟刪除向量,提升讀取效能
  table.properties.deletion-vectors.enabled: true

DataStream API

重要

通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法

如果您使用的Realtime Compute引擎VVR版本低於8.0.10,啟動作業可能會存在缺少依賴JAR包的問題,可以在附加依賴檔案額外引入對應的-uber包解決。

讀取SLS

Realtime Compute引擎VVR提供SourceFunction的實作類別SlsSourceFunction,用於讀取SLS,讀取SLS的樣本如下。

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Creates and adds SLS source and sink.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // The batch get size must be given.
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // time to start consuming, set to current time.
        int startInSec = (int) (new Date().getTime() / 1000);

        // time to stop consuming, -1 means never stop.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

寫入SLS

提供OutputFormat的實作類別SLSOutputFormat,用於寫入SLS。寫入SLS的樣本如下。

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

Maven中央庫中已經放置了SLS DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

常見問題

恢複失敗的Flink程式時,TaskManager發生OOM,源表報錯java.lang.OutOfMemoryError: Java heap space