Hologres連接器支援即時消費Hologres,即即時消費Hologres的Binlog資料。本文為您介紹Realtime ComputeFlink版消費Hologres的詳情。
使用限制
Hologres 0.10及以下版本,已存在的表無法修改表屬性開啟Binlog,需要重建立表。Hologres V1.1及以上版本,可以根據業務需要選擇開啟或關閉Binlog能力,同時支援配置TTL滿足不同業務情境對Binlog保留時間的訴求,詳情請參見訂閱Hologres Binlog。
Realtime Compute引擎VVR 8.0.11及以上版本支援消費分區表父表的Binlog。
暫不支援即時消費TIMESTAMP類型的資料,因此建立Hologres表時,請使用TIMESTAMPTZ類型。
預設的Binlog源表不支援數群組類型,僅支援INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)和TIMESTAMPTZ資料類型。
對不支援的資料類型(例如SMALLINT),即使不消費此欄位,仍然可能導致作業無法上線。
Realtime Compute引擎VVR 6.0.3及以上版本新增JDBC模式Binlog源表,VVR 6.0.7版本開始預設通過JDBC模式消費Hologres Binlog。相比原有Holohub模式,支援更完善的資料類型,如SMALLINT,數群組類型等,同時也支援了自訂使用者(非RAM使用者)。詳見下方JDBC模式Binlog源表。
Hologres 2.0及以上版本下線了Holohub模式,全面轉為JDBC模式。如果您的Flink版本小於6.0.7,需要顯式指定sdkMode參數為jdbc,或升級您的Flink版本。
Hologres 1.3.41版本開始,JDBC模式Binlog源表新支援讀取JSONB類型,但需要資料庫層級開啟GUC,開啟GUC的命令如下。
--db層級開啟GUC,僅superuser可以執行,每個db只需要設定一次。 alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
Realtime Compute引擎VVR 8.0.4起,連接器如果發現使用者使用的Hologres執行個體大於2.0版本,會強制使用JDBC模式消費Binlog。推薦Hologres執行個體升級至2.1版本,可以從Holohub模式無縫切換。如果Hologres執行個體是2.0版本,且使用者不是Superuser,使用JDBC模式消費Binlog需要特別進行許可權的配置,否則作業上線時可能拋出“permission denied for database”的異常。需要的許可權包括Database的CREATE許可權,以及執行個體的Replication Role許可權,授權SQL如下。
-- 專家許可權模型下為使用者授予CREATE許可權,以及賦予使用者執行個體的Replication Role許可權 GRANT CREATE ON DATABASE database_name TO <user_name>; alter role <user_name> replication; -- 如果Database開啟了簡單許可權模型(SLMP),無法執行GRANT語句,使用spm_grant為使用者授予DB的Admin許可權,也可以在Holoweb中直接賦權 call spm_grant('{dbname}_admin', '雲帳號id/雲郵箱/RAM帳號'); alter role <user_name> replication;
注意事項
Hologres Binlog以行存的形式記錄了資料的變更前後的整行資料,因此列存表產生Binlog時的反查開銷要大於行存表。對於資料更新頻繁的情境,建議使用行存表來開啟Binlog,否則Binlog產生會成為表寫入時的效能瓶頸,如果這張表同時還用於OLAP等分析查詢,建議使用行列共存的儲存格式。
UPDATE操作會產生兩條Binlog記錄,分別為更新操作前和操作後的資料記錄,因此您會消費到兩條資料。但是,Hologres Binlog功能會保證這兩條記錄是連續的且更新前的Binlog記錄在前,更新後的Binlog記錄在後。
建議Flink作業並發數和Hologres Table的Shard個數保持一致。
您可以在Hologres控制台上,使用以下語句查看Table的Shard數,其中tablename為您的業務表名稱。
select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
如果作業從檢查點恢複過程中,發生
table id parsed from checkpoint is different from the current table id
異常,可以升級到VVR-8.0.9版本啟動作業。這是由於Realtime Compute引擎VVR 8.0.5~VVR 8.0.8版本,Hologres Binlog源表從checkpoint恢複時,會強制檢查hologres表的table id,如果當前表的table id和checkpoint中儲存的不一致,會無法從checkpoint恢複。此異常表示作業運行期間,使用者對源表進行了TRUNCATE或其他重建表操作。考慮到使用者使用情境的複雜性,在VVR 8.0.9取消了對table id的強制檢查,但仍然不推薦對Binlog源表做重建表操作。重建表時原有錶的歷史Binlog會全部清除,Flink使用舊錶的消費位點去消費新表的資料,可能導致資料不一致等不符合預期的情況。
開啟Binlog
即時消費功能預設關閉,因此在Hologres控制台上建立表的DDL時,需要設定binlog.level和binlog.ttl參數,樣本如下。
begin;
CREATE TABLE test_message_src(
id int primary key,
title text not null,
body text
);
call set_table_property('test_message_src', 'orientation', 'row');
call set_table_property('test_message_src', 'clustering_key', 'id');
call set_table_property('test_message_src', 'binlog.level', 'replica'); --自Hologres 1.1版本起,可以在建表後開啟Binlog。
call set_table_property('test_message_src', 'binlog.ttl', '86400');
commit;
其中,binlog.level設定為replica
即代表開啟Binlog,binlog.ttl為Binlog的TTL,單位為秒。
消費模式
該模式下Source消費的Binlog資料是作為普通的Flink資料傳遞給下遊節點的,即所有資料都是INSERT類型的資料,您可以根據業務情況選擇如何處理特定hg_binlog_event_type
類型的資料。源表DDL程式碼範例如下。
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
該模式下Source消費的Binlog資料,將根據hg_binlog_event_type
自動為每行資料設定準確的Flink RowKind類型,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER類型,這樣就能完成表的資料的鏡像同步,類似MySQL或Postgres的CDC功能。源表DDL程式碼範例如下。
CREATE TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
全增量一體源表消費
在源表Join維表時,由於Binlog的TTL等原因,會導致無法使用源表的所有資料。原解決方案是為Binlog表設定一個很大的TTL,但這樣會有以下問題:
歷史Binlog資料會被長時間儲存,導致佔用較多的儲存資源。
因為Binlog包含資料更新記錄,使用Binlog進行全量消費會消費一些不必要的資料,導致佔用較多的計算資源,且無法讓使用者只關注最新的資料。
從VVR 4.0.13及以上版本,Hologres 0.10及以上版本,Hologres Binlog CDC源表支援全增量一體的消費,這種方式會先讀取資料庫的歷史全量資料,並平滑切換到Binlog讀取增量資料。採用這種方式,可以解決上述問題。
適用情境
適用於歷史資料不包含Binlog,但又希望消費所有資料的情境。
僅適用於目標表有主鍵的情境,推薦在CDC模式下使用的全增量Hologres源表。
Hologres1.1版本之後,支援按需開啟Binlog,可以將已有歷史資料的表開啟Binlog。
程式碼範例
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', --先讀取歷史全量資料,再增量消費Binlog。
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
JDBC模式Binlog源表
Realtime Compute引擎VVR 6.0.7版本開始,binlog源表新增JDBC模式(不同於CDC等消費模式,此處的JDBC模式是指底層擷取binlog的SDK基於JDBC)。相比原有Holohub模式,JDBC模式的Binlog源表:
支援更多的資料類型。包括:SMALLINT、INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、int4[]、int8[]、float4[]、float8[]、boolean[]、text[]、JSONB(需要Hologres版本大於1.3.41且開啟相應GUC,詳見本文使用限制)。
支援Hologres的自訂使用者(非RAM使用者)。
使用方式與普通的binlog源表類似,但需要設定sdkMode為jdbc,樣本如下。
create TEMPORARY table test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc', --使用jdbc模式的binlog源表
'jdbcBinlogSlotName'='replication_slot_name' --可選,不設定會自動建立
);
jdbcBinlogSlotName是jdbc模式消費binlog的一個選擇性參數,如果不設定,Hologres連接器會建立預設的slot並使用,預設建立的publication名稱類似publication_for_table_<table_name>_used_by_flink
,預設建立的slot名稱類似slot_for_table_<table_name>_used_by_flink
,在使用中如果發生異常,可以嘗試刪除並重試。預設建立slot需要一定的前提條件,要求使用者為執行個體的Superuser,或者同時擁有Database的CREATE許可權和執行個體的Replication Role許可權。如果沒有許可權導致作業上線失敗,可以嘗試如下操作,或者參考通過JDBC消費Hologres Binlog文檔進行處理。Hologres2.1版本起,JDBC模式消費Binlog不再需要配置slot,因此Hologres連接器從VVR 8.0.5開始,判斷Hologres執行個體為2.1及以上版本,也不再自動建立預設的slot。
-- 專家許可權模型下為使用者授予CREATE許可權,以及賦予使用者執行個體的Replication Role許可權
GRANT CREATE ON DATABASE database_name TO <user_name>;
alter role <user_name> replication;
-- 如果Database開啟了簡單許可權模型(SLMP),無法執行GRANT語句,使用spm_grant為使用者授予DB的Admin許可權,也可以在Holoweb中直接賦權
call spm_grant('{dbname}_admin', '雲帳號id/雲郵箱/RAM帳號');
alter role <user_name> replication;
目前刪除表並重建同名表可能導致作業出現"no table is defined in publication"或者"The table xxx has no slot named xxx"異常,原因是表被刪除時,和表綁定的publication沒有被刪除。當發生此異常時,可以在hologres中執行select * from pg_publication where pubname not in (select pubname from pg_publication_tables);
語句,查詢刪表時未一起被清理的publication,並執行drop publication xx;
語句刪除殘留的publication,之後重新啟動作業即可。或者選擇VVR 8.0.5版本,連接器會自動執行清理操作。
消費分區表Binlog(公測)
分區表是數倉使用過程中一種常用的建表方式,有利於對資料進行歸檔和整理,提高查詢的效率。在即時數倉分層等情境,消費Hologres Binlog作為資料來源表,可以提升資料複用能力,縮短端到端資料加工耗時。Hologres Connector支援JDBC模式消費分區表Binlog,可以通過一個作業持續監聽分區表的資料更改,而不需要啟動多個作業。結合Hologres的動態分區能力,還可以動態地監聽新增的分區,得到與消費非分區表一致的體驗。
注意事項
僅Realtime Compute引擎VVR 8.0.11及以上版本,Binlog源表JDBC模式支援消費分區表。
分區名稱必須嚴格由父表名+底線+分區值組成,即
{parent_table}_{partition_value}
,非此格式的分區可能無法消費到。對於DYNAMIC模式,分區值格式與動態分區的時間單位有關,目前不支援帶-
分隔字元的分區欄位,詳情請參見表名建置規則。如分區時間單位為DAY,則表的時間尾碼格式必須採用YYYYMMDD,例如20241225。Flink中聲明Hologres源表時,必須包含Hologres分區表的分區欄位。
對於DYNAMIC模式,要求分區表必須開啟動態分區管理。並且分區預建立參數
auto_partitioning.num_precreate
必須大於1,否則,在嘗試消費最新分區時,作業將會拋出異常。JDBC模式消費Binlog存在串連數的限制,消費分區表需要使用jdbc_fixed模式,要求Hologres執行個體版本大於等於2.1.27。
WITH參數
消費分區表WITH參數,其他參數請參考即時數倉Hologres。
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
partition-binlog.mode | 消費分區表Binlog模式。 | Enum | 否 | DISABLE | 參數取值如下:
|
partition-binlog-lateness-timeout-minutes | 在DYNAMIC模式下消費分區表,允許延遲的最大逾時時間。 | Boolean | 否 | 60 |
|
partition-values-to-read | 在STATIC模式下消費分區表,指定所需消費的分區,分區值之間使用','進行分隔。 | String | 否 | 無 |
|
消費分區表時,啟動模式binlogStartupMode和從Checkpoint恢複會有如下表現:
消費分區表Binglog模式為DYNAMIC。
啟動模式
說明
啟動模式
說明
binlogStartupMode=earliestOffset(預設值)
從目前存在的最早分區的最早Binlog開始消費。
binlogStartupMode=timestamp
根據設定的startTime找到對應的分區,從這張分區的startTime開始消費。如指定起始時間為2024-09-10 10:00:00,則分區為20240910,此分區從2024-09-10 10:00:00開始消費,之後正常消費20240911等分區。
binlogStartupMode=initial
先全量消費資料,記錄每個分區每個Shard消費的最大Binlog序號,增量消費會選擇最新的兩個分區,嘗試從這兩個分區每個Shard對應的LSN開始增量消費。
從Checkpoint恢複
儲存Checkpoint時,會記錄每個Shard最新兩個分區的最大Binlog序號,恢複時從屬記錄的最大Binlog序號開始增量Binlog消費。
消費分區表Binglog模式為STATIC。
啟動模式
說明
啟動模式
說明
binlogStartupMode=earliestOffset(預設值)
所有分區(或指定分區)從最早的Binlog開始消費。
binlogStartupMode=timestamp
所有分區(或指定分區)從設定的startTime開始消費Binlog。
binlogStartupMode=initial
先全量消費資料,記錄每個分區每個Shard消費的最大Binlog序號,再從此最大Binlog序號開始增量Binlog消費。
從checkpoint恢複
儲存checkpoint時,會記錄每個分區每個shard消費的最大Binlog序號,恢複時從此最大Binlog序號開始增量Binlog消費。
使用樣本
假設Hologres存在如下的DDL分區表,並且已啟用Binlog以及動態分區。
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- 開啟動態分區
auto_partitioning_time_unit = 'DAY', -- 以天為時間單元,自動建立的分區名樣本:test_message_src1_20241027,test_message_src1_20241028
auto_partitioning_num_precreate = '2' -- 會提前建立兩個分區
);
-- 已經存在的分區表,也可以通過ALTER TABLE方式開啟動態分區
在Flink中,使用以下SQL聲明對分區表test_message_src1
進行DYNAMIC模式消費。當新的單位時間到達時,將自動啟動對新分區的讀取。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- hologres分區表的分區欄位
)
with (
'connector' = 'hologres'
,'dbname' = '<yourDatabase>'
,'tablename' = 'test_message_src1' -- 開啟了動態分區的父表
,'username' = '<yourUserName>'
,'password' = '<yourPassword>'
,'endpoint' = '<yourEndpoint>'
,'binlog' = 'true'
,'partition-binlog.mode' = 'DYNAMIC' -- 動態監聽最新的分區
,'binlogstartUpMode' = 'initial' -- 全增量
,'sdkMode' = 'jdbc_fixed' -- 使用此模式,避免串連數限制
);
假設Hologres存在如下的DDL分區表,並且已啟用Binlog。
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
在Flink中,使用以下SQL聲明對分區表test_message_src2
進行STATIC模式消費。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- hologres分區表的分區欄位
)
with (
'connector' = 'hologres'
,'dbname' = '<yourDatabase>'
,'tablename' = 'test_message_src2' -- 分區表
,'username' = '<yourUserName>'
,'password' = '<yourPassword>'
,'endpoint' = '<yourEndpoint>'
,'binlog' = 'true'
,'partition-binlog.mode' = 'STATIC' -- 消費固定的分區
,'partition-values-to-read' = 'red,blue,green' -- 僅消費配置的3個分區,不會消費'black'分區;以後新增分區也不會消費
,'binlogstartUpMode' = 'initial' -- 全增量
,'sdkMode' = 'jdbc_fixed' -- 使用此模式,避免串連數限制
);
Hologres Binlog欄位組成
一條Binlog的欄位由Binlog系統欄位和使用者Table欄位組成,欄位定義如下:
欄位名 | 欄位類型 | 說明 |
欄位名 | 欄位類型 | 說明 |
hg_binlog_lsn | BIGINT | Binlog系統欄位,表示Binlog序號,Shard內部單調遞增但不連續,不同Shard之間不保證唯一和有序。 |
hg_binlog_event_type | BIGINT | Binlog系統欄位,表示目前記錄所表示的修改類型,參數取值如下:
|
hg_binlog_timestamp_us | BIGINT | Binlog系統欄位,系統時間戳,單位為微秒。 |
user_table_column_1 | 使用者定義 | 使用者的表欄位。 |
... | ... | 使用者的表欄位。 |
user_table_column_n | 使用者定義 | 使用者的表欄位。 |
中繼資料列
Realtime Compute引擎VVR 8.0.11及以上版本的Binlog源表支援中繼資料列。從此版本起,建議以中繼資料列的方式聲明hg_binlog_event_type等Binlog欄位。中繼資料列是SQL標準的擴充,通過中繼資料列可以訪問源表的庫名和表名,以及資料的變更類型,產生時間等特定資訊,您可以基於這些資訊自訂處理邏輯,例如過濾變更類型為DELETE的資料等。
欄位名 | 欄位類型 | 說明 |
欄位名 | 欄位類型 | 說明 |
db_name | STRING NOT NULL | 包含該行記錄的庫名。 |
table_name | STRING NOT NULL | 包含該行記錄的表名。 |
hg_binlog_lsn | BIGINT NOT NULL | 該行記錄的Binlog序號,詳情請參見Hologres Binlog欄位組成。 |
hg_binlog_timestamp_us | BIGINT NOT NULL | 該行記錄在資料庫中的變更時間戳記,單位微秒(us)。 |
hg_binlog_event_type | BIGINT NOT NULL | 該行記錄的變更類型。參數取值如下:
|
hg_shard_id | INT NOT NULL | 資料所在資料分區Shard。 Shard基本概念詳情請參見Table Group和Shard。 |
在DDL中,採用<meta_column_name> <datatype> METADATA VIRTUAL
聲明中繼資料列。樣本如下:
CREATE TABLE test_message_src_binlog_table(
-- hg_binlog_lsn BIGINT,
-- hg_binlog_event_type BIGINT,
-- hg_binlog_timestamp_us BIGINT,
hg_binlog_lsn bigint METADATA VIRTUAL
hg_binlog_event_type bigint METADATA VIRTUAL
hg_binlog_timestamp_us bigint METADATA VIRTUAL
hg_shard_id int METADATA VIRTUAL
db_name string METADATA VIRTUAL
table_name string METADATA VIRTUAL
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', --先讀取歷史全量資料,再增量消費Binlog。
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);