本文為您介紹資料匯流排DataHub連接器文法結構、WITH參數和使用樣本等。
背景資訊
阿里雲流資料處理平台DataHub是流式資料(Streaming Data)的處理平台,提供對流式資料的發布(Publish)、訂閱(Subscribe)和分發功能,讓您可以輕鬆構建基於流式資料的分析和應用,詳情請參見產品概述。
DataHub相容Kafka協議,因此您可以使用Kafka連接器(不包括Upsert Kafka)來訪問DataHub,詳情請參見相容Kafka。
DataHub連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 結果表和源表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | Datastream和SQL |
是否支援更新或刪除目標Topic資料 | 不支援更新和刪除目標Topic資料,只支援插入資料。 |
文法結構
CREATE TEMPORARY TABLE datahub_input (
`time` BIGINT,
`sequence` STRING METADATA VIRTUAL,
`shard-id` BIGINT METADATA VIRTUAL,
`system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
'connector' = 'datahub',
'subId' = '<yourSubId>',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'topic' = '<yourTopicName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為datahub。
endPoint
消費端點資訊。
String
是
無
不同地區DataHub有不同的EndPoint,詳情請參見DataHub網域名稱列表。
project
專案。
String
是
無
建立project詳情請參見快速入門(同步樣本)。
topic
主題。
String
是
無
建立topic詳情請參見快速入門(同步樣本)。
說明如果您填寫的topic是blob類型(一種無類型的非結構化資料的儲存方式),則在Flink消費時,表定義中必須有且只有一個VARBINARY類型的欄位。
accessId
阿里雲帳號的AccessKey ID。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?
重要為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數。
accessKey
阿里雲帳號的AccessKey Secret。
String
是
無
retryTimeout
最大持續重試時間。
Integer
否
1800000
單位毫秒,通常不作修改。
retryInterval
稍候再試。
Integer
否
1000
單位毫秒,通常不作修改。
CompressType
讀寫的壓縮策略。
String
否
lz4
lz4 (預設值):使用lz4壓縮。
deflate:使用deflate壓縮。
""(Null 字元串):表示關閉資料壓縮。
說明僅VVR 6.0.5及以上版本支援指定CompressType參數。
源表專屬
參數
說明
資料類型
是否必填
預設值
備忘
subId
訂閱ID。
String
是
無
如何建立DataHub訂閱,詳情請參見建立訂閱。
maxFetchSize
單次讀取條數。
Integer
否
50
影響讀效能的參數,調大可以增加吞吐。
maxBufferSize
非同步讀取的最大快取資料條數。
Integer
否
50
影響讀效能的參數,調大可以增加吞吐。
fetchLatestDelay
資料來源沒有資料時,sleep的時間。
Integer
否
500
單位毫秒。在資料來源頻繁沒有資料的情況下,影響吞吐,建議調小。
lengthCheck
單列欄位條數檢查策略。
String
否
NONE
NONE(預設值):
解析出的欄位數大於定義欄位數時,按從左至右的順序,取定義欄位數量的資料。
解析出的欄位數小於定義欄位數時,跳過該行資料。
SKIP:解析出的欄位數和定義欄位數不同時跳過該行資料。
EXCEPTION:解析出的欄位數和定義欄位數不同時提示異常。
PAD:按從左至右順序填充。
解析出的欄位數大於定義欄位數時,按從左至右的順序,取定義欄位數量的資料。
解析出的欄位數小於定義欄位數時,按從左至右的順序,在行尾用Null填充缺少的欄位。
columnErrorDebug
是否開啟調試開關。
Boolean
否
false
false(預設值):關閉調試功能。
true:開啟調試開關,列印解析異常的日誌。
startTime
消費日誌的開始時間。
String
否
無
格式為yyyy-MM-dd hh:mm:ss。
endTime
消費日誌的結束時間。
String
否
無
格式為yyyy-MM-dd hh:mm:ss。
startTimeMs
消費日誌的開始時間。
Long
否
-1
單位毫秒。該配置優先於startTime。預設值-1代表使用訂閱中儲存的資料點位開始消費;如果訂閱中還沒有儲存過點位,那麼會用最早的資料點位進行消費。
重要當使用預設值-1啟動作業時,如果作業在做出第一個checkpoint前發生了failover,此時DataHub訂閱中的最新資料點位可能已被更新,導致重啟的作業使用了更新的點位進行消費,從而跳過部分資料。如需避免這種情況,建議指定startTimeMs為固定值。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
batchCount
每次批量寫入資料的數量。
Integer
否
500
影響寫效能,調大可以增加吞吐,但是會增大延遲。
batchSize
每次批量寫入資料的大小。
Integer
否
512000
單位Byte,影響寫效能,調大可以增加吞吐,但是會增大延遲。
flushInterval
攢批寫入資料的時間。
Integer
否
5000
單位毫秒,影響寫效能,調大可以增加吞吐,但是增大延遲。
hashFields
指定列名後,相同列的值會寫入到同一個Shard。
String
否
null
預設隨機寫。可以指定多個列值,用逗號(,)分割,例如
hashFields=a,b。timeZone
資料的時區。
String
否
無
影響TimeStamp等帶時區資料的轉換。
schemaVersion
向註冊的Schema裡寫入的version。
Integer
否
-1
無。
類型映射
Flink欄位類型 | DataHub欄位類型 |
TINYINT | TINYINT |
BOOLEAN | BOOLEAN |
INTEGER | INTEGER |
BIGINT | BIGINT |
BIGINT | TIMESTAMP |
TIMESTAMP | |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
VARCHAR | STRING |
SMALLINT | SMALLINT |
VARBINARY | BLOB |
屬性欄位
欄位名 | 欄位類型 | 說明 |
shard-id | BIGINT METADATA VIRTUAL | Shard的ID。 |
sequence | STRING METADATA VIRTUAL | 資料順序。 |
system-time | TIMESTAMP METADATA VIRTUAL | 系統時間。 |
僅在VVR 3.0.1及以上版本支援擷取以上DataHub屬性欄位。
使用樣本
源表
CREATE TEMPORARY TABLE datahub_input ( `time` BIGINT, `sequence` STRING METADATA VIRTUAL, `shard-id` BIGINT METADATA VIRTUAL, `system-time` TIMESTAMP METADATA VIRTUAL ) WITH ( 'connector' = 'datahub', 'subId' = '<yourSubId>', 'endPoint' = '<yourEndPoint>', 'project' = '<yourProjectName>', 'topic' = '<yourTopicName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}' ); CREATE TEMPORARY TABLE test_out ( `time` BIGINT, `sequence` STRING, `shard-id` BIGINT, `system-time` TIMESTAMP ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO test_out SELECT `time`, `sequence` , `shard-id`, `system-time` FROM datahub_input;結果表
CREATE TEMPORARY table datahub_source( name VARCHAR ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'subId'='<yourSubId>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'startTime'='2018-06-01 00:00:00' ); CREATE TEMPORARY table datahub_sink( name varchar ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'batchSize'='512000', 'batchCount'='500' ); INSERT INTO datahub_sink SELECT LOWER(name) from datahub_source;
Datastream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink全託管,DataStream連接器設定方法請參見DataStream連接器使用方法。
DataHub源表
VVR提供了SourceFunction的實作類別DatahubSourceFunction來讀取DataHub表資料。以下為讀取DataHub表資料的樣本。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//DataHub串連配置。
DatahubSourceFunction datahubSource =
new DatahubSourceFunction(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<yourStartTime>,
<yourEndTime>
);
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();
env.addSource(datahubSource)
.map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
.print();
env.execute();
private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
Tuple2<String, Long> tuple2 = new Tuple2<>();
TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
tuple2.f0 = (String) recordData.getField(0);
tuple2.f1 = (Long) recordData.getField(1);
return tuple2;
}DataHub結果表
VVR提供了OutputFormatSinkFunction的實作類別DatahubSinkFunction將資料寫入DataHub。以下為將資料寫入DataHub的樣本。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataHub串連配置。
env.generateSequence(0, 100)
.map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
.addSink(
new DatahubSinkFunction<>(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<schemaVersion> // 如果開啟了schemaRegistry,寫入的時候需要指定schemaVersion,其他情況填0即可。
);
env.execute();
private RecordEntry getRecordEntry(Long message, String s) {
RecordSchema recordSchema = new RecordSchema();
recordSchema.addField(new Field("f1", FieldType.STRING));
recordSchema.addField(new Field("f2", FieldType.BIGINT));
recordSchema.addField(new Field("f3", FieldType.DOUBLE));
recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
recordSchema.addField(new Field("f6", FieldType.DECIMAL));
RecordEntry recordEntry = new RecordEntry();
TupleRecordData recordData = new TupleRecordData(recordSchema);
recordData.setField(0, s + message);
recordData.setField(1, message);
recordEntry.setRecordData(recordData);
return recordEntry;
}XML
Maven中央庫中已經放置了DataHub DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-datahub</artifactId>
<version>${vvr-version}</version>
</dependency>