全部产品
Search
文档中心

大数据开发治理平台 DataWorks:AnalyticDB for MySQL 2.0数据源

更新时间:Oct 23, 2024

AnalyticDB for MySQL 2.0数据源为您提供读取和写入AnalyticDB for MySQL 2.0的双向通道,本文为您介绍DataWorks的AnalyticDB for MySQL 2.0数据同步的能力支持情况。

使用限制

  • 离线同步支持读取视图(VIEW)表。

  • AnalyticDB for MySQL 2.0 Reader不支持multivalue,否则同步任务会直接异常退出。

支持的字段类型

离线读

AnalyticDB for MySQL 2.0 Reader针对AnalyticDB for MySQL 2.0类型的转换列表,如下所示。

AnalyticDB for MySQL 2.0类型

DataX类型

MaxCompute类型

BIGINT

LONG

BIGINT

TINYINT

LONG

INT

TIMESTAMP

DATE

DATETIME

VARCHAR

STRING

STRING

SMALLINT

LONG

INT

INT

LONG

INT

FLOAT

STRING

DOUBLE

DOUBLE

STRING

DOUBLE

DATE

DATE

DATETIME

TIME

DATE

DATETIME

离线写

AnalyticDB for MySQL 2.0 Writer针对AnalyticDB for MySQL 2.0类型的转换列表,如下所示。

类型

AnalyticDB for MySQL 2.0数据类型

整数类

INT、TINYINT、SMALLINT、BIGINT

浮点类

FLOAT和DOUBLE

字符串类

VARCHAR

日期时间类

DATE和TIMESTAMP

布尔类

BOOLEAN

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

整库离线读同步配置指导

操作流程请参见数据集成侧同步任务配置

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

{
    "type": "job",
  "steps": [
    {
            "stepType": "ads",
      "parameter": {
        "datasource": "ads_demo",
        "table": "th_test",
        "column": [
          "id",
          "testtinyint",
          "testbigint",
          "testdate",
          "testtime",
          "testtimestamp",
          "testvarchar",
          "testdouble",
          "testfloat"
        ],
        "odps": {
          "accessId": "<yourAccessKeyId>",
          "accessKey": "<yourAccessKeySecret>",
          "account": "*********@aliyun.com",
          "odpsServer": " http://service.cn-shanghai-vpc.maxcompute.aliyun-inc.com/api",
          "tunnelServer": "http://dt.cn-shanghai-vpc.maxcompute.aliyun-inc.com",
          "accountType": "aliyun",
          "project": "odps_test"
        },
        "mode": "ODPS"
      },
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Writer",
      "category": "writer"
    }
  ],
  "version": "2.0",
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  },
  "setting": {
    "errorLimit": {
      "record": ""
    },
    "speed": {
      "concurrent": 2,
      "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                  "mbps":"12"//限流,此处1mbps = 1MB/s。
    }
  }
}

Reader脚本参数

参数

描述

是否必选

默认值

table

需要导出的表的名称。

column

列名,如果没有,则为全部。

*

limit

限制导出的记录数。

where

where条件,方便添加筛选条件,此处的String会被直接作为SQL条件添加到查询语句中,例如where id < 100

mode

目前支持Select和ODPS2种导入类型。

  • Select:使用limit分页。

  • ODPS:使用ODPS DUMP来导出数据,需要有ODPS的访问权限。

Select

odps.accessKey

当mode=ODPS时必填,AnalyticDB for MySQL 2.0访问ODPS使用的云账号AccessKey,需要有Describe、Create、Select、Alter、Update和Drop权限。

odps.accessId

当mode=ODPS时必填,AnalyticDB for MySQL 2.0访问ODPS使用的云账号AccessID,需要有Describe、Create、Select、Alter、Update和Drop权限。

odps.odpsServer

当mode=ODPS时必填,ODPS API地址。

odps.tunnelServer

当mode=ODPS时必填,ODPS Tunnel地址。

odps.project

当mode=ODPS时必填,ODPS Project名称。

odps.accountType

当mode=ODPS时生效,ODPS访问账号类型。

aliyun

Writer脚本Demo

{
    "type":"job",
    "version":"2.0",
    "steps":[ 
        {
            "stepType":"stream",
            "parameter":{
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"ads",//插件名。
            "parameter":{
                "partition":"",//目标表的分区名称。
                "datasource":"",//数据源。
                "column":[//字段。
                     "id"
                ],
                "writeMode":"insert",//写入模式。
                "batchSize":"256",//一次性批量提交的记录数大小。
                "table":"",//表名。
                "overWrite":"true"//AnalyticDB for MySQL 2.0写入是否覆盖当前写入的表,true为覆盖写入,false为不覆盖。(追加)写入。当 writeMode 为 Load 时,该值才会生效。
                "options.ignoreEmptySource":true//忽略源头数据空文件报错信息,任务不报错,默认true,不配置默认是true,设置为false读不到源头数据任务会报错
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Writer脚本参数

参数

描述

必选

默认值

连接url

AnalyticDB for MySQL 2.0连接信息,格式为Address:Port

数据库

AnalyticDB for MySQL 2.0的数据库名称。

Access Id

AnalyticDB for MySQL 2.0对应的AccessKey Id

Access Key

AnalyticDB for MySQL 2.0对应的AccessKey Secret

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。

table

目标表的表名称。

partition

目标表的分区名称,当目标表为普通表,需要指定该字段。

writeMode

AnalyticDB for MySQL 2.0 Writer实现了两种模式向AnalyticDB for MySQL 2.0导入数据。

  • Insert模式,在主键冲突情况下新的记录会覆盖旧的记录。

  • Load模式,数据通过第三方系统中转落地导入方式。

column

目的表字段列表,可以为["*"],或者具体的字段列表,例如["a", "b", "c"]

suffix

AnalyticDB for MySQL 2.0 url配置项的格式为ip:port,此部分为您定制的连接串,是可选参数。实际在AnalyticDB for MySQL 2.0数据库访问时,会变成JDBC数据库连接串。例如配置suffix为autoReconnect=true&failOverReadOnly=false&maxReconnects=10

batchSize

AnalyticDB for MySQL 2.0提交数据写的批量条数,当writeMode为insert时,该值才会生效。

writeMode为insert时,为必选。

bufferSize

DataX数据收集缓冲区大小,缓冲区的目的是积累一个较大的Buffer,源头的数据首先进入到此Buffer中进行排序,排序完成后再提交至AnalyticDB for MySQL 2.0。排序是根据AnalyticDB for MySQL 2.0的分区列模式进行的,排序的目的是数据顺序对AnalyticDB for MySQL 2.0服务端更友好(出于性能考虑)。

BufferSize缓冲区中的数据会经过batchSize批量提交至AnalyticDB for MySQL 2.0,通常需要设置bufferSize为batchSize数量的多倍。当writeMode为insert时,该值才会生效。

writeMode为insert时,为必选。

默认不配置不开启此功能。