全部产品
Search
文档中心

大数据开发治理平台 DataWorks:Salesforce数据源

更新时间:Oct 23, 2024

Salesforce提供按需定制的软件服务,这些服务涉及客户关系管理的各个方面,例如从普通的联系人管理、产品目录到订单管理、机会管理、销售管理等。DataWorks数据集成支持读取Salesforce类型的数据源,本文为您介绍Salesforce的使用详情。

支持的字段类型

字段类型

脚本模式数据类型

address

STRING

anyType

STRING

base64

BYTES

boolean

BOOL

combobox

STRING

complexvalue

STRING

currency

DOUBLE

date

DATE

datetime

DATE

double

DOUBLE

email

STRING

encryptedstring

STRING

id

STRING

int

LONG

json

STRING

long

LONG

multipicklist

STRING

percent

DOUBLE

phone

STRING

picklist

STRING

reference

STRING

string

STRING

textarea

STRING

time

DATE

url

STRING

geolocation

STRING

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

Salesforce支持如下方式创建数据源:

  • Salesforce官方:通过登录Salesforce官网自动获取Salesforce访问地址信息,创建数据源。

  • 自定义:通过指定您自己部署的Connected App的Consumer KeyConsumer Secret,自动获取Salesforce访问地址,创建数据源。

    自定义模式数据源配置方式

    创建Connected App

    1. 进入创建页面。

      1. 前往Salesforce官网并登录。

      2. 在顶部单击image,然后在左侧导航栏单击App Manager

      3. Lightning Experience App Manager页面,单击New Connected App

        image

    2. 填写Connected App相关配置。

      image

      关键参数配置:

      序号

      参数说明

      根据业务需求,创建符合Salesforce规范的应用名(Connected App Name)、API名称(API Name)及联系人邮箱(Contact Email)。

      开启Enable OAuth Settings。回调URL(Callback URL)填写为https://bff-cn-shanghai.data.aliyun.com/di/oauth/callback/index.html

      Selected OAuth Scopes中添加以下Scope:

      • Access Connect REST API resources (chatter api)

      • Access the identity URL service (id, profile, email, address, phone)

      • Access unique user identifiers (openid)

      • Manage user data via APIs (api)

      • Perform requests at any time (refresh token, offline_access)

      • 关闭Require Proof Key for Code Exchange (PKCE) Extension for Supported Authorization Flows

      • 开启Require Secret for Web Server Flow

      • 开启Require Secret for Refresh Token Flow

    3. 查看Connected App的Consumer Key and Secret

      1. 在App Manager列表中找到已创建的App,点击App右侧的image,然后单击View

      2. 在App详情页面,查看Consumer Key and Secret

        image

      3. 复制Consumer KeyConsumer Secret值用于后续创建Saleforce数据源时使用。

        image

    填写数据源参数

    1. 进入数据集成页面。

      登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据集成,在下拉框中选择对应工作空间后单击进入数据集成

    2. 单击左侧导航栏的数据源,进入数据源管理页面。

    3. 单击新增数据源,搜索Salesforce并打开,在新增Salesforce数据源页面,配置数据源类型自定义

      image

      关键参数配置:

      参数

      说明

      登录地址

      填写为https://<Salesforce域名>/services/oauth2/authorize

      认证地址

      填写为https://<Salesforce域名>/services/oauth2/token

      Consumer KeyConsumer Secret

      填写为已获取Connected App详情中的Consumer KeyConsumer Secret

      单击登录Salesforce,在授权页面填写用户名、密码后,单击允许授权(Allow),完成数据源配置。

      image

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

案例1:查询Salesforce Object

{
  "type":"job",
  "version":"2.0",
  "steps":[
    {
      "stepType":"salesforce",
      "parameter":{
        "datasource":"",
        "serviceType": "sobject",
        "table": "Account",
        "beginDateTime": "20230817184200",
        "endDateTime": "20231017184200",
        "where": "",
        "column": [
          {
            "type": "STRING",
            "name": "Id"
          },
          {
            "type": "STRING",
            "name": "Name"
          },
          {
            "type": "BOOL",
            "name": "IsDeleted"
          },
          {
            "type": "DATE",
            "name": "CreatedDate"
          }
        ]
      },
      "name":"Reader",
      "category":"reader"
    },
    {
      "stepType":"stream",
      "parameter":{},
      "name":"Writer",
      "category":"writer"
    }
  ],
  "setting":{
    "errorLimit":{
      "record":"0"
    },
    "speed":{
      "throttle":true,
      "concurrent":1,
      "mbps":"12"
    }
  },
  "order":{
    "hops":[
      {
        "from":"Reader",
        "to":"Writer"
      }
    ]
  }
}

案例2:使用BULK API1.0查询Salesforce Object

{
  "type":"job",
  "version":"2.0",
  "steps":[
    {
      "stepType":"salesforce",
      "parameter":{
        "datasource":"",
        "serviceType": "bulk1",
        "table": "Account",
        "beginDateTime": "20230817184200",
        "endDateTime": "20231017184200",
        "where": "",
        "blockCompoundColumn":true,
        "bulkQueryJobTimeoutSeconds":86400,
        "column": [
          {
            "type": "STRING",
            "name": "Id"
          },
          {
            "type": "STRING",
            "name": "Name"
          },
          {
            "type": "BOOL",
            "name": "IsDeleted"
          },
          {
            "type": "DATE",
            "name": "CreatedDate"
          }
        ]
      },
      "name":"Reader",
      "category":"reader"
    },
    {
      "stepType":"stream",
      "parameter":{
        "print": true
      },
      "name":"Writer",
      "category":"writer"
    }
  ],
  "setting":{
    "errorLimit":{
      "record":"0"
    },
    "speed":{
      "concurrent":1
    }
  },
  "order":{
    "hops":[
      {
        "from":"Reader",
        "to":"Writer"
      }
    ]
  }
}

案例3:使用BULK API2.0查询Salesforce Object

{
  "type":"job",
  "version":"2.0",
  "steps":[
    {
      "stepType":"salesforce",
      "parameter":{
        "datasource":"",
        "serviceType": "bulk2",
        "table": "Account",
        "beginDateTime": "20230817184200",
        "endDateTime": "20231017184200",
        "where": "",
        "blockCompoundColumn":true,
        "bulkQueryJobTimeoutSeconds":86400,
        "column": [
          {
            "type": "STRING",
            "name": "Id"
          },
          {
            "type": "STRING",
            "name": "Name"
          },
          {
            "type": "BOOL",
            "name": "IsDeleted"
          },
          {
            "type": "DATE",
            "name": "CreatedDate"
          }
        ]
      },
      "name":"Reader",
      "category":"reader"
    },
    {
      "stepType":"stream",
      "parameter":{},
      "name":"Writer",
      "category":"writer"
    }
  ],
  "setting":{
    "errorLimit":{
      "record":"0"
    },
    "speed":{
      "throttle":true,
      "concurrent":1,
      "mbps":"12"
    }
  },
  "order":{
    "hops":[
      {
        "from":"Reader",
        "to":"Writer"
      }
    ]
  }
}

案例4:使用SOQL查询语句

{
  "type":"job",
  "version":"2.0",
  "steps":[
    {
      "stepType":"salesforce",
      "parameter":{
        "datasource":"",
        "serviceType": "query",
        "query": "select Id, Name, IsDeleted, CreatedDate from Account where Name!='Aliyun' ",
        "column": [
          {
            "type": "STRING",
            "name": "Id"
          },
          {
            "type": "STRING",
            "name": "Name"
          },
          {
            "type": "BOOL",
            "name": "IsDeleted"
          },
          {
            "type": "DATE",
            "name": "CreatedDate"
          }
        ]
      },
      "name":"Reader",
      "category":"reader"
    },
    {
      "stepType":"stream",
      "parameter":{},
      "name":"Writer",
      "category":"writer"
    }
  ],
  "setting":{
    "errorLimit":{
      "record":"0"
    },
    "speed":{
      "throttle":true,
      "concurrent":1,
      "mbps":"12"
    }
  },
  "order":{
    "hops":[
      {
        "from":"Reader",
        "to":"Writer"
      }
    ]
  }
}

Reader脚本参数

参数

是否必选

描述

默认值

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。

serviceType

同步方式,支持以下配置项:

  • sobject:查询Salesforce Object。

  • query:通过SOQL查询语句查询数据。

  • bulk1:使用Salesforce Bulk API 1.0查询Salesforce Object。

  • bulk2:使用Salesforce Bulk API 2.0查询Salesforce Object。

    重要
    • bulk1、bulk2模式不支持address、geolocation等组合字段类型。

    • bulk2模式不支持分布式任务。

    • 在某些情况下,bulk1的性能表现会比bulk2更好,您可以根据自己的Salesforce Object测试性能并选择。

sobject

table

Salesforce Object,和表名类似,例如Account、Case、Group。当serviceType配置为sobject、bulk1或bulk2时必填。

beginDateTime

  • 数据消费的开始、结束时间位点。当serviceType配置为sobject、bulk1或bulk2时使用。

  • 使用Salesforce Object的最后修改时间过滤数据,最后修改时间字段会按照以下优先级自动寻找:SystemModstamp>LastModifiedDate>CreatedDate。

  • 遵循左闭右开原则。

  • yyyymmddhhmmss格式的时间字符串,您可以配合调度参数,实现增量抽取数据的效果。

endDateTime

splitPk

  • 切分键。当serviceType配置为sobject时使用。

  • Salesforce Reader进行数据抽取时,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,提高数据同步的效能。

  • splitPk支持datetime、int、long字段,不符合这3个数据类型时,任务报错。

blockCompoundColumn

组合数据类型行为。当serviceType配置为bulk1或bulk2时使用。取值:

  • true:表示存在组合数据类型时,任务失败,您需要去掉组合数据类型的列映射后重新运行。

  • false:表示将组合数据类型作为NULL值读取。

true

bulkQueryJobTimeoutSeconds

  • 批量数据准备超时,单位:秒。当serviceType配置为bulk1或bulk2时使用。

  • 在开始读取数据之前,Salesforce服务端需要先运行准备批量数据的任务,如果该任务运行时间超过本配置,则视为准备数据超时,任务失败。

86400

batchSize

  • 单批下载条数。当serviceType配置为bulk1或bulk2时使用。

  • 本配置只需要比Salesforce批量数据准备任务自动分片的单批条数稍大即可获得最高下载性能。

  • 数据下载是流式的,增大此配置项不会占用更多内存。

  • 高级模式,向导模式不支持此参数的配置。

300000

where

  • 筛选条件。当serviceType配置为sobject、bulk1或bulk2时使用。

  • 在实际业务场景中,可以进行数据筛选,例如Name != 'Aliyun'。

  • 如果不填写where语句,数据同步均视作同步全量数据。

  • 不可以将where条件指定为limit 10,这不符合SOQL WHERE子句约束。

query

  • 查询语句。当serviceType配置为query时使用。

  • 在部分业务场景中,where配置项不足以描述所筛选的条件,您可以通过该配置项来自定义筛选SQL。配置该项后,数据同步系统会忽略table、column、beginDateTime、endDateTime、where和splitPk配置项,直接使用该项配置的内容对数据进行筛选。例如:select Id, Name, IsDeleted from Account where Name!='Aliyun'

  • 高级模式,向导模式不支持此参数的配置。

queryAll

  • 查询语句,包含已删除的数据。当serviceType配置为sobject或query时使用。

  • 配置为true时查询包含已删除的记录,IsDeleted字段可以用于区分记录是否已删除。

false

column

所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。

  • 支持列裁剪,可以挑选部分列进行导出。

  • 支持列换序,可以不按照表schema信息顺序进行导出。

  • 支持常量配置,如下:

    [
      {
        "name": "Id",
        "type": "STRING"
      },
      {
        "name": "Name",
        "type": "STRING"
      },
      {
        "name": "'123'",
        "type": "LONG"
      },
      {
        "name": "'abc'",
        "type": "STRING"
      }
    ]
    说明
    • Id、Name为普通列名。

    • '123'为整型数字常量(注意需要加上一对单引号)。

    • 'abc'为字符串常量(注意需要加上一对单引号)。

  • column必须显示指定同步的列集合,不允许为空。

connectTimeoutSeconds

  • 建立HTTP连接的超时时间,单位:秒,超过本配置项时,任务失败。

  • 高级模式,向导模式不支持此参数的配置。

30

socketTimeoutSeconds

  • HTTP连接失去响应,单位:秒,前后报文传输间隔大于此配置项时,任务失败。

  • 高级模式,向导模式不支持此参数的配置。

600

retryIntervalSeconds

  • 重试间隔,单位:秒。

  • 高级模式,向导模式不支持此参数的配置。

60

retryTimes

  • 重试次数。

  • 高级模式,向导模式不支持此参数的配置。

3