本文為您介紹如何使用即時數倉Hologres連接器。
背景資訊
即時數倉Hologres是一站式即時資料倉庫引擎,支援海量資料即時寫入、即時更新、即時分析,支援標準SQL(相容PostgreSQL協議),支援PB級資料多維分析(OLAP)與即席分析(Ad Hoc),支援高並發低延遲的線上資料服務(Serving),與MaxCompute、Flink、DataWorks深度融合,提供離線上一體化全棧數倉解決方案。Hologres連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表、維表和結果表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不支援 |
特有監控指標 |
|
API種類 | Datastream和SQL |
是否支援更新或刪除結果表資料 | 是 |
特色功能
源表
功能 | 詳情 |
即時消費Hologres |
擷取更多資訊,詳情請參見Flink/Blink即時消費Hologres Binlog。 |
結果表
功能 | 詳情 |
支援寫入Changelog訊息。 | |
只更新修改部分的資料,而非整行更新。 | |
支援即時同步單表、整庫的資料以及相應的表結構變更到Hologres表中。 | |
插入部分列 說明 僅Realtime Compute引擎VVR 6.0.7及以上版本支援。 | 支援將Flink INSERT DML中指定的列名下推給連接器,從而僅更新指定的列。 |
前提條件
已建立Hologres表,詳情請參見建立Hologres表。
使用限制
通用:
僅Flink計算引擎VVR 2.0.0及以上版本支援Hologres連接器。
Hologres連接器不支援訪問Hologres外部表格。關於Hologres外部表格詳情請參見建立Hologres外部表格(映射到MaxCompute)。
連接器目前的已知缺陷以及版本功能發布記錄詳見Hologres Connector Release Note。
源表專屬:
Flink預設以批模式讀取Hologres源表資料,即只掃描一次Hologres全表,掃描結束,消費結束,新到Hologres源表的資料不會被讀取。從VVR 3.0.0版本開始,支援即時消費Hologres資料,詳情請參見Realtime ComputeFlink版即時消費Hologres。從VVR 6.0.3版本開始,Hologres源表在批模式讀取時支援filter下推,詳見源表專屬參數
enable_filter_push_down
。Hologres CDC模式暫不支援定義Watermark。如果您需要進行視窗彙總,您可以採用非視窗彙總的方式,詳情請參見MySQL/Hologres CDC源表不支援視窗函數,如何?類似每分鐘彙總統計的需求?。
結果表專屬:無。
維表專屬:
維表建議使用主鍵作為Join條件,對於此類主鍵點查的維表,建立Hologres表時建議選擇行存模式,列存模式對於點查情境效能開銷較大。選擇行存模式建立維表時必須設定主鍵,並且將主鍵設定為Clustering Key才可以工作。詳情請參見CREATE TABLE。
如果業務需要,無法使用主鍵作為Join條件,對於此類非主鍵點查的維表(即一對多的查詢),建立Hologres表時建議選擇列存模式,併合理設定分布鍵Distribution Key以及Event Time Column(Segment Key)以最佳化查詢效能,詳情請參見表格儲存體格式:列存、行存、行列共存。
VVR 4.0以下版本僅支援對維表主鍵點查的維表Join,VVR 4.0及以上版本,jdbc模式支援維表的非主鍵點查。
注意事項
使用了rpc模式時,VVR版本升級注意事項:
Hologres 2.0版本下線了rpc(用於維表與結果表)模式,全面轉為jdbc相關模式(目前包括jdbc、jdbc_fixed和jdbc_copy等),rpc模式不會對同一批次中相同主鍵的資料做去重,如果業務情境需要保留完整的資料,切換為jdbc模式後,可以通過設定'jdbcWriteBatchSize'='1'防止去重,或者升級到VVR 8.0.5版本配置deduplication.enabled參數為false。
如果您作業中存在使用了rpc模式讀寫Hologres的情況,此時如果您需要將VVR 4.x升級到VVR 6.x或VVR 8.x,請按照以下情況進行處理:
升級到VVR 6.0.4~6.0.6版本,可能會拋出異常。推薦維表和結果表使用jdbc_fixed或jdbc模式。
升級到VVR 6.0.7及以上版本,無需您做任何處理,Flink系統會自動替換rpc為jdbc相關模式。
使用binlog源表且未指定jdbc模式時,VVR版本升級注意事項:
Hologres 2.0版本開始有限支援holohub(用於Binlog源表)模式,Hologres 2.1版本徹底下線了holohub模式,全面轉為jdbc模式。
如果您作業中存在消費binlog源表的情況,而且binlog源表未通過sdkmode='jdbc'指定jdbc模式,預設會使用holohub模式。此時如果您需要將VVR 4.x升級到VVR 6.x或VVR 8.x,請按照以下情況進行處理:
如果Hologres版本是2.0。
升級到VVR 6.0.7~VVR 8.0.3版本,仍然可以繼續使用holohub模式。
升級到VVR 8.0.4及以上版本,可能拋出許可權不足的異常,需要特別配置使用者權限,詳情見Realtime ComputeFlink版即時消費Hologres。
如果Hologres版本是2.1。
升級到VVR 6.0.7~VVR 8.0.4版本,可能無法正常消費Binlog,建議升級到VVR 8.0.5。
升級到VVR 8.0.5及以上版本,無需您做任何處理,Flink系統會自動替換holohub模式為jdbc模式。
文法結構
CREATE TABLE hologres_table (
name VARCHAR,
age BIGINT,
birthday BIGINT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector' = 'hologres',
'dbname' = '<yourDBName>',
'tablename' = '<yourTableName>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint' = '<yourEndpoint>',
'sdkmode' = 'jdbc'
);
WITH參數
僅FlinkRealtime Compute引擎VVR 4.0.11及以上版本支援所有jdbc開頭的參數。
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為
hologres
。dbname
資料庫名稱。
String
是
無
Hologres V2.0版本推出了全新的彈性高可用執行個體形態,將計算資源分解為不同的計算群組(Virtual Warehouse),更好的服務於高可用部署。不同的計算群組使用相同的Endpoint,您可以通過在dbname參數後添加特定的尾碼來指定串連某個計算群組。例如某張維表希望串連特定的計算群組read_warehouse,可以通過
'dbname' = 'db_test@read_warehouse'
方式指定。說明僅JDBC相關模式支援使用計算群組,詳見源表、維表和結果表WITH參數中的sdkMode參數。
tablename
表名稱。
String
是
無
如果Schema不為Public時,則tablename需要填寫為
schema.tableName
。username
使用者名稱,請填寫阿里雲帳號的AccessKey ID。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數和密鑰管理。
password
密碼,請填寫阿里雲帳號的AccessKey Secret。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?
重要為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數和密鑰管理。
endpoint
Hologres服務地址。
String
是
無
詳情請參見訪問網域名稱。
connection.ssl.mode
是否啟用SSL(Secure Sockets Layer)傳輸加密,以及啟用採用何種模式。
String
否
disable
參數取值如下:
disable(預設值):不啟用傳輸加密。
require:啟用SSL,只對資料鏈路加密。
verify-ca:啟用SSL,加密資料鏈路,同時使用CA認證驗證Hologres服務端的真實性。
verify-full:啟用SSL,加密資料鏈路,使用CA認證驗證Hologres服務端的真實性,同時比對認證內的CN或DNS與串連時配置的Hologres串連地址是否一致。
說明VVR 8.0.5及以上版本開始支援此參數。
Hologres自1.1版本起支援SSL傳輸加密的require模式,2.1版本起新增支援verify-ca和verify-full模式。詳見傳輸加密。
當配置為verify-ca或者verify-full時,需要同時配置connection.ssl.root-cert.location參數。
connection.ssl.root-cert.location
當傳輸加密模式需要認證時,配置認證的路徑。
String
否
無
當connection.ssl.mode配置為verify-ca或者verify-full時,需要同時配置CA認證的路徑。認證可以使用Realtime Compute控制台的資源上傳功能上傳至平台,上傳後檔案存放在/flink/usrlib目錄下。例如,需要使用的CA認證檔案名稱為certificate.crt,則上傳後參數取值應該為
'/flink/usrlib/certificate.crt'
。說明VVR 8.0.5及以上版本開始支援此參數。
CA認證擷取方式見傳輸加密-下載CA認證。
jdbcRetryCount
當串連故障時,寫入和查詢的重試次數。
Integer
否
10
無。
jdbcRetrySleepInitMs
每次重試的固定等待時間。
Long
否
1000
實際重試的等待時間的計算公式為
jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs
。單位為毫秒。jdbcRetrySleepStepMs
每次重試的累加等待時間。
Long
否
5000
實際重試的等待時間的計算公式為
jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs
。單位為毫秒。jdbcConnectionMaxIdleMs
JDBC串連的空閑時間。
Long
否
60000
超過這個空閑時間,串連就會斷開釋放掉。單位為毫秒。
jdbcMetaCacheTTL
本機快取TableSchema資訊的到期時間。
Long
否
60000
單位為毫秒。
jdbcMetaAutoRefreshFactor
如果緩衝的剩餘時間小於觸發時間,則系統會自動重新整理緩衝。
Integer
否
4
緩衝的剩餘時間計算方法:緩衝的剩餘時間=緩衝的到期時間 - 緩衝已經存活的時間。緩衝自動重新整理後,則從0開始重新計算緩衝的存活時間。
觸發時間計算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor兩個參數的比值。
type-mapping.timestamp-converting.legacy
Flink和Hologres之間是否進行時間類型的相互轉換。
Boolean
否
true
參數取值如下:
true:不進行相互轉換。時區轉換將採用運行環境中的JVM時區。
false(推薦):進行相互轉換。時區轉換將使用Flink所配置的時區。
說明僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。
Flink和Hologres的時區詳情,請參見Flink與Hologres時區說明。
property-version=0時,預設值為true;property-version=1時,預設值為false。
property-version
Connector參數版本。
Integer
否
0
可填的值為0和1,預設值為0。
說明僅VVR 8.0.6及以上版本支援配置該參數。
在不同參數版本裡,可用的參數集合和參數的預設值可能不同。如果存在區別,區別詳情會在參數的說明部分描述。
推薦使用參數版本1。
源表專屬
參數
說明
資料類型
是否必填
預設值
備忘
field_delimiter
匯出資料時,不同行之間使用的分隔字元。
String
否
"\u0002"
無。
binlog
是否消費Binlog資料。
Boolean
否
false
參數取值如下:
true:消費Binlog資料。
false(預設值):不消費Binlog資料。
說明Realtime Compute引擎VVR 8.0.6及以上版本支援多版本預設值。
property-version=0時,預設值為false。
property-version=1時,預設值為true。
sdkMode
SDK模式。
String
否
holohub
參數取值如下:
holohub(預設值):使用holohub模式的binlog源表。
jdbc:使用JDBC模式的binlog源表。
詳情請參見Binlog Source表。
說明VVR 6.0.3及以下版本:不支援配置該參數。
VVR 6.0.4~6.0.6版本:推薦使用預設配置,即holohub。
VVR 6.0.7及以上版本:推薦配置為jdbc。
Hologres執行個體為2.0以下版本,Flink系統採用您配置的參數值。
Hologres執行個體為2.0及以上版本,由於Hologres 2.0版本下線了holohub服務,此時如果您配置了holohub,Flink系統自動切換為jdbc。但是如果您配置為jdbc,Flink系統採用jdbc。
VVR 8.0.4及以上版本:
Hologres執行個體為2.0版本,Flink系統自動切換為jdbc。可能存在許可權不足等問題,參考Realtime ComputeFlink版即時消費Hologres文檔進行處理。
Hologres執行個體為2.1及以上版本,Flink系統自動切換為jdbc。
jdbcBinlogSlotName
JDBC模式的binlog源表的Slot名稱。建立方法請參見JDBC模式Binlog源表。
String
否
無
僅在sdkMode配置為jdbc時生效,如果使用者未配置,連接器會預設建立一個Slot來使用。詳見JDBC模式Binlog源表。
說明Hologres執行個體2.1版本起,不再需要配置此參數,連接器也不會嘗試自動建立。
binlogMaxRetryTimes
讀取Binlog資料出錯後的重試次數。
Integer
否
60
無。
binlogRetryIntervalMs
讀取Binlog資料出錯後的重試時間間隔。
Long
否
2000
單位為毫秒。
binlogBatchReadSize
批量讀取Binlog的資料行數。
Integer
否
100
無。
cdcMode
是否採用CDC模式讀取Binlog資料。
Boolean
否
false
參數取值如下:
true:CDC模式讀取Binlog資料。
false(預設值):非CDC模式讀取Binlog資料。
說明Realtime Compute引擎VVR 8.0.6及以上版本支援多版本預設值。
property-version=0時,預設值為false。
property-version=1時,預設值為true。
upsertSource
源表是否使用upsert類型的Changelog。
Boolean
否
false
僅在CDC模式下生效。參數取值如下:
true:僅支援Upsert類型,包括INSERT、DELETE、和UPDATE_AFTER。
false(預設值):支援所有類型,包括INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。
說明如果下遊包含回撤運算元(例如使用ROW_NUMBER OVER WINDOW去重),則需要設定upsertSource為true,此時源表會以Upsert方式從Hologres中讀取資料。
binlogStartupMode
Binlog資料消費模式。
String
否
earliestOffset
參數取值如下:
initial:先全量消費資料,再讀取Binlog開始增量消費。
earliestOffset(預設值):從最早的Binlog開始消費。
timestamp:從設定的startTime開始消費Binlog。
說明如果設定了startTime或者在啟動介面選擇了啟動時間,則binlogStartupMode強制使用timestamp模式,其他消費模式不生效,即startTime參數優先順序更高。
說明僅Realtime Compute引擎VVR 4.0.13及以上版本,Hologres 0.10及以上版本支援該參數。
Realtime Compute引擎VVR 8.0.6及以上版本支援多版本預設值。
property-version=0時,預設值為false。
property-version=1時,預設值為true。
startTime
啟動位點的時間。
String
否
無
格式為yyyy-MM-dd hh:mm:ss。如果沒有設定該參數,且作業沒有從State恢複,則從最早的Binlog開始消費Hologres資料。
jdbcScanFetchSize
掃描時攢批大小。
Integer
否
256
無。
jdbcScanTimeoutSeconds
掃描操作逾時時間。
Integer
否
60
單位為秒。
jdbcScanTransactionSessionTimeoutSeconds
掃描操作所在事務的逾時時間。
Integer
否
600秒(0表示不逾時)
對應Hologres的GUC參數idle_in_transaction_session_timeout,詳情請參見GUC參數。
說明僅Realtime Compute引擎Flink1.13-vvr-4.0.15及以上版本支援該參數。
enable_filter_push_down
全量讀取階段是否進行filter下推。
Boolean
否
false
參數取值如下:
false(預設值):不進行filter下推。
true:讀取全量資料時,將支援的過濾條件下推到Hologres執行。包括非Binlog Source全量讀取以及Binlog Source使用全增量一體化消費模式時的全量階段。
重要Realtime Compute引擎Flink1.15-vvr-6.0.3到Flink1.15-vvr-6.0.5預設會進行filter下推,但如果作業使用了hologres維表,且寫入的DML中包含有對維表非主鍵欄位的過濾條件時,維表的filter會被錯誤的下推,可能導致維表join出現錯誤結果。因此推薦使用Realtime Compute引擎Flink1.15-vvr-6.0.6及以上版本,並在源表增加此參數來開啟過濾條件下推功能。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
sdkMode
SDK模式。
String
否
jdbc
參數取值如下:
jdbc:預設值,表示使用jdbc模式進行寫入。
jdbc_copy:是否使用fixed copy方式寫入。
fixed copy是hologres1.3新增的能力,相比通過insert方法(jdbc模式)進行寫入,fixed copy方式可以更高的吞吐(因為是流模式),更低的資料延時,更低的用戶端記憶體消耗(因為不攢批)。但此模式暫不支援delete資料,也不支援寫入分區父表,不支援ignoreNullWhenUpdate參數。
rpc:表示使用rpc模式進行寫入,與useRpcMode參數一致,與jdbc模式的區別在於不佔用串連數,不支援寫入Hologres的Jsonb,RoarinBitmap類型。
jdbc_fixed(beta功能):表示使用fixed jdbc方式進行寫入,
需要Hologres引擎版本大於等於1.3,與jdbc模式的區別在於不佔用串連數,不支援寫入Hologres的Jsonb,RoarinBitmap類型。
說明VVR 6.0.3以下版本:不支援配置該參數。
VVR 6.0.4~VVR 6.0.6版本:推薦配置為jdbc。
VVR 6.0.7~VVR 8.0.1版本:推薦配置為jdbc。
如果Hologres執行個體為2.0以下版本,Flink系統採用您配置的參數值。
如果Hologres執行個體為2.0及以上版本,由於Hologres 2.0及以上版本下線了rpc服務,此時如果您將該參數配置為rpc,Flink系統將自動切換該參數值為jdbc_fixed,但是如果您配置為其他值,Flink系統將採用您配置的參數值。
rpc模式不會對同一批次中相同主鍵的資料做去重,如果業務情境需要保留完整的資料,切換為jdbc模式後,可以通過設定'jdbcWriteBatchSize'='1'防止去重。
VVR 8.0.3及以上版本:推薦配置為jdbc。
自此版本開始,無論Hologres執行個體版本,都不再支援rpc模式,如果選擇rpc模式,將自動切換該參數值為jdbc_fixed且設定'jdbcWriteBatchSize'='1'防止去重。
VVR 8.0.5及以上版本:推薦配置為jdbc。
如果選擇rpc模式,將自動切換該參數值為jdbc_fixed且設定deduplication.enabled參數為false防止去重。
bulkload
是否採用bulkload寫入。
Boolean
否
false
僅在sdkMode設定為jdbc_copy時生效。bulkload寫入目前僅適用於無主鍵表或者主鍵保證不重複的有主鍵表(主鍵重複會拋出異常),相比預設的jdbc_copy,寫入使用更少的Hologres資源。
說明VVR 8.0.5及以上版本開始支援此參數,同時需要Hologres執行個體2.1及以上版本。
useRpcMode
是否通過RPC方式使用Hologres連接器。
Boolean
否
false
參數取值如下:
true:使用RPC方式使用Hologres連接器。
與sdkMode參數設定為rpc效果相同,通過RPC方式會降低SQL串連數。
false(預設值):使用JDBC方式使用Hologres連接器。
通過JDBC方式會佔用SQL串連,導致JDBC連結數增加。
說明Hologres 2.0版本下線了rpc模式,推薦使用sdkMode參數來選擇jdbc或者jdbc_fixed模式。
Realtime Compute引擎VVR 6.0.7及VVR 8.0.1版本在檢測到Hologres執行個體是2.0及以上版本時,會自動切換rpc模式為jdbc_fixed模式。
Realtime Compute引擎VVR 8.0.3及以上版本會自動切換rpc模式為jdbc_fixed模式。
rpc模式不會對同一批次中相同主鍵的資料做去重,如果業務情境需要保留完整的資料變化記錄,推薦使用Realtime Compute引擎VVR 8.0.5及以上版本,jdbc模式可以配置deduplication.enabled參數為false不進行去重。
property-version=1時,該參數下線。
mutatetype
資料寫入模式。
String
否
insertorignore
詳情請參見流式語義。
說明Realtime Compute引擎VVR 8.0.6及以上版本支援多版本預設值。
property-version=0時,預設值為insertorignore。
property-version=1時,預設值為insertorupdate。
partitionrouter
是否寫入分區表。
Boolean
否
false
無。
createparttable
當寫入分區表時,是否根據分區值自動建立不存在的分區表。
Boolean
否
false
rpc模式下,如果分區值中存在短劃線(-),暫不支援自動建立分區表。
說明Hologres執行個體1.3.22及以上版本開始支援使用Date類型做分區鍵。Realtime Compute引擎VVR 8.0.3及以上版本,支援使用Date類型做分區鍵時自動建立分區表。
請確保分區值不會出現髒資料,否則會建立錯誤的分區表導致Failover,建議慎用該參數。
當sdk_mode設定為jdbc_copy時,不支援寫入分區父表。
ignoredelete
是否忽略撤回訊息。
Boolean
否
true
說明僅在使用流式語義時生效。
Realtime Compute引擎VVR 8.0.6及以上版本支援多版本預設值。
property-version=0時,預設值為true。
property-version=1時,預設值為false。
connectionSize
單個Flink結果表任務所建立的JDBC串連池大小。
Integer
否
3
如果作業效能不足,建議您增加串連池大小。串連池大小和資料吞吐成正比。
jdbcWriteBatchSize
JDBC模式,Hologres Sink節點資料攢批條數(不是來一條資料處理一條,而是攢一批再處理)的最大值。
Integer
否
256
單位為資料行數。
說明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。
jdbcWriteBatchByteSize
JDBC模式,Hologres Sink節點資料攢批位元組數(不是來一條資料處理一條,而是攢一批再處理)的最大值。
Long
否
2097152(2*1024*1024)位元組,即2 MB
說明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。
jdbcWriteFlushInterval
JDBC模式,Hologres Sink節點資料攢批寫入Hologres的最長等待時間。
Long
否
10000
單位為毫秒。
說明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。
ignoreNullWhenUpdate
當mutatetype='insertOrUpdate'時,是否忽略更新寫入資料中的Null值。
Boolean
否
false
取值說明如下:
false(預設值):將Null值寫到Hologres結果表裡。
true:忽略更新寫入資料中的Null值。
說明僅Flink計算引擎VVR 4.0及以上版本支援該參數。
當sdk_mode設定為jdbc_copy時,不支援此參數。
connectionPoolName
串連池名稱。同一個TaskManager中,配置相同名稱的串連池的表可以共用串連池。
String
否
無
取值為非
'default'
的任一字元串。如果多個表設定相同的串連池,則這些使用相同串連池的表的connectionSize參數也需要相同。說明VVR 4.0.12以下版本:不支援配置該參數。
VVR 4.0.12~VVR 8.0.3版本:預設不共用,每個表使用自己的串連池。
VVR 8.0.4以上版本:同一個作業中endpoint相同的表會預設共用串連池。作業中表數量較多時串連數可能相對不足影響效能,這種情況下推薦為不同的表設定不同的connectionPoolName。
此參數可以按需配置,比如作業中有維表A,B以及結果表C,D,E五張hologres表,可以A表和B表使用'pool1',C表和D表使用'pool2',E表流量較大,單獨使用'pool3'。
jdbcEnableDefaultForNotNullColumn
如果將Null值寫入Hologres表中Not Null且無預設值的欄位,是否允許連接器協助填充一個預設值。
Boolean
否
true
參數取值如下:
true(預設值):允許連接器填充預設值並寫入,規則如下。
如果欄位是String類型,則預設寫為空白("")。
如果欄位是Number類型,則預設寫為0。
如果是Date、timestamp或timestamptz時間類型欄位,則預設寫為1970-01-01 00:00:00。
false:不填充預設值,寫Null到Not Null欄位時,會拋出異常。
remove-u0000-in-text.enabled
如果寫入時字串類型包含\u0000非法字元,是否允許連接器協助去除。
Boolean
否
false
參數取值如下:
false(預設值):連接器不對資料進行操作,但碰到髒資料時寫入可能拋出如下異常,
ERROR: invalid byte sequence for encoding "UTF8": 0x00
此時需要在源表提前處理髒資料,或者在SQL中定義髒資料處理邏輯。
true:連接器會協助去除字串類型中的\u0000,防止寫入拋出異常。
partial-insert.enabled
是否只插入INSERT語句中定義的欄位。
Boolean
否
false
參數取值如下:
false(預設值):無論INSERT語句中聲明了哪些欄位,都會更新結果表DDL中定義的所有欄位,對於未在INSERT語句中聲明的欄位,會被更新為null。
true:將INSERT語句中定義的欄位下推給連接器,從而可以只對聲明的欄位進行更新或插入。
說明僅Realtime Compute引擎VVR 6.0.7及以上版本支援該參數。
此參數僅在mutatetype參數配置為InsertOrUpdate時生效。
deduplication.enabled
jdbc及jdbc_fixed模式寫入攢批過程中,是否進行去重。
Boolean
否
true
參數取值如下:
true(預設值):如果一批資料中有主鍵相同的資料,預設進行去重,只保留最後一條到達的資料。以兩個欄位,其中第一個欄位為主鍵的資料舉例:
INSERT (1,'a')
和INSERT (1,'b')
兩條記錄先後到達,去重之後只保留後到達的(1,'b')
寫入Hologres結果表中。Hologres結果表中已經存在記錄
(1,'a')
,此時DELETE (1,'a')
和INSERT (1,'b')
兩條記錄先後到達,只保留後到達的(1,'b')
寫入hologres中,表現為直接更新,而不是先刪除再插入。
false:在攢批過程中不進行去重,如果發現新到的資料和目前攢批的資料中存在主鍵相同的情況,先將攢批資料寫入,寫入完成之後再繼續寫入新到的資料。
說明僅Realtime Compute引擎VVR 8.0.5及以上版本支援該參數。
不允許攢批去重時,極端情況下(例如所有資料的主鍵都相同)寫入會退化為不攢批的單條寫入,對效能有一定影響。
維表專屬
參數
說明
資料類型
是否必填
預設值
備忘
sdkMode
SDK模式。
String
否
jdbc
參數取值如下:
jdbc(預設值):表示使用jdbc模式進行查詢,支援主鍵點查和非主鍵的查詢,但是非主鍵的查詢對效能影響較大,查詢較慢。
rpc:表示使用rpc模式進行點查,與useRpcMode參數一致,僅支援主鍵點查,即維表的主鍵欄位必須與Flink Join On的欄位完全符合,與jdbc模式的區別在於不佔用串連數,不支援讀取Hologres的Jsonb,RoarinBitmap類型。
jdbc_fixed:(beta功能,需要hologres引擎版本大於等於1.3)表示使用fixed jdbc方式進行點查,與jdbc模式的區別在於不佔用串連數,且不支援讀取Hologres的Jsonb,RoarinBitmap類型。僅支援主鍵點查,即維表的主鍵欄位必須與Flink Join On的欄位完全符合。
說明VVR 6.0.3以下版本:不支援配置該參數。
VVR 6.0.4~VVR 6.0.6版本:推薦配置為jdbc。
在VVR 6.0.6版本,SDK模式選擇為jdbc時,如果維表字串類型的查詢結果中包含null值,可能拋出null 指標異常,此時推薦您使用rpc模式繞過。
VVR 6.0.7及VVR 8.0.1:推薦配置為jdbc。
如果Hologres執行個體為2.0以下版本,Flink系統將採用您配置的參數值。
如果Hologres執行個體為2.0及以上版本,由於Hologres 2.0及以上版本下線了rpc服務,此時如果您將該參數配置為了rpc,Flink系統自動將參數值切換為jdbc_fixed。但是如果您配置為其他值,Flink系統將採用您配置的參數值。
VVR 8.0.3及以上版本:推薦配置為jdbc。
自此版本開始,無論Hologres執行個體版本,都不再支援rpc模式,如果選擇rpc模式,將自動切換該參數值為jdbc_fixed。
useRpcMode
是否通過RPC方式使用Hologres連接器。
Boolean
否
false
參數取值如下:
true:使用RPC方式使用Hologres連接器。與sdkMode參數設定為rpc效果相同。通過RPC方式會降低SQL串連數。
false(預設值):使用JDBC方式使用Hologres連接器。
通過JDBC方式會佔用SQL串連,導致JDBC連結數增加。
說明Hologres 2.0版本下線了rpc了服務,推薦使用sdkMode參數來選擇jdbc或者jdbc_fixed模式。
Realtime Compute引擎VVR 6.0.7及VVR 8.0.1版本在檢測到Hologres執行個體是2.0及以上版本時,會自動切換rpc模式為jdbc_fixed模式。
Realtime Compute引擎VVR 8.0.3及以上版本會自動切換rpc模式為jdbc_fixed模式。
connectionSize
單個Flink維表任務所建立的JDBC串連池大小。
Integer
否
3
如果作業效能不足,建議您增加串連池大小。串連池大小和資料吞吐成正比。
connectionPoolName
串連池名稱。同一個TaskManager中,配置相同名稱的串連池的表可以共用串連池。
String
否
無
取值為非
'default'
的任一字元串。如果多個表設定相同的串連池,則這些使用相同串連池的表的connectionSize參數也需要相同。您可以按需配置此參數,例如作業中有維表A,B以及結果表C,D,E五張hologres表,可以A表和B表使用pool1,C表和D表使用pool2,E表流量較大,單獨使用pool3。
說明VVR 4.0.12以下版本:不支援配置該參數。
VVR 4.0.12~VVR 8.0.3版本:預設不共用,每個表使用自己的串連池。
VVR 8.0.4以上版本:同一個作業中Endpoint相同的表會預設共用串連池。作業中表數量較多時串連數可能相對不足影響效能,這種情況下推薦為不同的表設定不同的connectionPoolName。
jdbcReadBatchSize
點查Hologres維表時,攢批處理的最大條數。
Integer
否
128
無。
jdbcReadBatchQueueSize
維表點查請求緩衝隊列大小。
Integer
否
256
無。
jdbcReadTimeoutMs
維表點查的逾時時間。
Long
否
預設值為0,表示不會逾時
僅vvr 4.0.15-flink 1.13及以上版本、vvr 6.0.2-flink 1.15及以上版本支援該參數。
jdbcReadRetryCount
維表點查逾時時的重試次數。
Interger
否
VVR 8.0.5以下版本:1
VVR 8.0.5及以上版本:10
僅VVR 6.0.3以上版本支援該參數。
本參數與jdbcRetryCount不同,後者是指串連發生異常時的重試次數。
jdbcScanFetchSize
在一對多join(即沒有使用完整主鍵)時使用scan介面,scan攢批處理資料的條數。
Integer
否
256
無。
jdbcScanTimeoutSeconds
scan操作的逾時時間。
Integer
否
60
單位為秒。
cache
緩衝策略。
String
否
見備忘列。
Hologres僅支援None和LRU兩種緩衝策略,取值詳情請參見背景資訊。
說明Cache預設值和VVR版本有關:
VVR 4.x版本及以上版本,預設值為None。
VVR 4.x版本以下版本,預設值為LRU。
cacheSize
緩衝大小。
Integer
否
10000
選擇LRU緩衝策略後,可以設定緩衝大小。單位為條。
cacheTTLMs
緩衝更新時間間隔。
Long
否
見備忘列。
單位為毫秒。cacheTTLMs預設值和cache的配置有關:
如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。
如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。
cacheEmpty
是否緩衝join結果為空白的資料。
Boolean
否
true
true(預設值):表示緩衝join結果為空白的資料。
false:表示不緩衝join結果為空白的資料。
async
是否非同步返回資料。
Boolean
否
false
true:表示非同步返回資料。
false(預設值):表示不進行非同步返回資料。
說明非同步返回資料是無序的。
類型映射
Flink與Hologres的資料類型映射請參見Blink/Flink與Hologres的資料類型映射。
使用樣本
源表示例
非Binlog Source表
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'field_delimiter'='|' --該參數可選。
'sdkmode' = 'jdbc'
);
CREATE TEMPORARY TABLE blackhole_sink(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='blackhole'
);
INSERT INTO blackhole_sink
SELECT name, age, birthday
from hologres_source;
Binlog Source表
Hologres連接器支援即時消費Hologres,即即時消費Hologres的Binlog資料。Flink即時消費Hologres詳情請參見Realtime ComputeFlink版即時消費Hologres。
結果表示例
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;
維表示例
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'hologres',
...
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
特色功能詳解
流式語義
寬表Merge和局部更新功能
作為CTAS和CDAS的目標端
DataStream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink全託管,DataStream連接器設定方法請參見DataStream連接器使用方法。Maven中央庫中已經放置了Hologres DataStream連接器。VVR 6.0.7請使用其中的1.15-vvr-6.0.7-1版本的依賴。
Hologres源表
VVR提供了RichInputFormat的實作類別HologresBulkreadInputFormat來讀取Hologres表資料。以下為構建Hologres Source讀取表資料的樣本。
VVR 4.0.15
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
String query = JDBCUtils.getSimpleSelectFromStatement(
jdbcOptions.getTable(), schema.getFieldNames());
// 構建HologresBulkreadInputFormat讀取表資料。
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
VVR 6.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(new HologresConnectionParam(config), jdbcOptions, schema, "", -1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
Hologres Binlog源表
VVR提供了Source的實作類別HologresBinlogSource來讀取Hologres Binlog資料。以下為構建Hologres Binlog Source的樣本。
VVR 4.0.15
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);
// 構建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
schema,
config,
jdbcOptions,
recordConverter,
startTimeMs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 6.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 設定或建立預設slotname
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE)
&& config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// 構建Binlog Record Converter。
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// 構建Hologres Binlog Source。
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
方法buildRecordConverter不在VVR Connector依賴中,是範例程式碼中提供的方法。
Hologres Binlog注意事項和實現原理等詳情,請參見Binlog Source表。
Hologres結果表
VVR提供了OutputFormatSinkFunction的實作類別HologresSinkFunction來寫入資料。以下為構建Hologres Sink的樣本。
VVR 4.0.15
// 初始化讀取的表的Schema。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 構建Hologres Writer,以RowData的方式寫入資料。
AbstractHologresWriter<RowData> hologresWriter =
buildHologresWriter(schema, config, hologresConnectionParam);
// 構建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
VVR 6.0.7&VVR 8.0.1
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 構建Hologres Writer,以RowData的方式寫入資料。
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam, schema, HologresTableSchema.get(hologresConnectionParam), new Integer[0]);
// 構建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
方法buildHologresWriter不在VVR Connector依賴中,是範例程式碼中提供的方法。
Flink與Hologres時區說明
時間類型
產品 | 類型 | 說明 |
Flink | 表示沒有時區資訊的日期和時間,描述年、 月、日、小時、分鐘、秒和小數秒對應的時間戳記。可以通過一個字串來指定,例如 | |
用於描述時間軸上的絕對時間點,使用long儲存從epoch至今的毫秒數,使用int儲存毫秒中的納秒數。epoch時間是從Java的標準epoch時間開始計算。在計算和可視化時, 每個TIMESTAMP_LTZ類型的資料都使用Session (會話)中配置的時區。可以用於跨時區的計算,因為它是一個基於epoch的絕對時間點(比如上例中的毫秒)代表的就是不同時區的同一個絕對時間點。 相同的TIMESTAMP_LTZ值,在不同的時區可能會反映出不同的本地TIMESTAMP,例如:如果一個TIMESTAMP_LTZ值為 | ||
Hologres | TIMESTAMP | 類似於Flink的 |
TIMESTAMP WITH TIME ZONE (TIMESTAMPTZ) | 類似於Flink的 例如北京(UTC+8)時區的時間戳記 |
Flink讀寫Hologres時間
Realtime Compute引擎VVR 8.0.6及以上版本且
type-mapping.timestamp-converting.legacy=false
時,支援所有時間類型間的相互轉換。Flink
Hologres
詳情
TIMESTAMP
TIMESTAMP
之間相互轉換是直接的,不涉及時區轉換。因此推薦採用該資料對應。
TIMESTAMP LTZ
TIMESTAMP TZ
TIMESTAMP
TIMESTAMP TZ
之間的轉換涉及時區轉換。為了在轉換中保持準確性,需要通過配置項參數
table.local-time-zone
設定Flink時區,配置項參數設定方法請參見如何配置作業運行參數?。例如當設定
'table.local-time-zone': 'Asia/Shanghai'
時,表示Flink時區為上海(+8時區)時,Flink TIMESTAMP類型的資料為2022-01-01 01:01:01.123456,寫入Hologres TIMESTAMP TZ的數值為2022-01-01 01:01:01.123456+8。TIMESTAMP LTZ
TIMESTAMP
Realtime Compute引擎VVR 8.0.5及以下版本或VVR 8.0.6及以上版本設定
type-mapping.timestamp-converting.legacy=false
時,除TIMESTAMP間轉化,其他類型相互轉化可能會出現資料偏差問題。Flink
Hologres
備忘
TIMESTAMP
TIMESTAMP
之間相互轉換是直接的,不涉及時區轉換。因此推薦採用該資料對應。
TIMESTAMP LTZ
TIMESTAMP TZ
讀寫Hologres資料時都當作無時區時間進行處理,可能會存在資料偏差。
例如,Flink TIMESTAMP_LTZ類型的數值為2024-03-19T04:00:00Z,在上海(+8時區)對應的實際無時區時間為2024-03-19T12:00:00,但是寫入時將2024-03-19T04:00:00當作無時區時間,寫入Hologres TIMESTAMP TZ的數值為2024-03-19T04:00:00+08,數值偏差8小時。
TIMESTAMP
TIMESTAMP TZ
時區轉換預設採用的是運行環境的JVM時區,而不是Flink時區,這與Flink內部計算的時區轉換格式不同。當Flink時區與機器的JVM時區不一致時,會導致資料存在偏差,建議採用Flink時區進行Hologres資料的讀寫。
TIMESTAMP LTZ
TIMESTAMP