全部產品
Search
文件中心

Realtime Compute for Apache Flink:資料匯流排DataHub

更新時間:Oct 31, 2025

本文為您介紹資料匯流排DataHub連接器文法結構、WITH參數和使用樣本等。

背景資訊

阿里雲流資料處理平台DataHub是流式資料(Streaming Data)的處理平台,提供對流式資料的發布(Publish)、訂閱(Subscribe)和分發功能,讓您可以輕鬆構建基於流式資料的分析和應用,詳情請參見產品概述

說明

DataHub相容Kafka協議,因此您可以使用Kafka連接器(不包括Upsert Kafka)來訪問DataHub,詳情請參見相容Kafka

DataHub連接器支援的資訊如下。

類別

詳情

支援類型

結果表和源表

運行模式

流模式和批模式

資料格式

暫不適用

特有監控指標

暫無

API種類

Datastream和SQL

是否支援更新或刪除目標Topic資料

不支援更新和刪除目標Topic資料,只支援插入資料。

文法結構

CREATE TEMPORARY TABLE datahub_input (
  `time` BIGINT,
  `sequence`  STRING METADATA VIRTUAL,
  `shard-id` BIGINT METADATA VIRTUAL,
  `system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
  'connector' = 'datahub',
  'subId' = '<yourSubId>',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'topic' = '<yourTopicName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值為datahub。

    endPoint

    消費端點資訊。

    String

    不同地區DataHub有不同的EndPoint,詳情請參見DataHub網域名稱列表

    project

    專案。

    String

    建立project詳情請參見快速入門(同步樣本)

    topic

    主題。

    String

    建立topic詳情請參見快速入門(同步樣本)

    說明

    如果您填寫的topic是blob類型(一種無類型的非結構化資料的儲存方式),則在Flink消費時,表定義中必須有且只有一個VARBINARY類型的欄位。

    accessId

    阿里雲帳號的AccessKey ID。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數

    accessKey

    阿里雲帳號的AccessKey Secret。

    String

    retryTimeout

    最大持續重試時間。

    Integer

    1800000

    單位毫秒,通常不作修改。

    retryInterval

    稍候再試。

    Integer

    1000

    單位毫秒,通常不作修改。

    CompressType

    讀寫的壓縮策略。

    String

    lz4

    • lz4 (預設值):使用lz4壓縮。

    • deflate:使用deflate壓縮。

    • ""(Null 字元串):表示關閉資料壓縮。

    說明

    僅VVR 6.0.5及以上版本支援指定CompressType參數。

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    subId

    訂閱ID。

    String

    如何建立DataHub訂閱,詳情請參見建立訂閱

    maxFetchSize

    單次讀取條數。

    Integer

    50

    影響讀效能的參數,調大可以增加吞吐。

    maxBufferSize

    非同步讀取的最大快取資料條數。

    Integer

    50

    影響讀效能的參數,調大可以增加吞吐。

    fetchLatestDelay

    資料來源沒有資料時,sleep的時間。

    Integer

    500

    單位毫秒。在資料來源頻繁沒有資料的情況下,影響吞吐,建議調小。

    lengthCheck

    單列欄位條數檢查策略。

    String

    NONE

    • NONE(預設值):

      • 解析出的欄位數大於定義欄位數時,按從左至右的順序,取定義欄位數量的資料。

      • 解析出的欄位數小於定義欄位數時,跳過該行資料。

    • SKIP:解析出的欄位數和定義欄位數不同時跳過該行資料。

    • EXCEPTION:解析出的欄位數和定義欄位數不同時提示異常。

    • PAD:按從左至右順序填充。

      • 解析出的欄位數大於定義欄位數時,按從左至右的順序,取定義欄位數量的資料。

      • 解析出的欄位數小於定義欄位數時,按從左至右的順序,在行尾用Null填充缺少的欄位。

    columnErrorDebug

    是否開啟調試開關。

    Boolean

    false

    • false(預設值):關閉調試功能。

    • true:開啟調試開關,列印解析異常的日誌。

    startTime

    消費日誌的開始時間。

    String

    格式為yyyy-MM-dd hh:mm:ss。

    endTime

    消費日誌的結束時間。

    String

    格式為yyyy-MM-dd hh:mm:ss。

    startTimeMs

    消費日誌的開始時間。

    Long

    -1

    單位毫秒。該配置優先於startTime。預設值-1代表使用訂閱中儲存的資料點位開始消費;如果訂閱中還沒有儲存過點位,那麼會用最早的資料點位進行消費。

    重要

    當使用預設值-1啟動作業時,如果作業在做出第一個checkpoint前發生了failover,此時DataHub訂閱中的最新資料點位可能已被更新,導致重啟的作業使用了更新的點位進行消費,從而跳過部分資料。如需避免這種情況,建議指定startTimeMs為固定值。

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    batchCount

    每次批量寫入資料的數量。

    Integer

    500

    影響寫效能,調大可以增加吞吐,但是會增大延遲。

    batchSize

    每次批量寫入資料的大小。

    Integer

    512000

    單位Byte,影響寫效能,調大可以增加吞吐,但是會增大延遲。

    flushInterval

    攢批寫入資料的時間。

    Integer

    5000

    單位毫秒,影響寫效能,調大可以增加吞吐,但是增大延遲。

    hashFields

    指定列名後,相同列的值會寫入到同一個Shard。

    String

    null

    預設隨機寫。可以指定多個列值,用逗號(,)分割,例如hashFields=a,b

    timeZone

    資料的時區。

    String

    影響TimeStamp等帶時區資料的轉換。

    schemaVersion

    向註冊的Schema裡寫入的version。

    Integer

    -1

    無。

類型映射

Flink欄位類型

DataHub欄位類型

TINYINT

TINYINT

BOOLEAN

BOOLEAN

INTEGER

INTEGER

BIGINT

BIGINT

BIGINT

TIMESTAMP

TIMESTAMP

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL

DECIMAL

VARCHAR

STRING

SMALLINT

SMALLINT

VARBINARY

BLOB

屬性欄位

欄位名

欄位類型

說明

shard-id

BIGINT METADATA VIRTUAL

Shard的ID。

sequence

STRING METADATA VIRTUAL

資料順序。

system-time

TIMESTAMP METADATA VIRTUAL

系統時間。

說明

僅在VVR 3.0.1及以上版本支援擷取以上DataHub屬性欄位。

使用樣本

  • 源表

    CREATE TEMPORARY TABLE datahub_input (
      `time` BIGINT,
      `sequence`  STRING METADATA VIRTUAL,
      `shard-id` BIGINT METADATA VIRTUAL,
      `system-time` TIMESTAMP METADATA VIRTUAL
    ) WITH (
      'connector' = 'datahub',
      'subId' = '<yourSubId>',
      'endPoint' = '<yourEndPoint>',
      'project' = '<yourProjectName>',
      'topic' = '<yourTopicName>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}'
    );
    
    CREATE TEMPORARY TABLE test_out (
      `time` BIGINT,
      `sequence`  STRING,
      `shard-id` BIGINT,
      `system-time` TIMESTAMP
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO test_out
    SELECT
      `time`,
      `sequence` ,
      `shard-id`,
      `system-time`
    FROM datahub_input;
  • 結果表

    CREATE TEMPORARY table datahub_source(
      name VARCHAR
    ) WITH (
      'connector'='datahub',
      'endPoint'='<endPoint>',
      'project'='<yourProjectName>',
      'topic'='<yourTopicName>',
      'subId'='<yourSubId>',
      'accessId'='${secret_values.ak_id}',
      'accessKey'='${secret_values.ak_secret}',
      'startTime'='2018-06-01 00:00:00'
    );
    
    CREATE TEMPORARY table datahub_sink(
      name varchar
    ) WITH (
      'connector'='datahub',
      'endPoint'='<endPoint>',
      'project'='<yourProjectName>',
      'topic'='<yourTopicName>',
      'accessId'='${secret_values.ak_id}',
      'accessKey'='${secret_values.ak_secret}',
      'batchSize'='512000',
      'batchCount'='500'
    );
    
    INSERT INTO datahub_sink
    SELECT
      LOWER(name)
    from datahub_source;

Datastream API

重要

通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink全託管,DataStream連接器設定方法請參見DataStream連接器使用方法

DataHub源表

VVR提供了SourceFunction的實作類別DatahubSourceFunction來讀取DataHub表資料。以下為讀取DataHub表資料的樣本。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//DataHub串連配置。
DatahubSourceFunction datahubSource =
    new DatahubSourceFunction(
    <yourEndPoint>,
    <yourProjectName>,
    <yourTopicName>,
    <yourSubId>,
    <yourAccessId>,
    <yourAccessKey>,
    "public",
    <yourStartTime>,
    <yourEndTime>
    );
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();
env.addSource(datahubSource)
    .map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
    .print();
env.execute();
private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
    Tuple2<String, Long> tuple2 = new Tuple2<>();
    TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
    tuple2.f0 = (String) recordData.getField(0);
    tuple2.f1 = (Long) recordData.getField(1);
    return tuple2;
}

DataHub結果表

VVR提供了OutputFormatSinkFunction的實作類別DatahubSinkFunction將資料寫入DataHub。以下為將資料寫入DataHub的樣本。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataHub串連配置。
env.generateSequence(0, 100)
    .map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
    .addSink(
    new DatahubSinkFunction<>(
       <yourEndPoint>,
       <yourProjectName>,
       <yourTopicName>,
       <yourSubId>,
       <yourAccessId>,
       <yourAccessKey>,
       "public",
       <schemaVersion> // 如果開啟了schemaRegistry,寫入的時候需要指定schemaVersion,其他情況填0即可。
       );
env.execute();
private RecordEntry getRecordEntry(Long message, String s) {
    RecordSchema recordSchema = new RecordSchema();
    recordSchema.addField(new Field("f1", FieldType.STRING));
    recordSchema.addField(new Field("f2", FieldType.BIGINT));
    recordSchema.addField(new Field("f3", FieldType.DOUBLE));
    recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
    recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
    recordSchema.addField(new Field("f6", FieldType.DECIMAL));
    RecordEntry recordEntry = new RecordEntry();
    TupleRecordData recordData = new TupleRecordData(recordSchema);
    recordData.setField(0, s + message);
    recordData.setField(1, message);
    recordEntry.setRecordData(recordData);
    return recordEntry;
}

XML

Maven中央庫中已經放置了DataHub DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-datahub</artifactId>
    <version>${vvr-version}</version>
</dependency>

常見問題