全部产品
Search
文档中心

大数据开发治理平台 DataWorks:PolarDB数据源

更新时间:Oct 23, 2024

PolarDB数据源为您提供读取和写入PolarDB双向通道的功能,您可以通过向导模式和脚本模式配置同步任务。

使用限制

离线读写

支持读取视图表。

实时读

来源数据源为阿里云PolarDB MySQL时,您需要开启Binlog。阿里云PolarDB MySQL是一款完全兼容MySQL的云原生数据库,默认使用了更高级别的物理日志代替Binlog,但为了更好地与MySQL生态融合,PolarDB支持开启Binlog的功能。

支持的字段类型

离线读

PolarDB Reader针对PolarDB类型的转换列表,如下所示。

类型分类

PolarDB数据类型

整数类

INT、TINYINT、SMALLINT、MEDIUMINT和BIGINT

浮点类

FLOAT、DOUBLE和DECIMAL

字符串类

VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT和LONGTEXT

日期时间类

DATE、DATETIME、TIMESTAMP、TIME和YEAR

布尔型

BIT和BOOL

二进制类

TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB和VARBINARY

说明
  • 除上述罗列字段类型外,其它类型均不支持。

  • PolarDB Reader插件将tinyint(1)视作整型。

离线写

类似于PolarDB Reader ,目前PolarDB Writer支持大部分PolarDB类型,但也存在部分类型没有支持的情况,请注意检查您的数据类型。

PolarDB Writer针对PolarDB类型的转换列表,如下所示。

类型分类

PolarDB数据类型

整数类

INT、TINYINT、SMALLINT、MEDIUMINT、BIGINT和YEAR

浮点类

FLOAT、DOUBLE和DECIMAL

字符串类

VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT和LONGTEXT

日期时间类

DATE、DATETIME、TIMESTAMP和TIME

布尔型

BOOL

二进制类

TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB和VARBINARY

数据同步前准备

准备工作1:配置白名单

Serverless资源组或独享数据集成资源组所在的VPC网段添加至OceanBase的白名单中,详情请参见添加白名单。

准备工作2:创建账号并配置账号权限

创建账号并配置账号权限。

您需要规划一个数据库的登录账户用于后续执行操作,此账户需拥有数据库的 SELECT, REPLICATION SLAVE, REPLICATION CLIENT权限。

  1. 创建账号。

    操作详情可参见创建和管理数据库账号

  2. 配置权限。

    您可参考以下命令为账号添加此权限,或直接给账号赋予SUPER权限。

    -- CREATE USER '同步账号'@'%' IDENTIFIED BY '同步账号';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '同步账号'@'%';

准备工作3:开启PolarDB的开启Binlog

操作详情可参见开启Binlog

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发:PolarDB同步流程引导

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

单表、整库实时同步任务配置指导

操作流程请参见DataStudio侧实时同步任务配置

整库离线读、单表/整库全增量实时读同步任务配置指导

操作流程请参见数据集成侧同步任务配置

常见问题

实时同步Oracle、PolarDB、MySQL任务重复报错

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

单库单表的脚本示例如下,详情请参见上述参数说明。

{
    "type": "job",
    "steps": [
        {
            "parameter": {
                "datasource": "test_005",//数据源名。
                "column": [//源端列名。
                    "id",
                    "name",
                    "age",
                    "sex",
                    "salary",
                    "interest"
                ],
                "where": "id=1001",//过滤条件。
                "splitPk": "id",//切分键。
                "table": "PolarDB_person",//源端表名。
              	"useReadonly": "false"//是否从备库读取数据。
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {}
    ],
    "version": "2.0",//版本号。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {//错误记录数。
            "record": ""
        },
        "speed": {
            "concurrent": 6,//并发数。
            "throttle": true//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
      "mbps":"12",//限流,此处1mbps = 1MB/s。
        }
    }
}

Reader脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。

table

选取的需要同步的表名称。

useReadonly

如果您希望读写分离,从PolarDB集群的备库读取数据,则此参数配置为true。不配置时,默认为false,表示从主库读取数据。

false

column

所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息 。默认使用所有列配置,例如[*]。

  • 支持列裁剪,即列可以挑选部分列进行导出。

  • 支持列换序,即列可以不按照表Schema信息顺序进行导出。

  • 支持常量配置,您需要按照SQL语法格式,例如["id", "table","1","'mingya.wmy'","'null'", "to_char(a+1)","2.3","true"]

    • id为普通列名。

    • table为包含保留字的列名。

    • 1为整型数字常量。

    • ‘mingya.wmy’为字符串常量(注意需要加上一对单引号)。

    • 'null'为字符串常量。

    • to_char(a+1)为计算字符串长度函数。

    • 2.3为浮点数。

    • true为布尔值。

  • column必须显示指定同步的列集合,不允许为空。

splitPk

PolarDB Reader进行数据抽取时,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,从而提高数据同步的效能。

  • 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片不容易出现数据热点。

  • 目前splitPk仅支持整型数据切分,不支持字符串、浮点、日期等其他类型 。如果您指定其他非支持类型,忽略plitPk功能,使用单通道进行同步。

  • 如果splitPk不填写,包括不提供splitPk或者splitPk值为空,数据同步视作使用单通道同步该表数据 。

splitFactor

切分因子,可以配置同步数据的切分份数,如果配置了多并发,会按照并发数 * splitFactor份来切分。例如,并发数=5,splitFactor=5,则会按照5*5=25份来切分,在5个并发线程上执行。

说明

建议取值范围:1~100,过大会导致内存溢出。

5

where

筛选条件,在实际业务场景中,往往会选择当天的数据进行同步,将where条件指定为gmt_create>$bizdate

  • where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或value,数据同步均视作同步全量数据。

  • 将where条件指定为limit 10不符合WHERE子句约束,不建议使用。

querySql(高级模式,向导模式不提供)

在部分业务场景中,where配置项不足以描述所筛选的条件,您可以通过该配置型来自定义筛选SQL。当配置该项后,数据同步系统就会忽略columntablewhere配置项,直接使用该项配置的内容对数据进行筛选。例如需要进行多表 join 后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id。当您配置querySql时,PolarDB Reader直接忽略columntablewhere条件的配置,querySql优先级大于table、column、where、splitPk选项。datasource会使用它解析出用户名和密码等信息。

Writer脚本Demo

脚本配置样例如下,详情请参见上述参数说明。

{
    "type": "job",
    "steps": [
        {
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {
                "postSql": [],//导入后完成语句。
                "datasource": "test_005",//数据源名称。
                "column": [//目标列名。
                    "id",
                    "name",
                    "age",
                    "sex",
                    "salary",
                    "interest"
                ],
                "writeMode": "insert",//写入模式。
                "batchSize": 256,//一次性批量提交的记录数大小。
                "table": "PolarDB_person_copy",//目标表名。
                "preSql": []//导入前准备语句。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",//版本号。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {//错误记录数。
            "record": ""
        },
        "speed": {
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":6, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    }
}

Writer脚本参数

  • 全量参数说明

    参数

    描述

    必选

    默认值

    datasource

    数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。

    table

    选取的需要同步的表名称。

    writeMode

    选择导入模式,可以支持:

    • insert(即向导模式的insert into)

    • update(即向导模式的on duplicate key update)

    • replace(即向导模式的replace into)

    不同方式的详细介绍与场景示例请参见下文的writeMode(主键冲突)参数详解

    insert

    column

    目标表需要写入数据的字段,字段之间用英文所逗号分隔。例如"column": ["id", "name", "age"]。如果要依次写入全部列,使用(*)表示。 例如"column": [" *"]

    preSql

    执行数据同步任务之前率先执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如清除旧数据。

    postSql

    执行数据同步任务之后执行的SQL语句,目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如加上某一个时间戳。

    batchSize

    一次性批量提交的记录数大小,该值可以极大减少数据同步系统与PolarDB的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成数据同步运行进程OOM情况。

    1,024

    updateColumn

    writeMode配置成update时,发生遇到主键/唯一性索引冲突时所更新的字段。字段之间用英文逗号所分隔,例如"updateColumn": ["name", "age"]

    说明

    目前仅支持PolarDB for MySQL。

  • writeMode(主键冲突)参数详解

    对比介绍

    insert(即向导模式的insert into)

    update(即向导模式的on duplicate key update)

    replace(即向导模式的replace into)

    处理策略

    当主键或唯一性索引冲突时,冲突行不写入目标表,以脏数据的形式体现。

    没有遇到主键或唯一性索引冲突时,与insert into行为一致。冲突时会用新行替换已经指定的字段的语句,写入数据至目标表。

    没有遇到主键或唯一性索引冲突时,与insert into行为一致。冲突时会先删除原有行,再插入新行。即新行会替换原有行的所有字段。

    数据示例

    • 源表数据

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+
    • 目标表原始数据

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 2  | wangwu  |     |
      +----+---------+-----+
    • 运行任务后,成功写入目标表1条数据,脏数据1条。

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | wangwu  |     |
      +----+---------+-----+
    • 场景1:任务配置部分字段:"column": ["id","name"]

      • 源表数据

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
      • 目标表原始数据

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 2  | wangwu  |  3  |
        +----+---------+-----+
      • 运行任务后,成功写入目标表2条数据,脏数据0条。

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    | 3   |
        +----+---------+-----+
    • 场景2:任务配置所有字段,"column": ["id","name","age"]

      • 源表数据

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
      • 目标表原始数据

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 2  | wangwu  |  3  |
        +----+---------+-----+
      • 运行任务后,成功写入目标表2条数据,脏数据0条。

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
    • 源表数据

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+
    • 目标表原始数据

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 2  | wangwu  |  3  |
      +----+---------+-----+
    • 运行任务后,成功写入目标表2条数据,脏数据0条。

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+