All Products
Search
Document Center

DataWorks:OSS data source

Last Updated:Jun 27, 2024

DataWorks provides OSS Reader and OSS Writer for you to read data from and write data to Object Storage Service (OSS) data sources. This topic describes the capabilities of synchronizing data from or to OSS data sources.

Data type mappings and limits

Batch data read

OSS Reader reads data from OSS and converts the data to a format that is readable to Data Integration. OSS stores only unstructured data. The following table lists the features that are supported and not supported by OSS Reader:

Supported

Not supported

  • Reads data from TXT objects. The data in the objects must be logical two-dimensional tables.

  • Writes CSV-like files with custom delimiters to OSS.

  • Reads data from files in the ORC and Parquet formats.

  • Reads data of various types as strings and supports constants and column pruning.

  • Supports recursive data read and object name-based filtering.

  • Supports object compression. The following compression formats are supported: GZIP, BZIP2, and ZIP.

    Note

    You cannot compress multiple objects into one package.

  • Uses parallel threads to read data from multiple objects.

  • Uses parallel threads to read data from a single object.

  • Uses parallel threads to read a compressed object.

  • Reads data from an object that exceeds 100 GB in size.

Important

If data in OSS is stored as CSV files, the data must comply with the standard CSV format. For example, if the data in a column of a CSV file is enclosed in a pair of single quotation marks ('), you must replace this pair of single quotation marks with a pair of double quotation marks ("). Otherwise, the data in the CSV file may be incorrectly parsed.

Batch data write

OSS stores only unstructured data. Therefore, OSS Writer converts the data obtained from a reader to text files and writes the files to OSS. The following table lists the features that are supported and not supported by OSS Writer:

Supported

Not supported

  • Writes only text files to OSS. The data in the files must be organized as logical two-dimensional tables. OSS Writer cannot write files that store binary large object (BLOB) data, such as video data and image data, to OSS.

  • Writes CSV-like files with custom delimiters to OSS.

  • Writes files in the ORC and Parquet formats to OSS.

  • Uses parallel threads to write files to OSS. Each thread writes one file to OSS.

  • Supports object rotation. Files are written to OSS as objects. If the size of a file exceeds a specific threshold, OSS Writer writes excess data as another object.

  • Uses parallel threads to write a single file to OSS.

  • Distinguishes between data types. OSS does not distinguish between data types. OSS Writer writes all data as strings to OSS.

  • Writes data to an OSS bucket whose storage class is Cold Archive.

Category

Data Integration data type

Integer

LONG

String

STRING

Floating point

DOUBLE

Boolean

BOOLEAN

Date and time

DATE

Real-time data write

  • Real-time data write is supported.

  • You can write data from OSS to Hudi 0.12.x in real time.

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.

Add a data source

Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.

Configure a batch synchronization task to synchronize data of a single table

Configure a real-time synchronization task to synchronize data of a single table

For more information about the configuration procedure, see Create a real-time synchronization task to synchronize incremental data from a single table and Configure a real-time synchronization task in DataStudio.

Configure synchronization settings to implement real-time synchronization of full or incremental data in a database

For more information about the configuration procedure, see Configure a synchronization task in Data Integration.

FAQ

Appendix: Code and parameters

Appendix: Configure a batch synchronization task by using the code editor

If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.

Common code for OSS Reader

{
    "type":"job",
    "version":"2.0",// The version number. 
    "steps":[
        {
            "stepType":"oss",// The plug-in name. 
            "parameter":{
                "nullFormat":"",// The string that represents a null pointer. 
                "compress":"",// The format in which objects are compressed. 
                "datasource":"", // The name of the data source. 
                "column":[// The names of the columns. 
                    {
                        "index":0,// The ID of a column in the source object. 
                        "type":"string"// The source data type. 
                    },
                    {
                        "index":1,
                        "type":"long"
                    },
                    {
                        "index":2,
                        "type":"double"
                    },
                    {
                        "index":3,
                        "type":"boolean"
                    },
                    {
                        "format":"yyyy-MM-dd HH:mm:ss", // The time format. 
                        "index":4,
                        "type":"date"
                    }
                ],
                "skipHeader":"",// Specifies whether to skip the headers in a CSV-like object if the object has headers. 
                "encoding":"",// The encoding format. 
                "fieldDelimiter":",",// The column delimiter. 
                "fileFormat": "",// The format of the object. 
                "object":[]// The name of the object. 
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":""// The maximum number of dirty data records allowed. 
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1 // The maximum number of parallel threads. 
            "mbps":"12",// The maximum transmission rate. Unit: MB/s. 
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Code for OSS Reader: Read data from ORC or Parquet objects in OSS

OSS Reader reads data from ORC or Parquet objects in the way in which HDFS Reader reads data. In addition to the original parameters, OSS Reader provides extended parameters such as path and fileFormat.

  • The following code provides an example on how to configure OSS Reader to read data from ORC objects in OSS:

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "orc",
    "path": "/tests/case61/orc__691b6815_9260_4037_9899_****",
    "column": [
    {
    "index": 0,
    "type": "long"
    },
    {
    "index": "1",
    "type": "string"
    },
    {
    "index": "2",
    "type": "string"
    }
    ]
    }
    }
  • The following code provides an example on how to configure OSS Reader to read data from Parquet objects in OSS:

    {
      "type":"job",
        "version":"2.0",
        "steps":[
        {
          "stepType":"oss",
          "parameter":{
            "nullFormat":"",
            "compress":"",
            "fileFormat":"parquet",
            "path":"/*",
            "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }",
            "column":[
              {
                "index":"0",
                "type":"string"
              },
              {
                "index":"1",
                "type":"long"
              },
              {
                "index":"2",
                "type":"string"
              },
              {
                "index":"3",
                "type":"string"
              },
              {
                "index":"4",
                "type":"string"
              },
              {
                "index":"5",
                "type":"string"
              },
              {
                "index":"6",
                "type":"string"
              },
              {
                "index":"7",
                "type":"string"
              },
              {
                "index":"8",
                "type":"string"
              },
              {
                "index":"9",
                "type":"string"
              },
              {
                "index":"10",
                "type":"double"
              },
              {
                "index":"11",
                "type":"string"
              },
              {
                "index":"12",
                "type":"string"
              }
            ],
            "skipHeader":"false",
            "encoding":"UTF-8",
            "fieldDelimiter":",",
            "fieldDelimiterOrigin":",",
            "datasource":"wpw_demotest_oss",
            "envType":0,
            "object":[
              "wpw_demo/userdata1.parquet"
            ]
          },
          "name":"Reader",
          "category":"reader"
        },
        {
          "stepType":"odps",
          "parameter":{
            "partition":"dt=${bizdate}",
            "truncate":true,
            "datasource":"0_odps_wpw_demotest",
            "envType":0,
            "column":[
              "id"
            ],
            "emptyAsNull":false,
            "table":"wpw_0827"
          },
          "name":"Writer",
          "category":"writer"
        }
      ],
        "setting":{
        "errorLimit":{
          "record":""
        },
        "locale":"zh_CN",
          "speed":{
          "throttle":false,
            "concurrent":2
        }
      },
      "order":{
        "hops":[
          {
            "from":"Reader",
            "to":"Writer"
          }
        ]
      }
    }

Parameters in code for OSS Reader

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

Object

The name of the OSS object from which you want to read data. You can specify multiple object names. For example, a bucket has a directory named yunshi, and this directory contains an object named ll.txt. In this case, you can set this parameter to yunshi/ll.txt. This parameter can work with scheduling parameters to flexibly generate the name and path of an OSS object.

  • If you specify a single OSS object name, OSS Reader uses only a single thread to read data. The feature of using parallel threads to read data from a single uncompressed object will be available in the future.

  • If you specify multiple OSS object names, OSS Reader uses parallel threads to read data. You can configure the number of parallel threads based on your business requirements.

  • If you specify a name that contains a wildcard, OSS Reader reads data from all objects that match the name. For example, if you set this parameter to abc*[0-9], OSS Reader reads data from objects such as abc0, abc1, abc2, and abc3. If you set this parameter to abc?.txt, OSS Reader reads data from objects whose names start with abc, end with .txt, and contain an arbitrary character between abc and .txt.

    We recommend that you do not use wildcards because an out of memory (OOM) error may occur. For more information, see What is OSS?

Note
  • Data Integration considers all objects in a synchronization task as a single table. Make sure that all objects in each synchronization task use the same schema.

  • Control the number of objects stored in a directory. If a directory contains excessive objects, an OOM error may occur. In this case, store the objects in different directories before you synchronize data.

Yes

No default value

parquetSchema

The schema of Parquet files that you want to read. If you set the fileFormat parameter to parquet, you must configure the parquetSchema parameter. Make sure that the entire script complies with the JSON syntax.

message MessageTypeName {
required, dataType, columnName;
......................;
}

The parquetSchema parameter contains the following fields:

  • MessageTypeName: the name of the MessageType object.

  • required: indicates that the column cannot be left empty. You can also specify optional based on your business requirements. We recommend that you specify optional for all columns.

  • dataType: Parquet files support various field types, such as BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE, BINARY, and FIXED_LEN_BYTE_ARRAY. Set this parameter to BINARY if the field stores strings.

  • Each line, including the last one, must end with a semicolon (;).

Configuration example:

"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"

No

No default value

column

The names of the columns from which you want to read data. The type parameter specifies the source data type. The index parameter specifies the ID of the column in the source object, starting from 0. The value parameter specifies the column value if the column is a constant column. OSS Reader does not read a constant column from the source. Instead, OSS Reader generates a constant column based on the column value that you specify.

By default, OSS Reader reads all data as strings. You can specify the column parameter in the following format:

"column": ["*"]

You can also configure the column parameter in the following format:

"column":
    {
       "type": "long",
       "index": 0    // The first INT-type column in the object from which you want to read data. 
    },
    {
       "type": "string",
       "value": "alibaba"  // The value of the current column. In this code, the value is the constant alibaba. 
    }
Note

For the column parameter, you must configure the type parameter and configure either the index or value parameter.

Yes

"column": ["*"]

fileFormat

The format of the OSS object from which you want to read data. Examples: csv and text. OSS objects in the csv or text format support custom delimiters.

Yes

csv

fieldDelimiter

The column delimiter that is used in the OSS object from which you want to read data.

Note

You must specify a column delimiter for OSS Reader. The default column delimiter is commas (,). If you do not specify a column delimiter, the default column delimiter is used.

If the delimiter is non-printable, enter a value encoded in Unicode, such as \u001b and \u007c.

Yes

,

lineDelimiter

The row delimiter that is used in the OSS object from which you want to read data.

Note

This parameter takes effect only when the fileFormat parameter is set to text.

No

No default value

compress

The format in which objects are compressed. By default, this parameter is left empty, which indicates that objects are not compressed. OSS Reader supports the following compression formats: GZIP, BZIP2, and ZIP.

No

No default value

encoding

The encoding format of the object from which you want to read data.

No

utf-8

nullFormat

The string that represents a null pointer. No standard strings can represent a null pointer in TXT files. You can use this parameter to define a string that represents a null pointer. Examples:

  • If you specify nullFormat:"null", the reader considers the printable string null as a null value.

  • If you specify nullFormat:"\u0001", the reader considers the non-printable string \u0001 as a null value.

  • If you do not configure the nullFormat parameter, OSS Reader does not convert source data.

No

No default value

skipHeader

Specifies whether to skip the headers in a CSV-like object if the object has headers. By default, the headers are not skipped. The skipHeader parameter is unavailable for compressed objects.

No

false

csvReaderConfig

The configurations required to read CSV objects. The parameter value must match the MAP type. You can use a CSV object reader to read data from CSV objects. The CSV object reader supports multiple configurations.

No

No default value

Common code for OSS Writer

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"oss",// The plug-in name. 
            "parameter":{
                "nullFormat":"",// The string that represents a null pointer. 
                "dateFormat":"",// The format in which data of the DATE data type is serialized in objects. 
                "datasource":"", // The name of the data source. 
                "writeMode":"",// The write mode. 
                "writeSingleObject":"false", // Specifies whether to write a single file to OSS at a time. 
                "encoding":"",// The encoding format. 
                "fieldDelimiter":","// The column delimiter. 
                "fileFormat":"",// The format in which OSS Writer writes files to OSS. 
                "object":""// The prefix for the names of the files that you want to write to OSS. 
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The maximum number of dirty data records allowed. 
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1, // The maximum number of parallel threads. 
            "mbps":"12",// The maximum transmission rate. Unit: MB/s. 
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Code for OSS Writer: Write ORC or Parquet files to OSS

OSS Writer writes ORC or Parquet files to OSS in the way in which HDFS Writer writes data to Hadoop Distributed File System (HDFS). In addition to the parameters for OSS Writer, extended parameters, such as path and fileFormat, are added for OSS Writer. For more information about the extended parameters, see HDFS Writer.

The following code provides examples on how to configure a synchronization task to write an ORC file to OSS and how to configure a synchronization task to write a Parquet file to OSS.

Important

The following code is only for reference. You can modify the parameters based on your business requirements.

  • Write an ORC file to OSS

    If you want to write ORC files to OSS, you can use only the code editor. You must set the fileFormat parameter to orc, the path parameter to the path where the file to be written is stored, and the column parameter to a value in the {"name":"your column name","type": "your column type"} format.

    The following table lists the data types of data in ORC files that OSS Writer can write to OSS.

    Data Type

    OSS Writer for batch data write (ORC files)

    TINYINT

    Supported

    SMALLINT

    Supported

    INT

    Supported

    BIGINT

    Supported

    FLOAT

    Supported

    DOUBLE

    Supported

    TIMESTAMP

    Supported

    DATE

    Supported

    VARCHAR

    Supported

    STRING

    Supported

    CHAR

    Supported

    BOOLEAN

    Supported

    DECIMAL

    Supported

    BINARY

    Supported

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "orc",
    "path": "/tests/case61",
    "fileName": "orc",
    "writeMode": "append",
    "column": [
    {
    "name": "col1",
    "type": "BIGINT"
    },
    {
    "name": "col2",
    "type": "DOUBLE"
    },
    {
    "name": "col3",
    "type": "STRING"
    }
    ],
    "writeMode": "append",
    "fieldDelimiter": "\t",
    "compress": "NONE",
    "encoding": "UTF-8"
    }
    }
  • Write a Parquet file to OSS

    {
    "stepType": "oss",
    "parameter": {
    "datasource": "",
    "fileFormat": "parquet",
    "path": "/tests/case61",
    "fileName": "test",
    "writeMode": "append",
    "fieldDelimiter": "\t",
    "compress": "SNAPPY",
    "encoding": "UTF-8",
    "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}",
    "dataxParquetMode": "fields"
    }
    }

Parameters in code for OSS Writer

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

object

The prefix for the names of the files that you want to write to OSS. OSS simulates the directory effect by adding delimiters to file names. Examples:

  • If you set the object parameter to datax, the names of the files start with datax and end with random strings.

  • If you set the object parameter to cdo/datax, the names of the files start with /cdo/datax and end with random strings. OSS uses forward slashes (/) in file names to simulate the directory effect.

If you do not want to add a random universally unique identifier (UUID) as the suffix, we recommend that you set the writeSingleObject parameter to true. For more information, see the description of the writeSingleObject parameter.

Yes

No default value

writeMode

The write mode. Valid values:

  • truncate: OSS Writer deletes all existing objects whose names start with the specified prefix before it writes files to OSS. For example, if you set the object parameter to abc, OSS Writer deletes all the objects whose names start with abc before it writes files to OSS.

  • append: OSS Writer writes all files to OSS and suffixes the file names with random UUIDs to ensure that the names of the files are different from the names of existing objects. For example, if you set the object parameter to DI, the actual names of the files written to OSS are in the DI_****_****_**** format.

  • nonConflict: If OSS contains objects whose names start with the specified prefix, OSS Writer returns an error. For example, if you set the object parameter to abc and OSS contains an object named abc123, OSS Writer returns an error.

Yes

No default value

writeSingleObject

Specifies whether to write a single file to OSS at a time. Valid values:

  • true: writes a single file to OSS at a time. If no data is read, no empty file is generated.

  • false: writes multiple files to OSS at a time. If no data is read and a file header is configured, an empty file that contains only the file header is generated. Otherwise, an empty file is generated.

Note

The writeSingleObject parameter does not take effect for ORC or Parquet files. This indicates that you cannot write a single ORC or Parquet file to OSS if multiple threads are used at a time to synchronize data. If you want to write a single file, you can set the concurrent parameter to 1. In this case, a random suffix is added to the file name. However, the data synchronization speed is affected.

No

false

fileFormat

The format in which OSS Writer writes files to OSS. Valid values:

  • csv: If a file is written as a CSV file, the file must follow CSV specifications. If the data in the file contains column delimiters, the column delimiters are escaped by double quotation marks (").

  • text: If a file is written as a text file, the data in the file is separated by column delimiters. In this case, OSS Writer does not escape the column delimiters.

  • parquet: OSS Writer can write Parquet files to OSS. If you want to write Parquet files to OSS, you must configure the parquetschema parameter to define the related data type.

    Important
  • orc: If you want to write ORC files to OSS, you must use the code editor.

No

text

compress

The compression type of the files that you want to write to OSS. This parameter is available only in the code editor.

Note

CSV and text files cannot be compressed. Parquet and ORC files can be compressed in a format such as Snappy and GZIP.

No

No default value

fieldDelimiter

The column delimiter that is used in the files that you want to write to OSS.

No

,

encoding

The encoding format of the files that you want to write to OSS.

No

utf-8

parquetSchema

The schema of the Parquet files that you want to write to OSS. If you set the fileFormat parameter to parquet, you must configure this parameter. Format:

message MessageTypeName {
required, dataType, columnName;
......................;
}

Fields:

  • MessageTypeName: the name of the MessageType object.

  • required: indicates that the column cannot be left empty. You can also specify optional based on your business requirements. We recommend that you specify optional for all columns.

  • dataType: Parquet files support various data types, such as BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE, BINARY, and FIXED_LEN_BYTE_ARRAY. Set this parameter to BINARY if the column stores strings.

Note

Each line, including the last one, must end with a semicolon (;).

Example:

message m {
optional int64 id;
optional int64 date_id;
optional binary datetimestring;
optional int32 dspId;
optional int32 advertiserId;
optional int32 status;
optional int64 bidding_req_num;
optional int64 imp;
optional int64 click_num;
}

No

No default value

nullFormat

The string that represents a null pointer. No standard strings can represent a null pointer in text files. You can use this parameter to define a string that represents a null pointer. For example, if you set nullFormat to null, Data Integration considers null as a null pointer.

No

No default value

header

The headers in the files that you want to write to OSS. Example: ['id', 'name', 'age'].

No

No default value

maxFileSize (advanced parameter, which is available only in the code editor)

The maximum size of a single file that can be written to OSS. Default value: 100,000. Unit: MB. OSS Writer performs object rotation based on the value of this parameter. Object rotation is similar to log rotation of Log4j. When a file is uploaded to OSS in multiple parts, the maximum size of a part is 10 MB. This size is the minimum granularity used for object rotation. If you set this parameter to a value that is less than 10 MB, the maximum size of a single file that can be written to OSS is still 10 MB. The InitiateMultipartUploadRequest operation can be used to upload a file in a maximum of 10,000 parts at a time.

If object rotation occurs, suffixes, such as _1, _2, and _3, are appended to the new object names that consist of prefixes and random UUIDs.

Note

The default unit is MB.

For example, if you set the maxFileSize parameter to 300, the maximum size of a single file that can be written to OSS is 300 MB.

No

100,000

suffix (advanced parameter, which is available only in the code editor)

The file name extension of the files that you want to write to OSS. For example, if you set the suffix parameter to .csv, the final name of a file written to OSS is in the fileName****.csv format.

No

No default value

Appendix 2: Convert the data types of data in Parquet files

If you do not configure the parquetSchema parameter, DataWorks converts the data types of data in source Parquet files. The following table provides the conversion policy.

Data type after conversion

Parquet type

Parquet logical type

CHAR / VARCHAR / STRING

BINARY

UTF8

BOOLEAN

BOOLEAN

N/A

BINARY / VARBINARY

BINARY

N/A

DECIMAL

FIXED_LEN_BYTE_ARRAY

DECIMAL

TINYINT

INT32

INT_8

SMALLINT

INT32

INT_16

INT/INTEGER

INT32

N/A

BIGINT

INT64

N/A

FLOAT

FLOAT

N/A

DOUBLE

DOUBLE

N/A

DATE

INT32

DATE

TIME

INT32

TIME_MILLIS

TIMESTAMP/DATETIME

INT96

N/A