本文為您介紹如何使用ApsaraDB for HBase連接器。
背景資訊
ApsaraDB for HBase是低成本、高擴充、雲智能的巨量資料NoSQL,相容標準HBase訪問協議,提供低成本儲存、高擴充吞吐、智能資料處理等核心優勢,是為淘寶推薦、花唄風控、廣告投放、監控大屏、菜鳥物流軌跡、支付寶賬單、手淘訊息等眾多阿里巴巴核心服務提供支撐的資料庫,具備PB規模、高並發、秒級伸縮、毫秒響應、跨機房高可用、全託管、全球分布等企業能力。
HBase連接器支援的資訊如下:
類別 | 詳情 |
支援類型 | 維表和結果表 |
運行模式 | 流模式 |
資料格式 | 暫不支援 |
特有監控指標 |
|
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
注意事項
使用前,請確認已建立資料庫執行個體類型,並選擇正確的連接器,使用不當的連接器可能會導致不可預期的問題:
ApsaraDB for HBase執行個體,使用本文的HBase連接器。
Lindorm執行個體相容HBase模式,使用Lindorm連接器,詳情請參見雲原生多模資料庫Lindorm。
如果串連開源HBase,則無法保證資料的正確性。
文法結構
CREATE TABLE hbase_table(
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
'connector'='cloudhbase',
'table-name'='<yourTableName>',
'zookeeper.quorum'='<yourZookeeperQuorum>'
);
HBase的列族(Column Family)必須聲明為ROW類型,列族名即該ROW的欄位名。例如,DDL定義中聲明了family1、family2和family3三個列族。
HBase列族中的列(Column)與對應ROW中嵌套的每個欄位對應,列名即欄位名。例如,DDL定義中列族family2聲明了q2和q3兩列。
除了類型為ROW的欄位外,只能有一個原始類型(Atomic Type)的欄位(例如STRING或BIGINT),該欄位將被視作HBase的行鍵(Row Key),例如DDL定義中的Rowkey。
必須將HBase的行鍵定義為結果表的主鍵(Primary Key),如果沒有顯示定義主鍵,預設使用行鍵作為主鍵。
結果表中不需要將HBase表的所有列族和列都進行聲明,只聲明需要的即可。
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為
cloudhbase
。table-name
HBase表名。
String
是
無
無。
zookeeper.znode.quorum
HBase的zookeeper住址。
String
是
無
無。
zookeeper.znode.parent
HBase在zookeeper中的根目錄。
String
否
/hbase
僅在HBase標準版中生效。
userName
使用者名稱。
String
否
無
僅在HBase增強版中生效。
password
密碼。
String
否
無
僅在HBase增強版中生效。
haclient.cluster.id
HBase高可用執行個體ID。
String
否
無
只有訪問同城主備執行個體時才需要配置僅在HBase增強版中生效。
retires.number
HBase用戶端的重試次數。
Integer
否
31
無。
null-string-literal
HBase欄位類型為字串時,如果Flink欄位資料為null,則將該欄位賦值為
null-string-literal
,並寫入HBase。String
否
null
無。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
sink.buffer-flush.max-size
寫入HBase前,記憶體中緩衝的資料量(位元組)大小。調大該值有利於提高HBase寫入效能,但會增加寫入延遲和記憶體使用量。
String
否
2MB
支援位元組單位B、KB、MB和GB,不區分大小寫。設定為0表示不進行緩衝。
sink.buffer-flush.max-rows
寫入HBase前,記憶體中緩衝的資料條數。調大該值有利於提高HBase寫入效能,但會增加寫入延遲和記憶體使用量。
Integer
否
1000
設定為0表示不進行緩衝。
sink.buffer-flush.interval
將快取資料周期性寫入到HBase的間隔,可以控制寫入HBase的延遲。
Duration
否
1s
支援時間單位ms、s、min、h和d。設定為0表示關閉定期寫入。
dynamic.table
是否使用支援動態列的HBase表。
Boolean
否
false
參數取值如下:
true:使用支援動態列的HBase表。
false:不使用支援動態列的HBase表。
sink.ignore-delete
是否忽略撤回訊息。
Boolean
否
false
參數取值如下:
true:忽略撤回訊息。
false:不忽略撤回訊息。
說明僅Realtime Compute引擎VVR 4.0.10及以上版本支援該參數。
sink.sync-write
是否同步寫入HBase。
Boolean
否
true
參數取值如下:
true:同步寫,保證順序,會犧牲一定效能。
false:非同步寫,不保證順序,效能更好。
說明僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。
sink.buffer-flush.batch-rows
同步寫入HBase時記憶體中緩衝的資料條數,調大該值有利於提高HBase寫入效能,但會增加寫入延遲和記憶體使用量。
Integer
否
100
僅當sink.sync-write為true時生效。
說明僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。
sink.ignore-null
是否忽略寫入null值。
Boolean
否
false
說明設定成true時,參數
null-string-literal
將不再生效。僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。
維表專屬(比如Cache參數)
參數
說明
資料類型
是否必填
預設值
備忘
cache
緩衝策略。
String
否
ALL
目前ApsaraDB for HBase版維表支援以下三種緩衝策略:
None:無緩衝。
LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果沒有找到,則去物理維表中尋找。
說明需要配置相關參數:緩衝大小(cacheSize)和緩衝更新時間間隔(cacheTTLMs)。
ALL(預設值):緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。
說明適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。需要配置相關參數:緩衝更新時間間隔cacheTTLMs,更新時間黑名單cacheReloadTimeBlackList。
維表中所有資料載入到緩衝中,可能會導致作業啟動變慢,您可以根據業務需求靈活配置緩衝策略。
因為系統會非同步載入維表資料,所以在使用CACHE ALL時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的兩倍。
cacheSize
緩衝大小。
Long
否
10000
當緩衝策略選擇LRU時,可以設定緩衝大小。
cacheTTLMs
緩衝失效時間,單位為毫秒。
Long
否
無
cacheTTLMs配置和cache有關:
如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。
如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。
如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。
cacheEmpty
是否緩衝空結果。
Boolean
否
true
無。
cacheReloadTimeBlackList
更新時間黑名單。在緩衝策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內做Cache更新(例如雙11情境)。
String
否
無
格式為2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔字元的使用方式如下所示:
用英文逗號(,)來分隔多個黑名單。
用箭頭(->)來分割黑名單的起始結束時間。
cacheScanLimit
讀取全量HBase資料,RPC(Remote Procedure Call Protocol)服務端一次返回給用戶端的行數。
Integer
否
100
緩衝策略選擇ALL時啟用。
類型映射
Flink中的資料類型在HBase中通過org.apache.hadoop.hbase.util.Bytes
轉換成位元組數組,解碼過程有以下兩種情況:
對於Flink的非字串類型,如果HBase中的值為空白位元組數組,則解碼為null。
對於Flink的字串類型,如果HBase中的值為
null-string-literal
位元組數組,則解碼為null。
Flink SQL類型 | 寫入Bytes時CloudHBase轉換函式 | 從CloudHBase讀取Bytes的轉換函式 |
CHAR | byte[] toBytes(String s) | String toString(byte[] b) |
VARCHAR | ||
STRING | ||
BOOLEAN | byte[] toBytes(boolean b) | boolean toBoolean(byte[] b) |
BINARY | byte[] | byte[] |
VARBINARY | ||
DECIMAL | byte[] toBytes(BigDecimal v) | BigDecimal toBigDecimal(byte[] b) |
TINYINT | new byte[] { val } | bytes[0] |
SMALLINT | byte[] toBytes(short val) | short toShort(byte[] bytes) |
INT | byte[] toBytes(int val) | int toInt(byte[] bytes) |
BIGINT | byte[] toBytes(long val) | long toLong(byte[] bytes) |
FLOAT | byte[] toBytes(float val) | float toFloat(byte[] bytes) |
DOUBLE | byte[] toBytes(double val) | double toDouble(byte[] bytes) |
DATE | 將日期轉換成自1970.01.01以來的天數,用int表示,並通過 | HBase位元組數組通過 |
TIME | 將時間轉換成自00:00:00以來的毫秒數,用int表示,並通過 | HBase位元組數組通過 |
TIMESTAMP | 將時間戳記轉換成自1970-01-01 00:00:00以來的毫秒數,用long表示,並通過 | HBase位元組數組通過 |
程式碼範例
維表示例。
CREATE TEMPORARY TABLE datagen_source ( a INT, b BIGINT, c STRING, `proc_time` AS PROCTIME() ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_dim ( rowkey INT, family1 ROW<col1 INT>, family2 ROW<col1 STRING, col2 BIGINT>, family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING> ) WITH ( 'connector' = 'cloudhbase', 'table-name' = '<yourTableName>', 'zookeeper.quorum' = '<yourZookeeperQuorum>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, f1c1 INT, f3c3 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT a, family1.col1 as f1c1, family3.col3 as f3c3 FROM datagen_source JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` as h ON datagen_source.a = h.rowkey;
結果表示例。
CREATE TEMPORARY TABLE datagen_source ( rowkey INT, f1q1 INT, f2q1 STRING, f2q2 BIGINT, f3q1 DOUBLE, f3q2 BOOLEAN, f3q3 STRING ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_sink ( rowkey INT, family1 ROW<q1 INT>, family2 ROW<q1 STRING, q2 BIGINT>, family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector'='cloudhbase', 'table-name'='<yourTableName>', 'zookeeper.quorum'='<yourZookeeperQuorum>' ); INSERT INTO hbase_sink SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3) FROM datagen_source;
結果動態表示例。
CREATE TEMPORARY TABLE datagen_source ( id INT, f1hour STRING, f1deal BIGINT, f2day STRING, f2deal BIGINT ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_sink ( rowkey INT, f1 ROW<`hour` STRING, deal BIGINT>, f2 ROW<`day` STRING, deal BIGINT> ) WITH ( 'connector'='cloudhbase', 'table-name'='<yourTableName>', 'zookeeper.quorum'='<yourZookeeperQuorum>', 'dynamic.table'='true' ); INSERT INTO hbase_sink SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
當dynamic.table參數值為true時,表示使用支援動態列的HBase表。
每個列族對應的ROW中必須聲明兩個欄位:第1個欄位的值表示動態列,第2個欄位的值表示動態列的值。
如果datagen_source表存在一條資料,代表ID為1的商品,在10:00-11:00點之間的成交額是100,在2020年7月26日當天的成交額是10000,則HBase中將插入行鍵為1的行,其中f1:10為100,f2:2020-7-26為10000。