全部產品
Search
文件中心

Realtime Compute for Apache Flink:Realtime ComputeFlink版即時消費Hologres

更新時間:Sep 14, 2024

Hologres連接器支援即時消費Hologres,即即時消費Hologres的Binlog資料。本文為您介紹Realtime ComputeFlink版消費Hologres的詳情。

使用限制

  • Hologres 0.10及以下版本,已存在的表無法修改表屬性開啟Binlog,需要重建立表。Hologres V1.1及以上版本,可以根據業務需要選擇開啟或關閉Binlog能力,同時支援配置TTL滿足不同業務情境對Binlog保留時間的訴求,詳情請參見訂閱Hologres Binlog

  • 不支援開啟分區表父表的Binlog,請使用非分區表。

  • 暫不支援即時消費TIMESTAMP類型的資料,因此建立Hologres表時,請使用TIMESTAMPTZ類型。

  • 預設的Binlog源表不支援數群組類型,僅支援INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)和TIMESTAMPTZ資料類型。

    說明

    對不支援的資料類型(例如SMALLINT),即使不消費此欄位,仍然可能導致作業無法上線。

  • Realtime Compute引擎VVR 6.0.3及以上版本新增JDBC模式Binlog源表,VVR 6.0.7版本開始預設通過JDBC模式消費Hologres Binlog。相比原有Holohub模式,支援更完善的資料類型,如SMALLINT,數群組類型等,同時也支援了自訂使用者(非RAM使用者)。詳見下方JDBC模式Binlog源表

    Hologres 2.0及以上版本下線了Holohub模式,全面轉為JDBC模式。如果您的Flink版本小於6.0.7,需要顯式指定sdkMode參數為jdbc,或升級您的Flink版本。

  • Hologres 1.3.41版本開始,JDBC模式Binlog源表新支援讀取JSONB類型,但需要資料庫層級開啟GUC,開啟GUC的命令如下。

    --db層級開啟GUC,僅superuser可以執行,每個db只需要設定一次。
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • Realtime Compute引擎VVR 8.0.4起,連接器如果發現使用者使用的Hologres執行個體大於2.0版本,會強制使用JDBC模式消費Binlog。推薦Hologres執行個體升級至2.1版本,可以從Holohub模式無縫切換。如果Hologres執行個體是2.0版本,且使用者不是Superuser,使用JDBC模式消費Binlog需要特別進行許可權的配置,否則作業上線時可能拋出“permission denied for database”的異常。需要的許可權包括Database的CREATE許可權,以及執行個體的Replication Role許可權,授權SQL如下。

    -- 專家許可權模型下為使用者授予CREATE許可權,以及賦予使用者執行個體的Replication Role許可權
    GRANT CREATE ON DATABASE database_name TO <user_name>;
    alter role <user_name> replication;
    
    -- 如果Database開啟了簡單許可權模型(SLMP),無法執行GRANT語句,使用spm_grant為使用者授予DB的Admin許可權,也可以在Holoweb中直接賦權
    call spm_grant('{dbname}_admin', '雲帳號id/雲郵箱/RAM帳號');
    alter role <user_name> replication;

注意事項

  • Hologres Binlog以行存的形式記錄了資料的變更前後的整行資料,因此列存表產生Binlog時的反查開銷要大於行存表。對於資料更新頻繁的情境,建議使用行存表來開啟Binlog,否則Binlog產生會成為表寫入時的效能瓶頸,如果這張表同時還用於OLAP等分析查詢,建議使用行列共存的儲存格式。

  • UPDATE操作會產生兩條Binlog記錄,分別為更新操作前和操作後的資料記錄,因此您會消費到兩條資料。但是,Hologres Binlog功能會保證這兩條記錄是連續的且更新前的Binlog記錄在前,更新後的Binlog記錄在後。

  • 建議Flink作業並發數和Hologres Table的Shard個數保持一致。

    您可以在Hologres控制台上,使用以下語句查看Table的Shard數,其中tablename為您的業務表名稱。

    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • 如果作業從檢查點恢複過程中,發生table id parsed from checkpoint is different from the current table id異常,可以升級到VVR-8.0.9版本啟動作業。這是由於Realtime Compute引擎VVR 8.0.5~VVR 8.0.8版本,Hologres Binlog源表從checkpoint恢複時,會強制檢查hologres表的table id,如果當前表的table id和checkpoint中儲存的不一致,會無法從checkpoint恢複。此異常表示作業運行期間,使用者對源表進行了TRUNCATE或其他重建表操作。考慮到使用者使用情境的複雜性,在VVR 8.0.9取消了對table id的強制檢查,但仍然不推薦對Binlog源表做重建表操作。重建表時原有錶的歷史Binlog會全部清除,Flink使用舊錶的消費位點去消費新表的資料,可能導致資料不一致等不符合預期的情況。

開啟Binlog

即時消費功能預設關閉,因此在Hologres控制台上建立表的DDL時,需要設定binlog.levelbinlog.ttl參數,樣本如下。

begin;
CREATE TABLE test_message_src(
 id int primary key, 
 title text not null, 
 body text
);
call set_table_property('test_message_src', 'orientation', 'row');
call set_table_property('test_message_src', 'clustering_key', 'id');
call set_table_property('test_message_src', 'binlog.level', 'replica'); --自Hologres 1.1版本起,可以在建表後開啟Binlog。
call set_table_property('test_message_src', 'binlog.ttl', '86400'); 
commit;

其中,binlog.level設定為replica即代表開啟Binlog,binlog.ttl為Binlog的TTL,單位為秒。

消費模式

非CDC模式

該模式下Source消費的Binlog資料是作為普通的Flink資料傳遞給下遊節點的,即所有資料都是INSERT類型的資料,您可以根據業務情況選擇如何處理特定hg_binlog_event_type類型的資料。源表DDL程式碼範例如下。

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

CDC模式

該模式下Source消費的Binlog資料,將根據hg_binlog_event_type自動為每行資料設定準確的Flink RowKind類型,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER類型,這樣就能完成表的資料的鏡像同步,類似MySQL或Postgres的CDC功能。源表DDL程式碼範例如下。

CREATE TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

全增量一體源表消費

在源表Join維表時,由於Binlog的TTL等原因,會導致無法使用源表的所有資料。原解決方案是為Binlog表設定一個很大的TTL,但這樣會有以下問題:

  • 歷史Binlog資料會被長時間儲存,導致佔用較多的儲存資源。

  • 因為Binlog包含資料更新記錄,使用Binlog進行全量消費會消費一些不必要的資料,導致佔用較多的計算資源,且無法讓使用者只關注最新的資料。

從VVR 4.0.13及以上版本,Hologres 0.10及以上版本,Hologres Binlog CDC源表支援全增量一體的消費,這種方式會先讀取資料庫的歷史全量資料,並平滑切換到Binlog讀取增量資料。採用這種方式,可以解決上述問題。

適用情境

  • 適用於歷史資料不包含Binlog,但又希望消費所有資料的情境。

  • 僅適用於目標表有主鍵的情境,推薦在CDC模式下使用的全增量Hologres源表。

  • Hologres1.1版本之後,支援按需開啟Binlog,可以將已有歷史資料的表開啟Binlog。

程式碼範例

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', --先讀取歷史全量資料,再增量消費Binlog。
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
  );

JDBC模式Binlog源表

Realtime Compute引擎VVR 6.0.7版本開始,binlog源表新增JDBC模式(不同於CDC等消費模式,此處的JDBC模式是指底層擷取binlog的SDK基於JDBC)。相比原有Holohub模式,JDBC模式的Binlog源表:

  • 支援更多的資料類型。包括:SMALLINT、INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、int4[]、int8[]、float4[]、float8[]、boolean[]、text[]、JSONB(需要Hologres版本大於1.3.41且開啟相應GUC,詳見本文使用限制)。

  • 支援Hologres的自訂使用者(非RAM使用者)。

使用方式與普通的binlog源表類似,但需要設定sdkMode為jdbc,樣本如下。

create TEMPORARY table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc', --使用jdbc模式的binlog源表
  'jdbcBinlogSlotName'='replication_slot_name' --可選,不設定會自動建立
);

jdbcBinlogSlotName是jdbc模式消費binlog的一個選擇性參數,如果不設定,Hologres連接器會建立預設的slot並使用,預設建立的publication名稱類似publication_for_table_<table_name>_used_by_flink,預設建立的slot名稱類似slot_for_table_<table_name>_used_by_flink,在使用中如果發生異常,可以嘗試刪除並重試。預設建立slot需要一定的前提條件,要求使用者為執行個體的Superuser,或者同時擁有Database的CREATE許可權和執行個體的Replication Role許可權。如果沒有許可權導致作業上線失敗,可以嘗試如下操作,或者參考通過JDBC消費Hologres Binlog文檔進行處理。Hologres2.1版本起,JDBC模式消費Binlog不再需要配置slot,因此Hologres連接器從VVR 8.0.5開始,判斷Hologres執行個體為2.1及以上版本,也不再自動建立預設的slot。

-- 專家許可權模型下為使用者授予CREATE許可權,以及賦予使用者執行個體的Replication Role許可權
GRANT CREATE ON DATABASE database_name TO <user_name>;
alter role <user_name> replication;

-- 如果Database開啟了簡單許可權模型(SLMP),無法執行GRANT語句,使用spm_grant為使用者授予DB的Admin許可權,也可以在Holoweb中直接賦權
call spm_grant('{dbname}_admin', '雲帳號id/雲郵箱/RAM帳號');
alter role <user_name> replication;

說明

目前刪除表並重建同名表可能導致作業出現"no table is defined in publication"或者"The table xxx has no slot named xxx"異常,原因是表被刪除時,和表綁定的publication沒有被刪除。當發生此異常時,可以在hologres中執行select * from pg_publication where pubname not in (select pubname from pg_publication_tables);語句,查詢刪表時未一起被清理的publication,並執行drop publication xx;語句刪除殘留的publication,之後重新啟動作業即可。或者選擇VVR 8.0.5版本,連接器會自動執行清理操作。

Hologres Binlog實現原理

一條Binlog的欄位由Binlog系統欄位和使用者Table欄位組成,欄位定義如下:

欄位名

欄位類型

說明

hg_binlog_lsn

BIGINT

Binlog系統欄位,表示Binlog序號,Shard內部單調遞增不保證連續,不同Shard之間不保證唯一和有序。

hg_binlog_event_type

BIGINT

Binlog系統欄位,表示目前記錄所表示的修改類型,參數取值如下:

  • INSERT=5:表示當前Binlog為插入一條新的記錄。

  • DELETE=2:表示當前Binlog為刪除一條已有的記錄。

  • BEFORE_UPDATE=3:表示當前Binlog為更新操作前的記錄。

  • AFTER_UPDATE=7:表示當前Binlog為更新操作後的記錄。

hg_binlog_timestamp_us

BIGINT

Binlog系統欄位,系統時間戳,單位為微秒。

user_table_column_1

使用者定義

使用者的表欄位。

...

...

使用者的表欄位。

user_table_column_n

使用者定義

使用者的表欄位。