全部產品
Search
文件中心

Realtime Compute for Apache Flink:Log ServiceSLS

更新時間:Oct 25, 2024

本文為您介紹如何使用Log ServiceSLS連接器。

背景資訊

Log Service是針對日誌類資料的一站式服務。Log Service可以協助您快捷地完成資料擷取、消費、投遞以及查詢分析,提升營運和營運效率,建立海量Tlog能力。

SLS連接器支援的資訊如下。

類別

詳情

支援類型

源表和結果表

運行模式

僅支援流模式

特有監控指標

暫不適用

資料格式

暫無

API種類

SQL

是否支援更新或刪除結果表資料

不支援更新和刪除結果表資料,只支援插入資料。

特色功能

SLS連接器源表支援直接讀取訊息的屬性欄位,支援的屬性欄位如下。

欄位名

欄位類型

欄位說明

__source__

STRING METADATA VIRTUAL

訊息源。

__topic__

STRING METADATA VIRTUAL

訊息主題。

__timestamp__

BIGINT METADATA VIRTUAL

日誌時間。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

訊息TAG。

對於屬性"__tag__:__receive_time__":"1616742274"'__receive_time__'和'1616742274'會被作為KV對,記錄在Map中,在SQL中通過__tag__['__receive_time__']的方式訪問。

前提條件

已建立Log ServiceProject和Logstore,詳情請參見建立Project和Logstore

使用限制

  • 僅Realtime Compute引擎VVR 2.0.0及以上版本支援Log ServiceSLS連接器。

  • SLS連接器僅保證At-Least-Once語義。

  • 僅Realtime Compute引擎VVR 4.0.13及以上版本支援Shard數目變化觸發自動Failover功能。

  • 強烈建議不要設定Source並發度大於Shard個數,不僅會造成資源浪費,且在8.0.5及更低版本中,如果後續Shard數目發生變化,自動Failover功能可能會失效,造成部分Shard不被消費。

文法結構

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值sls。

    endPoint

    EndPoint地址。

    String

    請填寫SLS的私網服務地址,詳情請參見服務存取點

    說明
    • Realtime ComputeFlink版預設不具備訪問公網的能力,但阿里雲提供的NAT Gateway可以實現VPC網路與公網網路互連,詳情請參見控制台操作

    • 不建議跨公網訪問SLS。如確有需要,請使用HTTPS網路傳輸協議並且開啟SLSGlobal Acceleration服務,詳情請參見管理傳輸加速

    project

    SLS專案名稱。

    String

    無。

    logStore

    SLS LogStore或metricstore名稱。

    String

    logStore和metricstore是相同的消費方式。

    accessId

    阿里雲帳號的AccessKey ID。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理

    accessKey

    阿里雲帳號的AccessKey Secret。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    enableNewSource

    是否啟用實現了FLIP-27介面的新資料來源。

    Boolean

    false

    新資料來源可以自動適應shard變化,同時儘可能保證shard在所有的source並發上分布均勻。

    說明

    僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

    重要

    作業在該配置項發生變化後無法從狀態恢複。

    可通過先設定配置項consumerGroup啟動作業,將消費進度記錄到SLS消費組中,再將配置項consumeFromCheckpoint設為true後無狀態啟動作業,從而實現從歷史進度繼續消費。

    shardDiscoveryIntervalMs

    動態檢測shard變化時間間隔,單位為毫秒。

    Long

    60000

    設定為負值時可以關閉動態檢測。

    說明
    • 僅當配置項enableNewSource為true時生效。

    • 僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

    startupMode

    源表啟動模式。

    String

    timestamp

    參數取值如下:

    • timestamp(預設):從指定的起始時間開始消費日誌。

    • latest:從最新位點開始消費日誌。

    • earliest:從最早位點開始消費日誌。

    說明

    若將配置項consumeFromCheckpoint設為true,則會從指定的消費組中儲存的Checkpoint開始消費日誌,此處的啟動模式將不會生效。

    startTime

    消費日誌的開始時間。

    String

    目前時間

    格式為yyyy-MM-dd hh:mm:ss。

    僅當startupMode設為timestamp時生效。

    說明

    startTime和stopTime基於SLS中的__receive_time__屬性,而非__timestamp__屬性。

    stopTime

    消費日誌的結束時間。

    String

    格式為yyyy-MM-dd hh:mm:ss。

    說明

    如期望日誌消費到結尾時退出Flink程式,需要同時設定exitAfterFinish=true.

    consumerGroup

    消費組名稱。

    String

    消費組用於記錄消費進度。您可以自訂消費組名,無固定格式。

    說明

    不支援通過相同的消費組進行多作業的協同消費。不同的Flink作業應該設定不同的消費組。如果不同的Flink作業使用相同的消費組,它們將會消費全部資料。這是因為在Flink消費SLS的資料時,並不會經過SLS消費組進行分區分配,因此導致各個消費者獨立消費各自的訊息,即使消費組是相同的。

    consumeFromCheckpoint

    是否從指定的消費組中儲存的Checkpoint開始消費日誌。

    String

    false

    參數取值如下:

    • true:必須同時指定消費組,Flink程式會從消費組中儲存的Checkpoint開始消費日誌,如果該消費組沒有對應的Checkpoint,則從startTime配置值開始消費。

    • false(預設值):不從指定的消費組中儲存的Checkpoint開始消費日誌。

    說明

    僅Realtime Compute引擎VVR 6.0.5及以上版本支援該參數。

    maxRetries

    讀取SLS失敗後重試次數。

    String

    3

    無。

    batchGetSize

    單次請求讀取logGroup的個數。

    String

    100

    batchGetSize設定不能超過1000,否則會報錯。

    exitAfterFinish

    在資料消費完成後,Flink程式是否退出。

    String

    false

    參數取值如下:

    • true:資料消費完後,Flink程式退出。

    • false(預設):資料消費完後,Flink程式不退出。

    說明

    僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。

    query

    SLS消費預先處理語句。

    String

    通過使用query參數,您可以在消費SLS資料之前對其進行過濾,以避免將所有資料都消費到Flink中,從而實現節約成本和提高處理速度的目的。

    例如 'query' = '*| where request_method = ''GET'''表示在Flink讀取SLS資料前,先匹配出request_method欄位值等於get的資料。

    說明

    query需使用Log ServiceSPL語言,請參見SPL概述

    重要
    • 僅Realtime Compute引擎VVR 8.0.1及以上版本支援該參數。

    • Log ServiceSLS支援該功能的地區請參見基於規則消費日誌

    • 公測階段免費,後續可能會在產生Log ServiceSLS費用,詳情請參見費用說明

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    topicField

    指定欄位名,該欄位的值會覆蓋__topic__屬性欄位的值,表示日誌的主題。

    String

    該參數值是表中已存在的欄位之一。

    timeField

    指定欄位名,該欄位的值會覆蓋__timestamp__屬性欄位的值,表示日誌寫入時間。

    String

    目前時間

    該參數值是表中已存在的欄位之一,且欄位類型必須為INT。如果未指定,則預設填充目前時間。

    sourceField

    指定欄位名,該欄位的值會覆蓋__source__屬性欄位的值,表示日誌的來源地,例如產生該日誌機器的IP地址。

    String

    該參數值是表中已存在的欄位之一。

    partitionField

    指定欄位名,資料寫入時會根據該列值計算Hash值,Hash值相同的資料會寫入同一個shard。

    String

    如果未指定,則每條資料會隨機寫入當前可用的Shard中。

    buckets

    當指定partitionField時,根據Hash值重新分組的個數。

    String

    64

    該參數的取值範圍是[1, 256],且必須是2的整數次冪。同時,buckets個數應當大於等於Shard個數,否則會出現部分Shard沒有資料寫入的情況。

    說明

    僅Realtime Compute引擎VVR 6.0.5及以上版本支援該參數。

    flushIntervalMs

    觸發資料寫入的時間周期。

    String

    2000

    單位為毫秒。

    writeNullProperties

    是否將null值作為空白字串寫入SLS。

    Boolean

    true

    參數取值如下:

    • true(預設值):將null值作為空白字串寫入日誌。

    • false:計算結果為null的欄位不會寫入到日誌中。

    說明

    僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

類型映射

Flink欄位類型

SLS欄位類型

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

程式碼範例

CREATE TEMPORARY TABLE sls_input(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` STRING METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
 `time`,
  url,
  dt,
  float_field,
  double_field,
  boolean_field,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

DataStream API

重要

通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法

讀取SLS

Realtime Compute引擎VVR提供SourceFunction的實作類別SlsSourceFunction,用於讀取SLS,讀取SLS的樣本如下。

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Creates and adds SLS source and sink.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // The batch get size must be given.
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // time to start consuming, set to current time.
        int startInSec = (int) (new Date().getTime() / 1000);

        // time to stop consuming, -1 means never stop.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

寫入SLS

提供OutputFormat的實作類別SLSOutputFormat,用於寫入SLS。寫入SLS的樣本如下。

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

Maven中央庫中已經放置了SLS DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

常見問題

恢複失敗的Flink程式時,TaskManager發生OOM,源表報錯java.lang.OutOfMemoryError: Java heap space