全部产品
Search
文档中心

实时计算Flink版:云原生数据仓库AnalyticDB MySQL版(ADB)3.0

更新时间:Dec 24, 2024

本文为您介绍如何使用云原生数据仓库AnalyticDB MySQL版3.0连接器。

背景信息

云原生数据仓库AnalyticDB MySQL版3.0是融合数据库、大数据技术于一体的云原生企业级数据仓库服务。AnalyticDB MySQL版支持高吞吐的数据实时增删改、低延时的实时分析和复杂ETL,兼容上下游生态工具,可用于构建企业级报表系统、数据仓库和数据服务引擎。

ADB MySQL 3.0连接器支持的信息如下。

类别

详情

支持类型

源表(公测中)、维表和结果表

说明

仅Flink计算引擎VVR 8.0.4及以上版本支持源表(公测中),源表的参数和配置详情请参见Flink订阅Binlog,维表和结果表参数详情请参见WITH参数

运行模式

流模式和批模式

数据格式

暂不适用

特有监控指标

暂无

API种类

SQL

是否支持更新或删除结果表数据

前提条件

语法结构

CREATE TEMPORARY TABLE adb_table (
  `id` INT,
  `num` BIGINT,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'adb3.0',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>'
);
重要

Flink DDL中定义的主键必须和AnalyticDB MySQL数据库物理表中的主键保持一致,主键一致包括是否存在主键和主键名称一致。如果不一致,会影响数据正确性。

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    结果表类型。

    String

    固定值为adb3.0。

    url

    JDBC连接地址。

    String

    云原生数据仓库AnalyticDB MySQL版数据库的JDBC链接地址。固定格式为jdbc:mysql://<endpoint>:<port>/<databaseName>,其中:

    • endpoint和port:您可以登录AnalyticDB 控制台,单击对应的集群名称,进入集群信息页面,在网络信息中获取。

    • databaseName:云原生数据仓库AnalyticDB MySQL版数据库名称。

    userName

    用户名。

    String

    无。

    password

    密码。

    String

    无。

    tableName

    表名。

    String

    无。

    maxRetryTimes

    写入或读取数据失败后,重试的最大次数。

    Integer

    10

    无。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    batchSize

    一次批量写入的条数。

    Integer

    1000

    需指定主键后,该参数才生效。

    bufferSize

    内存中缓存的数据条数。batchSizebufferSize任一到达阈值都会触发写入。

    Integer

    1000

    需指定主键后,该参数才生效。

    flushIntervalMs

    清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。

    Integer

    3000

    单位为毫秒。

    ignoreDelete

    是否忽略Delete操作。

    Boolean

    false

    参数取值如下:

    • true:忽略Delete操作。

    • false:接受Delete操作。

    replaceMode

    DDL中定义了主键的情况下,是否采用replace into语法插入数据。

    Boolean

    true

    该参数取值如下:

    • true:采用replace into语法插入数据。

    • false:采用insert into on duplicate key update语法插入数据。

    说明
    • 仅AnalyticDB MySQL 3.1.3.5及以上版本支持该参数。

    • 此参数仅在DDL中定义了主键时才生效,插入数据时采用的语法详情如下:

      • DDL中定义了主键且replaceMode=true,采用replace into语法插入数据。

      • DDL中定义了主键且replaceMode=false,采用insert into on duplicate key update语法插入数据。

      • DDL中没有定义主键,采用insert into语法插入数据。

    excludeUpdateColumns

    表示更新主键值相同的数据时,忽略指定字段的更新。

    String

    空字符串

    如果忽略指定的字段为多个时,则需要使用英文逗号(,)分割。例如excludeUpdateColumns=column1,column2

    说明
    • 仅在replaceMode=false时,该参数才生效。在replaceMode=true时,对应字段会被更新为null。

    • 要忽略的多个字段需要写在一行中,不能换行。

    connectionMaxActive

    线程池大小。

    Integer

    40

    无。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    cache

    缓存策略。

    String

    ALL

    云原生数据仓库AnalyticDB MySQL版3.0维表支持以下三种缓存策略:

    • None:无缓存。

    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

    • ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

    适用于远程表数据量小且MISS KEY在源表数据和维表JOIN时,ON条件无法关联特别多的场景。

    说明
    • 如果使用CACHE ALL时,请注意节点内存大小,防止出现OOM。

    • 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

    cacheSize

    缓存大小,即缓存多少行数据。

    Integer

    100000

    cacheSize配置和cache为LRU有关。当cache配置为LRU时,必须配置cacheSize参数。

    cacheTTLMs

    缓存超时时间,单位为毫秒。

    Integer

    Long.MAX_VALUE

    cacheTTLMs配置和cache配置为LRU或ALL有关:

    • 如果cache配置为LRU,则cacheTTLMs为缓存失效的超时时间。默认值为Long.MAX_VALUE,即代表缓存不过期。

    • 如果cache配置为ALL,则cacheTTLMs为物理表数据被重新加载的间隔时间。默认值为Long.MAX_VALUE,即代表不重新加载物理表数据。

    说明

    如果cache配置为None,则cacheTTLMs不用配置。因为cache配置为None,表示没有缓存,因此不用配置缓存超时时间。

    maxJoinRows

    主表中每一条数据查询维表时,匹配后最多返回的结果数。

    Integer

    1024

    如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows='n',以确保实时计算匹配处理效率。

    说明

    进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受该参数限制。

类型映射

云原生数据仓库AnalyticDB MySQL版3.0字段类型

Flink字段类型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s) 或NUMERIC(p, s)

DECIMAL(p, s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

使用示例

  • 结果表

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO adb_sink
    SELECT * FROM datagen_source;
  • 维表

    CREATE TEMPORARY TABLE datagen_source(
      `a` INT,
      `b` VARCHAR,
      `c` STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_dim (
      `a` INT,
      `b` VARCHAR,
      `c` VARCHAR
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `a` INT,
      `b` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

相关文档

报错:multi-statement be found