全部產品
Search
文件中心

Simple Log Service:Realtime Compute(Flink)消費

更新時間:Oct 25, 2024

阿里雲Realtime Compute(Flink)通過建立Log Service源表的方式,可以直接消費Log Service的資料。本文介紹了如何為Realtime Compute建立Log Service源表以及建立過程涉及到的屬性欄位提取。

背景資訊

Flink消費日誌支援的資訊如下:

類別

詳情

支援類型

源表和結果表。

運行模式

僅支援流模式。

特有監控指標

暫不適用。

資料格式

暫無。

API種類

SQL。

是否支援更新或刪除結果表資料

不支援更新和刪除結果表資料,只支援插入資料。

使用Flink消費日誌的入口,請參見Flink SQL作業快速入門

前提條件

使用限制

  • 僅Realtime Compute引擎VVR 2.0.0及以上版本支援Log ServiceSLS連接器。

  • SLS連接器僅保證At-Least-Once語義。

  • 僅Realtime Compute引擎VVR 4.0.13及以上版本支援Shard數目變化觸發自動Failover功能。

  • 強烈建議不要設定Source並發度大於Shard個數,不僅會造成資源浪費,且在8.0.5及更低版本中,如果後續Shard數目發生變化,自動Failover功能可能會失效,造成部分Shard不被消費。

建立Log Service源表和結果表

重要

使用Flink消費Log Service資料,必須有一個完整的SQL作業。完整的SQL作業包含:源表、結果表,在經過商務邏輯處理後將源表資料插入到結果表(INSERT INTO語句)。

FlinkSQL作業開發請參見SQL作業開發

Log Service是即時資料儲存,Realtime Compute能將其作為流式資料輸入。假設有如下日誌內容:

__source__:  11.85.*.199
__tag__:__receive_time__:  1562125591
__topic__:  test-topic
request_method:  GET
status:  200

程式碼範例

Flink消費Log Service資料的SQL開發作業代碼如下:

重要

SQL中的表名、列名和保留字衝突時,需要使用反引號'`'括起來。

CREATE TEMPORARY TABLE sls_input(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
  request_method,
  status,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值sls。

    endPoint

    EndPoint地址。

    String

    請填寫SLS的私網服務地址,詳情請參見服務存取點

    說明
    • Realtime ComputeFlink版預設不具備訪問公網的能力,但阿里雲提供的NAT Gateway可以實現VPC網路與公網網路互連,詳情請參見空間管理與操作

    • 不建議跨公網訪問SLS。如確有需要,請使用HTTPS網路傳輸協議並且開啟SLSGlobal Acceleration服務,詳情請參見管理傳輸加速

    project

    SLS專案名稱。

    String

    無。

    logStore

    SLS LogStore或metricstore名稱。

    String

    logStore和metricstore是相同的消費方式。

    accessId

    阿里雲帳號的AccessKey ID。

    String

    詳情請參見空間管理與操作

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理

    accessKey

    阿里雲帳號的AccessKey Secret。

    String

    詳情請參見空間管理與操作

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    enableNewSource

    是否啟用實現了FLIP-27介面的新資料來源。

    Boolean

    false

    新資料來源可以自動適應shard變化,同時儘可能保證shard在所有的source並發上分布均勻。

    說明

    僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

    重要

    作業在該配置項發生變化後無法從狀態恢複。

    可通過先設定配置項consumerGroup啟動作業,將消費進度記錄到SLS消費組中,再將配置項consumeFromCheckpoint設為true後無狀態啟動作業,從而實現從歷史進度繼續消費。

    shardDiscoveryIntervalMs

    動態檢測shard變化時間間隔,單位為毫秒。

    Long

    60000

    設定為負值時可以關閉動態檢測。

    說明
    • 僅當配置項enableNewSource為true時生效。

    • 僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

    startupMode

    源表啟動模式。

    String

    timestamp

    參數取值如下:

    • timestamp(預設):從指定的起始時間開始消費日誌。

    • latest:從最新位點開始消費日誌。

    • earliest:從最早位點開始消費日誌。

    說明

    若將配置項consumeFromCheckpoint設為true,則會從指定的消費組中儲存的Checkpoint開始消費日誌,此處的啟動模式將不會生效。

    startTime

    消費日誌的開始時間。

    String

    目前時間

    格式為yyyy-MM-dd hh:mm:ss。

    僅當startupMode設為timestamp時生效。

    說明

    startTime和stopTime基於SLS中的__receive_time__屬性,而非__timestamp__屬性。

    stopTime

    消費日誌的結束時間。

    String

    格式為yyyy-MM-dd hh:mm:ss。

    說明

    如期望日誌消費到結尾時退出Flink程式,需要同時設定exitAfterFinish=true.

    consumerGroup

    消費組名稱。

    String

    消費組用於記錄消費進度。您可以自訂消費組名,無固定格式。

    說明

    不支援通過相同的消費組進行多作業的協同消費。不同的Flink作業應該設定不同的消費組。如果不同的Flink作業使用相同的消費組,它們將會消費全部資料。這是因為在Flink消費SLS的資料時,並不會經過SLS消費組進行分區分配,因此導致各個消費者獨立消費各自的訊息,即使消費組是相同的。

    consumeFromCheckpoint

    是否從指定的消費組中儲存的Checkpoint開始消費日誌。

    String

    false

    參數取值如下:

    • true:必須同時指定消費組,Flink程式會從消費組中儲存的Checkpoint開始消費日誌,如果該消費組沒有對應的Checkpoint,則從startTime配置值開始消費。

    • false(預設值):不從指定的消費組中儲存的Checkpoint開始消費日誌。

    說明

    僅Realtime Compute引擎VVR 6.0.5及以上版本支援該參數。

    maxRetries

    讀取SLS失敗後重試次數。

    String

    3

    無。

    batchGetSize

    單次請求讀取logGroup的個數。

    String

    100

    batchGetSize設定不能超過1000,否則會報錯。

    exitAfterFinish

    在資料消費完成後,Flink程式是否退出。

    String

    false

    參數取值如下:

    • true:資料消費完後,Flink程式退出。

    • false(預設):資料消費完後,Flink程式不退出。

    說明

    僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。

    query

    SLS消費預先處理語句。

    String

    通過使用query參數,您可以在消費SLS資料之前對其進行過濾,以避免將所有資料都消費到Flink中,從而實現節約成本和提高處理速度的目的。

    例如 'query' = '*| where request_method = ''GET'''表示在Flink讀取SLS資料前,先匹配出request_method欄位值等於get的資料。

    說明

    query需使用Log ServiceSPL語言,請參見SPL概述

    重要
    • 僅Realtime Compute引擎VVR 8.0.1及以上版本支援該參數。

    • Log ServiceSLS支援該功能的地區請參見基於規則消費日誌

    • 公測階段免費,後續可能會在產生Log ServiceSLS費用,詳情請參見費用說明

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    topicField

    指定欄位名,該欄位的值會覆蓋__topic__屬性欄位的值,表示日誌的主題。

    String

    該參數值是表中已存在的欄位之一。

    timeField

    指定欄位名,該欄位的值會覆蓋__timestamp__屬性欄位的值,表示日誌寫入時間。

    String

    目前時間

    該參數值是表中已存在的欄位之一,且欄位類型必須為INT。如果未指定,則預設填充目前時間。

    sourceField

    指定欄位名,該欄位的值會覆蓋__source__屬性欄位的值,表示日誌的來源地,例如產生該日誌機器的IP地址。

    String

    該參數值是表中已存在的欄位之一。

    partitionField

    指定欄位名,資料寫入時會根據該列值計算Hash值,Hash值相同的資料會寫入同一個shard。

    String

    如果未指定,則每條資料會隨機寫入當前可用的Shard中。

    buckets

    當指定partitionField時,根據Hash值重新分組的個數。

    String

    64

    該參數的取值範圍是[1, 256],且必須是2的整數次冪。同時,buckets個數應當大於等於Shard個數,否則會出現部分Shard沒有資料寫入的情況。

    說明

    僅Realtime Compute引擎VVR 6.0.5及以上版本支援該參數。

    flushIntervalMs

    觸發資料寫入的時間周期。

    String

    2000

    單位為毫秒。

    writeNullProperties

    是否將null值作為空白字串寫入SLS。

    Boolean

    true

    參數取值如下:

    • true(預設值):將null值作為空白字串寫入日誌。

    • false:計算結果為null的欄位不會寫入到日誌中。

    說明

    僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

屬性欄位提取

除日誌欄位外,支援提取如下四個屬性欄位,也支援提取其它自訂欄位。

欄位名

欄位類型

欄位說明

__source__

STRING METADATA VIRTUAL

訊息源。

__topic__

STRING METADATA VIRTUAL

訊息主題。

__timestamp__

BIGINT METADATA VIRTUAL

日誌時間。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

訊息TAG。

對於屬性"__tag__:__receive_time__":"1616742274"'__receive_time__'和'1616742274'會被作為KV對,記錄在Map中,在SQL中通過__tag__['__receive_time__']的方式訪問。

屬性欄位的提取需要添加HEADER聲明,樣本如下:

create table sls_stream(
  __timestamp__ bigint HEADER,
  __receive_time__ bigint HEADER
  b int,
  c varchar
) with (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

相關文檔

使用Flink DataStream API消費資料,請參見DataStream API