本文為您介紹如何基於GitHub即時事件數目據通過MaxCompute構建離線數倉、通過Flink和Hologres構建即時數倉,然後通過Hologres和MaxCompute分別進行即時與離線資料分析,從而實現即時離線一體化解決方案。
背景資訊
隨著社會數字化發展,企業對資料時效性的需求越來越強烈。除傳統的面向海量資料加工情境設計的離線情境外,大量業務需要解決面向即時加工、即時儲存、即時分析的即時情境問題,為了應對這樣的情形,提出了離線即時一體化的概念。
即時離線一體化是指將即時資料和離線資料在同一平台上管理和處理的技術。它能夠實現即時資料處理和離線資料分析的無縫銜接,從而提高資料分析效率和精度。其優勢在於:
提高資料處理效率:將即時資料和離線資料整合在同一平台上,大大提高了資料處理效率,降低資料轉送和轉換成本。
提高資料分析精度:將即時資料和離線資料進行混合分析,從而提高資料分析精度和準確性。
降低系統複雜度:減少資料管理和處理的複雜度,使資料管理和處理更加簡單和高效。
提高資料應用價值:更加充分地發揮資料的應用價值,為企業提供更好的決策支援。
阿里雲在此方向上進行了諸多方案設計,推出了化繁為簡的即時離線一體化數倉,通過MaxCompute和即時數倉Hologres分別對應上述的離線與即時情境,同時匹配Flink的即時加工能力,共同構成阿里雲一體化數倉的核心引擎組件。
方案架構
使用MaxCompute和Hologres對GitHub公開事件數目據集進行即時離線一體化實踐的完整鏈路圖如下所示。
其中ECS將GitHub即時與離線事件數目據收集匯總後作為資料來源,分別進入即時鏈路與離線鏈路,最後兩條鏈路資料匯總到Hologres,統一對外提供服務。
即時鏈路:通過Flink對Log Service中的資料即時加工並寫入Hologres。Flink是強大的流式計算引擎,Hologres支援資料即時寫入與更新、寫入即可查,二者原生整合,支援高吞吐、低延時、有模型、高品質的即時數倉開發,最終滿足業務洞察即時性需求,如最新事件提取、熱時間點事件分析等情境。
離線鏈路:通過MaxCompute對海量離線資料進行處理並歸檔。阿里雲OSS(Object Storage Service)是阿里雲提供的雲端儲存體服務,可以用於儲存各類資料,本次實踐引用的未經處理資料是JSON格式,OSS可以提供方便、安全、低成本、可靠的儲存能力。MaxCompute是適用於資料分析情境的企業級SaaS(Software as a Service)模式雲資料倉儲,可以直接通過外表的方式讀取並解析OSS中的半結構化資料,將高價值可用Data Integration至MaxCompute內部儲存,然後結合DataWorks進行資料開發,產生離線資料倉儲。
Hologres與MaxCompute底層無縫打通,因此可以通過Hologres對MaxCompute海量歷史資料進行加速查詢分析,滿足業務對歷史資料的低頻高效能查詢需求。還可以輕鬆實現通過離線鏈路對即時資料的修正,解決即時鏈路中可能出現的資料遺漏等問題。
該方案優勢如下:
離線鏈路穩定高效:支援資料小時級寫入更新,可以批量處理大規模資料,進行複雜的計算和分析,降低計算成本,提高資料處理效率。
即時鏈路成熟:支援即時寫入、即時事件計算、即時分析,即時鏈路簡化,資料秒級響應。
統一儲存與服務:均由Hologres對外提供服務,資料集中儲存,對外介面一致(OLAP、KeyValue統一為SQL介面)。
即時離線融合:資料冗餘少、移動少,資料可修正。
通過一站式開發,最終實現資料秒級響應,全鏈路狀態可見,架構組件少、依賴少,營運成本、人工成本均有效降低。
業務與資料認知
大量開發人員在GitHub上進行開源專案的開發工作,並在專案的開發過程中產生海量事件。GitHub會記錄每次事件的類型及詳情、開發人員、代碼倉庫等資訊,並開放其中的公開事件,包括加星標、提交代碼等,具體事件類型請參見Webhook events and payloads。
GitHub通過OpenAPI公布其公開事件,API僅開放5分鐘前的即時事件,詳情請參見Events。該API可用於擷取即時資料。
GH Archive專案則是將GitHub公開事件按小時進行匯總,並允許開發人員訪問,專案具體資訊請參見GH Archive。該專案可用於擷取離線資料。
GitHub業務認知
Github的業務核心為管理代碼與互動交流,主要涉及三個一級實體物件:開發人員(Developer)、代碼倉庫(Repository)和組織(Organization)。
在本次Github公開事件數目據分析中,事件
作為一個實體物件被儲存和記錄下來。
原始公開事件數目據認知
某原始事件JSON編碼資料樣本如下:
{
"id": "19541192931",
"type": "WatchEvent",
"actor":
{
"id": 23286640,
"login": "herekeo",
"display_login": "herekeo",
"gravatar_id": "",
"url": "https://api.github.com/users/herekeo",
"avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
},
"repo":
{
"id": 52760178,
"name": "crazyguitar/pysheeet",
"url": "https://api.github.com/repos/crazyguitar/pysheeet"
},
"payload":
{
"action": "started"
},
"public": true,
"created_at": "2022-01-01T00:03:04Z"
}
本分析實踐涉及15類公開事件(不包含未出現及不再記錄的事件),詳細的事件類型及描述請參見Github公開事件類型。
前提條件
已建立Elastic Compute Service執行個體並綁定Elastic IP Address,用於提取GitHub API中的即時事件數目據,詳情請參見建立方式導航和綁定和解除綁定Elastic IP Address。
已開通Object Storage Service並在ECS中安裝ossutil工具,用於儲存GH Archive提供的JSON資料檔案,詳情請參見開通OSS服務和安裝ossutil。
已開通MaxCompute並建立Project,詳情請參見建立MaxCompute專案。
已開通巨量資料開發治理平台DataWorks並建立工作空間,用於建立離線調度任務,詳情請參見建立工作空間。
已開通Log ServiceSLS並建立Project和Logstore,用於將ECS提取到的資料作為日誌進行收集,詳情請參見快速入門。
已開通Realtime ComputeFlink執行個體,用於將SLS收集的日誌資料即時寫入Hologres,詳情請參見開通Realtime ComputeFlink版。
已開通即時數倉Hologres,詳情請參見購買Hologres。
離線資料倉儲建設(小時級更新)
通過ECS下載未經處理資料檔案並上傳至OSS
ECS用例用於下載GH Archive提供的JSON資料檔案。
對於歷史資料可通過
wget
命令下載,例如wget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gz
下載2012年到2022年每個小時的資料。對於未來每小時產生的新資料,可以通過如下步驟設定小時級定時任務下載。
說明請確保已在ECS中安裝ossutil工具,詳情請參見安裝ossutil。建議您直接下載ossutil安裝包上傳ECS執行個體,通過
yum install unzip
安裝unzip解壓軟體,直接解壓ossutil,移動至/usr/bin/
目錄下即可。請確保已在ECS相同地區下建立OSS Bucket,您可以使用自訂OSS Bucket名稱。本樣本對應的OSS Bucket名稱為
githubevents
。本樣本ECS檔案下載目錄為
/opt/hourlydata/gh_data
,您可以自訂其他目錄。
使用如下命令在
/opt/hourlydata
目錄下,建立名稱為download_code.sh
的檔案。cd /opt/hourlydata vim download_code.sh
在檔案內輸入
i
後進入編輯模式,添加如下樣本指令碼命令。d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H') h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H') url=https://data.gharchive.org/${d}.json.gz echo ${url} #將資料下載至./gh_data/目錄下,您可以自訂其他目錄。 wget ${url} -P ./gh_data/ #切換至gh_data目錄下 cd gh_data #解壓下載資料為json檔案 gzip -d ${d}.json echo ${d}.json #切換根目錄。 cd /root #使用ossutil工具上傳資料至OSS #在名稱為githubevents的OSS Bucket下建立hr=${h}的目錄 ossutil mkdir oss://githubevents/hr=${h} #將/hourlydata/gh_data(檔案儲存體目錄可以自訂其他地方)目錄下的資料上傳至OSS。 ossutil cp -r /opt/hourlydata/gh_data oss://githubevents/hr=${h} -u echo oss uploaded successfully! rm -rf /opt/hourlydata/gh_data/${d}.json echo ecs deleted!
按鍵盤Esc鍵,輸入
:wq
並斷行符號以儲存並關閉檔案。使用如下命令設定每小時的第10分鐘執行
download_code.sh
指令檔。#1 執行以下指令,按鍵盤I鍵進入編輯狀態。 crontab -e #2 添加以下指令,完成後按鍵盤Esc鍵,輸入:wq退出。 10 * * * * cd /opt/hourlydata && sh download_code.sh > download.log
執行後每個小時的第10分鐘會下載前一個小時的JSON檔案,在ECS解壓後上傳至OSS中(路徑為
oss://githubevents
)。為了之後每次唯讀取前一個小時的檔案,在上傳檔案時對每個檔案建立一個名稱為‘hr=%Y-%M-%D-%H’
的目錄作為分區,之後每次寫入資料唯讀取最新分區下的檔案。
通過外部表格將OSS資料匯入MaxCompute
請在MaxCompute用戶端或DataWorks中的ODPS SQL節點執行如下命令,詳情請參見使用本地用戶端(odpscmd)串連或開發ODPS SQL任務。
建立用於轉換OSS中儲存的JSON檔案的外部表格
githubevents
:CREATE EXTERNAL TABLE IF NOT EXISTS githubevents ( col STRING ) PARTITIONED BY ( hr STRING ) STORED AS textfile LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/' ;
MaxCompute中建立OSS外部表格詳情請參見建立OSS外部表格。
建立用於儲存資料的事實表
dwd_github_events_odps
,其DDL如下:CREATE TABLE IF NOT EXISTS dwd_github_events_odps ( id BIGINT COMMENT '事件ID' ,actor_id BIGINT COMMENT '事件發起人ID' ,actor_login STRING COMMENT '事件發起人登入名稱' ,repo_id BIGINT COMMENT 'repoID' ,repo_name STRING COMMENT 'repo全名:owner/Repository_name' ,org_id BIGINT COMMENT 'repo所屬組織ID' ,org_login STRING COMMENT 'repo所屬組織名稱' ,`type` STRING COMMENT '事件類型' ,created_at DATETIME COMMENT '事件發生時間' ,action STRING COMMENT '事件行為' ,iss_or_pr_id BIGINT COMMENT 'issue/pull_request ID' ,number BIGINT COMMENT 'issue/pull_request 序號' ,comment_id BIGINT COMMENT 'comment(評論) ID' ,commit_id STRING COMMENT 'commit(提交記錄) ID' ,member_id BIGINT COMMENT '成員ID' ,rev_or_push_or_rel_id BIGINT COMMENT 'review/push/release ID' ,ref STRING COMMENT '建立/刪除的資源名稱' ,ref_type STRING COMMENT '建立/刪除的資源類型' ,state STRING COMMENT 'issue/pull_request/pull_request_review的狀態' ,author_association STRING COMMENT 'actor與repo之間的關係' ,language STRING COMMENT '請求合并代碼的語言' ,merged BOOLEAN COMMENT '是否接受合并' ,merged_at DATETIME COMMENT '代碼合并時間' ,additions BIGINT COMMENT '代碼增加行數' ,deletions BIGINT COMMENT '代碼減少行數' ,changed_files BIGINT COMMENT 'pull request 改變檔案數量' ,push_size BIGINT COMMENT '提交數量' ,push_distinct_size BIGINT COMMENT '不同的提交數量' ,hr STRING COMMENT '事件發生所在小時,如00點23分,hr=00' ,`month` STRING COMMENT '事件發生所在月,如2015年10月,month=2015-10' ,`year` STRING COMMENT '事件發生所在年,如2015年,year=2015' ) PARTITIONED BY ( ds STRING COMMENT '事件發生所在日,ds=yyyy-mm-dd' );
將JSON資料解析寫入事實表。
使用如下命令引入分區並進行JSON解析寫入
dwd_github_events_odps
表中:msck repair table githubevents add partitions; set odps.sql.hive.compatible = true; set odps.sql.split.hive.bridge = true; INSERT into TABLE dwd_github_events_odps PARTITION(ds) SELECT CAST(GET_JSON_OBJECT(col,'$.id') AS BIGINT ) AS id ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id ,GET_JSON_OBJECT(col,'$.org.login') AS org_login ,GET_JSON_OBJECT(col,'$.type') as type ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at ,GET_JSON_OBJECT(col,'$.payload.action') AS action ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT) END AS iss_or_pr_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT) ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT) END AS number ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT) END AS rev_or_push_or_rel_id ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state') END AS state ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association') END AS author_association ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT) AS deletions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT) AS push_size ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT) AS push_distinct_size ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'/','-') as month ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'/','-') as ds from githubevents where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string);
查詢資料。
使用如下命令查詢
dwd_github_events_odps
表資料:SET odps.sql.allow.fullscan=true; SELECT * FROM dwd_github_events_odps where ds = '2023-03-31' limit 10;
樣本返回結果如下:
即時資料倉庫建設
通過ECS擷取即時資料
ECS執行個體用於從GitHub API中提取即時事件數目據。本文僅以如下指令碼為例,展示一種通過GitHub API採集即時資料的方法。
該指令碼每次運行會執行1分鐘,採集這段時間內API提供的即時事件數目據,並以JSON格式儲存每個事件數目據。
該指令碼不保證採集到全部的即時事件數目據。
持續從GitHub API中採集資料需要提供Accept和Authorization。其中Accept為固定值,Authorization需要填寫從GitHub中申請的存取權杖。存取權杖的建立方法請參見此處。
使用如下命令在
/opt/realtime
目錄下建立名稱為download_realtime_data.py
的檔案。cd /opt/realtime vim download_realtime_data.py
在檔案內輸入
i
後進入編輯模式,添加如下樣本內容。#!python import requests import json import sys import time # 擷取API URL def get_next_link(resp): resp_link = resp.headers['link'] link = '' for l in resp_link.split(', '): link = l.split('; ')[0][1:-1] rel = l.split('; ')[1] if rel == 'rel="next"': return link return None # 採集API中一頁的資料 def download(link, fname): # 定義GitHub API的Accept和Authorization headers = {"Accept": "application/vnd.github+json","Authorization": "<Bearer> <github_api_token>"} resp = requests.get(link, headers=headers) if int(resp.status_code) != 200: return None with open(fname, 'a') as f: for j in resp.json(): f.write(json.dumps(j)) f.write('\n') print('downloaded {} events to {}'.format(len(resp.json()), fname)) return resp # 採集API中多頁的資料 def download_all_data(fname): link = 'https://api.github.com/events?per_page=100&page=1' while True: resp = download(link, fname) if resp is None: break link = get_next_link(resp) if link is None: break # 定義目前時間 def get_current_ms(): return round(time.time()*1000) # 定義指令碼每次執行時間長度1分鐘 def main(fname): current_ms = get_current_ms() while get_current_ms() - current_ms < 60*1000: download_all_data(fname) time.sleep(0.1) # 執行指令碼 if __name__ == '__main__': if len(sys.argv) < 2: print('usage: python {} <log_file>'.format(sys.argv[0])) exit(0) main(sys.argv[1])
按Esc鍵,輸入
:wq
並斷行符號以儲存並關閉檔案。建立
run_py.sh
檔案用於執行download_realtime_data.py
並將每次執行採集到的資料分別儲存,內容如下。python /opt/realtime/download_realtime_data.py /opt/realtime/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').json
建立
delete_log.sh
檔案用於刪除歷史資料,內容如下。d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d') rm -f /opt/realtime/gh_realtime_data/*${d}*.json
使用如下命令每分鐘採集一次GitHub資料,每天刪除一次歷史資料。
#1 執行以下指令,按鍵盤I鍵進入編輯狀態。 crontab -e #2 添加以下指令,完成後按鍵盤Esc鍵,輸入:wq退出。 * * * * * bash /opt/realtime/run_py.sh 1 1 * * * bash /opt/realtime/delete_log.sh
通過SLS採集ECS資料
SLS用於將ECS中提取到的即時事件數目據作為日誌進行收集。
SLS支援通過Logtail採集ECS上的日誌。由於本文涉及的資料為JSON格式,因此可以使用Logtail的JSON模式快速採集ECS中的增量JSON日誌,採集方法請參見使用JSON模式採集日誌。其中本文定義SLS對未經處理資料的頂層索引值對進行解析。
Logtail配置的日誌路徑參數本樣本設定為/opt/realtime/gh_realtime_data/**/*.json
。
配置完成後,SLS即可持續完成對ECS中增量事件數目據的採集。採集到的資料情況樣本如下圖。
通過Flink即時寫入SLS資料至Hologres
Flink用於將SLS採集的日誌資料即時寫入Hologres。通過在Flink中使用SLS源表、Hologres結果表,即可實現資料從SLS到Hologres的即時寫入,詳情請參見從SLSLog Service匯入。
建立Hologres內部表。
本文建立的內部表中只保留了原始JSON資料的部分索引值,並將事件
id
、日期ds
設為主鍵,將事件id
設為Distribution Key,將日期ds
設為分區鍵,將事件發生時間created_at
設為event_time_column。您可以根據實際查詢需求,為其他欄位建立索引,以提升查詢效率。索引介紹請參見CREATE TABLE。本次樣本建表DDL如下。DROP TABLE IF EXISTS gh_realtime_data; BEGIN; CREATE TABLE gh_realtime_data ( id bigint, actor_id bigint, actor_login text, repo_id bigint, repo_name text, org_id bigint, org_login text, type text, created_at timestamp with time zone NOT NULL, action text, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id text, member_id bigint, rev_or_push_or_rel_id bigint, ref text, ref_type text, state text, author_association text, language text, merged boolean, merged_at timestamp with time zone, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr text, month text, year text, ds text, PRIMARY KEY (id,ds) ) PARTITION BY LIST (ds); CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id'); CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at'); CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at'); COMMENT ON COLUMN public.gh_realtime_data.id IS '事件ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_id IS '事件發起人ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_login IS '事件發起人登入名稱'; COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'repoID'; COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'repo名稱'; COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'repo所屬組織ID'; COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'repo所屬組織名稱'; COMMENT ON COLUMN public.gh_realtime_data.type IS '事件類型'; COMMENT ON COLUMN public.gh_realtime_data.created_at IS '事件發生時間'; COMMENT ON COLUMN public.gh_realtime_data.action IS '事件行為'; COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'issue/pull_request ID'; COMMENT ON COLUMN public.gh_realtime_data.number IS 'issue/pull_request 序號'; COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'comment(評論)ID'; COMMENT ON COLUMN public.gh_realtime_data.commit_id IS '提交記錄ID'; COMMENT ON COLUMN public.gh_realtime_data.member_id IS '成員ID'; COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'review/push/release ID'; COMMENT ON COLUMN public.gh_realtime_data.ref IS '建立/刪除的資源名稱'; COMMENT ON COLUMN public.gh_realtime_data.ref_type IS '建立/刪除的資源類型'; COMMENT ON COLUMN public.gh_realtime_data.state IS 'issue/pull_request/pull_request_review的狀態'; COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'actor與repo之間的關係'; COMMENT ON COLUMN public.gh_realtime_data.language IS '程式設計語言'; COMMENT ON COLUMN public.gh_realtime_data.merged IS '是否接受合并'; COMMENT ON COLUMN public.gh_realtime_data.merged_at IS '代碼合并時間'; COMMENT ON COLUMN public.gh_realtime_data.additions IS '代碼增加行數'; COMMENT ON COLUMN public.gh_realtime_data.deletions IS '代碼減少行數'; COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'pull request 改變檔案數量'; COMMENT ON COLUMN public.gh_realtime_data.push_size IS '提交數量'; COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS '不同的提交數量'; COMMENT ON COLUMN public.gh_realtime_data.hr IS '事件發生所在小時,如00點23分,hr=00'; COMMENT ON COLUMN public.gh_realtime_data.month IS '事件發生所在月,如2015年10月,month=2015-10'; COMMENT ON COLUMN public.gh_realtime_data.year IS '事件發生所在年,如2015年,year=2015'; COMMENT ON COLUMN public.gh_realtime_data.ds IS '事件發生所在日,ds=yyyy-mm-dd'; COMMIT;
通過Flink即時寫入資料。
通過Flink對SLS的資料進一步解析並即時寫入到Hologres中。在Flink中使用如下語句對寫入的資料進行過濾,丟棄事件ID、事件發生時間(
created_at
)為空白的髒資料,並且只保留近期發生的事件數目據。CREATE TEMPORARY TABLE sls_input ( actor varchar, created_at varchar, id bigint, org varchar, payload varchar, public varchar, repo varchar, type varchar ) WITH ( 'connector' = 'sls', 'endpoint' = '<endpoint>',--sls私域endpoint 'accessid' = '<accesskey id>',--帳號access id 'accesskey' = '<accesskey secret>',--帳號access key 'project' = '<project name>',--sls的project名 'logstore' = '<logstore name>'--sls的LogStore名稱 'starttime' = '2023-04-06 00:00:00',--sls資料擷取開始時間 ); CREATE TEMPORARY TABLE hologres_sink ( id bigint, actor_id bigint, actor_login string, repo_id bigint, repo_name string, org_id bigint, org_login string, type string, created_at timestamp, action string, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id string, member_id bigint, rev_or_push_or_rel_id bigint, `ref` string, ref_type string, state string, author_association string, `language` string, merged boolean, merged_at timestamp, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr string, `month` string, `year` string, ds string ) WITH ( 'connector' = 'hologres', 'dbname' = '<hologres dbname>', --Hologres的資料庫名稱 'tablename' = '<hologres tablename>', --Hologres用於接收資料的表名稱 'username' = '<accesskey id>', --當前阿里雲帳號的AccessKey ID 'password' = '<accesskey secret>', --當前阿里雲帳號的AccessKey Secret 'endpoint' = '<endpoint>', --當前Hologres執行個體VPC網路的Endpoint 'jdbcretrycount' = '1', --串連故障時的重試次數 'partitionrouter' = 'true', --是否寫入分區表 'createparttable' = 'true', --是否自動建立分區 'mutatetype' = 'insertorignore' --資料寫入模式 ); INSERT INTO hologres_sink SELECT id ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id ,JSON_VALUE(actor, '$.login') AS actor_login ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id ,JSON_VALUE(repo, '$.name') AS repo_name ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id ,JSON_VALUE(org, '$.login') AS org_login ,type ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at ,JSON_VALUE(payload, '$.action') AS action ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint) END AS iss_or_pr_id ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint) ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint) END AS number ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id ,CASE WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint) WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint) WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint) END AS rev_or_push_or_rel_id ,JSON_VALUE(payload, '$.ref') AS `ref` ,JSON_VALUE(payload, '$.ref_type') AS ref_type ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state') END AS state ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association') WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association') END AS author_association ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language` ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds FROM sls_input WHERE id IS NOT NULL AND created_at IS NOT NULL AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1);
參數說明請參見Log ServiceSLS源表和即時數倉Hologres結果表。
說明由於GitHub原始事件數目據採用的時區為UTC、未經處理資料不帶有時區屬性,Hologres的預設時區為東八區,因此需要在Flink即時寫入Hologres過程中對資料時區進行調整:需要在Flink SQL中對源表資料賦予UTC時區屬性,並在啟動作業時在作業啟動配置頁面的Flink配置地區添加
table.local-time-zone:Asia/Shanghai
語句將Flink系統時區定義為Asia/Shanghai
。查詢資料。
在Hologres中查詢通過Flink寫入Hologres中的SLS資料,後續您可以根據業務需求進行資料開發。
SELECT * FROM public.gh_realtime_data limit 10;
結果樣本如下:
使用離線資料修正即時資料
在本文的情境中,即時資料存在遺漏的可能,因此可以使用離線資料對即時資料進行修正。通過如下步驟可以完成對前一日即時資料的修正,您可以根據自身業務需要,調整資料修正的周期。
在Hologres中建立外部表格,擷取MaxCompute離線資料。
IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to ( <foreign_table_name> ) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');
參數說明請參見IMPORT FOREIGN SCHEMA。
通過建立暫存資料表實現離線資料修正前一日即時資料。
說明Hologres從V2.1.17版本起支援Serverless Computing能力,針對巨量資料量離線匯入、大型ETL作業、外表巨量資料量查詢等情境,使用Serverless Computing執行該類任務可以直接使用額外的Serverless資源,避免使用執行個體自身資源,無需為執行個體預留額外的計算資源,顯著提升執行個體穩定性、減少OOM機率,且僅需為任務單獨付費。Serverless Computing詳情請參見Serverless Computing概述,Serverless Computing使用方法請參見Serverless Computing使用指南。
-- 清理潛在的暫存資料表 DROP TABLE IF EXISTS gh_realtime_data_tmp; -- 建立暫存資料表 SET hg_experimental_enable_create_table_like_properties = ON; CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data'); -- (可選)推薦使用Serverless Computing執行巨量資料量離線匯入和ETL作業 SET hg_computing_resource = 'serverless'; -- 向暫存資料表插入資料並更新統計資料 INSERT INTO gh_realtime_data_tmp SELECT * FROM <foreign_table_name> WHERE ds = current_date - interval '1 day' ON CONFLICT (id, ds) DO NOTHING; ANALYZE gh_realtime_data_tmp; -- 重設配置,保證非必要的SQL不會使用serverless資源。 RESET hg_computing_resource; -- 已有臨時子表替換原子表 BEGIN; DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>'); COMMIT;
資料分析
針對已擷取到的海量資料,可以進行豐富的資料分析。您可以結合自身業務需要分析的時間範圍,對資料倉儲進行進一步分層設計,以滿足即時資料分析、離線資料分析、即時離線一體化分析等多方面訴求。
如下樣本針對上文擷取到的即時資料進行分析,您也可以針對具體代碼倉庫或開發人員進行資料分析。
查詢今日公開事件總數。
SELECT count(*) FROM gh_realtime_data WHERE created_at >= date_trunc('day', now());
返回結果樣本如下:
count ------ 1006
查詢過去1天最活躍(事件數目最多)的幾個專案。
SELECT repo_name, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' GROUP BY repo_name ORDER BY events DESC LIMIT 5;
返回結果樣本如下:
repo_name events ----------------------------------------+------ leo424y/heysiri.ml 29 arm-on/plan 10 Christoffel-T/fiverr-pat-20230331 9 mate-academy/react_dynamic-list-of-goods 9 openvinotoolkit/openvino 7
查詢過去1天最活躍(事件數目最多)的幾位開發人員。
SELECT actor_login, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' AND actor_login NOT LIKE '%[bot]' GROUP BY actor_login ORDER BY events DESC LIMIT 5;
返回結果樣本如下:
actor_login events ------------------+------ direwolf-github 13 arm-on 10 sergii-nosachenko 9 Christoffel-T 9 yangwang201911 7
查詢過去1小時最火程式設計語言排行。
SELECT language, count(*) total FROM gh_realtime_data WHERE created_at > now() - interval '1 hour' AND language IS NOT NULL GROUP BY language ORDER BY total DESC LIMIT 10;
返回結果樣本如下:
language total -----------+---- JavaScript 25 C++ 15 Python 14 TypeScript 13 Java 8 PHP 8
查詢過去1天專案加星數排行。
說明本樣本並未考慮使用者取消星標等情況。
SELECT repo_id, repo_name, COUNT(actor_login) total FROM gh_realtime_data WHERE type = 'WatchEvent' AND created_at > now() - interval '1 day' GROUP BY repo_id, repo_name ORDER BY total DESC LIMIT 10;
返回結果樣本如下:
repo_id repo_name total ---------+----------------------------------+----- 618058471 facebookresearch/segment-anything 4 619959033 nomic-ai/gpt4all 1 97249406 denysdovhan/wtfjs 1 9791525 digininja/DVWA 1 168118422 aylei/interview 1 343520006 joehillen/sysz 1 162279822 agalwood/Motrix 1 577723410 huggingface/swift-coreml-diffusers 1 609539715 e2b-dev/e2b 1 254839429 maniackk/KKCallStack 1
查詢今日使用者和專案日活。
SELECT uniq (actor_id) actor_num, uniq (repo_id) repo_num FROM gh_realtime_data WHERE created_at > date_trunc('day', now());
返回結果樣本如下:
actor_num repo_num ---------+-------- 743 816