全部產品
Search
文件中心

DataWorks:OSS資料來源

更新時間:Jun 19, 2024

OSS資料來源為您提供讀取和寫入OSS的雙向通道,本文為您介紹DataWorks的OSS資料同步的能力支援情況。

支援的欄位類型與使用限制

離線讀

OSS Reader實現了從OSS讀取資料並轉為Data Integration協議的功能,OSS本身是無結構化資料存放區。對於Data Integration而言,OSS Reader支援的功能如下。

支援

不支援

  • 支援且僅支援讀取TXT格式的檔案,且要求TXT中schema為一張二維表。

  • 支援類CSV格式檔案,自訂分隔字元。

  • 支援ORC、PARQUET格式.

  • 支援多種類型資料讀取(使用String表示),支援列裁剪、列常量。

  • 支援遞迴讀取、支援檔案名稱過濾。

  • 支援文本壓縮,現有壓縮格式為gzipbzip2zip

    說明

    一個壓縮包不允許多檔案打包壓縮。

  • 多個Object可以支援並發讀取。

  • 單個Object(File)不支援多線程並發讀取。

  • 單個Object在壓縮情況下,從技術上無法支援多線程並發讀取。

  • 單個Object(File)不超過100 GB。

重要

準備OSS資料時,如果資料為CSV檔案,則必須為標準格式的CSV檔案。例如,如果列內容在半形引號(")內,需要替換成兩個半形引號(""),否則會造成檔案被錯誤分割。

離線寫

OSS Writer實現了從資料同步協議轉為OSS中的文字檔功能,OSS本身是無結構化資料存放區,目前OSS Writer支援的功能如下。

支援

不支援

  • 支援且僅支援寫入文本類型(不支援BLOB,如視頻和圖片)的檔案,並要求文字檔中的Schema為一張二維表。

  • 支援類CSV格式檔案,自訂分隔字元。

  • 支援ORC、PARQUET格式。

  • 支援多線程寫入,每個線程寫入不同的子檔案。

  • 檔案支援滾動,當檔案大於某個size值時,支援檔案切換。

  • 單個檔案不能支援並發寫入。

  • OSS本身不提供資料類型,OSS Writer均以STRING類型寫入OSS對象。

  • 如果OSS的Bucket儲存類型為冷Archive Storage,則不支援寫入。

類型分類

Data Integrationcolumn配置類型

整數類

LONG

字串類

STRING

浮點類

DOUBLE

布爾類

BOOLEAN

日期時間類

DATE

即時寫

  • 支援即時寫入的能力。

  • 支援即時寫入Hudi格式版本:0.12.x。

資料同步任務開發

OSS資料同步任務的配置入口和通用配置流程指導可參見下文的配置指導,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源

單表離線同步任務配置指導

單表即時同步任務配置指導

操作流程請參見配置單表增量資料即時同步DataStudio側即時同步任務配置

整庫(即時)全增量同步處理配置指導

操作流程請參見Data Integration側同步任務配置

常見問題

附錄:指令碼Demo與參數說明

附錄:離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。

OSS Reader指令碼Demo:通用樣本

{
    "type":"job",
    "version":"2.0",//版本號碼。
    "steps":[
        {
            "stepType":"oss",//外掛程式名。
            "parameter":{
                "nullFormat":"",//定義可以表示為null的字串。
                "compress":"",//文本壓縮類型。
                "datasource":"",//資料來源。
                "column":[//欄位。
                    {
                        "index":0,//列序號。
                        "type":"string"//資料類型。
                    },
                    {
                        "index":1,
                        "type":"long"
                    },
                    {
                        "index":2,
                        "type":"double"
                    },
                    {
                        "index":3,
                        "type":"boolean"
                    },
                    {
                        "format":"yyyy-MM-dd HH:mm:ss", //時間格式。
                        "index":4,
                        "type":"date"
                    }
                ],
                "skipHeader":"",//類CSV格式檔案可能存在表頭為標題情況,需要跳過。
                "encoding":"",//編碼格式。
                "fieldDelimiter":",",//欄位分隔符號。
                "fileFormat": "",//文本類型。
                "object":[]//object首碼。
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":""//錯誤記錄數。
        },
        "speed":{
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1 //作業並發數。
            "mbps":"12",//限流,此處1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

OSS Reader指令碼Demo:ORC或Parquet檔案讀取OSS

目前通過複用HDFS Reader的方式完成OSS讀取ORC或Parquet格式的檔案,在OSS Reader已有參數的基礎上,增加了PathFileFormat等擴充配置參數。

  • 以ORC檔案格式讀取OSS,樣本如下。

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "orc",
    "path": "/tests/case61/orc__691b6815_9260_4037_9899_****",
    "column": [
    {
    "index": 0,
    "type": "long"
    },
    {
    "index": "1",
    "type": "string"
    },
    {
    "index": "2",
    "type": "string"
    }
    ]
    }
    }
  • 以Parquet檔案格式讀取OSS,樣本如下。

    {
      "type":"job",
        "version":"2.0",
        "steps":[
        {
          "stepType":"oss",
          "parameter":{
            "nullFormat":"",
            "compress":"",
            "fileFormat":"parquet",
            "path":"/*",
            "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }",
            "column":[
              {
                "index":"0",
                "type":"string"
              },
              {
                "index":"1",
                "type":"long"
              },
              {
                "index":"2",
                "type":"string"
              },
              {
                "index":"3",
                "type":"string"
              },
              {
                "index":"4",
                "type":"string"
              },
              {
                "index":"5",
                "type":"string"
              },
              {
                "index":"6",
                "type":"string"
              },
              {
                "index":"7",
                "type":"string"
              },
              {
                "index":"8",
                "type":"string"
              },
              {
                "index":"9",
                "type":"string"
              },
              {
                "index":"10",
                "type":"double"
              },
              {
                "index":"11",
                "type":"string"
              },
              {
                "index":"12",
                "type":"string"
              }
            ],
            "skipHeader":"false",
            "encoding":"UTF-8",
            "fieldDelimiter":",",
            "fieldDelimiterOrigin":",",
            "datasource":"wpw_demotest_oss",
            "envType":0,
            "object":[
              "wpw_demo/userdata1.parquet"
            ]
          },
          "name":"Reader",
          "category":"reader"
        },
        {
          "stepType":"odps",
          "parameter":{
            "partition":"dt=${bizdate}",
            "truncate":true,
            "datasource":"0_odps_wpw_demotest",
            "envType":0,
            "column":[
              "id"
            ],
            "emptyAsNull":false,
            "table":"wpw_0827"
          },
          "name":"Writer",
          "category":"writer"
        }
      ],
        "setting":{
        "errorLimit":{
          "record":""
        },
        "locale":"zh_CN",
          "speed":{
          "throttle":false,
            "concurrent":2
        }
      },
      "order":{
        "hops":[
          {
            "from":"Reader",
            "to":"Writer"
          }
        ]
      }
    }

OSS Reader指令碼參數

參數

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須要與添加的資料來源名稱保持一致。

Object

OSS的Object資訊,此處可以支援填寫多個Object。例如xxx的bucket中有yunshi檔案夾,檔案夾中有ll.txt檔案,則Object直接填yunshi/ll.txt。 支援使用調度參數並配合調度,靈活產生Object檔案名稱與路徑。

  • 當指定單個OSS Object時,OSS Reader暫時只能使用單線程進行資料幫浦。後期將考慮在非壓縮檔情況下針對單個Object可以進行多線程並發讀取。

  • 當指定多個OSS Object時,OSS Reader支援使用多線程進行資料幫浦。可以根據具體要求配置線程並發數。

  • 當指定萬用字元時,OSS Reader嘗試遍曆出多個Object資訊。例如配置為abc*[0-9]時,可以匹配到abc0abc1abc2abc3等;配置為abc?.txt時,可以匹配到以abc開頭、 .txt結尾、中間有1個任一字元的檔案。

    配置萬用字元會導致記憶體溢出,通常不建議您進行配置。詳情請參見OSS產品概述

說明
  • 資料同步系統會將一個作業下同步的所有Object視作同一張資料表。您必須保證所有的Object能夠適配同一套Schema資訊。

  • 請注意控制單個目錄下的檔案個數,否則可能會觸發系統OutOfMemoryError報錯。如果遇到此情況,請將檔案拆分到不同目錄後再嘗試進行同步。

parquetSchema

以Parquet檔案格式讀取OSS時配置,若且唯若fileFormatparquet時生效,具體表示parquet儲存的類型說明。您需要確保填寫parquetSchema後,整體配置符合JSON文法。

message MessageType名 {
是否必填, 資料類型, 列名;
......................;
}

parquetSchema的配置格式說明如下:

  • MessageType名:填寫名稱。

  • 是否必填:required表示非空,optional表示可為空白。推薦全填optional。

  • 資料類型:Parquet檔案支援BOOLEAN、Int32、Int64、Int96、FLOAT、DOUBLE、BINARY(如果是字串類型,請填BINARY)和fixed_len_byte_array類型。

  • 每行列設定必須以分號結尾,最後一行也要寫上分號。

配置樣本如下所示。

"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"

column

讀取欄位列表,type指定來源資料的類型,index指定當前列來自於文本第幾列(以0開始),value指定當前類型為常量,不是從源標頭檔讀取資料,而是根據value值自動產生對應的列。

預設情況下,您可以全部按照String類型讀取資料,配置如下。

"column": ["*"]

您可以指定column欄位資訊,配置如下。

"column":
    {
       "type": "long",
       "index": 0    //從OSS文本第一列擷取int欄位。
    },
    {
       "type": "string",
       "value": "alibaba"  //從OSSReader內部產生alibaba的字串欄位作為當前欄位。
    }
說明

對於您指定的column資訊,type必須填寫,index/value必須選擇其一。

全部按照STRING類型讀取。

fileFormat

文本類型。源頭OSS的檔案類型。例如csv、text,兩種格式均支援自訂分隔字元。

csv

fieldDelimiter

讀取的欄位分隔符號。

說明

OSS Reader在讀取資料時,需要指定欄位分割符,如果不指定預設為(,),介面配置中也會預設填寫為(,)。

如果分隔字元不可見,請填寫Unicode編碼。例如,\u001b\u007c

,

lineDelimiter

讀取的行分隔字元。

說明

當fileFormat取值為text時,本參數有效。

compress

文本壓縮類型,預設不填寫(即不壓縮)。支援壓縮類型為gzipbzip2zip

不壓縮

encoding

讀取檔案的編碼配置。

utf-8

nullFormat

文字檔中無法使用標準字串定義null(null 指標),資料同步提供nullFormat定義哪些字串可以表示為null。 例如:

  • 配置nullFormat:"null",等同於“可見字元”,如果源頭資料是null,則資料同步視作null欄位。

  • 配置nullFormat:"\u0001",等同於“不可見字元”,如果源頭資料是字串"\u0001",則資料同步視作null欄位。

  • 不寫"nullFormat"這個參數,等同於“未配置”,代表來源是什麼資料就直接按照什麼資料寫入目標端,不做任何轉換。

skipHeader

類CSV格式檔案可能存在表頭為標題情況,需要跳過。預設不跳過,壓縮檔模式下不支援skipHeader

false

csvReaderConfig

讀取CSV類型檔案參數配置,Map類型。讀取CSV類型檔案使用的CsvReader進行讀取,會有很多配置,不配置則使用預設值。

OSS Writer指令碼Demo:通用樣本

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"oss",//外掛程式名。
            "parameter":{
                "nullFormat":"",//資料同步系統提供nullFormat,定義哪些字串可以表示為null。
                "dateFormat":"",//日期格式。
                "datasource":"",//資料來源。
                "writeMode":"",//寫入模式。
                "writeSingleObject":"false", //表示是否將同步資料寫入單個oss檔案。
                "encoding":"",//編碼格式。
                "fieldDelimiter":","//欄位分隔符號。
                "fileFormat":"",//文本類型。
                "object":""//Object首碼。
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//錯誤記錄數。
        },
        "speed":{
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1, //作業並發數。
            "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

OSS Writer指令碼Demo:ORC或Parquet檔案寫入OSS指令碼配置demo

目前通過複用HDFS Writer的方式完成OSS寫ORC或Parquet格式的檔案。在OSS Writer已有參數的基礎上,增加了PathFileFormat等擴充配置參數,參數含義請參見HDFS Writer

ORC或Parquet檔案寫入OSS的樣本如下:

重要

以下僅為樣本,請根據您自己具體的列名稱和類型修改對應的參數,請勿直接複製使用。

  • 以ORC檔案格式寫入OSS

    寫ORC檔案,當前僅支援指令碼模式,您需要轉指令碼模式配置,其中fileFormat需要配置為orcpath需要配置為寫入檔案的路徑,column配置格式為 {"name":"your column name","type": "your column type"}

    當前支援寫入的ORC類型如下:

    欄位類型

    離線寫OSS(ORC格式)

    TINYINT

    支援

    SMALLINT

    支援

    INT

    支援

    BIGINT

    支援

    FLOAT

    支援

    DOUBLE

    支援

    TIMESTAMP

    支援

    DATE

    支援

    VARCHAR

    支援

    STRING

    支援

    CHAR

    支援

    BOOLEAN

    支援

    DECIMAL

    支援

    BINARY

    支援

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "orc",
    "path": "/tests/case61",
    "fileName": "orc",
    "writeMode": "append",
    "column": [
    {
    "name": "col1",
    "type": "BIGINT"
    },
    {
    "name": "col2",
    "type": "DOUBLE"
    },
    {
    "name": "col3",
    "type": "STRING"
    }
    ],
    "writeMode": "append",
    "fieldDelimiter": "\t",
    "compress": "NONE",
    "encoding": "UTF-8"
    }
    }
  • 以Parquet檔案格式寫入OSS

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "parquet",
    "path": "/tests/case61",
    "fileName": "test",
    "writeMode": "append",
    "fieldDelimiter": "\t",
    "compress": "SNAPPY",
    "encoding": "UTF-8",
    "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}",
    "dataxParquetMode": "fields"
    }
    }

OSS Writer指令碼參數

參數

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,該配置項填寫的內容必須與添加的資料來源名稱保持一致。

object

OSS Writer寫入的檔案名稱,OSS使用檔案名稱類比目錄的實現。OSS對於Object的名稱有以下限制:

  • 使用"object": "datax",寫入的Object以datax開頭,尾碼添加隨機字串。

  • 使用"object": "cdo/datax",寫入的Object以/cdo/datax開頭,尾碼隨機添加字串,OSS類比目錄的分隔字元為(/)。

如果您不需要尾碼隨機UUID,建議您配置"writeSingleObject" : "true",詳情請參見writeSingleObject說明。

writeMode

OSS Writer寫入前,資料的處理:

  • truncate:寫入前清理Object名稱首碼匹配的所有Object。例如"object":"abc",將清理所有abc開頭的Object。

  • append:寫入前不進行任何處理,Data IntegrationOSS Writer直接使用Object名稱寫入,並使用隨機UUID的尾碼名來保證檔案名稱不衝突。例如您指定的Object名為Data Integration,實際寫入為DI_****_****_****

  • nonConflict:如果指定路徑出現首碼匹配的Object,直接報錯。例如"object":"abc",如果存在abc123的Object,將直接報錯。

writeSingleObject

OSS寫資料時,是否寫單個檔案:

  • true:表示寫單個檔案,當讀不到任何資料時, 不會產生空檔案。

  • false:表示寫多個檔案,當讀不到任何資料時,若設定檔頭,會輸出空檔案只包含檔案頭,否則只輸出空檔案。

說明

當寫入ORC、parquet類型資料時,writeSingleObject參數不生效,即使用該參數無法在多並發情境下,寫入單個ORC或parquet檔案。若要寫入單個檔案,您可以將並發設定為1,但檔案名稱會添加隨機尾碼,並且設定並發為1時,將影響同步任務的速度。

false

fileFormat

檔案寫出的格式,支援以下幾種格式:

  • csv:僅支援嚴格的csv格式。如果待寫資料包括資料行分隔符號,則會根據csv的轉義文法轉義,轉義符號為雙引號(")。

  • text:使用資料行分隔符號簡單分割待寫資料,對於待寫資料包括資料行分隔符號情況下不進行轉義。

  • parquet:若使用此檔案類型,必須增加parquetschema參數定義資料類型。

    重要
  • ORC:若使用此種格式,需要轉指令碼模式。

text

compress

寫入OSS的資料檔案的壓縮格式(需使用指令碼模式任務配置)。

說明

csv、text文本類型不支援壓縮,parquet/orc檔案支援gzip、snappy等壓縮。

fieldDelimiter

寫入的欄位分隔符號。

,

encoding

寫出檔案的編碼配置。

utf-8

parquetSchema

以Parquet檔案格式寫入OSS的必填項,用來描述目標檔案的結構,所以此項若且唯若fileFormatparquet時生效,格式如下。

message MessageType名 {
是否必填, 資料類型, 列名;
......................;
}

配置項說明如下:

  • MessageType名:填寫名稱。

  • 是否必填:required表示非空,optional表示可為空白。推薦全填optional。

  • 資料類型:Parquet檔案支援BOOLEAN、INT32、INT64、INT96、FLOAT、DOUBLE、BINARY(如果是字串類型,請填BINARY)和FIXED_LEN_BYTE_ARRAY等類型。

說明

每行列設定必須以分號結尾,最後一行也要寫上分號。

樣本如下。

message m {
optional int64 id;
optional int64 date_id;
optional binary datetimestring;
optional int32 dspId;
optional int32 advertiserId;
optional int32 status;
optional int64 bidding_req_num;
optional int64 imp;
optional int64 click_num;
}

nullFormat

文字檔中無法使用標準字串定義null(null 指標),資料同步系統提供nullFormat定義可以表示為null的字串。例如,您配置nullFormat="null",如果源頭資料是null,資料同步系統會視作null欄位。

header

OSS寫出時的表頭,例如,["id", "name", "age"]

maxFileSize(進階配置,嚮導模式不支援)

OSS寫出時單個Object檔案的最大值,預設為10,000*10MB,類似於在列印log4j日誌時,控制記錄檔的大小。OSS分塊上傳時,每個分塊大小為10MB(也是日誌輪轉檔案最小粒度,即小於10MB的maxFileSize會被作為10MB),每個OSS InitiateMultipartUploadRequest支援的分塊最大數量為10,000。

輪轉寄生時,Object名字規則是在原有Object首碼加UUID隨機數的基礎上,拼接_1,_2,_3等尾碼。

說明

預設單位為MB。

配置樣本:"maxFileSize":300, 表示設定單個檔案大小為300M。

100,000

suffix(進階配置,嚮導模式不支援)

資料同步寫出時,產生的檔案名稱尾碼。例如,配置suffix.csv,則最終寫出的檔案名稱為fileName****.csv

附錄2:parquet類型資料的轉化策略

如果您沒有配置parquetSchema,那麼DataWorks側會根據源端欄位類型,按照一定的策略進行相應資料類型轉換,轉換策略如下。

轉換後的資料類型

Parquet type

Parquet logical type

CHAR / VARCHAR / STRING

BINARY

UTF8

BOOLEAN

BOOLEAN

不涉及

BINARY / VARBINARY

BINARY

不涉及

DECIMAL

FIXED_LEN_BYTE_ARRAY

DECIMAL

TINYINT

INT32

INT_8

SMALLINT

INT32

INT_16

INT/INTEGER

INT32

不涉及

BIGINT

INT64

不涉及

FLOAT

FLOAT

不涉及

DOUBLE

DOUBLE

不涉及

DATE

INT32

DATE

TIME

INT32

TIME_MILLIS

TIMESTAMP/DATETIME

INT96

不涉及