This topic describes how to build an offline data warehouse by using MaxCompute and a real-time data warehouse by using Realtime Compute for Apache Flink and Hologres based on real-time event data of GitHub. This topic also describes how to perform real-time and offline data analytics by using Hologres and MaxCompute. This process is called the integration of offline and real-time data processing.
Background information
With the development of digitalization, enterprises have an increasingly strong demand for data timeliness. Except for traditional business that involves massive data processing in offline mode, most business requires real-time processing, real-time storage, and real-time analytics. To meet these requirements, Alibaba Cloud introduces the integration of offline and real-time data processing.
The integration of offline and real-time data processing allows real-time data and offline data to be managed and processed on the same platform. This seamless integration helps improve the efficiency and precision of data analytics. The integration of offline and real-time data processing provides the following advantages:
High data processing efficiency: Both real-time data and offline data are managed and processed on the same platform. This significantly improves data processing efficiency and reduces data transmission and conversion costs.
High data analytics precision: The hybrid analytics of real-time data and offline data delivers higher precision and accuracy.
Low system complexity: Data management and data processing are more simple and efficient.
High data value: Data value is fully explored to facilitate decision-making of enterprises.
In the integrated data warehousing solution of Alibaba Cloud, MaxCompute is used for offline data analytics, Hologres is used for real-time data analytics, and Realtime Compute for Apache Flink is used for real-time data processing.
Architecture
The following figure shows the architecture of the integrated data warehousing solution in which MaxCompute and Hologres are used together to perform real-time and offline processing of data in GitHub public event datasets.
An Elastic Compute Service (ECS) instance collects real-time and offline event data from GitHub as data sources. The real-time data and offline data are then separately processed and written to Hologres. Hologres provides data services to external applications.
Real-time processing: Realtime Compute for Apache Flink performs real-time processing of data in Simple Log Service and writes the processed data to Hologres. Realtime Compute for Apache Flink is a powerful stream computing engine. Hologres supports real-time data writing and updating. Data in Hologres can be queried immediately after the data is written. Realtime Compute for Apache Flink and Hologres are natively integrated to support real-time data warehouses that deliver high throughput, low latency, modeling, and high-quality performance. The real-time data warehouses meet the real-time requirements of business insights in scenarios such as the latest event extraction and hot event analysis.
Offline processing: MaxCompute processes and archives large amounts of offline data. Object Storage Service (OSS) is a cloud storage service provided by Alibaba Cloud. OSS can be used to store various types of data. The raw data referenced in this practice is in JSON format. OSS provides convenient, secure, low-cost, and reliable storage capabilities. MaxCompute is an enterprise-level software as a service (SaaS) data warehouse that is suitable for data analytics. MaxCompute can directly read and parse semi-structured data in OSS by using external tables, store high-value data, and then use DataWorks to develop data and build offline data warehouses.
Hologres seamlessly integrates with MaxCompute at the underlying layer. You can use Hologres to accelerate queries and analytics of large amounts of historical data in MaxCompute to meet your business requirements for low-frequency and high-performance queries of historical data. You can also easily modify real-time data through offline processing to resolve issues such as data loss that may occur in real-time links.
The integrated data warehousing solution provides the following advantages:
Stable and efficient offline processing: Data can be written and updated on an hourly basis. A large amount of data can be processed in batches for complex computing and analytics. This reduces computing costs and improves data processing efficiency.
Mature real-time processing: Real-time writing, real-time event computing, and real-time analytics are supported. Real-time data is queried in seconds.
Unified storage and service: Hologres is used to provide services. Hologres stores data in a centralized manner and provides a unified SQL interface for online analytical processing (OLAP) queries and point queries of key-value pairs.
Integration of real-time data and offline data: Less redundant data exists. Data is seldom migrated and can be corrected.
The one-stop development achieves responses to data queries within seconds, visualizes data processing status, and reduces the required components and dependencies. This significantly decreases the O&M costs and labor costs.
Introduction to business and data
A large number of developers develop open source projects on GitHub and generate a large number of events during the development of the projects. GitHub records the type and details of each event, the developer, and the code repository. GitHub also exposes public events, such as the events generated when you add items to your favorites or submit code. For more information about event types, see Webhook events and payloads.
GitHub publishes public events by using an API. The API only exposes events that occurred 5 minutes ago. For more information, see Events. You can use the API to obtain real-time data.
The GH Archive project summarizes GitHub public events on an hourly basis and allows access from developers. For more information about the GH Archive project, see GH Archive. You can obtain offline data from the GH Archive project.
Introduction to GitHub
GitHub is used for code management and also functions as a platform for communications among developers. GitHub involves three level-1 entity objects: developer, repository, and organization.
In this practice, events
are stored and recorded as entity objects.
Introduction to data of original public events
JSON-formatted sample data of an original event:
{
"id": "19541192931",
"type": "WatchEvent",
"actor":
{
"id": 23286640,
"login": "herekeo",
"display_login": "herekeo",
"gravatar_id": "",
"url": "https://api.github.com/users/herekeo",
"avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
},
"repo":
{
"id": 52760178,
"name": "crazyguitar/pysheeet",
"url": "https://api.github.com/repos/crazyguitar/pysheeet"
},
"payload":
{
"action": "started"
},
"public": true,
"created_at": "2022-01-01T00:03:04Z"
}
In this example, 15 types of public events are involved, excluding events that do not occur or are no longer recorded. For more information about event types and descriptions, see GitHub event types.
Prerequisites
An ECS instance is created and is associated with an elastic IP address (EIP). You can use the ECS instance to obtain real-time event data from GitHub APIs. For more information, see Creation method overview and Associate or disassociate an EIP.
OSS is activated, and the ossutil tool is installed on the ECS instance. You can use the ossutil tool to store JSON-formatted data from GH Archive to OSS. For more information, see Activate OSS and Install ossutil.
MaxCompute is activated, and a MaxCompute project is created. For more information, see Create a MaxCompute project.
DataWorks is activated, and a workspace is created for you to create offline scheduling tasks. For more information, see Create a workspace.
Simple Log Service is activated. A project and a Logstore are created to collect data that is extracted by ECS instances and store the data as logs. For more information, see Getting Started.
Realtime Compute for Apache Flink is activated to write log data collected by Simple Log Service to Hologres in real time. For more information, see Activate Realtime Compute for Apache Flink.
A Hologres instance is purchased. For more information, see Purchase a Hologres instance.
Build an offline data warehouse (updated on an hourly basis)
Download raw data files by using an ECS instance and upload the files to OSS
You can use the ECS instance to download JSON-formatted data files from GH Archive. If you want to download historical data, you can run the wget
command. For example, you can run the wget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gz
command to download data that was archived on an hourly basis from year 2012 to year 2022. If you want to download real-time data that is archived on an hourly basis, perform the following steps to configure a scheduled task that runs at hourly intervals.
Run the following command to create a file named
download_code.sh
:vim download_code.sh
Enter
i
in the file to enter the edit mode and run a script. Sample script:NoteBefore you run the script, you must make sure that the ossutil tool is installed in the ECS instance. For more information, see Install ossutil. In this example, the name of the OSS bucket is
githubevents
.d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H') h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H') url=https://data.gharchive.org/${d}.json.gz echo ${url} wget ${url} -P ./gh_data/ cd gh_data gzip -d ${d}.json echo ${d}.json # Use the ossutil tool to upload data to OSS. cd /root ./ossutil64 mkdir oss://githubevents/hr=${h} ./ossutil64 cp -r /hourlydata/gh_data oss://githubevents/hr=${h} -u echo oss uploaded successfully! rm -rf /hourlydata/gh_data/${d}.json echo ecs deleted!
Press the Esc key to exit the edit mode. Enter
:wq
and press the Enter key to save and close the script.Run the following command to configure a scheduled task in which the
download_code.sh
script is executed at the 10th minute in each hour.crontab -e 10 * * * * cd /hourlydata && sh download_code.sh > download.log
At the 10th minute of each hour, the JSON file that was archived in the previous hour is downloaded and decompressed to the
oss://githubevents
path of OSS. To ensure that MaxCompute reads only the file that was archived in the previous hour, we recommend that you create a partition with the name in the‘hr=%Y-%M-%D-%H’
format for each file. This ensures that MaxCompute reads data from the latest partition.
Import data from OSS to MaxCompute by using an external table
Perform the following steps on the MaxCompute client or an ODPS SQL node in the DataWorks console. For more information, see MaxCompute client (odpscmd) or Create an ODPS SQL node.
Create an external table named
githubevents
for converting JSON files stored in OSS.CREATE EXTERNAL TABLE IF NOT EXISTS githubevents ( col STRING ) PARTITIONED BY ( hr STRING ) STORED AS textfile LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/' ;
For more information about how to create an OSS external table in MaxCompute, see Create an OSS external table.
Create a fact table named
dwd_github_events_odps
for storing data. Sample data definition language (DDL) statement:CREATE TABLE IF NOT EXISTS dwd_github_events_odps ( id BIGINT COMMENT 'The event ID' ,actor_id BIGINT COMMENT 'The ID of the event actor' ,actor_login STRING COMMENT 'The username of the event actor' ,repo_id BIGINT COMMENT 'repoID' ,repo_name STRING COMMENT 'The full repository name in the owner/Repository_name format' ,org_id BIGINT COMMENT 'The ID of the organization to which the repository belongs' ,org_login STRING COMMENT 'The name of the organization to which the repository belongs' ,`type` STRING COMMENT 'The event type' ,created_at DATETIME COMMENT 'The time when the event occurred' ,action STRING COMMENT 'The event action' ,iss_or_pr_id BIGINT COMMENT 'issue/pull_request ID' ,number BIGINT COMMENT 'The sequence number of issue or pull_request request' ,comment_id BIGINT COMMENT 'The comment ID' ,commit_id STRING COMMENT 'The commit ID' ,member_id BIGINT COMMENT 'The member ID' ,rev_or_push_or_rel_id BIGINT COMMENT 'review/push/release ID' ,ref STRING COMMENT 'The name of the resource that is created or deleted' ,ref_type STRING COMMENT 'The type of the resource that is created or deleted' ,state STRING COMMENT 'The status of the issue, pull_request, or pull_request_review request' ,author_association STRING COMMENT 'The relationship between the actor and the repository' ,language STRING COMMENT 'The language that is used to merge request code' ,merged BOOLEAN COMMENT 'Specifies whether merge is allowed' ,merged_at DATETIME COMMENT 'The time when code is merged' ,additions BIGINT COMMENT 'The number of rows added to the code' ,deletions BIGINT COMMENT 'The number of rows deleted from the code' ,changed_files BIGINT COMMENT 'The number of files changed by the pull request' ,push_size BIGINT COMMENT 'The push size' ,push_distinct_size BIGINT COMMENT 'The different push sizes' ,hr STRING COMMENT 'The hour in which the event occurred. For example, if the event occurred at 00:23, the value of this parameter is 00. ,`month` STRING COMMENT 'The month in which the event occurred. For example, if the event occurred in October 2015, the value of this parameter is 2015-10.' ,`year` STRING COMMENT 'The year in which the event occurred. For example, if the event occurred in year 2015, the value of this parameter is 2015.' ) PARTITIONED BY ( ds STRING COMMENT 'The day on which the event occurred. The value of this parameter is in the yyyy-mm-dd format.' ) ;
Parse the JSON-formatted data and write it to the fact table.
Run the following commands to add partitions, parse the JSON-formatted data, and write the parsed data to the
dwd_github_events_odps
table:msck repair table githubevents add partitions; set odps.sql.hive.compatible = true; set odps.sql.split.hive.bridge = true; INSERT into TABLE dwd_github_events_odps PARTITION(ds) SELECT CAST(GET_JSON_OBJECT(col,'$.id') AS BIGINT ) AS id ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id ,GET_JSON_OBJECT(col,'$.org.login') AS org_login ,GET_JSON_OBJECT(col,'$.type') as type ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at ,GET_JSON_OBJECT(col,'$.payload.action') AS action ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT) END AS iss_or_pr_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT) ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT) END AS number ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT) END AS rev_or_push_or_rel_id ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state') END AS state ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association') END AS author_association ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT) AS deletions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT) AS push_size ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT) AS push_distinct_size ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'/','-') as month ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'/','-') as ds from githubevents where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string);
Query data.
Run the following commands to query data from the
dwd_github_events_odps
table:set odps.sql.allow.fullscan=true; select * from dwd_github_events_odps limit 10;
The following figure shows the returned result.
Build a real-time data warehouse
Use an ECS instance to collect real-time data
You can use an ECS instance to extract real-time event data from the GitHub API. This section describes how to collect real-time data from the GitHub API.
In this example, real-time event data that was generated within 1 minute is collected from the GitHub API and stored in the JSON format.
The real-time event data that was collected by executing the script may not be complete.
If you want to continuously collect data from the GitHub API, you must specify the Accept and Authorization parameters. The value of Accept is fixed. You can obtain the value of Authorization from the access token that you applied from GitHub. For more information about how to create an access token, see Creating a personal access token.
Run the following command to create a file named
download_realtime_data.py
.vim download_realtime_data.py
Enter
i
in the file to enter the edit mode. Then, add the following sample code.#!python import requests import json import sys import time # Obtain the URL of the GitHub API. def get_next_link(resp): resp_link = resp.headers['link'] link = '' for l in resp_link.split(', '): link = l.split('; ')[0][1:-1] rel = l.split('; ')[1] if rel == 'rel="next"': return link return None # Collect one page of data from the GitHub API. def download(link, fname): # Define Accept and Authorization for the GitHub API. headers = {"Accept": "application/vnd.github+json"[, "Authorization": "Bearer <github_api_token>"]} resp = requests.get(link, headers=headers) if int(resp.status_code) != 200: return None with open(fname, 'a') as f: for j in resp.json(): f.write(json.dumps(j)) f.write('\n') print('downloaded {} events to {}'.format(len(resp.json()), fname)) return resp # Collect multiple pages of data from the GitHub API. def download_all_data(fname): link = 'https://api.github.com/events?per_page=100&page=1' while True: resp = download(link, fname) if resp is None: break link = get_next_link(resp) if link is None: break # Define the current time. def get_current_ms(): return round(time.time()*1000) # Set the duration of each script execution to 1 minute. def main(fname): current_ms = get_current_ms() while get_current_ms() - current_ms < 60*1000: download_all_data(fname) time.sleep(0.1) # Execute the script. if __name__ == '__main__': if len(sys.argv) < 2: print('usage: python {} <log_file>'.format(sys.argv[0])) exit(0) main(sys.argv[1])
Press the Esc key to exit the edit mode. Enter
:wq
and press the Enter key to save and close the script.Create a file named
run_py.sh
and use the file to execute the script in thedownload_realtime_data.py
file and store the collected data separately based on the execution time. Sample code:python /root/download_realtime_data.py /root/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').json
Create a
delete_log.sh
file that is used to delete historical data. Sample code:d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d') rm -f /root/gh_realtime_data/*${d}*.json
Run the following commands to collect GitHub data every minute and delete historical data every day:
crontab -e * * * * * bash /root/run_py.sh 1 1 * * * bash /root/delete_log.sh
Use Simple Log Service to collect data from the ECS instance
You can use Simple Log Service to collect real-time event data that is extracted by the ECS instance and store the data as logs.
Simple Log Service provides Logtail for you to collect log data from the ECS instance. The sample data in this topic is in the JSON format. You can use the JSON mode of Logtail to quickly collect incremental JSON logs from the ECS instance. For more information, see Collect logs in JSON mode. In this topic, Simple Log Service is used to parse raw data key-value pairs that belong to the top level.
In this example, Logtail logs the collected data in the *.json file in the /root/gh_realtime_data/**
directory.
After the configuration is complete, Simple Log Service continuously collects incremental event data from the ECS instance. The following figure shows an example of the collected data.
Use Realtime Compute for Apache Flink to write data from Simple Log Service to Hologres
You can use Realtime Compute for Apache Flink to write log data collected by Simple Log Service to Hologres in real time. In this process, the Simple Log Service source table and Hologres result table are used in Realtime Compute for Apache Flink. For more information, see Import data from Simple Log Service.
Create a Hologres internal table.
In this example, only some keys of the JSON-formatted raw data are retained in the Hologres internal table. The event ID specified by
id
is configured as the distribution key, the date specified byds
is configured as the partition key, and the event occurrence time specified bycreated_at
is configured as the event_time_column.id
andds
are primary keys. You can create indexes for other fields in the Hologres internal table based on your query requirements. This helps improve query efficiency. For more information about indexes, see CREATE TABLE. Sample DDL statements:DROP TABLE IF EXISTS gh_realtime_data; BEGIN; CREATE TABLE gh_realtime_data ( id bigint, actor_id bigint, actor_login text, repo_id bigint, repo_name text, org_id bigint, org_login text, type text, created_at timestamp with time zone NOT NULL, action text, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id text, member_id bigint, rev_or_push_or_rel_id bigint, ref text, ref_type text, state text, author_association text, language text, merged boolean, merged_at timestamp with time zone, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr text, month text, year text, ds text, PRIMARY KEY (id,ds) ) PARTITION BY LIST (ds); CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id'); CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at'); CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at'); COMMENT ON COLUMN public.gh_realtime_data.id IS 'The event ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_id IS 'The actor ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_login IS 'The username of the event actor'; COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'repoID'; COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'The repository name'; COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'The ID of the organization to which the repository belongs'; COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'The name of the organization to which the repository belongs'; COMMENT ON COLUMN public.gh_realtime_data.type IS 'The event type'; COMMENT ON COLUMN public.gh_realtime_data.created_at IS 'The time when the event occurred.'; COMMENT ON COLUMN public.gh_realtime_data.action IS 'The event action'; COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'issue/pull_request ID'; COMMENT ON COLUMN public.gh_realtime_data.number IS 'The sequence number of issue or pull_request'; COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'The comment ID'; COMMENT ON COLUMN public.gh_realtime_data.commit_id IS 'The commit ID'; COMMENT ON COLUMN public.gh_realtime_data.member_id IS 'The member ID'; COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'review/push/release ID'; COMMENT ON COLUMN public.gh_realtime_data.ref IS 'The name of the resource that is created or deleted'; COMMENT ON COLUMN public.gh_realtime_data.ref_type IS 'The type of the resource that is created or deleted'; COMMENT ON COLUMN public.gh_realtime_data.state IS 'The status of the issue, pull_request, or pull_request_review request'; COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'The relationship between the actor and the repository'; COMMENT ON COLUMN public.gh_realtime_data.language IS 'The programming language'; COMMENT ON COLUMN public.gh_realtime_data.merged IS 'Specifies whether merge is allowed'; COMMENT ON COLUMN public.gh_realtime_data.merged_at IS 'The time when code is merged'; COMMENT ON COLUMN public.gh_realtime_data.additions IS 'The number of rows added to the code'; COMMENT ON COLUMN public.gh_realtime_data.deletions IS 'The number of rows deleted from the code'; COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'The number of files changed by the pull request'; COMMENT ON COLUMN public.gh_realtime_data.push_size IS 'The push size'; COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS 'The different push sizes'; COMMENT ON COLUMN public.gh_realtime_data.hr IS 'The hour in which the event occurred. For example, if the event occurred at 00:23, the value of this parameter is 00.'; COMMENT ON COLUMN public.gh_realtime_data.month IS 'The month in which the event occurred. For example, if the event occurred in October 2015, the value of this parameter is 2015-10.'; COMMENT ON COLUMN public.gh_realtime_data.year IS 'The year in which the event occurred. For example, if the event occurred in year 2015, the value of this parameter is 2015.'; COMMENT ON COLUMN public.gh_realtime_data.ds IS 'The day on which the event occurred. The value of this parameter is in the yyyy-mm-dd format.'; COMMIT;
Write real-time data by using Realtime Compute for Apache Flink.
You can use Realtime Compute for Apache Flink to further parse data from Simple Log Service and write the parsed data to Hologres in real time. Execute the following statements in Realtime Compute for Apache Flink to filter data that you want to write to Hologres. In the data filtering, dirty data such as event IDs and event occurrence time specified by
created_at
is discarded. Only data of events that occurred recently is stored.CREATE TEMPORARY TABLE sls_input ( actor varchar, created_at varchar, id bigint, org varchar, payload varchar, public varchar, repo varchar, type varchar ) WITH ( 'connector' = 'sls', 'endpoint' = '<endpoint>',-- The internal endpoint that is used to access Simple Log Service. 'accesssid'= '<accesskey id>',-- The AccessKey ID of your account. 'accesskey' = '<accesskey secret>',-- The AccessKey secret of your account. 'project' = '<project name>',-- The name of the project of Simple Log Service. 'logstore' = '<logstore name>'-- The name of the Logstore in Simple Log Service. ); CREATE TEMPORARY TABLE hologres_sink ( id bigint, actor_id bigint, actor_login string, repo_id bigint, repo_name string, org_id bigint, org_login string, type string, created_at timestamp, action string, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id string, member_id bigint, rev_or_push_or_rel_id bigint, `ref` string, ref_type string, state string, author_association string, `language` string, merged boolean, merged_at timestamp, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr string, `month` string, `year` string, ds string ) with ( 'connector' = 'hologres', 'dbname' = '<hologres dbname>', -- The name of the Hologres database. 'tablename' = '<hologres tablename>', -- The name of the Hologres table to which you want to write data. 'username' = '<accesskey id>', -- The AccessKey ID of your Alibaba Cloud account. 'password' = '<accesskey secret>', -- The AccessKey secret of your Alibaba Cloud account. 'endpoint' = '<endpoint>', -- The virtual private cloud (VPC) endpoint of your Hologres instance. 'jdbcretrycount' = '1', -- The maximum number of retries allowed if a connection fails. 'partitionrouter' = 'true', -- Specifies whether to write data to a partitioned table. 'createparttable' = 'true', -- Specifies whether to automatically create partitions. 'mutatetype' = 'insertorignore' -- The data writing mode. ); INSERT INTO hologres_sink SELECT id ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id ,JSON_VALUE(actor, '$.login') AS actor_login ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id ,JSON_VALUE(repo, '$.name') AS repo_name ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id ,JSON_VALUE(org, '$.login') AS org_login ,type ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at ,JSON_VALUE(payload, '$.action') AS action ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint) END AS iss_or_pr_id ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint) ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint) END AS number ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id ,CASE WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint) WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint) WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint) END AS rev_or_push_or_rel_id ,JSON_VALUE(payload, '$.ref') AS `ref` ,JSON_VALUE(payload, '$.ref_type') AS ref_type ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state') END AS state ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association') WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association') END AS author_association ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language` ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds FROM sls_input WHERE id IS NOT NULL AND created_at IS NOT NULL AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1) ;
For more information about the parameters, see Create a Log Service source table and Create a Hologres result table.
NoteGitHub records raw event data in the UTC time zone and does not include the time zone property in the data. However, Hologres uses the UTC+8 time zone by default. When you use Realtime Compute for Apache Flink to write real-time data to Hologres, you must convert the time zone for data by performing the following operations: Add the UTC time zone property to data in the source table in Flink SQL. In the Flink Configuration section of the Deployment Starting Configuration page, add the
table.local-time-zone:Asia/Shanghai
statement to set the time zone of Realtime Compute for Apache Flink toAsia/Shanghai
.Query data.
You can query the data that is written from Simple Log Service to Hologres by using Realtime Compute for Apache Flink. You can also develop the data based on your business requirements.
SELECT * FROM public.gh_realtime_data limit 10;
The following figure shows the returned result.
Use offline data to correct real-time data
In scenarios described in this topic, real-time data may not be complete. You can use offline data to correct real-time data. The following section describes how to correct real-time data that was generated on the previous day. You can modify the data correction period based on your business requirements.
Create a foreign table in Hologres to obtain offline data from MaxCompute.
IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to ( <foreign_table_name> ) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');
For more information about the parameters, see IMPORT FOREIGN SCHEMA.
Create a temporary table and use offline data to correct real-time data that was generated on the previous day.
NoteHologres V2.1.17 and later support the Serverless Computing feature. The Serverless Computing feature is suitable for scenarios in which you want to import a large amount of data offline, run large-scale extract, transform, and load (ETL) jobs, or query a large amount of data from foreign tables. You can use the Serverless Computing feature to perform the preceding operations based on additional serverless computing resources. This can eliminate the need to reserve additional computing resources for the instances. This improves instance stability and reduces the occurrences of out of memory (OOM) errors. You are charged only for the additional serverless computing resources used by tasks. For more information about the Serverless Computing feature, see Overview of Serverless Computing. For more information about how to use the Serverless Computing feature, see User guide on Serverless Computing.
-- Drop a temporary table if it exists. DROP TABLE IF EXISTS gh_realtime_data_tmp; -- Create a temporary table. SET hg_experimental_enable_create_table_like_properties = ON; CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data'); -- Optional. We recommend that you use the Serverless Computing feature to import a large amount of data offline and run extract, transform, and load (ETL) jobs. SET hg_computing_resource = 'serverless'; -- Insert data into the temporary table and update statistics. INSERT INTO gh_realtime_data_tmp SELECT * FROM <foreign_table_name> WHERE ds = current_date - interval '1 day' ON CONFLICT (id, ds) DO NOTHING; ANALYZE gh_realtime_data_tmp; -- Reset the configurations. This ensures that serverless computing resources are not used for subsequent SQL statements. RESET hg_computing_resource; -- Replace the original child table with the temporary child table. BEGIN; DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>'); COMMIT;
Data analytics
The large amounts of data can be used for various data analytics. You can define data layers based on the time range required by your business when you design a data warehouse. The data warehouse can meet your requirements on real-time data analytics, offline data analytics, and integration of offline and real-time data processing.
This section describes how to analyze real-time data that is obtained in the preceding sections. You can also perform analytics on data in a specified code repository or perform data analytics as a developer.
Query the total number of public events that occurred on the current day.
SELECT count(*) FROM gh_realtime_data WHERE created_at >= date_trunc('day', now());
The following result is returned:
count ------ 1006
Query the top projects in which the most number of events occurred on the previous day.
SELECT repo_name, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' GROUP BY repo_name ORDER BY events DESC LIMIT 5;
The following result is returned:
repo_name events ----------------------------------------+------ leo424y/heysiri.ml 29 arm-on/plan 10 Christoffel-T/fiverr-pat-20230331 9 mate-academy/react_dynamic-list-of-goods 9 openvinotoolkit/openvino 7
Query the top developers who initiated the most number of events on the previous day.
SELECT actor_login, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' AND actor_login NOT LIKE '%[bot]' GROUP BY actor_login ORDER BY events DESC LIMIT 5;
The following result is returned:
actor_login events ------------------+------ direwolf-github 13 arm-on 10 sergii-nosachenko 9 Christoffel-T 9 yangwang201911 7
Query the most popular programming languages in the past hour.
SELECT language, count(*) total FROM gh_realtime_data WHERE created_at > now() - interval '1 hour' AND language IS NOT NULL GROUP BY language ORDER BY total DESC LIMIT 10;
The following result is returned:
language total -----------+---- JavaScript 25 C++ 15 Python 14 TypeScript 13 Java 8 PHP 8
Rank repositories by the number of times a repository is added to favorites on the previous day in descending order.
NoteIn this example, the number of times a repository is removed from favorites is not calculated.
SELECT repo_id, repo_name, COUNT(actor_login) total FROM gh_realtime_data WHERE type = 'WatchEvent' AND created_at > now() - interval '1 day' GROUP BY repo_id, repo_name ORDER BY total DESC LIMIT 10;
The following result is returned:
repo_id repo_name total ---------+----------------------------------+----- 618058471 facebookresearch/segment-anything 4 619959033 nomic-ai/gpt4all 1 97249406 denysdovhan/wtfjs 1 9791525 digininja/DVWA 1 168118422 aylei/interview 1 343520006 joehillen/sysz 1 162279822 agalwood/Motrix 1 577723410 huggingface/swift-coreml-diffusers 1 609539715 e2b-dev/e2b 1 254839429 maniackk/KKCallStack 1
Query the number of active actors and the number of active repositories involved on the current day.
SELECT uniq (actor_id) actor_num, uniq (repo_id) repo_num FROM gh_realtime_data WHERE created_at > date_trunc('day', now());
The following result is returned:
actor_num repo_num ---------+-------- 743 816