全部產品
Search
文件中心

Realtime Compute for Apache Flink:即時數倉Hologres

更新時間:Jul 13, 2024

本文為您介紹如何使用即時數倉Hologres連接器。

背景資訊

即時數倉Hologres是一站式即時資料倉庫引擎,支援海量資料即時寫入、即時更新、即時分析,支援標準SQL(相容PostgreSQL協議),支援PB級資料多維分析(OLAP)與即席分析(Ad Hoc),支援高並發低延遲的線上資料服務(Serving),與MaxCompute、Flink、DataWorks深度融合,提供離線上一體化全棧數倉解決方案。Hologres連接器支援的資訊如下。

類別

詳情

支援類型

源表、維表和結果表

運行模式

流模式和批模式

資料格式

暫不支援

特有監控指標

  • 源表:

    • numRecordsIn

    • numRecordsInPerSecond

  • 結果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    說明

    指標含義詳情,請參見監控指標說明

API種類

Datastream和SQL

是否支援更新或刪除結果表資料

特色功能

源表

功能

詳情

即時消費Hologres

  • 支援作為非Binlog Source讀取Hologres全量資料。

  • 支援即時消費Hologres的Binlog資料。

    • 支援非CDC模式消費。

    • 支援CDC模式消費。

    • 支援全增量一體源表消費。

擷取更多資訊,詳情請參見Flink/Blink即時消費Hologres Binlog

結果表

功能

詳情

流式語義

支援寫入Changelog訊息。

寬表Merge和局部更新功能

只更新修改部分的資料,而非整行更新。

作為CTAS和CDAS的目標端

支援即時同步單表、整庫的資料以及相應的表結構變更到Hologres表中。

插入部分列

說明

僅Realtime Compute引擎VVR 6.0.7及以上版本支援。

支援將Flink INSERT DML中指定的列名下推給連接器,從而僅更新指定的列。

前提條件

已建立Hologres表,詳情請參見建立Hologres表

使用限制

  • 通用:

  • 源表專屬:

  • 結果表專屬:無。

  • 維表專屬:

    • 維表建議使用主鍵作為Join條件,對於此類主鍵點查的維表,建立Hologres表時建議選擇行存模式,列存模式對於點查情境效能開銷較大。選擇行存模式建立維表時必須設定主鍵,並且將主鍵設定為Clustering Key才可以工作。詳情請參見CREATE TABLE

    • 如果業務需要,無法使用主鍵作為Join條件,對於此類非主鍵點查的維表(即一對多的查詢),建立Hologres表時建議選擇列存模式,併合理設定分布鍵Distribution Key以及Event Time Column(Segment Key)以最佳化查詢效能,詳情請參見表格儲存體格式:列存、行存、行列共存

    • VVR 4.0以下版本僅支援對維表主鍵點查的維表Join,VVR 4.0及以上版本,jdbc模式支援維表的非主鍵點查。

注意事項

  • 使用了rpc模式時,VVR版本升級注意事項:

    Hologres 2.0版本下線了rpc(用於維表與結果表)模式,全面轉為jdbc相關模式(目前包括jdbc、jdbc_fixed和jdbc_copy等),rpc模式不會對同一批次中相同主鍵的資料做去重,如果業務情境需要保留完整的資料,切換為jdbc模式後,可以通過設定'jdbcWriteBatchSize'='1'防止去重,或者升級到VVR 8.0.5版本配置deduplication.enabled參數為false。

    如果您作業中存在使用了rpc模式讀寫Hologres的情況,此時如果您需要將VVR 4.x升級到VVR 6.x或VVR 8.x,請按照以下情況進行處理:

    • 升級到VVR 6.0.4~6.0.6版本,可能會拋出異常。推薦維表和結果表使用jdbc_fixed或jdbc模式。

    • 升級到VVR 6.0.7及以上版本,無需您做任何處理,Flink系統會自動替換rpc為jdbc相關模式。

  • 使用binlog源表且未指定jdbc模式時,VVR版本升級注意事項:

    Hologres 2.0版本開始有限支援holohub(用於Binlog源表)模式,Hologres 2.1版本徹底下線了holohub模式,全面轉為jdbc模式。

    如果您作業中存在消費binlog源表的情況,而且binlog源表未通過sdkmode='jdbc'指定jdbc模式,預設會使用holohub模式。此時如果您需要將VVR 4.x升級到VVR 6.x或VVR 8.x,請按照以下情況進行處理:

    • 如果Hologres版本是2.0。

      • 升級到VVR 6.0.7~VVR 8.0.3版本,仍然可以繼續使用holohub模式。

      • 升級到VVR 8.0.4及以上版本,可能拋出許可權不足的異常,需要特別配置使用者權限,詳情見Realtime ComputeFlink版即時消費Hologres

    • 如果Hologres版本是2.1。

      • 升級到VVR 6.0.7~VVR 8.0.4版本,可能無法正常消費Binlog,建議升級到VVR 8.0.5。

      • 升級到VVR 8.0.5及以上版本,無需您做任何處理,Flink系統會自動替換holohub模式為jdbc模式。

文法結構

CREATE TABLE hologres_table (
  name VARCHAR,
  age BIGINT,
  birthday BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'hologres',
  'dbname' = '<yourDBName>',
  'tablename' = '<yourTableName>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint' = '<yourEndpoint>',
  'sdkmode' = 'jdbc'
);

WITH參數

說明

僅FlinkRealtime Compute引擎VVR 4.0.11及以上版本支援所有jdbc開頭的參數。

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值為hologres

    dbname

    資料庫名稱。

    String

    Hologres V2.0版本推出了全新的彈性高可用執行個體形態,將計算資源分解為不同的計算群組(Virtual Warehouse),更好的服務於高可用部署。不同的計算群組使用相同的Endpoint,您可以通過在dbname參數後添加特定的尾碼來指定串連某個計算群組。例如某張維表希望串連特定的計算群組read_warehouse,可以通過'dbname' = 'db_test@read_warehouse' 方式指定。

    說明

    僅JDBC相關模式支援使用計算群組,詳見源表、維表和結果表WITH參數中的sdkMode參數。

    tablename

    表名稱。

    String

    如果Schema不為Public時,則tablename需要填寫為schema.tableName

    username

    使用者名稱,請填寫阿里雲帳號的AccessKey ID。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數和密鑰管理

    password

    密碼,請填寫阿里雲帳號的AccessKey Secret。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數和密鑰管理

    endpoint

    Hologres服務地址。

    String

    詳情請參見訪問網域名稱

    connection.ssl.mode

    是否啟用SSL(Secure Sockets Layer)傳輸加密,以及啟用採用何種模式。

    String

    disable

    參數取值如下:

    • disable(預設值):不啟用傳輸加密。

    • require:啟用SSL,只對資料鏈路加密。

    • verify-ca:啟用SSL,加密資料鏈路,同時使用CA認證驗證Hologres服務端的真實性。

    • verify-full:啟用SSL,加密資料鏈路,使用CA認證驗證Hologres服務端的真實性,同時比對認證內的CN或DNS與串連時配置的Hologres串連地址是否一致。

    說明
    • VVR 8.0.5及以上版本開始支援此參數。

    • Hologres自1.1版本起支援SSL傳輸加密的require模式,2.1版本起新增支援verify-ca和verify-full模式。詳見傳輸加密

    • 當配置為verify-ca或者verify-full時,需要同時配置connection.ssl.root-cert.location參數。

    connection.ssl.root-cert.location

    當傳輸加密模式需要認證時,配置認證的路徑。

    String

    當connection.ssl.mode配置為verify-ca或者verify-full時,需要同時配置CA認證的路徑。認證可以使用Realtime Compute控制台的資源上傳功能上傳至平台,上傳後檔案存放在/flink/usrlib目錄下。例如,需要使用的CA認證檔案名稱為certificate.crt,則上傳後參數取值應該為 '/flink/usrlib/certificate.crt'

    說明

    jdbcRetryCount

    當串連故障時,寫入和查詢的重試次數。

    Integer

    10

    無。

    jdbcRetrySleepInitMs

    每次重試的固定等待時間。

    Long

    1000

    實際重試的等待時間的計算公式為jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs。單位為毫秒。

    jdbcRetrySleepStepMs

    每次重試的累加等待時間。

    Long

    5000

    實際重試的等待時間的計算公式為jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs。單位為毫秒。

    jdbcConnectionMaxIdleMs

    JDBC串連的空閑時間。

    Long

    60000

    超過這個空閑時間,串連就會斷開釋放掉。單位為毫秒。

    jdbcMetaCacheTTL

    本機快取TableSchema資訊的到期時間。

    Long

    60000

    單位為毫秒。

    jdbcMetaAutoRefreshFactor

    如果緩衝的剩餘時間小於觸發時間,則系統會自動重新整理緩衝。

    Integer

    4

    緩衝的剩餘時間計算方法:緩衝的剩餘時間=緩衝的到期時間 - 緩衝已經存活的時間。緩衝自動重新整理後,則從0開始重新計算緩衝的存活時間。

    觸發時間計算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor兩個參數的比值。

    type-mapping.timestamp-converting.legacy

    Flink和Hologres之間是否進行時間類型的相互轉換。

    Boolean

    true

    參數取值如下:

    • true:不進行相互轉換。時區轉換將採用運行環境中的JVM時區。

    • false(推薦):進行相互轉換。時區轉換將使用Flink所配置的時區。

    說明
    • 僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

    • Flink和Hologres的時區詳情,請參見Flink與Hologres時區說明

    • property-version=0時,預設值為true;property-version=1時,預設值為false。

    property-version

    Connector參數版本。

    Integer

    0

    可填的值為0和1,預設值為0。

    說明
    • 僅VVR 8.0.6及以上版本支援配置該參數。

    • 在不同參數版本裡,可用的參數集合和參數的預設值可能不同。如果存在區別,區別詳情會在參數的說明部分描述。

    • 推薦使用參數版本1。

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    field_delimiter

    匯出資料時,不同行之間使用的分隔字元。

    String

    "\u0002"

    無。

    binlog

    是否消費Binlog資料。

    Boolean

    false

    參數取值如下:

    • true:消費Binlog資料。

    • false(預設值):不消費Binlog資料。

    說明

    Realtime Compute引擎VVR 8.0.6及以上版本支援多版本預設值。

    • property-version=0時,預設值為false。

    • property-version=1時,預設值為true。

    sdkMode

    SDK模式。

    String

    holohub

    參數取值如下:

    • holohub(預設值):使用holohub模式的binlog源表。

    • jdbc:使用JDBC模式的binlog源表。

    詳情請參見Binlog Source表

    說明
    • VVR 6.0.3及以下版本:不支援配置該參數。

    • VVR 6.0.4~6.0.6版本:推薦使用預設配置,即holohub。

    • VVR 6.0.7及以上版本:推薦配置為jdbc。

      • Hologres執行個體為2.0以下版本,Flink系統採用您配置的參數值。

      • Hologres執行個體為2.0及以上版本,由於Hologres 2.0版本下線了holohub服務,此時如果您配置了holohub,Flink系統自動切換為jdbc。但是如果您配置為jdbc,Flink系統採用jdbc。

    • VVR 8.0.4及以上版本:

      • Hologres執行個體為2.0版本,Flink系統自動切換為jdbc。可能存在許可權不足等問題,參考Realtime ComputeFlink版即時消費Hologres文檔進行處理。

      • Hologres執行個體為2.1及以上版本,Flink系統自動切換為jdbc。

    jdbcBinlogSlotName

    JDBC模式的binlog源表的Slot名稱。建立方法請參見JDBC模式Binlog源表

    String

    僅在sdkMode配置為jdbc時生效,如果使用者未配置,連接器會預設建立一個Slot來使用。詳見JDBC模式Binlog源表

    說明

    Hologres執行個體2.1版本起,不再需要配置此參數,連接器也不會嘗試自動建立。

    binlogMaxRetryTimes

    讀取Binlog資料出錯後的重試次數。

    Integer

    60

    無。

    binlogRetryIntervalMs

    讀取Binlog資料出錯後的重試時間間隔。

    Long

    2000

    單位為毫秒。

    binlogBatchReadSize

    批量讀取Binlog的資料行數。

    Integer

    100

    無。

    cdcMode

    是否採用CDC模式讀取Binlog資料。

    Boolean

    false

    參數取值如下:

    • true:CDC模式讀取Binlog資料。

    • false(預設值):非CDC模式讀取Binlog資料。

    說明

    Realtime Compute引擎VVR 8.0.6及以上版本支援多版本預設值。

    • property-version=0時,預設值為false。

    • property-version=1時,預設值為true。

    upsertSource

    源表是否使用upsert類型的Changelog。

    Boolean

    false

    僅在CDC模式下生效。參數取值如下:

    • true:僅支援Upsert類型,包括INSERT、DELETE、和UPDATE_AFTER。

    • false(預設值):支援所有類型,包括INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。

    說明

    如果下遊包含回撤運算元(例如使用ROW_NUMBER OVER WINDOW去重),則需要設定upsertSource為true,此時源表會以Upsert方式從Hologres中讀取資料。

    binlogStartupMode

    Binlog資料消費模式。

    String

    earliestOffset

    參數取值如下:

    • initial:先全量消費資料,再讀取Binlog開始增量消費。

    • earliestOffset(預設值):從最早的Binlog開始消費。

    • timestamp:從設定的startTime開始消費Binlog。

    說明

    如果設定了startTime或者在啟動介面選擇了啟動時間,則binlogStartupMode強制使用timestamp模式,其他消費模式不生效,即startTime參數優先順序更高。

    說明
    • 僅Realtime Compute引擎VVR 4.0.13及以上版本,Hologres 0.10及以上版本支援該參數。

    • Realtime Compute引擎VVR 8.0.6及以上版本支援多版本預設值。

    • property-version=0時,預設值為false。

    • property-version=1時,預設值為true。

    startTime

    啟動位點的時間。

    String

    格式為yyyy-MM-dd hh:mm:ss。如果沒有設定該參數,且作業沒有從State恢複,則從最早的Binlog開始消費Hologres資料。

    jdbcScanFetchSize

    掃描時攢批大小。

    Integer

    256

    無。

    jdbcScanTimeoutSeconds

    掃描操作逾時時間。

    Integer

    60

    單位為秒。

    jdbcScanTransactionSessionTimeoutSeconds

    掃描操作所在事務的逾時時間。

    Integer

    600秒(0表示不逾時)

    對應Hologres的GUC參數idle_in_transaction_session_timeout,詳情請參見GUC參數

    說明

    僅Realtime Compute引擎Flink1.13-vvr-4.0.15及以上版本支援該參數。

    enable_filter_push_down

    全量讀取階段是否進行filter下推。

    Boolean

    false

    參數取值如下:

    • false(預設值):不進行filter下推。

    • true:讀取全量資料時,將支援的過濾條件下推到Hologres執行。包括非Binlog Source全量讀取以及Binlog Source使用全增量一體化消費模式時的全量階段。

      重要

      Realtime Compute引擎Flink1.15-vvr-6.0.3到Flink1.15-vvr-6.0.5預設會進行filter下推,但如果作業使用了hologres維表,且寫入的DML中包含有對維表非主鍵欄位的過濾條件時,維表的filter會被錯誤的下推,可能導致維表join出現錯誤結果。因此推薦使用Realtime Compute引擎Flink1.15-vvr-6.0.6及以上版本,並在源表增加此參數來開啟過濾條件下推功能。

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    sdkMode

    SDK模式。

    String

    jdbc

    參數取值如下:

    • jdbc:預設值,表示使用jdbc模式進行寫入。

    • jdbc_copy:是否使用fixed copy方式寫入。

      fixed copy是hologres1.3新增的能力,相比通過insert方法(jdbc模式)進行寫入,fixed copy方式可以更高的吞吐(因為是流模式),更低的資料延時,更低的用戶端記憶體消耗(因為不攢批)。但此模式暫不支援delete資料,也不支援寫入分區父表,不支援ignoreNullWhenUpdate參數。

    • rpc:表示使用rpc模式進行寫入,與useRpcMode參數一致,與jdbc模式的區別在於不佔用串連數,不支援寫入Hologres的Jsonb,RoarinBitmap類型。

    • jdbc_fixed(beta功能):表示使用fixed jdbc方式進行寫入,

      需要Hologres引擎版本大於等於1.3,與jdbc模式的區別在於不佔用串連數,不支援寫入Hologres的Jsonb,RoarinBitmap類型。

    說明
    • VVR 6.0.3以下版本:不支援配置該參數。

    • VVR 6.0.4~VVR 6.0.6版本:推薦配置為jdbc。

    • VVR 6.0.7~VVR 8.0.1版本:推薦配置為jdbc。

      • 如果Hologres執行個體為2.0以下版本,Flink系統採用您配置的參數值。

      • 如果Hologres執行個體為2.0及以上版本,由於Hologres 2.0及以上版本下線了rpc服務,此時如果您將該參數配置為rpc,Flink系統將自動切換該參數值為jdbc_fixed,但是如果您配置為其他值,Flink系統將採用您配置的參數值。

      • rpc模式不會對同一批次中相同主鍵的資料做去重,如果業務情境需要保留完整的資料,切換為jdbc模式後,可以通過設定'jdbcWriteBatchSize'='1'防止去重。

    • VVR 8.0.3及以上版本:推薦配置為jdbc。

      自此版本開始,無論Hologres執行個體版本,都不再支援rpc模式,如果選擇rpc模式,將自動切換該參數值為jdbc_fixed且設定'jdbcWriteBatchSize'='1'防止去重。

    • VVR 8.0.5及以上版本:推薦配置為jdbc。

      如果選擇rpc模式,將自動切換該參數值為jdbc_fixed且設定deduplication.enabled參數為false防止去重。

    bulkload

    是否採用bulkload寫入。

    Boolean

    false

    僅在sdkMode設定為jdbc_copy時生效。bulkload寫入目前僅適用於無主鍵表或者主鍵保證不重複的有主鍵表(主鍵重複會拋出異常),相比預設的jdbc_copy,寫入使用更少的Hologres資源。

    說明

    VVR 8.0.5及以上版本開始支援此參數,同時需要Hologres執行個體2.1及以上版本。

    useRpcMode

    是否通過RPC方式使用Hologres連接器。

    Boolean

    false

    參數取值如下:

    • true:使用RPC方式使用Hologres連接器。

      與sdkMode參數設定為rpc效果相同,通過RPC方式會降低SQL串連數。

    • false(預設值):使用JDBC方式使用Hologres連接器。

      通過JDBC方式會佔用SQL串連,導致JDBC連結數增加。

    說明
    • Hologres 2.0版本下線了rpc模式,推薦使用sdkMode參數來選擇jdbc或者jdbc_fixed模式。

    • Realtime Compute引擎VVR 6.0.7及VVR 8.0.1版本在檢測到Hologres執行個體是2.0及以上版本時,會自動切換rpc模式為jdbc_fixed模式。

    • Realtime Compute引擎VVR 8.0.3及以上版本會自動切換rpc模式為jdbc_fixed模式。

    • rpc模式不會對同一批次中相同主鍵的資料做去重,如果業務情境需要保留完整的資料變化記錄,推薦使用Realtime Compute引擎VVR 8.0.5及以上版本,jdbc模式可以配置deduplication.enabled參數為false不進行去重。

    • property-version=1時,該參數下線。

    mutatetype

    資料寫入模式。

    String

    insertorignore

    詳情請參見流式語義

    說明

    Realtime Compute引擎VVR 8.0.6及以上版本支援多版本預設值。

    • property-version=0時,預設值為insertorignore。

    • property-version=1時,預設值為insertorupdate。

    partitionrouter

    是否寫入分區表。

    Boolean

    false

    無。

    createparttable

    當寫入分區表時,是否根據分區值自動建立不存在的分區表。

    Boolean

    false

    rpc模式下,如果分區值中存在短劃線(-),暫不支援自動建立分區表。

    說明
    • Hologres執行個體1.3.22及以上版本開始支援使用Date類型做分區鍵。Realtime Compute引擎VVR 8.0.3及以上版本,支援使用Date類型做分區鍵時自動建立分區表。

    • 請確保分區值不會出現髒資料,否則會建立錯誤的分區表導致Failover,建議慎用該參數。

    • 當sdk_mode設定為jdbc_copy時,不支援寫入分區父表。

    ignoredelete

    是否忽略撤回訊息。

    Boolean

    true

    說明

    僅在使用流式語義時生效。

    Realtime Compute引擎VVR 8.0.6及以上版本支援多版本預設值。

    • property-version=0時,預設值為true。

    • property-version=1時,預設值為false。

    connectionSize

    單個Flink結果表任務所建立的JDBC串連池大小。

    Integer

    3

    如果作業效能不足,建議您增加串連池大小。串連池大小和資料吞吐成正比。

    jdbcWriteBatchSize

    JDBC模式,Hologres Sink節點資料攢批條數(不是來一條資料處理一條,而是攢一批再處理)的最大值。

    Integer

    256

    單位為資料行數。

    說明

    jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。

    jdbcWriteBatchByteSize

    JDBC模式,Hologres Sink節點資料攢批位元組數(不是來一條資料處理一條,而是攢一批再處理)的最大值。

    Long

    2097152(2*1024*1024)位元組,即2 MB

    說明

    jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。

    jdbcWriteFlushInterval

    JDBC模式,Hologres Sink節點資料攢批寫入Hologres的最長等待時間。

    Long

    10000

    單位為毫秒。

    說明

    jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。

    ignoreNullWhenUpdate

    mutatetype='insertOrUpdate'時,是否忽略更新寫入資料中的Null值。

    Boolean

    false

    取值說明如下:

    • false(預設值):將Null值寫到Hologres結果表裡。

    • true:忽略更新寫入資料中的Null值。

    說明
    • 僅Flink計算引擎VVR 4.0及以上版本支援該參數。

    • 當sdk_mode設定為jdbc_copy時,不支援此參數。

    connectionPoolName

    串連池名稱。同一個TaskManager中,配置相同名稱的串連池的表可以共用串連池。

    String

    取值為非'default'的任一字元串。如果多個表設定相同的串連池,則這些使用相同串連池的表的connectionSize參數也需要相同。

    說明
    • VVR 4.0.12以下版本:不支援配置該參數。

    • VVR 4.0.12~VVR 8.0.3版本:預設不共用,每個表使用自己的串連池。

    • VVR 8.0.4以上版本:同一個作業中endpoint相同的表會預設共用串連池。作業中表數量較多時串連數可能相對不足影響效能,這種情況下推薦為不同的表設定不同的connectionPoolName。

    • 此參數可以按需配置,比如作業中有維表A,B以及結果表C,D,E五張hologres表,可以A表和B表使用'pool1',C表和D表使用'pool2',E表流量較大,單獨使用'pool3'。

    jdbcEnableDefaultForNotNullColumn

    如果將Null值寫入Hologres表中Not Null且無預設值的欄位,是否允許連接器協助填充一個預設值。

    Boolean

    true

    參數取值如下:

    • true(預設值):允許連接器填充預設值並寫入,規則如下。

      • 如果欄位是String類型,則預設寫為空白("")。

      • 如果欄位是Number類型,則預設寫為0。

      • 如果是Date、timestamp或timestamptz時間類型欄位,則預設寫為1970-01-01 00:00:00

    • false:不填充預設值,寫Null到Not Null欄位時,會拋出異常。

    remove-u0000-in-text.enabled

    如果寫入時字串類型包含\u0000非法字元,是否允許連接器協助去除。

    Boolean

    false

    參數取值如下:

    • false(預設值):連接器不對資料進行操作,但碰到髒資料時寫入可能拋出如下異常,ERROR: invalid byte sequence for encoding "UTF8": 0x00

      此時需要在源表提前處理髒資料,或者在SQL中定義髒資料處理邏輯。

    • true:連接器會協助去除字串類型中的\u0000,防止寫入拋出異常。

    partial-insert.enabled

    是否只插入INSERT語句中定義的欄位。

    Boolean

    false

    參數取值如下:

    • false(預設值):無論INSERT語句中聲明了哪些欄位,都會更新結果表DDL中定義的所有欄位,對於未在INSERT語句中聲明的欄位,會被更新為null。

    • true:將INSERT語句中定義的欄位下推給連接器,從而可以只對聲明的欄位進行更新或插入。

    說明
    • 僅Realtime Compute引擎VVR 6.0.7及以上版本支援該參數。

    • 此參數僅在mutatetype參數配置為InsertOrUpdate時生效。

    deduplication.enabled

    jdbc及jdbc_fixed模式寫入攢批過程中,是否進行去重。

    Boolean

    true

    參數取值如下:

    • true(預設值):如果一批資料中有主鍵相同的資料,預設進行去重,只保留最後一條到達的資料。以兩個欄位,其中第一個欄位為主鍵的資料舉例:

      • INSERT (1,'a')INSERT (1,'b')兩條記錄先後到達,去重之後只保留後到達的(1,'b')寫入Hologres結果表中。

      • Hologres結果表中已經存在記錄(1,'a'),此時DELETE (1,'a')INSERT (1,'b')兩條記錄先後到達,只保留後到達的(1,'b')寫入hologres中,表現為直接更新,而不是先刪除再插入。

    • false:在攢批過程中不進行去重,如果發現新到的資料和目前攢批的資料中存在主鍵相同的情況,先將攢批資料寫入,寫入完成之後再繼續寫入新到的資料。

    說明
    • 僅Realtime Compute引擎VVR 8.0.5及以上版本支援該參數。

    • 不允許攢批去重時,極端情況下(例如所有資料的主鍵都相同)寫入會退化為不攢批的單條寫入,對效能有一定影響。

  • 維表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    sdkMode

    SDK模式。

    String

    jdbc

    參數取值如下:

    • jdbc(預設值):表示使用jdbc模式進行查詢,支援主鍵點查和非主鍵的查詢,但是非主鍵的查詢對效能影響較大,查詢較慢。

    • rpc:表示使用rpc模式進行點查,與useRpcMode參數一致,僅支援主鍵點查,即維表的主鍵欄位必須與Flink Join On的欄位完全符合,與jdbc模式的區別在於不佔用串連數,不支援讀取Hologres的Jsonb,RoarinBitmap類型。

    • jdbc_fixed:(beta功能,需要hologres引擎版本大於等於1.3)表示使用fixed jdbc方式進行點查,與jdbc模式的區別在於不佔用串連數,且不支援讀取Hologres的Jsonb,RoarinBitmap類型。僅支援主鍵點查,即維表的主鍵欄位必須與Flink Join On的欄位完全符合。

    說明
    • VVR 6.0.3以下版本:不支援配置該參數。

    • VVR 6.0.4~VVR 6.0.6版本:推薦配置為jdbc。

      在VVR 6.0.6版本,SDK模式選擇為jdbc時,如果維表字串類型的查詢結果中包含null值,可能拋出null 指標異常,此時推薦您使用rpc模式繞過。

    • VVR 6.0.7及VVR 8.0.1:推薦配置為jdbc。

      • 如果Hologres執行個體為2.0以下版本,Flink系統將採用您配置的參數值。

      • 如果Hologres執行個體為2.0及以上版本,由於Hologres 2.0及以上版本下線了rpc服務,此時如果您將該參數配置為了rpc,Flink系統自動將參數值切換為jdbc_fixed。但是如果您配置為其他值,Flink系統將採用您配置的參數值。

    • VVR 8.0.3及以上版本:推薦配置為jdbc。

      • 自此版本開始,無論Hologres執行個體版本,都不再支援rpc模式,如果選擇rpc模式,將自動切換該參數值為jdbc_fixed。

    useRpcMode

    是否通過RPC方式使用Hologres連接器。

    Boolean

    false

    參數取值如下:

    • true:使用RPC方式使用Hologres連接器。與sdkMode參數設定為rpc效果相同。通過RPC方式會降低SQL串連數。

    • false(預設值):使用JDBC方式使用Hologres連接器。

      通過JDBC方式會佔用SQL串連,導致JDBC連結數增加。

    說明
    • Hologres 2.0版本下線了rpc了服務,推薦使用sdkMode參數來選擇jdbc或者jdbc_fixed模式。

    • Realtime Compute引擎VVR 6.0.7及VVR 8.0.1版本在檢測到Hologres執行個體是2.0及以上版本時,會自動切換rpc模式為jdbc_fixed模式。

    • Realtime Compute引擎VVR 8.0.3及以上版本會自動切換rpc模式為jdbc_fixed模式。

    connectionSize

    單個Flink維表任務所建立的JDBC串連池大小。

    Integer

    3

    如果作業效能不足,建議您增加串連池大小。串連池大小和資料吞吐成正比。

    connectionPoolName

    串連池名稱。同一個TaskManager中,配置相同名稱的串連池的表可以共用串連池。

    String

    取值為非'default'的任一字元串。如果多個表設定相同的串連池,則這些使用相同串連池的表的connectionSize參數也需要相同。

    您可以按需配置此參數,例如作業中有維表A,B以及結果表C,D,E五張hologres表,可以A表和B表使用pool1,C表和D表使用pool2,E表流量較大,單獨使用pool3。

    說明
    • VVR 4.0.12以下版本:不支援配置該參數。

    • VVR 4.0.12~VVR 8.0.3版本:預設不共用,每個表使用自己的串連池。

    • VVR 8.0.4以上版本:同一個作業中Endpoint相同的表會預設共用串連池。作業中表數量較多時串連數可能相對不足影響效能,這種情況下推薦為不同的表設定不同的connectionPoolName。

    jdbcReadBatchSize

    點查Hologres維表時,攢批處理的最大條數。

    Integer

    128

    無。

    jdbcReadBatchQueueSize

    維表點查請求緩衝隊列大小。

    Integer

    256

    無。

    jdbcReadTimeoutMs

    維表點查的逾時時間。

    Long

    預設值為0,表示不會逾時

    僅vvr 4.0.15-flink 1.13及以上版本、vvr 6.0.2-flink 1.15及以上版本支援該參數。

    jdbcReadRetryCount

    維表點查逾時時的重試次數。

    Interger

    • VVR 8.0.5以下版本:1

    • VVR 8.0.5及以上版本:10

    • 僅VVR 6.0.3以上版本支援該參數。

    • 本參數與jdbcRetryCount不同,後者是指串連發生異常時的重試次數。

    jdbcScanFetchSize

    在一對多join(即沒有使用完整主鍵)時使用scan介面,scan攢批處理資料的條數。

    Integer

    256

    無。

    jdbcScanTimeoutSeconds

    scan操作的逾時時間。

    Integer

    60

    單位為秒。

    cache

    緩衝策略。

    String

    見備忘列。

    Hologres僅支援None和LRU兩種緩衝策略,取值詳情請參見背景資訊

    說明

    Cache預設值和VVR版本有關:

    • VVR 4.x版本及以上版本,預設值為None。

    • VVR 4.x版本以下版本,預設值為LRU。

    cacheSize

    緩衝大小。

    Integer

    10000

    選擇LRU緩衝策略後,可以設定緩衝大小。單位為條。

    cacheTTLMs

    緩衝更新時間間隔。

    Long

    見備忘列。

    單位為毫秒。cacheTTLMs預設值和cache的配置有關:

    • 如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。

    • 如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。

    cacheEmpty

    是否緩衝join結果為空白的資料。

    Boolean

    true

    • true(預設值):表示緩衝join結果為空白的資料。

    • false:表示不緩衝join結果為空白的資料。

    async

    是否非同步返回資料。

    Boolean

    false

    • true:表示非同步返回資料。

    • false(預設值):表示不進行非同步返回資料。

    說明

    非同步返回資料是無序的。

類型映射

Flink與Hologres的資料類型映射請參見Blink/Flink與Hologres的資料類型映射

使用樣本

源表示例

非Binlog Source表

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' --該參數可選。
  'sdkmode' = 'jdbc'
);

CREATE TEMPORARY TABLE blackhole_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT 
) WITH (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink 
SELECT name, age, birthday
from hologres_source;

Binlog Source表

Hologres連接器支援即時消費Hologres,即即時消費Hologres的Binlog資料。Flink即時消費Hologres詳情請參見Realtime ComputeFlink版即時消費Hologres

結果表示例

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);

INSERT INTO hologres_sink SELECT * from datagen_source;

維表示例

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector' = 'hologres',
   ...
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

特色功能詳解

流式語義

流處理,也稱為流資料或流事件處理,即對一系列無界資料或事件連續處理。執行流資料或流事件處理的系統通常允許您指定一種可靠性模式或處理語義,保證整個系統處理資料的準確性,因為網路或裝置故障等可能會導致資料丟失。

根據Hologres Sink的配置和Hologres表的屬性,流式語義分為以下兩種:

  • Exactly-once(僅一次):即使在發生各種故障的情況下,系統只處理一次資料或事件。

  • At-least-once(至少一次):如果在系統完全處理之前丟失了資料或事件,則從源頭重新傳輸,因此可以多次處理資料或事件。如果第一次重試成功,則不必進行後續重試。

在Hologres結果表中使用流式語義,需要注意以下幾點:

  • 如果Hologres物理表未設定主鍵,則Hologres Sink使用At-least-once語義。

  • 如果Hologres物理表已設定主鍵,則Hologres Sink通過主鍵確保Exactly-once語義。當同主鍵資料出現多次時,您需要設定mutatetype參數確定更新結果表的方式,mutatetype取值如下:

    • insertorignore(預設值):保留首次出現的資料,忽略後續所有資料。

    • insertorreplace:後出現的資料整行替換已有資料。

    • insertorupdate:更新已有資料的部分列。例如一張表有a、b、c和d四個欄位,a是PK(Primary Key),寫入Hologres時唯寫入a和b兩個欄位,在PK重複的情況下,系統只會更新b欄位,c和d保持不變。

    說明
    • mutatetype設定為insertorreplaceinsertorupdate時,系統根據主鍵更新資料。

    • Flink定義的結果表中的資料列數不一定要和Hologres物理表的列數一致,您需要保證缺失的列沒有非空約束,即列值可以為Null,否則會報錯。

  • 預設情況下,Hologres Sink只能向一張表匯入資料。如果匯入資料至分區表的父表,即使匯入成功,也會查詢資料失敗。您可以設定參數partitionRouter為true,開啟自動將資料路由到對應分區表的功能。注意事項如下:

    • tablename參數需要填寫為父表的表名。

    • 如果沒有提前建立分區表,需要在WITH參數中啟用createparttable=true,從而支援自動建立分區表,否則會匯入失敗。

寬表Merge和局部更新功能

在把多個流的資料寫到一張Hologres寬表的情境中,會涉及到寬表Merge和資料的局部更新。下面通過樣本來介紹實現寬表merge的兩種方式。

說明

本情境的兩種實現方式均具有如下限制:

  • 寬表必須有主鍵。

  • 每個資料流的資料都必須包含完整的主鍵欄位。

  • 列存模式的寬表Merge情境在高RPS的情況下,CPU使用率會偏高,建議關閉表中欄位的Dictionary Encoding功能。

方式一(推薦)

說明

僅Realtime Compute引擎VVR 6.0.7及以上版本支援使用此方式。

假設有兩個Flink資料流,一個資料流中包含a、b和c欄位,另一個資料流中包含a、d和e欄位,Hologres寬表WIDE_TABLE包含a、b、c、d和e欄位,其中a欄位為主鍵。具體操作如下:

  1. 使用Flink SQL建立一張Hologres結果表,並聲明a、b、c、d、e五個欄位,映射至寬表WIDE_TABLE。

  2. 結果表的屬性設定:

    • mutatetype設定為insertorupdate,可以根據主鍵更新資料。

    • ignoredelete設定為true,防止回撤訊息產生Delete請求。

  3. 將兩個Flink資料流的資料分別INSERT至對應的結果表中。

    // 已經定義的source1和source2
    CREATE TEMPORARY TABLE hologres_sink ( -- 聲明a,b,c,d,e五個欄位
      a BIGINT, 
      b STRING,
      c STRING,
      d STRING,
      e STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>',  -- hologres寬表,包含a,b,c,d,e五個欄位
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype'='insertorupdate',    -- 根據主鍵更新資料
      'ignoredelete'='true',            -- 忽略回撤訊息產生的Delete請求
      'partial-insert.enabled'='true'   -- 開啟部分列更新參數,支援僅更新INSERT語句中生命的欄位
    );
    
    BEGIN STATEMENT SET;
    INSERT INTO hologres_sink(a,b,c) select * from source1;  -- 聲明只插入a,b,c三個欄位
    INSERT INTO hologres_sink(a,d,e) select * from source2;  -- 聲明只插入a,d,e三個欄位
    END;

方式二

說明

僅Realtime Compute引擎VVR 6.0.6及以下版本支援使用此方式。

假設有兩個Flink資料流,一個資料流中包含a、b和c欄位,另一個資料流中包含a、d和e欄位,Hologres寬表WIDE_TABLE包含a、b、c、d和e欄位,其中a欄位為主鍵。具體操作如下:

  1. 使用Flink SQL建立兩張Hologres結果表,其中一張表只聲明a、b和c欄位,另一張表只聲明a、d和e欄位。這兩張表都映射至寬表WIDE_TABLE。

  2. 兩張結果表的屬性設定:

    • mutatetype設定為insertorupdate,可以根據主鍵更新資料。

    • ignoredelete設定為true,防止回撤訊息產生Delete請求。

  3. 將兩個Flink資料流的資料分別INSERT至對應的結果表中。

    // 已經定義的source1和source2
    
    CREATE TEMPORARY TABLE hologres_sink_1 ( -- 只聲明a,b,c三個欄位
      a BIGINT, 
      b STRING,
      c STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>',  -- hologres寬表,包含a,b,c,d,e五個欄位
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype' = 'insertorupdate',    -- 根據主鍵更新資料
      'ignoredelete' = 'true'             -- 忽略回撤訊息產生的Delete請求
    );
    
    CREATE TEMPORARY TABLE hologres_sink_2 ( -- 只聲明a,d,e三個欄位
      a BIGINT, 
      d STRING,
      e STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>',  -- hologres寬表,包含a,b,c,d,e五個欄位
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype'='insertorupdate',    -- 根據主鍵更新資料
      'ignoredelete'='true'             -- 忽略回撤訊息產生的Delete請求
    );
    
    BEGIN STATEMENT SET;
    INSERT INTO hologres_sink_1 select * from source1;
    INSERT INTO hologres_sink_2 select * from source2;
    END;

作為CTAS和CDAS的目標端

Hologres支援即時同步單表或整庫層級的資料,在同步過程之中如果上遊的表結構發生了變更也會即時同步到Hologres表中。新資料流到Hologres表時,Flink會先觸發Hologres修改相應的表結構,然後再將資料寫入到相應的表中。以上過程全部由Flink自動完成,您無需關心實現細節,詳情請參見CREATE TABLE AS(CTAS)語句資料庫即時入倉快速入門

DataStream API

重要

通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink全託管,DataStream連接器設定方法請參見DataStream連接器使用方法。Maven中央庫中已經放置了Hologres DataStream連接器。VVR 6.0.7請使用其中的1.15-vvr-6.0.7-1版本的依賴。

Hologres源表

VVR提供了RichInputFormat的實作類別HologresBulkreadInputFormat來讀取Hologres表資料。以下為構建Hologres Source讀取表資料的樣本。

VVR 4.0.15

// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
String query = JDBCUtils.getSimpleSelectFromStatement(
  jdbcOptions.getTable(), schema.getFieldNames());

// 構建HologresBulkreadInputFormat讀取表資料。
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
  .print();
env.execute();

VVR 6.0.7

// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");

// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);

HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(new HologresConnectionParam(config), jdbcOptions,  schema, "", -1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
  .print();
env.execute();

Hologres Binlog源表

VVR提供了Source的實作類別HologresBinlogSource來讀取Hologres Binlog資料。以下為構建Hologres Binlog Source的樣本。

VVR 4.0.15

// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);

// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);

// 構建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
  schema,
  config,
  jdbcOptions,
  recordConverter,
  startTimeMs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();

VVR 6.0.7

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 設定或建立預設slotname
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE)
  && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// 構建Binlog Record Converter。
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
  jdbcOptions.getTable(),
  schema,
  new HologresConnectionParam(config),
  cdcMode,
  Collections.emptySet());

// 構建Hologres Binlog Source。
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
  new HologresConnectionParam(config),
  schema,
  config,
  jdbcOptions,
  startTimeMs,
  StartupMode.TIMESTAMP,
  recordConverter,
  "",
  -1);

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
說明
  • 方法buildRecordConverter不在VVR Connector依賴中,是範例程式碼中提供的方法。

  • Hologres Binlog注意事項和實現原理等詳情,請參見Binlog Source表

Hologres結果表

VVR提供了OutputFormatSinkFunction的實作類別HologresSinkFunction來寫入資料。以下為構建Hologres Sink的樣本。

VVR 4.0.15

// 初始化讀取的表的Schema。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .field("b", DataTypes.STRING())
  .build();

// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);

// 構建Hologres Writer,以RowData的方式寫入資料。
AbstractHologresWriter<RowData> hologresWriter =
  buildHologresWriter(schema, config, hologresConnectionParam);

// 構建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
  .addSink(sinkFunction);
env.execute();

VVR 6.0.7&VVR 8.0.1

// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .field("b", DataTypes.STRING())
  .build();

// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");

HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 構建Hologres Writer,以RowData的方式寫入資料。
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
  hologresConnectionParam, schema, HologresTableSchema.get(hologresConnectionParam), new Integer[0]);
// 構建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
  .addSink(sinkFunction);

env.execute();
說明

方法buildHologresWriter不在VVR Connector依賴中,是範例程式碼中提供的方法。

Flink與Hologres時區說明

時間類型

產品

類型

說明

Flink

Flink TIMESTAMP

表示沒有時區資訊的日期和時間,描述年、 月、日、小時、分鐘、秒和小數秒對應的時間戳記。可以通過一個字串來指定,例如1970-01-01 00:00:04.001

Flink TIMESTAMP_LTZ

用於描述時間軸上的絕對時間點,使用long儲存從epoch至今的毫秒數,使用int儲存毫秒中的納秒數。epoch時間是從Java的標準epoch時間開始計算。在計算和可視化時, 每個TIMESTAMP_LTZ類型的資料都使用Session (會話)中配置的時區。可以用於跨時區的計算,因為它是一個基於epoch的絕對時間點(比如上例中的毫秒)代表的就是不同時區的同一個絕對時間點。

相同的TIMESTAMP_LTZ值,在不同的時區可能會反映出不同的本地TIMESTAMP,例如:如果一個TIMESTAMP_LTZ值為2024-03-19T04:00:00Z,在上海時區(UTC+8)的本地時間戳記會表示為2024-03-19T12:00:00,而在格林威治時區(UTC+0)則表示為2024-03-19T04:00:00

Hologres

TIMESTAMP

類似於Flink的TIMESTAMP類型,表示沒有時區資訊的日期和時間。儲存資料時不會改變,即使用戶端的時區發生變化,儲存的值也保持不變。例如:2022-01-01 01:01:01.123456

TIMESTAMP WITH TIME ZONE (TIMESTAMPTZ)

類似於Flink的TIMESTAMP_LTZ類型,它帶有時區資訊。Hologres儲存TIMESTAMPTZ資料時,會將其轉換為UTC時區的值。當查詢資料時,Hologres會根據用戶端的時區配置參數將UTC時區的值轉換為用戶端時區的值。

例如北京(UTC+8)時區的時間戳記2022-02-01 10:33:20.125+08。在Hologres中儲存為TIMESTAMPTZ類型時,其值會是2022-02-01 10:33:20.125+08

Flink讀寫Hologres時間

  • Realtime Compute引擎VVR 8.0.6及以上版本且type-mapping.timestamp-converting.legacy=false時,支援所有時間類型間的相互轉換。

    Flink

    Hologres

    詳情

    TIMESTAMP

    TIMESTAMP

    之間相互轉換是直接的,不涉及時區轉換。因此推薦採用該資料對應。

    TIMESTAMP LTZ

    TIMESTAMP TZ

    TIMESTAMP

    TIMESTAMP TZ

    之間的轉換涉及時區轉換。為了在轉換中保持準確性,需要通過配置項參數table.local-time-zone設定Flink時區,配置項參數設定方法請參見如何配置作業運行參數?

    例如當設定'table.local-time-zone': 'Asia/Shanghai'時,表示Flink時區為上海(+8時區)時,Flink TIMESTAMP類型的資料為2022-01-01 01:01:01.123456,寫入Hologres TIMESTAMP TZ的數值為2022-01-01 01:01:01.123456+8。

    TIMESTAMP LTZ

    TIMESTAMP

  • Realtime Compute引擎VVR 8.0.5及以下版本或VVR 8.0.6及以上版本設定type-mapping.timestamp-converting.legacy=false時,除TIMESTAMP間轉化,其他類型相互轉化可能會出現資料偏差問題。

    Flink

    Hologres

    備忘

    TIMESTAMP

    TIMESTAMP

    之間相互轉換是直接的,不涉及時區轉換。因此推薦採用該資料對應。

    TIMESTAMP LTZ

    TIMESTAMP TZ

    讀寫Hologres資料時都當作無時區時間進行處理,可能會存在資料偏差

    例如,Flink TIMESTAMP_LTZ類型的數值為2024-03-19T04:00:00Z,在上海(+8時區)對應的實際無時區時間為2024-03-19T12:00:00,但是寫入時將2024-03-19T04:00:00當作無時區時間,寫入Hologres TIMESTAMP TZ的數值為2024-03-19T04:00:00+08,數值偏差8小時。

    TIMESTAMP

    TIMESTAMP TZ

    時區轉換預設採用的是運行環境的JVM時區,而不是Flink時區,這與Flink內部計算的時區轉換格式不同。當Flink時區與機器的JVM時區不一致時,會導致資料存在偏差,建議採用Flink時區進行Hologres資料的讀寫。

    TIMESTAMP LTZ

    TIMESTAMP

相關文檔