Hologres與Realtime ComputeBlink獨享模式(原產品線)深度融合,支援使用Connector的方式寫入資料至Hologres結果表,您可以立即查詢寫入的資料。本文為您介紹Realtime ComputeBlink獨享模式(原產品線)如何寫入資料至Hologres結果表。
使用限制
不同Blink獨享模式的版本開發語義不同,在使用之前,請先確定Blink獨享模式的版本,並根據版本樣本使用。
請確保開通的Realtime Compute與Hologres地區一致,以免串連失敗。
Blink獨享模式3.6之前的版本未內建Hologres Connector,即時寫入資料至Hologres需要引用JAR檔案,請您使用自助升級或加入HologresDingTalk交流群反饋,詳情請參見如何擷取更多的線上支援?。
說明建議您升級至3.6及以上的版本進行作業。
Blink獨享模式3.7版本支援自動建立Hologres分區表,但是您需要在作業中配置
createparttable='true'
。同時,使用分區表的注意事項如下:Hologres當前僅支援List分區。
建立分區表時,需要顯示指定的分區列。目前僅支援text和int4類型的分區列,並且分區的值不能包含短劃線(-),例如
2020-09-12
。如果分區表配置了主鍵(pk),則分區列必須是pk的一部分。
建立分區子表時,子表分區列的值必須為固定值。
寫入分區子表的資料對應的分區列值,必須與子表建立時定義的值嚴格匹配,否則會報錯。
當前不支援DEFAULT資料分割函數。
當匯入資料的Hologres目標表設定了主鍵,即時寫入的預設語義不會按照主鍵進行更新,後續匯入的主鍵資料如果重複,則會被丟棄。
Hologres為非同步寫入資料,您在進行作業時需要添加
blink.checkpoint.fail_on_checkpoint_error=true
配置,當作業發生異常時才會觸發Failover。Blink3.7.6及以上版本不需要添加該參數。
DDL語義
建立Hologres結果表的語句如下。
create table Hologres_sink(
name varchar,
age BIGINT,
birthday BIGINT
) with (
type='hologres',
dbname='<yourDbname>', --Hologres的資料庫名稱。
tablename='<yourTablename>', --Hologres用於接收資料的表名稱。
username='<yourUsername>', --當前阿里雲帳號的AccessKey ID。
password='<yourPassword>', --當前阿里雲帳號的AccessKey Secret。
endpoint='<yourEndpoint>'); --當前Hologres執行個體VPC網路的Endpoint。
WITH參數
參數 | 描述 | 樣本 |
type | 結果表的類型。 固定值為hologres。 | hologres |
endpoint | Hologres執行個體的VPC網路地址。 進入Hologres管理主控台,在目標執行個體詳情頁的網路資訊地區擷取Endpoint。Endpoint需包含連接埠號碼,格式為ip:port。 | demo-cn-hangzhou-vpc.hologres.aliyuncs.com:80 |
username | AccessKey ID 您可以單擊AccessKey 管理,擷取AccessKey ID。 | xxxxm3FMWaxxxx |
password | AccessKey Secret 您可以單擊AccessKey 管理,擷取AccessKey Secret。 | xxxxm355fffaxxxx |
dbname | 當前Hologres的資料庫名稱。 | Holodb |
tablename | 當前Hologres資料庫的表名稱。 | blink_test |
arraydelimiter | Hologres Sink支援將一個STRING欄位按照field_delimiter切分為數組匯入Hologres。 預設值為\u0002。 | \u0002 |
mutatetype | 資料寫入模式,詳情請參見即時數倉Hologres結果表。 預設值為insertorignore。 | insertorignore |
ignoredelete | 是否忽略回撤訊息。
說明 該參數僅在使用流式語義時生效。 預設為false。 通常Flink的Groupby會產生回撤訊息,回撤訊息傳輸到Hologres Connector時會產生Delete請求。 | false |
partitionrouter | 是否寫入分區表。
預設為false。 | false |
createparttable | 當寫入分區表時,是否根據分區值自動建立分區表。Blink獨享 3.7及以上版本支援該功能。 預設值為false。 重要 請您謹慎使用該功能,確保分區值不會出現髒資料,從而導致建立了錯誤的分區表。 | false |
arraydelimiter、mutatetype、ignoredelete、partitionrouter及createparttable參數未在DDL樣本語句中展示,如果您在實際應用中需要使用相應參數,可參考上述表格中的參數描述。
即時寫入資料至Hologres普通結果表
Hologres建立表。
在Hologres中建立一張用於接收資料的表。樣本建表SQL語句如下。
create table blink_test (a int, b text, c text, d float8, e bigint);
建立Realtime Compute作業。
建立Realtime Compute作業。
Realtime ComputeBlink 3.6及以上版本已支援Hologres資料來源,您可以直接調用,樣本SQL語句如下。
create table randomSource (a int, b VARCHAR, c VARCHAR, d DOUBLE, e BIGINT) with (type = 'random'); create table test ( a int, b VARCHAR, c VARCHAR, PRIMARY KEY (a) ) with ( type = 'hologres', `endpoint` = '$ip:$port', --當前Hologres的VPC網路地址以及連接埠號碼。 `username` = '當前阿里雲帳號的AccessKey ID', `password` = '當前阿里雲帳號的AccessKey Secret', `dbname` = '當前Hologres的資料庫名稱', `tablename` = 'blink_test'--當前Hologres接收資料的表名稱。 ); insert into test select a,b,c from randomSource;
上線作業。
完成新增作業後,單擊編輯框的語法檢查,如果顯示成功,則表明文法正確。
單擊儲存儲存作業。
單擊上線,提交作業至生產環境。根據業務情況填寫作業的上線配置。
啟動作業。
提交作業至生產環境後,您需要手動啟動作業。
在阿里Realtime Compute開發平台頁面頂部功能表列右側,單擊營運,跳轉至營運介面,選擇需要啟動的作業,單擊右上方的啟動。
Hologres即時查詢資料。
查詢Hologres中用於接收資料的表,就可以即時擷取到已寫入的資料。樣本查詢SQL語句如下。
select * from blink_test;
如何使用寬表Merge/局部更新功能
對於常見的多個流的資料寫入至一張Hologres寬表的情境,具體使用方法如下:
假設Hologres有一張寬表WIDE_TABLE,有A、B、C、D、E幾列,其中A欄位是主鍵,Flink一個流包含資料A、B、C,另一個流包含資料A、D、E。
使用Flink SQL聲明兩張Hologres結果表,其中一張表只聲明欄位A、B、C,另一張表只聲明欄位A、D、E,這兩張表都映射至《WIDE_TABLE》。
兩張結果表的mutatetype屬性都設定成insertorupdate。
兩張結果表的ignoredelete屬性都設定成true,防止回撤訊息產生Delete請求。
將兩個流的資料分別Insert至兩張結果表中。
該情境的具體使用限制如下:
寬表必須有主鍵。
每個流的資料都必須包含完整主鍵欄位。
列存表的寬表Merge情境在高RPS的情況下,CPU使用率會偏高,建議關閉表中欄位的Dictionary encoding。
即時寫入資料至Hologres的分區結果表
Hologres支援通過調用即時資料API介面,直接將資料寫入分區父表中,對應的分區資料將會自動路由至分區子表。您可以直接寫入資料至分區表。即時資料API的描述,詳情請參見即時資料API。
使用限制如下:
Hologres目前的版本僅支援List分區。
建立分區表時,需要顯示指定的分區列,分區列的類型僅支援text和 int4。
如果設定了主鍵,分區列必須為主鍵的一部分。
建立分區子表時,子表分區列的值必須為固定值。
寫入分區子表的資料對應的分區列值,必須嚴格與建立子表時定義的值匹配,否則會報錯。
Hologres當前不支援預設分區。
Hologres建立分區表。
在Hologres中建立一張用於接收資料的分區表,並建立對應的分區子表。樣本建表SQL語句如下。
--建立分區父表test_message和對應的分區子表。 drop table if exists test_message; begin; create table test_message ( "bizdate" text NOT NULL, "tag" text NOT NULL, "id" int4 NOT NULL, "title" text NOT NULL, "body" text, PRIMARY KEY (bizdate,tag,id) ) PARTITION BY LIST (bizdate); commit;
說明執行命令時,
${bizdate}
參數需要替換為實際值。Blink獨享模式3.7版本才支援自動建立分區,如果您使用的是Blink獨享模式3.7以下的版本,需要在Hologres中提前建立分區子表,否則會匯入資料失敗。
Blink獨享建立作業。
在Blink獨享模式中建立作業的樣本語句如下。
說明以下樣本適用於獨享在Blink獨享模式3.7及以上版本。如果您使用的是在Blink獨享模式3.7以下版本,請升級至3.7及以上版本,或者刪除自動建立分區子表的配置
`createparttable` = 'true'
。create table test_message_src( tag VARCHAR, id INTEGER, title VARCHAR, body VARCHAR ) with ( type = 'random', `interval` = '10', `count` = '100' ); create table test_message_sink ( bizdate VARCHAR, tag VARCHAR, id INTEGER, title VARCHAR, body VARCHAR ) with ( type = 'hologres', `endpoint` = '$ip:$port', --Hologres執行個體的VPC網路地址。 `username` ='<AccessID>', --當前阿里雲帳號的AccessKey ID。 `password` = '<AccessKey>', --當前阿里雲帳號的AccessKey Secret。 `dbname` = '<DBname>', --當前Hologres的資料庫名稱。 `tablename` = '<Tablename>', --當前Hologres資料庫的表名稱。 `partitionrouter` = 'true', --寫入資料至Hologres的分區表。 `createparttable` = 'true', --自動建立Hologres的分區子表。 ); insert into test_message_sink select "20200327",* from test_message_src; insert into test_message_sink select "20200328",* from test_message_src;
上線並啟動作業。
請參考即時寫入資料至Hologres結果表章節中的上線作業和啟動作業步驟。
Hologres即時查詢資料。
查詢Hologres中用於接收資料的表,就可以即時擷取到已寫入的資料。樣本查詢SQL語句如下。
select * from test_message; select * from test_message where bizdate = '20200327';
資料類型映射
Blink獨享與Hologres的資料類型映射,請參見資料類型匯總。