全部产品
Search
文档中心

大数据开发治理平台 DataWorks:AnalyticDB for PostgreSQL数据源

更新时间:Oct 23, 2024

AnalyticDB for PostgreSQL数据源提供读取和写入AnalyticDB for PostgreSQL的双向功能,本文为您介绍DataWorks的AnalyticDB for PostgreSQL数据同步的能力支持情况。

使用限制

离线同步支持读取视图表。

支持的版本

支持版本最高至7.0(含)。

支持的字段类型

离线读

AnalyticDB for PostgreSQL Reader支持大部分AnalyticDB for PostgreSQL类型,但也存在部分类型没有支持的情况,请注意检查您的数据类型。

AnalyticDB for PostgreSQL Reader针对AnalyticDB for PostgreSQL的类型转换列表,如下所示。

类型分类

AnalyticDB for PostgreSQL数据类型

整数类

BIGINT、BIGSERIAL、INTEGER、SMALLINT、SERIAL和GEOMETRY

浮点类

DOUBLE、PRECISION、MONEY、NUMERIC和REAL

字符串类

VARCHAR、CHAR、TEXT、BIT和INET

日期时间类

DATE、TIME和TIMESTAMP

布尔型

BOOL

二进制类

BYTEA

离线写

AnalyticDB for PostgreSQL Writer支持大部分AnalyticDB for PostgreSQL类型,但也存在部分类型没有支持的情况,请注意检查您的类型。

AnalyticDB for PostgreSQL Writer针对AnalyticDB for PostgreSQL的类型转换列表,如下所示。

类型分类

AnalyticDB for PostgreSQL数据类型

LONG

BIGINT、BIGSERIAL、INTEGER、SMALLINT和SERIAL

DOUBLE

DOUBLE、PRECISION、MONEY、NUMERIC和REAL

STRING

VARCHAR、CHAR、TEXT、BIT、INET和GEOMETRY

DATE

DATE、TIME和TIMESTAMP

BOOLEAN

BOOL

BYTES

BYTEA

说明

MONEY、INET和BIT需要您使用a_inet::varchar类似的语法进行转换。

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

整库离线读同步配置指导

操作流程请参见数据集成侧同步任务配置

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

{
    "type": "job",
    "steps": [
        {
            "parameter": {
                "datasource": "test_004",//数据源名称。
                "column": [//源端表的列名。
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "where": "id=1001",//过滤条件。
                "splitPk": "id",//切分键。
                "table": "public.person"//源端表名。
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",//版本号
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {//错误记录数。
            "record": ""
        },
        "speed": {
            "concurrent": 6,//并发数。
            "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
           "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    }
}

Reader脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,该配置项输入的内容必须和添加的数据源名称保持一致。

table

选取的需要同步的表名称。

column

所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息 。默认使用所有列配置,例如[*]

  • 支持列裁剪,即列可以挑选部分列进行导出。

  • 支持列换序,即列可以不按照表Schema信息顺序进行导出。

  • 支持常量配置,您需要按照SQL语法格式。例如,["id", "table","1","'mingya.wmy'","'null'", "to_char(a+1)","2.3","true"]

    • id为普通列名。

    • table为包含保留字的列名。

    • 1为整型数字常量。

    • ‘mingya.wmy’为字符串常量(注意需要加上一对单引号)。

    • 'null'为字符串常量。

    • to_char(a+1)为计算字符串长度函数。

    • 2.3为浮点数。

    • true为布尔值。

  • column必须显示指定同步的列集合,不允许为空。

splitPk

AnalyticDB for PostgreSQL Reader进行数据抽取时,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片。数据同步会启动并发任务进行数据同步,以提高数据同步的效能。

  • 通常表主键较为均匀,切分出的分片不易出现数据热点,所以推荐splitPk用户使用表主键。

  • 目前splitPk仅支持整型数据切分,不支持字符串、浮点、日期等其他类型 。如果您指定其他非支持类型,忽略splitPk功能,使用单通道进行同步。

  • 如果不填写splitPk,包括不提供splitPk或者splitPk值为空,数据同步视作使用单通道同步该表数据 。

where

筛选条件,AnalyticDB for PostgreSQLReader根据指定的columntablewhere条件拼接SQL,并根据该SQL进行数据抽取。例如测试时,可以将where条件指定实际业务场景,往往会选择当天的数据进行同步,将where条件指定为id>2 and sex=1

  • where条件可以有效地进行业务增量同步。

  • where条件不配置或者为空,视作全表同步数据。

querySql(高级模式,向导模式不提供)

在部分业务场景中,where配置项不足以描述所筛选的条件,您可以通过该配置型来自定义筛选SQL。当配置此项后,数据同步系统就会忽略columntable等配置项,直接使用该项配置的内容对数据进行筛选。例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id

当您配置querySql时,AnalyticDB for PostgreSQL Reader直接忽略columntablewhere条件的配置。

fetchSize

该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了数据集成和服务器端的网络交互次数,能够提升数据抽取性能。

说明

fetchSize值过大(>2048)可能造成数据同步进程OOM。

512

Writer脚本Demo

{
    "type": "job",
    "steps": [
        {
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {
                "postSql": [],//导入后的完成语句。
                "datasource": "test_004",//数据源名。
                "column": [//目标表的列名。
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "table": "public.person",//目标表的表名。
                "preSql": []//导入前的准备语句。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",//版本号。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {//错误记录数。
            "record": ""
        },
        "speed": {
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":6, //作业并发数。
            "mbps":"12"//限流
        }
    }
}

Writer脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。

table

选取的需要同步的表名称。

writeMode

选择导入模式,支持insert、copy和upsert方式。

  • insert:执行PostgreSQL的insert into...values... 语句,将数据写入至PostgreSQL中。建议优先选择该模式。

  • copy:PostgreSQL提供copy命令,用于表与文件(标准输出,标准输入)之间的相互复制。数据集成支持使用copy from,将数据加载到表中。建议您在遇到性能问题时再尝试使用该模式。

  • upsert:写入数据发生冲突后,按照conflictMode参数的设置进行新旧数据的处理。

insert

conflictMode

writeModeupsert,写入数据至PostgreSQL出现主键或唯一性索引冲突时,可选择如下冲突处理策略:

  • replace:写入数据发生冲突后,使用待同步的新数据覆盖原有的旧数据。

  • ignore:写入数据发生冲突后,忽略新数据,并保留原有旧数据。

说明

当前仅支持通过脚本模式配置冲突处理策略。

replace

column

目标表需要写入数据的字段,字段之间用英文逗号分隔。例如"column":["id","name","age"]。如果要依次写入全部列,使用*表示,例如"column":["*"]。

preSql

执行数据同步任务之前率先执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如清除旧数据。

postSql

执行数据同步任务之后执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如加上某一个时间戳。

batchSize

一次性批量提交的记录数大小,该值可以极大减少数据集成与AnalyticDB for PostgreSQL的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成数据集成运行进程OOM情况。

1,024