全部产品
Search
文档中心

实时计算Flink版:云数据库HBase

更新时间:Oct 16, 2024

本文为您介绍如何使用云数据库HBase连接器。

背景信息

云数据库HBase是低成本、高扩展、云智能的大数据NoSQL,兼容标准HBase访问协议,提供低成本存储、高扩展吞吐、智能数据处理等核心优势,是为淘宝推荐、花呗风控、广告投放、监控大屏、菜鸟物流轨迹、支付宝账单、手淘消息等众多阿里巴巴核心服务提供支撑的数据库,具备PB规模、高并发、秒级伸缩、毫秒响应、跨机房高可用、全托管、全球分布等企业能力。

HBase连接器支持的信息如下:

类别

详情

支持类型

维表和结果表

运行模式

流模式

数据格式

暂不支持

特有监控指标

  • 源表

    支持的监控指标:无。

  • 维表

    支持的监控指标:无。

  • 结果表

    支持的监控指标:numBytesOut、numBytesOutPerSecond、numRecordsOut、numRecordsOutPerSecond、currentSendTime。

    说明

    指标含义详情,请参见监控指标说明

API种类

SQL

是否支持更新或删除结果表数据

前提条件

注意事项

使用前,请确认已创建数据库实例类型,并选择正确的连接器,使用不当的连接器可能会导致不可预期的问题:

  • 云数据库HBase实例,使用本文的HBase连接器。

  • Lindorm实例兼容HBase模式,使用Lindorm连接器,详情请参见云原生多模数据库Lindorm

  • 如果连接开源HBase,则无法保证数据的正确性。

语法结构

CREATE TABLE hbase_table(
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q2 STRING, q3 BIGINT>,
  family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
  'connector'='cloudhbase',
  'table-name'='<yourTableName>',
  'zookeeper.quorum'='<yourZookeeperQuorum>'
);
  • HBase的列族(Column Family)必须声明为ROW类型,列族名即该ROW的字段名。例如,DDL定义中声明了family1、family2和family3三个列族。

  • HBase列族中的列(Column)与对应ROW中嵌套的每个字段对应,列名即字段名。例如,DDL定义中列族family2声明了q2和q3两列。

  • 除了类型为ROW的字段外,只能有一个原始类型(Atomic Type)的字段(例如STRING或BIGINT),该字段将被视作HBase的行键(Row Key),例如DDL定义中的Rowkey。

  • 必须将HBase的行键定义为结果表的主键(Primary Key),如果没有显示定义主键,默认使用行键作为主键。

  • 结果表中不需要将HBase表的所有列族和列都进行声明,只声明需要的即可。

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为cloudhbase

    table-name

    HBase表名。

    String

    无。

    zookeeper.znode.quorum

    HBase的zookeeper住址。

    String

    无。

    zookeeper.znode.parent

    HBase在zookeeper中的根目录。

    String

    /hbase

    仅在HBase标准版中生效。

    userName

    用户名。

    String

    仅在HBase增强版中生效。

    password

    密码。

    String

    仅在HBase增强版中生效。

    haclient.cluster.id

    HBase高可用实例ID。

    String

    只有访问同城主备实例时才需要配置仅在HBase增强版中生效。

    retires.number

    HBase客户端的重试次数。

    Integer

    31

    无。

    null-string-literal

    HBase字段类型为字符串时,如果Flink字段数据为null,则将该字段赋值为null-string-literal,并写入HBase。

    String

    null

    无。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    sink.buffer-flush.max-size

    写入HBase前,内存中缓存的数据量(字节)大小。调大该值有利于提高HBase写入性能,但会增加写入延迟和内存使用。

    String

    2MB

    支持字节单位B、KB、MB和GB,不区分大小写。设置为0表示不进行缓存。

    sink.buffer-flush.max-rows

    写入HBase前,内存中缓存的数据条数。调大该值有利于提高HBase写入性能,但会增加写入延迟和内存使用。

    Integer

    1000

    设置为0表示不进行缓存。

    sink.buffer-flush.interval

    将缓存数据周期性写入到HBase的间隔,可以控制写入HBase的延迟。

    Duration

    1s

    支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。

    dynamic.table

    是否使用支持动态列的HBase表。

    Boolean

    false

    参数取值如下:

    • true:使用支持动态列的HBase表。

    • false:不使用支持动态列的HBase表。

    sink.ignore-delete

    是否忽略撤回消息。

    Boolean

    false

    参数取值如下:

    • true:忽略撤回消息。

    • false:不忽略撤回消息。

    说明

    仅实时计算引擎VVR 4.0.10及以上版本支持该参数。

    sink.sync-write

    是否同步写入HBase。

    Boolean

    true

    参数取值如下:

    • true:同步写,保证顺序,会牺牲一定性能。

    • false:异步写,不保证顺序,性能更好。

    说明

    仅实时计算引擎VVR 4.0.13及以上版本支持该参数。

    sink.buffer-flush.batch-rows

    同步写入HBase时内存中缓存的数据条数,调大该值有利于提高HBase写入性能,但会增加写入延迟和内存使用。

    Integer

    100

    仅当sink.sync-write为true时生效。

    说明

    仅实时计算引擎VVR 4.0.13及以上版本支持该参数。

    sink.ignore-null

    是否忽略写入null值。

    Boolean

    false

    说明
    • 设置成true时,参数null-string-literal将不再生效。

    • 仅实时计算引擎VVR 8.0.9及以上版本支持该参数。

  • 维表独有(比如Cache参数)

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    cache

    缓存策略。

    String

    ALL

    目前云数据库HBase版维表支持以下三种缓存策略:

    • None:无缓存。

    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

      说明

      需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

    • ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

      说明
      • 适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置相关参数:缓存更新时间间隔cacheTTLMs,更新时间黑名单cacheReloadTimeBlackList

      • 维表中所有数据加载到缓存中,可能会导致作业启动变慢,您可以根据业务需求灵活配置缓存策略。

    因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

    cacheSize

    缓存大小。

    Long

    10000

    当缓存策略选择LRU时,可以设置缓存大小。

    cacheTTLMs

    缓存失效时间,单位为毫秒。

    Long

    cacheTTLMs配置和cache有关:

    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。

    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

    • 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。

    cacheEmpty

    是否缓存空结果。

    Boolean

    true

    无。

    cacheReloadTimeBlackList

    更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。

    String

    格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:

    • 用英文逗号(,)来分隔多个黑名单。

    • 用箭头(->)来分割黑名单的起始结束时间。

    cacheScanLimit

    读取全量HBase数据,RPC(Remote Procedure Call Protocol)服务端一次返回给客户端的行数。

    Integer

    100

    缓存策略选择ALL时启用。

类型映射

Flink中的数据类型在HBase中通过org.apache.hadoop.hbase.util.Bytes转换成字节数组,解码过程有以下两种情况:

  • 对于Flink的非字符串类型,如果HBase中的值为空字节数组,则解码为null。

  • 对于Flink的字符串类型,如果HBase中的值为null-string-literal字节数组,则解码为null。

Flink SQL类型

写入Bytes时CloudHBase转换函数

从CloudHBase读取Bytes的转换函数

CHAR

byte[] toBytes(String s)

String toString(byte[] b)

VARCHAR

STRING

BOOLEAN

byte[] toBytes(boolean b)

boolean toBoolean(byte[] b)

BINARY

byte[]

byte[]

VARBINARY

DECIMAL

byte[] toBytes(BigDecimal v)

BigDecimal toBigDecimal(byte[] b)

TINYINT

new byte[] { val }

bytes[0]

SMALLINT

byte[] toBytes(short val)

short toShort(byte[] bytes)

INT

byte[] toBytes(int val)

int toInt(byte[] bytes)

BIGINT

byte[] toBytes(long val)

long toLong(byte[] bytes)

FLOAT

byte[] toBytes(float val)

float toFloat(byte[] bytes)

DOUBLE

byte[] toBytes(double val)

double toDouble(byte[] bytes)

DATE

将日期转换成自1970.01.01以来的天数,用int表示,并通过byte[] toBytes(int val) 转换成字节数组。

HBase字节数组通过int toInt(byte[] bytes) 转换成int,表示自1970.01.01以来的天数。

TIME

将时间转换成自00:00:00以来的毫秒数,用int表示,并通过byte[] toBytes(int val) 转换成字节数组。

HBase字节数组通过int toInt(byte[] bytes) 转换成int,表示自00:00:00以来的毫秒数。

TIMESTAMP

将时间戳转换成自1970-01-01 00:00:00以来的毫秒数,用long表示,并通过byte[] toBytes(long val) 转换成字节数组。

HBase字节数组通过long toLong(byte[] bytes) ,表示自1970-01-01 00:00:00以来的毫秒数。

代码示例

  • 维表示例。

    CREATE TEMPORARY TABLE datagen_source (
      a INT,
      b BIGINT,
      c STRING,
      `proc_time` AS PROCTIME()
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE hbase_dim (
      rowkey INT,
      family1 ROW<col1 INT>,
      family2 ROW<col1 STRING, col2 BIGINT>,
      family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>
    ) WITH (
      'connector' = 'cloudhbase',
      'table-name' = '<yourTableName>',
      'zookeeper.quorum' = '<yourZookeeperQuorum>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      f1c1 INT,
      f3c3 STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
         SELECT a, family1.col1 as f1c1,  family3.col3 as f3c3 FROM datagen_source
    JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` as h ON datagen_source.a = h.rowkey;
  • 结果表示例。

    CREATE TEMPORARY TABLE datagen_source (
      rowkey INT,
      f1q1 INT,
      f2q1 STRING,
      f2q2 BIGINT,
      f3q1 DOUBLE,
      f3q2 BOOLEAN,
      f3q3 STRING
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE hbase_sink (
      rowkey INT,
      family1 ROW<q1 INT>,
      family2 ROW<q1 STRING, q2 BIGINT>,
      family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
      PRIMARY KEY (rowkey) NOT ENFORCED
    ) WITH (
      'connector'='cloudhbase',
      'table-name'='<yourTableName>',
      'zookeeper.quorum'='<yourZookeeperQuorum>'
    );
     
    INSERT INTO hbase_sink
    SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3) FROM datagen_source;
  • 结果动态表示例。

    CREATE TEMPORARY TABLE datagen_source (
      id INT,
      f1hour STRING,
      f1deal BIGINT,
      f2day STRING,
      f2deal BIGINT
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE hbase_sink (
      rowkey INT,
      f1 ROW<`hour` STRING, deal BIGINT>,
      f2 ROW<`day` STRING, deal BIGINT>
    ) WITH (
      'connector'='cloudhbase',
      'table-name'='<yourTableName>',
      'zookeeper.quorum'='<yourZookeeperQuorum>',
      'dynamic.table'='true'
    );
    
    INSERT INTO hbase_sink
    SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
    • dynamic.table参数值为true时,表示使用支持动态列的HBase表。

    • 每个列族对应的ROW中必须声明两个字段:第1个字段的值表示动态列,第2个字段的值表示动态列的值。

    • 如果datagen_source表存在一条数据,代表ID为1的商品,在10:00-11:00点之间的成交额是100,在2020年7月26日当天的成交额是10000,则HBase中将插入行键为1的行,其中f1:10为100,f2:2020-7-26为10000。