Hologres連接器支援即時消費Hologres,即即時消費Hologres的Binlog資料。本文為您介紹Realtime ComputeFlink版消費Hologres的詳情。
使用限制
Hologres 0.10及以下版本,已存在的表無法修改表屬性開啟Binlog,需要重建立表。Hologres V1.1及以上版本,可以根據業務需要選擇開啟或關閉Binlog能力,同時支援配置TTL滿足不同業務情境對Binlog保留時間的訴求,詳情請參見訂閱Hologres Binlog。
不支援開啟分區表父表的Binlog,請使用非分區表。
暫不支援即時消費TIMESTAMP類型的資料,因此建立Hologres表時,請使用TIMESTAMPTZ類型。
預設的Binlog源表不支援數群組類型,僅支援INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)和TIMESTAMPTZ資料類型。
說明對不支援的資料類型(例如SMALLINT),即使不消費此欄位,仍然可能導致作業無法上線。
Realtime Compute引擎VVR 6.0.3及以上版本新增JDBC模式Binlog源表,VVR 6.0.7版本開始預設通過JDBC模式消費Hologres Binlog。相比原有Holohub模式,支援更完善的資料類型,如SMALLINT,數群組類型等,同時也支援了自訂使用者(非RAM使用者)。詳見下方JDBC模式Binlog源表。
Hologres 2.0及以上版本下線了Holohub模式,全面轉為JDBC模式。如果您的Flink版本小於6.0.7,需要顯式指定sdkMode參數為jdbc,或升級您的Flink版本。
Hologres 1.3.41版本開始,JDBC模式Binlog源表新支援讀取JSONB類型,但需要資料庫層級開啟GUC,開啟GUC的命令如下。
--db層級開啟GUC,僅superuser可以執行,每個db只需要設定一次。 alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
Realtime Compute引擎VVR 8.0.4起,連接器如果發現使用者使用的Hologres執行個體大於2.0版本,會強制使用JDBC模式消費Binlog。推薦Hologres執行個體升級至2.1版本,可以從Holohub模式無縫切換。如果Hologres執行個體是2.0版本,且使用者不是Superuser,使用JDBC模式消費Binlog需要特別進行許可權的配置,否則作業上線時可能拋出“permission denied for database”的異常。需要的許可權包括Database的CREATE許可權,以及執行個體的Replication Role許可權,授權SQL如下。
-- 專家許可權模型下為使用者授予CREATE許可權,以及賦予使用者執行個體的Replication Role許可權 GRANT CREATE ON DATABASE database_name TO <user_name>; alter role <user_name> replication; -- 如果Database開啟了簡單許可權模型(SLMP),無法執行GRANT語句,使用spm_grant為使用者授予DB的Admin許可權,也可以在Holoweb中直接賦權 call spm_grant('{dbname}_admin', '雲帳號id/雲郵箱/RAM帳號'); alter role <user_name> replication;
注意事項
Hologres Binlog以行存的形式記錄了資料的變更前後的整行資料,因此列存表產生Binlog時的反查開銷要大於行存表。對於資料更新頻繁的情境,建議使用行存表來開啟Binlog,否則Binlog產生會成為表寫入時的效能瓶頸,如果這張表同時還用於OLAP等分析查詢,建議使用行列共存的儲存格式。
UPDATE操作會產生兩條Binlog記錄,分別為更新操作前和操作後的資料記錄,因此您會消費到兩條資料。但是,Hologres Binlog功能會保證這兩條記錄是連續的且更新前的Binlog記錄在前,更新後的Binlog記錄在後。
建議Flink作業並發數和Hologres Table的Shard個數保持一致。
您可以在Hologres控制台上,使用以下語句查看Table的Shard數,其中tablename為您的業務表名稱。
select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
如果作業從檢查點恢複過程中,發生
table id parsed from checkpoint is different from the current table id
異常,可以升級到VVR-8.0.9版本啟動作業。這是由於Realtime Compute引擎VVR 8.0.5~VVR 8.0.8版本,Hologres Binlog源表從checkpoint恢複時,會強制檢查hologres表的table id,如果當前表的table id和checkpoint中儲存的不一致,會無法從checkpoint恢複。此異常表示作業運行期間,使用者對源表進行了TRUNCATE或其他重建表操作。考慮到使用者使用情境的複雜性,在VVR 8.0.9取消了對table id的強制檢查,但仍然不推薦對Binlog源表做重建表操作。重建表時原有錶的歷史Binlog會全部清除,Flink使用舊錶的消費位點去消費新表的資料,可能導致資料不一致等不符合預期的情況。
開啟Binlog
即時消費功能預設關閉,因此在Hologres控制台上建立表的DDL時,需要設定binlog.level和binlog.ttl參數,樣本如下。
begin;
CREATE TABLE test_message_src(
id int primary key,
title text not null,
body text
);
call set_table_property('test_message_src', 'orientation', 'row');
call set_table_property('test_message_src', 'clustering_key', 'id');
call set_table_property('test_message_src', 'binlog.level', 'replica'); --自Hologres 1.1版本起,可以在建表後開啟Binlog。
call set_table_property('test_message_src', 'binlog.ttl', '86400');
commit;
其中,binlog.level設定為replica
即代表開啟Binlog,binlog.ttl為Binlog的TTL,單位為秒。
消費模式
非CDC模式
該模式下Source消費的Binlog資料是作為普通的Flink資料傳遞給下遊節點的,即所有資料都是INSERT類型的資料,您可以根據業務情況選擇如何處理特定hg_binlog_event_type
類型的資料。源表DDL程式碼範例如下。
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
CDC模式
該模式下Source消費的Binlog資料,將根據hg_binlog_event_type
自動為每行資料設定準確的Flink RowKind類型,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER類型,這樣就能完成表的資料的鏡像同步,類似MySQL或Postgres的CDC功能。源表DDL程式碼範例如下。
CREATE TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
全增量一體源表消費
在源表Join維表時,由於Binlog的TTL等原因,會導致無法使用源表的所有資料。原解決方案是為Binlog表設定一個很大的TTL,但這樣會有以下問題:
歷史Binlog資料會被長時間儲存,導致佔用較多的儲存資源。
因為Binlog包含資料更新記錄,使用Binlog進行全量消費會消費一些不必要的資料,導致佔用較多的計算資源,且無法讓使用者只關注最新的資料。
從VVR 4.0.13及以上版本,Hologres 0.10及以上版本,Hologres Binlog CDC源表支援全增量一體的消費,這種方式會先讀取資料庫的歷史全量資料,並平滑切換到Binlog讀取增量資料。採用這種方式,可以解決上述問題。
適用情境
適用於歷史資料不包含Binlog,但又希望消費所有資料的情境。
僅適用於目標表有主鍵的情境,推薦在CDC模式下使用的全增量Hologres源表。
Hologres1.1版本之後,支援按需開啟Binlog,可以將已有歷史資料的表開啟Binlog。
程式碼範例
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', --先讀取歷史全量資料,再增量消費Binlog。
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
JDBC模式Binlog源表
Realtime Compute引擎VVR 6.0.7版本開始,binlog源表新增JDBC模式(不同於CDC等消費模式,此處的JDBC模式是指底層擷取binlog的SDK基於JDBC)。相比原有Holohub模式,JDBC模式的Binlog源表:
支援更多的資料類型。包括:SMALLINT、INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、int4[]、int8[]、float4[]、float8[]、boolean[]、text[]、JSONB(需要Hologres版本大於1.3.41且開啟相應GUC,詳見本文使用限制)。
支援Hologres的自訂使用者(非RAM使用者)。
使用方式與普通的binlog源表類似,但需要設定sdkMode為jdbc,樣本如下。
create TEMPORARY table test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc', --使用jdbc模式的binlog源表
'jdbcBinlogSlotName'='replication_slot_name' --可選,不設定會自動建立
);
jdbcBinlogSlotName是jdbc模式消費binlog的一個選擇性參數,如果不設定,Hologres連接器會建立預設的slot並使用,預設建立的publication名稱類似publication_for_table_<table_name>_used_by_flink
,預設建立的slot名稱類似slot_for_table_<table_name>_used_by_flink
,在使用中如果發生異常,可以嘗試刪除並重試。預設建立slot需要一定的前提條件,要求使用者為執行個體的Superuser,或者同時擁有Database的CREATE許可權和執行個體的Replication Role許可權。如果沒有許可權導致作業上線失敗,可以嘗試如下操作,或者參考通過JDBC消費Hologres Binlog文檔進行處理。Hologres2.1版本起,JDBC模式消費Binlog不再需要配置slot,因此Hologres連接器從VVR 8.0.5開始,判斷Hologres執行個體為2.1及以上版本,也不再自動建立預設的slot。
-- 專家許可權模型下為使用者授予CREATE許可權,以及賦予使用者執行個體的Replication Role許可權
GRANT CREATE ON DATABASE database_name TO <user_name>;
alter role <user_name> replication;
-- 如果Database開啟了簡單許可權模型(SLMP),無法執行GRANT語句,使用spm_grant為使用者授予DB的Admin許可權,也可以在Holoweb中直接賦權
call spm_grant('{dbname}_admin', '雲帳號id/雲郵箱/RAM帳號');
alter role <user_name> replication;
目前刪除表並重建同名表可能導致作業出現"no table is defined in publication"或者"The table xxx has no slot named xxx"異常,原因是表被刪除時,和表綁定的publication沒有被刪除。當發生此異常時,可以在hologres中執行select * from pg_publication where pubname not in (select pubname from pg_publication_tables);
語句,查詢刪表時未一起被清理的publication,並執行drop publication xx;
語句刪除殘留的publication,之後重新啟動作業即可。或者選擇VVR 8.0.5版本,連接器會自動執行清理操作。
Hologres Binlog實現原理
一條Binlog的欄位由Binlog系統欄位和使用者Table欄位組成,欄位定義如下:
欄位名 | 欄位類型 | 說明 |
hg_binlog_lsn | BIGINT | Binlog系統欄位,表示Binlog序號,Shard內部單調遞增不保證連續,不同Shard之間不保證唯一和有序。 |
hg_binlog_event_type | BIGINT | Binlog系統欄位,表示目前記錄所表示的修改類型,參數取值如下:
|
hg_binlog_timestamp_us | BIGINT | Binlog系統欄位,系統時間戳,單位為微秒。 |
user_table_column_1 | 使用者定義 | 使用者的表欄位。 |
... | ... | 使用者的表欄位。 |
user_table_column_n | 使用者定義 | 使用者的表欄位。 |