全部產品
Search
文件中心

Realtime Compute for Apache Flink:ClickHouse

更新時間:Jul 13, 2024

本文為您介紹如何使用ClickHouse連接器。

背景資訊

ClickHouse是一個用於聯機分析(OLAP)的列式資料庫管理系統,詳情請參見What Is ClickHouse?

ClickHouse連接器支援的資訊如下.

類別

詳情

支援類型

僅支援結果表

運行模式

批模式和流模式

資料格式

暫不適用

特有監控指標

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

說明

指標含義詳情,請參見監控指標說明

API種類

SQL

是否支援更新或刪除結果表資料

當Flink結果表的DDL上指定了Primary Key,且參數 ignoreDelete設定為false時,則支援更新或刪除結果表資料,但效能會顯著下降。

特色功能

  • 對於ClickHouse的分布式表,支援直接寫對應的本地表。

  • 對於EMR的ClickHouse,提供Exactly Once的語義。

前提條件

  • 已建立ClickHouse表,詳情請參見建立表

  • 已配置白名單。

    • 如果您使用的是阿里雲資料庫ClickHouse,配置白名單詳情請參見設定白名單

    • 如果您使用的是阿里雲E-MapReduce的ClickHouse,配置白名單詳情請參見管理安全性群組

    • 如果您使用的是阿里雲ECS上自建的ClickHouse,配置白名單詳情請參見安全性群組概述

    • 如果為其他情況,請您自行配置ClickHouse所在機器的白名單讓其可被Flink所在機器訪問即可。

    說明

    如何查看Flink虛擬交換器的網段,請參見如何設定白名單?

使用限制

  • 暫不支援配置sink.parallelism參數。

  • ClickHouse結果表保證At-Least-Once語義。

  • 僅Flink計算引擎VVR 3.0.2及以上版本支援ClickHouse連接器。

  • 僅Flink計算引擎VVR 3.0.3,VVR 4.0.7及以上版本支援ignoreDelete選項。

  • 僅Flink計算引擎VVR 4.0.10及以上版本支援ClickHouse的Nested類型。

  • 僅Flink計算引擎VVR 4.0.11及以上版本支援直接將資料寫入到ClickHouse分布式表對應的本地表。

  • 僅Flink計算引擎VVR 4.0.11及以上版本提供寫EMR的ClickHouse的Exactly Once語義。但對EMR-3.45.1和EMR-5.11.1之後版本的ClickHouse,由於EMR ClickHouse產品能力變更,也不再提供Exactly Once語義。

  • 僅Flink計算引擎VVR 8.0.7及以上版本支援使用balance的策略來均勻地將資料寫入ClickHouse的本地表。

  • 僅ClickHouse社區相容版支援寫ClickHouse本地表。

文法結構

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000'
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

WITH參數

參數

說明

資料類型

是否必填

預設值

備忘

connector

結果表類型。

String

固定值為clickhouse

url

ClickHouse的JDBC串連地址。

String

URL格式為jdbc:clickhouse://<yourNetworAddress>:<PortId>/<yourDatabaseName>直接寫本地表時,節點IP可以在ClickHouse執行 select * from system.clusters擷取。如果不寫資料庫名稱,則使用預設的default資料庫。

說明

如果您要將資料寫入ClickHouse分布式表,則URL為該分布式表所在節點的JDBC URL。

userName

ClickHouse的使用者名稱。

String

無。

password

ClickHouse的密碼。

String

無。

tableName

ClickHouse的表名稱。

String

無。

maxRetryTimes

向結果表插入資料失敗後的最大嘗試次數。

Int

3

無。

batchSize

一次批量寫入的資料條數。

Int

100

如果緩衝中的資料條數達到了batchSize參數值,或者等待時間超過flushIntervalMs後,系統將會自動將緩衝中的資料寫入ClickHouse表中。

flushIntervalMs

清空緩衝的時間間隔。

Long

1000

單位為毫秒。

ignoreDelete

是否忽略Delete訊息。

Boolean

true

參數取值如下:

  • true(預設值):忽略。

  • false:不忽略。

    如果為false,並且在DDL中聲明了Primary Key,則會使用ClickHouse的ALTER語句來刪除資料。

說明

如果設定ignoreDelete=false,則無法支援以partition的方式寫ClickHouse分布表的本地表,所以就不能再設定writeMode為partition。

shardWrite

對於ClickHouse分布式表,是否直接寫ClickHouse的本地表。

Boolean

false

參數取值如下:

  • false(預設值):先寫ClickHouse的分布式表,再由分布式表寫入對應的本地表。此時tableName應為分布式表的名稱。

  • true:跳過分布式表,直接將資料寫到該ClickHouse分布式表對應的本地表。

    如果需要提高寫ClickHouse分布式表的輸送量,則建議將該值設定為true。

    • 如果您需要在URL中手動指定要將資料寫到哪些節點的本地表中。此時tableName應該為本地表的名字。程式碼範例如下:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002/default'
      'tableName' = 'local_table'
    • 如果您不需要手動指定本地表的節點,可以通過同時設定inferLocalTable參數來讓Flink自動推測本地表的節點。此時,tableName應該為分布式表的名字,且url為該分布式表所在節點的JDBC URL。程式碼範例如下:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default' // 分布式表所在節點的JDBC URL
      'tableName' = 'distribute_table'

inferLocalTable

對於寫ClickHouse分布式表,是否嘗試推測分布式表的本地表資訊,然後直接寫入本地表中。

Boolean

false

參數取值如下:

  • false(預設值):如果是寫ClickHouse分布式表,並且在參數url中只指定了一個節點,則不會嘗試推測分布式表對應的本地表資訊,而是依然會直接寫入分布式表,再由分布式表寫入對應的本地表。

  • true:Flink將嘗試推測分布式表的本地表資訊,並直接寫入對應的本地表。此時需要 shardWrite參數也同步設定為 true,tableName設定為分布式表的名字,並且url設定為該分布式表所在節點的JDBC URL。

說明

對於寫ClickHouse非分布式表,可直接忽略該參數。

writeMode

對於ClickHouse分布式表,採用何種策略寫ClickHouse的本地表。

Enum

default

參數取值如下:

  • default(預設值):表示總是寫入到第一個節點的本地表。

  • partition:表示將資料按key寫到同一個節點的本地表。

  • random:表示隨機寫到某個節點的本地表。

  • balance:表示採用round-robin的方式,均勻地將資料寫入到本地表節點中。

說明

如果設定了writeMode=partition,請確保配置項ignoreDelete為true。

shardingKey

按何種key將資料寫到同一個節點的本地表。

default

writeMode取值為partition時,shardingKey值必填,可包含多個欄位,多個欄位以英文逗號(,)分隔。

exactlyOnce

是否開啟exactlyOnce語義。

Boolean

false

參數取值如下:

  • true:開啟。

  • false(預設值):不開啟。

說明
  • 目前僅支援寫EMR的ClickHouse的Exactly Once語義。所以只有當您寫EMR的ClickHouse時,才能將exactlyOnce設定為true。

  • 不支援以partition策略寫ClickHouse的本地表的Exactly Once語義。所以如果exactlyOnce設定為true,則writeMode不能設定為partition。

類型映射

Flink欄位類型

ClickHouse欄位類型

BOOLEAN

UInt8 / Boolean

說明

ClickHouse v21.12及以上版本支援Boolean類型。如果您使用的ClickHouse是v21.12以下版本,Flink的Boolean類型則對應ClickHouse的UInt8類型。

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

BIGINT

UInt32

FLOAT

Float32

DOUBLE

Float64

CHAR

FixedString

VARCHAR

String

BINARY

FixedString

VARBINARY

String

DATE

Date

TIMESTAMP(0)

DateTime

TIMESTAMP(x)

Datetime64(x)

DECIMAL

DECIMAL

ARRAY

ARRAY

Nested

說明

ClickHouse暫不支援Flink的TIME、MAP、MULTISET和ROW類型。

對於ClickHouse的Nested類型,需要將其映射成Flink的ARRAY類型,例如:

-- ClickHouse
CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
  ...
);

需要映射為:

-- Flink
CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID` ARRAY<LONG>,
  `Goals.OrderID` ARRAY<STRING>
);
說明

ClickHouse的DateTime類型可以精確到秒,Datetime64可以精確到納秒。對於VVR-6.0.6之前的版本,因為ClickHouse官方提供的JDBC寫Datetime64資料類型會出現精度丟失,只能精確到秒的問題,所以通過Flink只能寫入秒層級的TIMESTAMP,即TIMESTAMP(0)。VVR-6.0.6及之後的版本修複了這個精度丟失問題,通過Flink可以正常寫Datetime64類型的資料。

使用樣本

  • 樣本1:寫ClickHouse單節點表。

    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;
  • 樣本2:寫ClickHouse分布式表。

    假設您已經有三個本地表,表名為local_table_test,分別在192.XX.XX.1、192.XX.XX.2和192.XX.XX.3節點上。然後基於這三個本地表,建立了一個分布式表distributed_table_test。

    • 此時,如果您希望Flink可以直接寫本地表,並且可以按照某個key將相同key的資料寫到同一個節點的本地表中,則DDL程式碼範例如下。

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'local_table_test',
        'shardWrite' = 'true',
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;
    • 此時,如果您不想手動指定本地表的節點,可以讓Flink來自動推測本地表節點,DDL程式碼範例如下:

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- 分布式表所在節點對應的JDBC URL。
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'distributed_table_test', --為分布式表的名字。
        'shardWrite' = 'true',
        'inferLocalTable' = 'true', --需設定inferLocalTable為true。
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;

常見問題