OSS資料來源為您提供讀取和寫入OSS的雙向通道,本文為您介紹DataWorks的OSS資料同步的能力支援情況。
支援的欄位類型與使用限制
離線讀
OSS Reader實現了從OSS讀取資料並轉為Data Integration協議的功能,OSS本身是無結構化資料存放區。對於Data Integration而言,OSS Reader支援的功能如下。
支援 | 不支援 |
|
|
準備OSS資料時,如果資料為CSV檔案,則必須為標準格式的CSV檔案。例如,如果列內容在半形引號(")內,需要替換成兩個半形引號(""),否則會造成檔案被錯誤分割。
OSS屬於非結構化資料來源,裡面存放的都是檔案類型資料,因此在使用同步時,需要先自行確認同步的欄位結構是否符合預期。同理,非結構化資料來源中資料結構發生變化時必須要在任務配置中重新確認欄位結構,否則可能會造成同步資料錯亂。
離線寫
OSS Writer實現了從資料同步協議轉為OSS中的文字檔功能,OSS本身是無結構化資料存放區,目前OSS Writer支援的功能如下。
支援 | 不支援 |
|
|
類型分類 | Data Integrationcolumn配置類型 |
整數類 | LONG |
字串類 | STRING |
浮點類 | DOUBLE |
布爾類 | BOOLEAN |
日期時間類 | DATE |
即時寫
支援即時寫入的能力。
支援即時寫入Hudi格式版本:0.12.x。
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
資料同步任務開發
資料同步任務的配置入口和通用配置流程可參見下文的配置指導。
單表離線同步任務配置指導
操作流程請參見通過嚮導模式配置離線同步任務、通過指令碼模式配置離線同步任務。
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄:指令碼Demo與參數說明。
單表即時同步任務配置指導
操作流程請參見配置單表增量資料即時同步、DataStudio側即時同步任務配置。
整庫(即時)全增量同步處理配置指導
操作流程請參見Data Integration側同步任務配置。
常見問題
附錄:指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Reader指令碼Demo:通用樣本
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"oss",//外掛程式名。
"parameter":{
"nullFormat":"",//定義可以表示為null的字串。
"compress":"",//文本壓縮類型。
"datasource":"",//資料來源。
"column":[//欄位。
{
"index":0,//列序號。
"type":"string"//資料類型。
},
{
"index":1,
"type":"long"
},
{
"index":2,
"type":"double"
},
{
"index":3,
"type":"boolean"
},
{
"format":"yyyy-MM-dd HH:mm:ss", //時間格式。
"index":4,
"type":"date"
}
],
"skipHeader":"",//類CSV格式檔案可能存在表頭為標題情況,需要跳過。
"encoding":"",//編碼格式。
"fieldDelimiter":",",//欄位分隔符號。
"fileFormat": "",//文本類型。
"object":[]//object首碼。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":""//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1 //作業並發數。
"mbps":"12",//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader指令碼Demo:ORC或Parquet檔案讀取OSS
目前通過複用HDFS Reader的方式完成OSS讀取ORC或Parquet格式的檔案,在OSS Reader已有參數的基礎上,增加了Path、FileFormat等擴充配置參數。
以ORC檔案格式讀取OSS,樣本如下。
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61/orc__691b6815_9260_4037_9899_****", "column": [ { "index": 0, "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "string" } ] } }
以Parquet檔案格式讀取OSS,樣本如下。
{ "type":"job", "version":"2.0", "steps":[ { "stepType":"oss", "parameter":{ "nullFormat":"", "compress":"", "fileFormat":"parquet", "path":"/*", "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }", "column":[ { "index":"0", "type":"string" }, { "index":"1", "type":"long" }, { "index":"2", "type":"string" }, { "index":"3", "type":"string" }, { "index":"4", "type":"string" }, { "index":"5", "type":"string" }, { "index":"6", "type":"string" }, { "index":"7", "type":"string" }, { "index":"8", "type":"string" }, { "index":"9", "type":"string" }, { "index":"10", "type":"double" }, { "index":"11", "type":"string" }, { "index":"12", "type":"string" } ], "skipHeader":"false", "encoding":"UTF-8", "fieldDelimiter":",", "fieldDelimiterOrigin":",", "datasource":"wpw_demotest_oss", "envType":0, "object":[ "wpw_demo/userdata1.parquet" ] }, "name":"Reader", "category":"reader" }, { "stepType":"odps", "parameter":{ "partition":"dt=${bizdate}", "truncate":true, "datasource":"0_odps_wpw_demotest", "envType":0, "column":[ "id" ], "emptyAsNull":false, "table":"wpw_0827" }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"" }, "locale":"zh_CN", "speed":{ "throttle":false, "concurrent":2 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
datasource | 資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須要與添加的資料來源名稱保持一致。 | 是 | 無 |
Object | OSS的Object資訊,此處可以支援填寫多個Object。例如xxx的bucket中有yunshi檔案夾,檔案夾中有ll.txt檔案,則Object直接填yunshi/ll.txt。 支援使用調度參數並配合調度,靈活產生Object檔案名稱與路徑。
說明
| 是 | 無 |
parquetSchema | 以Parquet檔案格式讀取OSS時配置,若且唯若fileFormat為parquet時生效,具體表示parquet儲存的類型說明。您需要確保填寫parquetSchema後,整體配置符合JSON文法。
parquetSchema的配置格式說明如下:
配置樣本如下所示。
| 否 | 無 |
column | 讀取欄位列表,type指定來源資料的類型,index指定當前列來自於文本第幾列(以0開始),value指定當前類型為常量,不是從源標頭檔讀取資料,而是根據value值自動產生對應的列。 預設情況下,您可以全部按照String類型讀取資料,配置如下。
您可以指定column欄位資訊,配置如下。
說明 對於您指定的column資訊,type必須填寫,index/value必須選擇其一。 | 是 | 全部按照STRING類型讀取。 |
fileFormat | 文本類型。源頭OSS的檔案類型。例如csv、text,兩種格式均支援自訂分隔字元。 | 是 | csv |
fieldDelimiter | 讀取的欄位分隔符號。 說明 OSS Reader在讀取資料時,需要指定欄位分割符,如果不指定預設為(,),介面配置中也會預設填寫為(,)。 如果分隔字元不可見,請填寫Unicode編碼。例如,\u001b、\u007c。 | 是 | , |
lineDelimiter | 讀取的行分隔字元。 說明 當fileFormat取值為text時,本參數有效。 | 否 | 無 |
compress | 文本壓縮類型,預設不填寫(即不壓縮)。支援壓縮類型為gzip、bzip2和zip。 | 否 | 不壓縮 |
encoding | 讀取檔案的編碼配置。 | 否 | utf-8 |
nullFormat | 文字檔中無法使用標準字串定義null(null 指標),資料同步提供nullFormat定義哪些字串可以表示為null。 例如:
| 否 | 無 |
skipHeader | 類CSV格式檔案可能存在表頭為標題情況,需要跳過。預設不跳過,壓縮檔模式下不支援skipHeader。 | 否 | false |
csvReaderConfig | 讀取CSV類型檔案參數配置,Map類型。讀取CSV類型檔案使用的CsvReader進行讀取,會有很多配置,不配置則使用預設值。 | 否 | 無 |
Writer指令碼Demo:通用樣本
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",//外掛程式名。
"parameter":{
"nullFormat":"",//資料同步系統提供nullFormat,定義哪些字串可以表示為null。
"dateFormat":"",//日期格式。
"datasource":"",//資料來源。
"writeMode":"",//寫入模式。
"writeSingleObject":"false", //表示是否將同步資料寫入單個oss檔案。
"encoding":"",//編碼格式。
"fieldDelimiter":","//欄位分隔符號。
"fileFormat":"",//文本類型。
"object":""//Object首碼。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Writer指令碼Demo:ORC或Parquet檔案寫入OSS指令碼配置demo
目前通過複用HDFS Writer的方式完成OSS寫ORC或Parquet格式的檔案。在OSS Writer已有參數的基礎上,增加了Path、FileFormat等擴充配置參數,參數含義請參見HDFS Writer。
ORC或Parquet檔案寫入OSS的樣本如下:
以下僅為樣本,請根據您自己具體的列名稱和類型修改對應的參數,請勿直接複製使用。
以ORC檔案格式寫入OSS
寫ORC檔案,當前僅支援指令碼模式,您需要轉指令碼模式配置,其中fileFormat需要配置為
orc
,path需要配置為寫入檔案的路徑,column配置格式為{"name":"your column name","type": "your column type"}
。當前支援寫入的ORC類型如下:
欄位類型
離線寫OSS(ORC格式)
TINYINT
支援
SMALLINT
支援
INT
支援
BIGINT
支援
FLOAT
支援
DOUBLE
支援
TIMESTAMP
支援
DATE
支援
VARCHAR
支援
STRING
支援
CHAR
支援
BOOLEAN
支援
DECIMAL
支援
BINARY
支援
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }
以Parquet檔案格式寫入OSS
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}", "dataxParquetMode": "fields" } }
Writer指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
datasource | 資料來源名稱,指令碼模式支援添加資料來源,該配置項填寫的內容必須與添加的資料來源名稱保持一致。 | 是 | 無 |
object | OSS Writer寫入的檔案名稱,OSS使用檔案名稱類比目錄的實現。OSS對於Object的名稱有以下限制:
如果您不需要尾碼隨機UUID,建議您配置 | 是 | 無 |
writeMode | OSS Writer寫入前,資料的處理:
| 是 | 無 |
writeSingleObject | OSS寫資料時,是否寫單個檔案:
說明 當寫入ORC、parquet類型資料時,writeSingleObject參數不生效,即使用該參數無法在多並發情境下,寫入單個ORC或parquet檔案。若要寫入單個檔案,您可以將並發設定為1,但檔案名稱會添加隨機尾碼,並且設定並發為1時,將影響同步任務的速度。 | 否 | false |
fileFormat | 檔案寫出的格式,支援以下幾種格式:
| 否 | text |
compress | 寫入OSS的資料檔案的壓縮格式(需使用指令碼模式任務配置)。 說明 csv、text文本類型不支援壓縮,parquet/orc檔案支援gzip、snappy等壓縮。 | 否 | 無 |
fieldDelimiter | 寫入的欄位分隔符號。 | 否 | , |
encoding | 寫出檔案的編碼配置。 | 否 | utf-8 |
parquetSchema | 以Parquet檔案格式寫入OSS的必填項,用來描述目標檔案的結構,所以此項若且唯若fileFormat為parquet時生效,格式如下。
配置項說明如下:
說明 每行列設定必須以分號結尾,最後一行也要寫上分號。 樣本如下。
| 否 | 無 |
nullFormat | 文字檔中無法使用標準字串定義null(null 指標),資料同步系統提供nullFormat定義可以表示為null的字串。例如,您配置 | 否 | 無 |
header | OSS寫出時的表頭,例如, | 否 | 無 |
maxFileSize(進階配置,嚮導模式不支援) | OSS寫出時單個Object檔案的最大值,預設為10,000*10MB,類似於在列印log4j日誌時,控制記錄檔的大小。OSS分塊上傳時,每個分塊大小為10MB(也是日誌輪轉檔案最小粒度,即小於10MB的maxFileSize會被作為10MB),每個OSS InitiateMultipartUploadRequest支援的分塊最大數量為10,000。 輪轉寄生時,Object名字規則是在原有Object首碼加UUID隨機數的基礎上,拼接_1,_2,_3等尾碼。 說明 預設單位為MB。 配置樣本:"maxFileSize":300, 表示設定單個檔案大小為300M。 | 否 | 100,000 |
suffix(進階配置,嚮導模式不支援) | 資料同步寫出時,產生的檔案名稱尾碼。例如,配置suffix為.csv,則最終寫出的檔案名稱為fileName****.csv。 | 否 | 無 |
附錄:parquet類型資料的轉化策略
如果您沒有配置parquetSchema,那麼DataWorks側會根據源端欄位類型,按照一定的策略進行相應資料類型轉換,轉換策略如下。
轉換後的資料類型 | Parquet type | Parquet logical type |
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | 不涉及 |
BINARY / VARBINARY | BINARY | 不涉及 |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT/INTEGER | INT32 | 不涉及 |
BIGINT | INT64 | 不涉及 |
FLOAT | FLOAT | 不涉及 |
DOUBLE | DOUBLE | 不涉及 |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP/DATETIME | INT96 | 不涉及 |