全部產品
Search
文件中心

Simple Log Service:Realtime Compute(Flink)消費

更新時間:Jul 23, 2024

阿里雲Realtime Compute(Flink)通過建立Log Service源表的方式,可以直接消費Log Service的資料。本文介紹了如何為Realtime Compute建立Log Service源表以及建立過程涉及到的屬性欄位提取。

背景資訊

Flink消費日誌支援的資訊如下:

類別

詳情

支援類型

源表和結果表。

運行模式

僅支援流模式。

特有監控指標

暫不適用。

資料格式

暫無。

API種類

SQL。

是否支援更新或刪除結果表資料

不支援更新和刪除結果表資料,只支援插入資料。

使用Flink消費日誌的入口,請參見Flink SQL作業快速入門

前提條件

使用限制

  • 僅Realtime Compute引擎VVR 2.0.0及以上版本支援Log ServiceSLS連接器。

  • SLS連接器僅保證At-Least-Once語義。

  • 僅Realtime Compute引擎VVR 4.0.13及以上版本支援Shard數目變化觸發自動Failover功能。

  • 強烈建議不要設定Source並發度大於Shard個數,不僅會造成資源浪費,且在8.0.5及更低版本中,如果後續Shard數目發生變化,自動Failover功能可能會失效,造成部分Shard不被消費。

建立Log Service源表和結果表

重要

使用Flink消費Log Service資料,必須有一個完整的SQL作業。完整的SQL作業包含:源表、結果表,在經過商務邏輯處理後將源表資料插入到結果表(INSERT INTO語句)。

FlinkSQL作業開發請參見SQL作業開發

Log Service是即時資料儲存,Realtime Compute能將其作為流式資料輸入。假設有如下日誌內容:

__source__:  11.85.*.199
__tag__:__receive_time__:  1562125591
__topic__:  test-topic
request_method:  GET
status:  200

程式碼範例

Flink消費Log Service資料的SQL開發作業代碼如下:

重要

SQL中的表名、列名和保留字衝突時,需要使用反引號'`'括起來。

CREATE TEMPORARY TABLE sls_input(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
  request_method,
  status,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值sls。

    endPoint

    EndPoint地址。

    String

    請填寫SLS的私網服務地址,詳情請參見私網服務入口

    說明

    Realtime ComputeFlink版預設不具備訪問公網的能力,但阿里雲提供的NAT Gateway可以實現VPC網路與公網網路互連,詳情請參見Realtime ComputeFlink版如何訪問公網?

    project

    SLS專案名稱。

    String

    無。

    logStore

    SLS LogStore或metricstore名稱。

    String

    logStore和metricstore是相同的消費方式。

    accessId

    阿里雲帳號的AccessKey ID。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數和密鑰管理

    accessKey

    阿里雲帳號的AccessKey Secret。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數和密鑰管理

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    startupMode

    源表啟動模式。

    String

    timestamp

    參數取值如下:

    • timestamp(預設):從指定的起始時間開始消費日誌。

    • latest:從最新位點開始消費日誌。

    • earliest:從最早位點開始消費日誌。

    說明

    若將配置項consumeFromCheckpoint設為true,則會從指定的消費組中儲存的Checkpoint開始消費日誌,此處的啟動模式將不會生效。

    startTime

    消費日誌的開始時間。

    String

    目前時間

    格式為yyyy-MM-dd hh:mm:ss。

    僅當startupMode設為timestamp時生效。

    說明

    startTime和stopTime基於SLS中的__receive_time__屬性,而非__timestamp__屬性。

    stopTime

    消費日誌的結束時間。

    String

    格式為yyyy-MM-dd hh:mm:ss。

    說明

    如期望日誌消費到結尾時退出Flink程式,需要同時設定exitAfterFinish=true.

    consumerGroup

    消費組名稱。

    String

    消費組用於記錄消費進度。您可以自訂消費組名,無固定格式。

    說明

    不支援通過相同的消費組進行多作業的協同消費。不同的Flink作業應該設定不同的消費組。如果不同的Flink作業使用相同的消費組,它們將會消費全部資料。這是因為在Flink消費SLS的資料時,並不會經過SLS消費組進行分區分配,因此導致各個消費者獨立消費各自的訊息,即使消費組是相同的。

    consumeFromCheckpoint

    是否從指定的消費組中儲存的Checkpoint開始消費日誌。

    String

    false

    參數取值如下:

    • true:必須同時指定消費組,Flink程式會從消費組中儲存的Checkpoint開始消費日誌,如果該消費組沒有對應的Checkpoint,則從startTime配置值開始消費。

    • false(預設值):不從指定的消費組中儲存的Checkpoint開始消費日誌。

    說明

    僅Realtime Compute引擎VVR 6.0.5及以上版本支援該參數。

    directMode

    是否使用SLS的直連模式。

    String

    false

    參數取值如下:

    • true:使用該功能。

    • false(預設):禁用該功能。

    maxRetries

    讀取SLS失敗後重試次數。

    String

    3

    無。

    batchGetSize

    單次請求讀取logGroup的個數。

    String

    100

    batchGetSize設定不能超過1000,否則會報錯。

    exitAfterFinish

    在資料消費完成後,Flink程式是否退出。

    String

    false

    參數取值如下:

    • true:資料消費完後,Flink程式退出。

    • false(預設):資料消費完後,Flink程式不退出。

    說明

    僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。

    query

    SLS消費預先處理語句。

    String

    通過使用query參數,您可以在消費SLS資料之前對其進行過濾,以避免將所有資料都消費到Flink中,從而實現節約成本和提高處理速度的目的。

    例如 'query' = '*| where request_method = ''GET'''表示在Flink讀取SLS資料前,先匹配出request_method欄位值等於get的資料。

    說明

    query需使用Log ServiceSPL語言,請參見SPL概述

    重要
    • 僅Realtime Compute引擎VVR 8.0.1及以上版本支援該參數。

    • Log ServiceSLS支援該功能的地區請參見基於規則消費日誌

    • 公測階段免費,後續可能會在產生Log ServiceSLS費用,詳情請參見費用說明

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    topicField

    指定欄位名,該欄位的值會覆蓋__topic__屬性欄位的值,表示日誌的主題。

    String

    該參數值是表中已存在的欄位之一。

    timeField

    指定欄位名,該欄位的值會覆蓋__timestamp__屬性欄位的值,表示日誌寫入時間。

    String

    目前時間

    該參數值是表中已存在的欄位之一,且欄位類型必須為INT。如果未指定,則預設填充目前時間。

    sourceField

    指定欄位名,該欄位的值會覆蓋__source__屬性欄位的值,表示日誌的來源地,例如產生該日誌機器的IP地址。

    String

    該參數值是表中已存在的欄位之一。

    partitionField

    指定欄位名,資料寫入時會根據該列值計算Hash值,Hash值相同的資料會寫入同一個shard。

    String

    如果未指定,則每條資料會隨機寫入當前可用的Shard中。

    buckets

    當指定partitionField時,根據Hash值重新分組的個數。

    String

    64

    該參數的取值範圍是[1, 256],且必須是2的整數次冪。同時,buckets個數應當大於等於Shard個數,否則會出現部分Shard沒有資料寫入的情況。

    說明

    僅Realtime Compute引擎VVR 6.0.5及以上版本支援該參數。

    flushIntervalMs

    觸發資料寫入的時間周期。

    String

    2000

    單位為毫秒。

    writeNullProperties

    是否將null值作為空白字串寫入SLS。

    Boolean

    true

    參數取值如下:

    • true(預設值):將null值作為空白字串寫入日誌。

    • false:計算結果為null的欄位不會寫入到日誌中。

    說明

    僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

屬性欄位提取

除日誌欄位外,支援提取如下四個屬性欄位,也支援提取其它自訂欄位。

欄位名

欄位類型

欄位說明

__source__

STRING METADATA VIRTUAL

訊息源。

__topic__

STRING METADATA VIRTUAL

訊息主題。

__timestamp__

BIGINT METADATA VIRTUAL

日誌時間。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

訊息TAG。

對於屬性"__tag__:__receive_time__":"1616742274"'__receive_time__'和'1616742274'會被作為KV對,記錄在Map中,在SQL中通過__tag__['__receive_time__']的方式訪問。

屬性欄位的提取需要添加HEADER聲明,樣本如下:

create table sls_stream(
  __timestamp__ bigint HEADER,
  __receive_time__ bigint HEADER
  b int,
  c varchar
) with (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

相關文檔

使用Flink DataStream API消費資料,請參見DataStream API