存在多種方式可以將Log ServiceSLS的資料寫入至Hologres,本文以Flink、DataWorksData Integration為例,為您介紹如何將SLS資料即時寫入至Hologres。
前提條件
開通SLS服務並建立Project和Logstore,詳情請參見快速入門。
開通Hologres服務並串連至開發工具,詳情請參見即時數倉Hologres使用流程。
如果選擇通過Flink將SLS資料寫入Hologres,請開通Realtime ComputeFlink版服務並建立專案空間,詳情請參見開通Flink全託管和建立與管理專案空間。
如果選擇通過DataWorksData Integration將SLS資料寫入Hologre,請開通DataWorks服務並建立工作空間,詳情請參見開通DataWorks服務和建立工作空間。
背景資訊
Log ServiceSLS是雲原生觀測與分析平台,為Log、Metric、Trace等資料提供大規模、低成本、即時的平台化服務。Log Service一站式提供資料擷取、加工、查詢與分析、可視化、警示、消費與投遞等功能,全面提升您在研發、營運、營運、安全等情境的數字化能力。
Hologres致力於高效能、高可靠、低成本、可擴充的Realtime Compute引擎研發,為使用者提供海量資料的即時資料倉庫解決方案和亞秒級互動式查詢服務,廣泛應用在即時資料中台建設、精細化分析、自助式分析、營銷畫像、人群圈選、即時風控等情境。您可以將SLS的資料快速寫入Hologres中,進行即時分析即時查詢,提升業務對資料的探索能力。
通過Flink將SLS資料寫入Hologres
準備SLS資料。
本次準備的SLS資料來源於SLS日誌平台的類比資料,類比遊戲登入和消費日誌的資料。如果您有業務資料,請直接使用業務資料。
在接入資料地區,單擊類比接入。
在類比接入頁簽,單擊遊戲動作記錄下的類比。
在選擇日誌空間頁面,選擇專案Project和日誌庫Logstore後單擊下一步。
在類比接入頁面,設定範圍頻率後單擊開始匯入。
查詢類比產生的欄位和資料如下,詳情請參見查詢和分析日誌。
其中
content
列為JSON類型。
建立Hologres表。
在Hologres中建立用於接收資料的表。您可以根據業務查詢需求,為對應的欄位建立索引,以提升查詢效率。索引介紹請參見建表概述。本次樣本建表DDL如下。
CREATE TABLE sls_flink_holo ( content JSONB , operation TEXT, uid TEXT, topic TEXT , source TEXT , c__timestamp TIMESTAMPTZ, receive_time BIGINT, PRIMARY KEY (uid) );
通過Flink寫入資料。
在Flink中將SLS資料寫入Hologres可以參考如下文檔:
Flink讀取SLS資料:Log ServiceSLS源表。
Flink寫入Hologres:即時數倉Hologres結果表。
本次SLS資料通過Flink寫入Hologres的SQL作業樣本如下,其中JSON類型欄位直接寫入Hologres JSON類型中,因為Flink中沒有JSON類型,使用VARCHAR類型代替。
CREATE TEMPORARY TABLE sls_input ( content STRING, operation STRING, uid STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` BIGINT METADATA VIRTUAL, `__tag__` MAP<VARCHAR, VARCHAR> METADATA VIRTUAL ) WITH ( 'connector' = 'sls', 'endpoint' = 'sls私域endpoint',--sls私域endpoint 'accessid' = 'access id',--帳號access id 'accesskey' = 'access key',--帳號access key 'starttime' = '2024-08-30 00:00:00',--消費日誌的開始時間 'project' = 'project name',--sls的project名 'logstore' = 'LogStore名稱'--LogStore名稱。 ); CREATE TEMPORARY TABLE hologres_sink ( content VARCHAR, operation VARCHAR, uid VARCHAR, topic STRING , source STRING , c__timestamp TIMESTAMP , receive_time BIGINT ) WITH ( 'connector' = 'hologres', 'dbname' = 'holo db nema', --Hologres的資料庫名稱。 'tablename' = 'holo tablene', --Hologres用於接收資料的表名稱。 'username' = 'access id', --當前阿里雲帳號的AccessKey ID。 'password' = 'access key', --當前阿里雲帳號的AccessKey Secret。 'endpoint' = 'holo vpc endpoint' --當前Hologres執行個體VPC網路的Endpoint。 ); INSERT INTO hologres_sink SELECT content, operation, uid, `__topic__` , `__source__` , CAST ( FROM_UNIXTIME (`__timestamp__`) AS TIMESTAMP ), CAST (__tag__['__receive_time__'] AS BIGINT) AS receive_time FROM sls_input;
查詢資料。
在Hologres中查詢通過Flink寫入Hologres中的SLS資料,後續您可以根據業務需求進行資料開發。
通過DataWorksData Integration將SLS資料寫入Hologres
準備SLS資料。
本次準備的SLS資料來源於SLS日誌平台的類比資料,類比遊戲登入和消費日誌的資料。如果您有業務資料,請直接使用業務資料。
在接入資料地區,單擊類比接入。
在類比接入頁簽,單擊遊戲動作記錄下的類比。
在選擇日誌空間頁面,選擇專案Project和日誌庫Logstore後單擊下一步。
在類比接入頁面,設定範圍頻率後單擊開始匯入。
查詢類比產生的欄位和資料如下,詳情請參見查詢和分析日誌。
其中
content
列為JSON類型。
建立Hologres表。
在Hologres中建立用於接收資料的表。並根據業務查詢需求,為對應的欄位建立索引,以提升查詢效率。索引介紹請參見建表概述。本次樣本建表DDL如下。
說明本樣本中設定
uid
為主鍵,保證資料的唯一性,實際設定可以根據業務需求選擇。設定
uid
為Distribution Key,資料寫入時相同的uid
可以寫入至同一個Shard,提高查詢效能。
BEGIN; CREATE TABLE sls_dw_holo ( content JSONB , operation TEXT, uid TEXT, C_Topic TEXT , C_Source TEXT , timestamp BIGINT, PRIMARY KEY (uid) ); CALL set_table_property('sls_dw_holo', 'distribution_key', 'uid'); CALL set_table_property('sls_dw_holo', 'event_time_column', 'timestamp'); COMMIT;
配置資料來源。
在進行資料同步之前,使用Data Integration功能需要為DataWorks工作空間添加資料來源。
SLS資料來源使用Loghub資料來源,詳情請參見配置LogHub(SLS)資料來源。
配置Hologres資料來源,詳情請參見配置Hologres資料來源。
即時同步資料。
在Data Integration中建立和運行即時同步任務,詳情請參見配置單表增量資料即時同步和即時同步任務營運。
本樣本建立的即時同步任務,配置輸入為Loghub資料來源,輸出為Hologres資料來源,並配置如下欄位同步映射關係。
查詢資料。
即時同步任務運行開始後,可以在Hologres中查詢通過DataWorksData Integration寫入Hologres中的SLS資料。