DataWorks provides Lindorm Reader and Lindorm Writer for you to read data from and write data to Lindorm data sources. This topic describes the capabilities of synchronizing data from or to Lindorm data sources.
Limits
You must add a Lindorm data source to DataWorks before you can use Lindorm Reader or Lindorm Writer to read data from or write data to Lindorm.
Lindorm Reader and Lindorm Writer support only exclusive resource groups for Data Integration.
LindormTSDB cannot be used as data sources for data synchronization.
The configuration parameter is required for Lindorm Reader and Lindorm Writer and must be configured in the JSON format. You can log on to the ApsaraDB for Lindorm console to obtain the configuration items that are required for Data Integration to connect to an ApsaraDB for Lindorm cluster.
NoteApsaraDB for Lindorm is a multimode database. Lindorm Reader can read data from tables of the table and wideColumn types in ApsaraDB for Lindorm databases, and Lindorm Writer can write data to such tables. For more information, you can consult on-duty Lindorm engineers by using DingTalk.
Data type mappings
Lindorm Reader and Lindorm Writer support most ApsaraDB for Lindorm data types. Make sure that the data types of your database are supported.
The following table lists the data type mappings based on which Lindorm Reader and Lindorm Writer convert data types.
Category | Lindorm data type |
Integer | INT, LONG, and SHORT |
Floating point | DOUBLE, FLOAT, and DOUBLE |
String | STRING |
Date and time | DATE |
Boolean | BOOLEAN |
Binary | BINARYSTRING |
Develop a data synchronization task
For information about the configuration procedure, see Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix: Code and parameters.
Appendix: Code and parameters
Appendix: Configure a batch synchronization task by using the code editor
If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.
Code for Lindorm Reader
In the following code, a batch synchronization task is configured to synchronize data from a table of the table type in an ApsaraDB for Lindorm database to a server.
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "caching": 128, "configuration": { // The configuration items that are required for Data Integration to connect to the ApsaraDB for Lindorm cluster. You can obtain the configuration items in the ApsaraDB for Lindorm console. This parameter must be configured in the JSON format. "lindorm.client.username": "", "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020", "lindorm.client.namespace": "namespace", "lindorm.client.password": "" }, "columns": [ "id", "name", "age", "birthday", "gender" ], "envType": 1, "datasource": "_LINDORM", "namespace": "namespace", "table": "lindorm_table" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "columns": "columns": [ "id", "name", "age", "birthday", "gender" ], "selects": [ "where(compare(\"id\", LESS, 5))", "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))", "where(compare(\"id\", GREATER_OR_EQUAL, 10))" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { // The maximum transmission rate, in Byte/s. Data Integration tries to reach but cannot exceed the rate specified by this parameter. "byte": 1048576 } // The settings related to dirty data records. "errorLimit": { // The maximum number of dirty data records allowed. If the number of dirty data records generated during data synchronization exceeds the value of this parameter, an error is reported. "record": 0, // The maximum percentage of dirty data records allowed. 1.0 indicates 100%, and 0.02 indicates 2%. "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
In the following code, a batch synchronization task is configured to synchronize data from a table of the wideColumn type in an ApsaraDB for Lindorm database to a server.
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "configuration": { // The configuration items that are required for Data Integration to connect to the ApsaraDB for Lindorm cluster. You can obtain the configuration items in the ApsaraDB for Lindorm console. This parameter must be configured in the JSON format. "lindorm.client.username": "", "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020", "lindorm.client.namespace": "namespace", "lindorm.client.password": "" }, "columns": [ "STRING|rowkey", "INT|f:a", "DOUBLE|f:b" ], "envType": 1, "datasource": "_LINDORM", "namespace": "namespace", "tableMode": "wideColumn", "table":"yourTableName" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { // The maximum transmission rate, in Byte/s. Data Integration tries to reach but cannot exceed the rate specified by this parameter. "byte": 1048576 } // The settings related to dirty data records. "errorLimit": { // The maximum number of dirty data records allowed. If the number of dirty data records generated during data synchronization exceeds the value of this parameter, an error is reported. "record": 0, // The maximum percentage of dirty data records allowed. 1.0 indicates 100%, and 0.02 indicates 2%. "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Parameters in code for Lindorm Reader
Parameter | Description | Required | Default value |
configuration | The configuration items that are required for Data Integration to connect to the ApsaraDB for Lindorm cluster. You can log on to the ApsaraDB for Lindorm console to obtain the configuration items. Then, the administrator of the ApsaraDB for Lindorm database must convert the configurations to data in the following JSON format: {"key1":"value1","key2":"value2"}. Example: {"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"}. Note If you manually write the JSON code, you must escape double quotation marks (") of values to \". | Yes | No default value |
mode | The data read mode. Valid values: FixedColumn and DynamicColumn. Default value: FixedColumn. | Yes | FixedColumn |
tableMode | The type of the table from which you want to read data. Valid values: table and wideColumn. Default value: table. If you leave this parameter empty, the system reads data from a table of the table type by default. | No | No default value |
table | The name of the table from which you want to read data. The table name is case-sensitive. | Yes | No default value |
namespace | The namespace of the table from which you want to read data. The namespace name is case-sensitive. | Yes | No default value |
encoding | The encoding method. Valid values: UTF-8 and GBK. This parameter is used to convert the lindorm byte[] data stored in binary mode to strings. | No | UTF-8 |
caching | The number of data records to read at a time. Set this parameter to an appropriate value based on your business requirements. This greatly reduces the interactions between Data Integration and Lindorm and increases throughput. If you set this parameter to an excessively large value, an out of memory (OOM) error may occur during data synchronization. | No | 100 |
selects | The setting related to data sharding. If Lindorm Reader reads data from a table of the table type, data sharding is not supported, and a single thread is run for the batch synchronization task by default. In this case, you must manually configure the selects parameter to enable data sharding. Example:
| No | No default value |
columns | The names of the columns from which you want to read data. Lindorm Reader allows you to read data from specific columns of a source table. It also allows you to read data from the specified columns in a source table in an order that is different from the order specified in the schema of the source table.
| Yes | No default value |
Code for Lindorm Writer
In the following code, a batch synchronization task is configured to write data from a MySQL data source to a table of the table type in an ApsaraDB for Lindorm database.
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "checkSlave": true, "datasource": " ", "envType": 1, "column": [ "id", "value", "table" ], "socketTimeout": 3600000, "masterSlave": "slave", "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8", "print": true }, "name": "mysqlReader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "configuration": { "lindorm.client.seedserver": "xxxxxxx:30020", "lindorm.client.username": "xxxxxx", "lindorm.client.namespace": "default", "lindorm.client.password": "xxxxxx" }, "nullMode": "skip", "datasource": "", "writeMode": "api", "envType": 1, "columns": [ "id", "name", "age", "birthday", "gender" ], "dynamicColumn": "false", "table": "lindorm_table", "encoding": "utf8" }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "speed": { // The maximum transmission rate, in Byte/s. Data Integration tries to reach but cannot exceed the rate specified by this parameter. "byte": 1048576 }, // The settings related to dirty data records. "errorLimit": { // The maximum number of dirty data records allowed. If the number of dirty data records generated during data synchronization exceeds the value of this parameter, an error is reported. "record": 0, // The maximum percentage of dirty data records allowed. 1.0 indicates 100%, and 0.02 indicates 2%. "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
In the following code, a batch synchronization task is configured to write data from a MySQL data source to a table of the wideColumn type in an ApsaraDB for Lindorm database.
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": " ", "column": [ "id", "name", "age", "birthday", "gender" ], "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "configuration": { "lindorm.client.seedserver": "xxxxxxx:30020", "lindorm.client.username": "xxxxxx", "lindorm.client.namespace": "default", "lindorm.client.password": "xxxxxx" }, "writeMode": "api", "namespace": "default", "table": "xxxxxx", "encoding": "utf8", "nullMode": "skip", "dynamicColumn": "false", "caching": 128, "columns": [ "ROW|STRING", "cf:id|STRING", "cf:age|INT", "cf:birthday|STRING" ] }, "name":"Writer", "category":"writer" } ], "setting": { "jvmOption": "", "errorLimit": { "record": "0" }, "speed": { "concurrent": 3, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Parameters in code for Lindorm Writer
Parameter | Description | Required | Default value |
configuration | The configuration items that are required for Data Integration to connect to the ApsaraDB for Lindorm cluster. You can log on to the ApsaraDB for Lindorm console to obtain the configuration items. Then, the administrator of the ApsaraDB for Lindorm database must convert the configurations to data in the following JSON format: {"key1":"value1","key2":"value2"}. Example: {"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"}. Note If you manually write the JSON code, you must escape the double quotation marks (") of values to \". | Yes | No default value |
table | The name of the table to which you want to write data. The table name is case-sensitive. | Yes | No default value |
namespace | The namespace of the table to which you want to write data. The namespace name is case-sensitive. | Yes | No default value |
encoding | The encoding method. Valid values: UTF-8 and GBK. This parameter is used to convert the lindorm byte[] data stored in binary mode to strings. | No | UTF-8 |
columns | The names of the columns to which you want to write data. Lindorm Writer allows you to write data to specific columns of a destination table. It also allows you to write data to the specified columns in a destination table in an order that is different from the order specified in the schema of the destination table.
| Yes | No default value |
nullMode | The policy used by Lindorm Writer to process the situation that a source field contains the NULL value. Valid values:
| No | EMPTY_BYTES |