Lindorm provides the bulkload feature to allow you to import data in batches. The bulkload feature can be used to quickly import data in a stable manner. This topic describes how to use the bulkload feature to import data in batches.
Feature
The bulkload feature loads data in bypass mode. The bulkload feature does not load data by using API operations or computing resources of your Lindorm instance. Compared with the method of calling API operations to import data, the bulkload feature provides the following advantages:
The data importing rate of bulkload is more than 10 times higher than the rate of importing data by using API operations.
Bulkload jobs do not affect your online services because bulkload jobs do not require the computing resources of your online services.
Resource usage for bulkload jobs is more flexible because the system uses Spark computing resources to run bulkload jobs.
The bulkload feature can be used to import data from various types of data sources, including Comma Separated Values (CSV) files, Optimized Row Columnar (ORC) files, Parquet files, and tables in Alibaba Cloud MaxCompute.
The bulkload feature is easy to use. You do not need to write a line of code to load data in batches in bypass mode.
Cost-effectiveness. Lindorm Tunnel Service (LTS) provides resources for bulkloading based on the cloud native elastic capability of serverless Spark. LTS provides computing resources that can be scaled based on your business requirements, and the resources that are used for bulkloading are billed by using the pay-as-you-go billing method. You do not need to purchase computing resources for bulkload jobs for a long period of time. This helps you reduce resource costs.
Prerequisites
LTS is activated for your Lindorm instance and you are logged on to the LTS web user interface (UI) of your Lindorm instance. For more information, see Activate and log on to LTS.
Lindorm Distributed Processing System (LDPS) is activated for your Lindorm instance. For more information, see Activate LDPS and modify the configurations.
A Spark data source is added. For more information, see Add a Spark data source.
Supported data sources
Source | Destination |
MaxCompute Table | Wide tables in the Lindorm wide table engine service (LindormTable) |
HDFS CSV or OSS CSV | |
HDFS Parquet or OSS Parquet | |
HDFS ORC or OSS ORC |
How to create a bulkload job
You can create a bulkload job by using one of the following methods:
Create a bulkload job in the LTS console
Log on to the LTS web UI of your Lindorm instance. For more information, see Activate and log on to LTS.
In the left-side navigation pane, choose Data Source Management > Add Data Source.
Create a MaxCompute data source. For information about how to create a MaxCompute data source, see ODPS data source.
Create a LindormTable data source. For information about how to create a LindormTable data source, see Add a LindormTable data source.
Create a Hadoop Distributed File System (HDFS) data source. For information about how to create an HDFS data source, see Add an HDFS data source.
In the left-side navigation pane, choose Data Import > Bulkload.
Click create new job. Then, configure the parameters on the page that appears. The following table describes the parameters.
Section
Parameter
Description
Select Datasource
Source
Select the MaxCompute data source or HDFS data source from which you want to import data to Lindorm.
Target
Select the LindormTable data source to which you want to import data.
Configuration
Reader Config
If the data source is a table in MaxCompute, the configuration of the reader must include the following parameters:
table: the name of the source table in MaxCompute.
column: the names of the columns from which you want to import data to Lindorm.
partition: If the source table is a partitioned table, specify the partitions from which you want to import data to Lindorm. If the source table is a non-partitioned table, this parameter can be left empty.
numPartitions: the number of threads that you want the reader to use to read data from the source table.
If the data source is a CSV file in an HDFS system, the configuration of the reader must include the following parameters:
filePath: the directory where the CSV file resides.
header: specifies whether the CSV file includes a header line.
delimiter: the delimiter that is used in the CSV file.
column: the names and data types of the columns in the CSV file.
If the data source is a Parquet file in an HDFS system, the configuration of the reader must include the following parameters:
filePath: the directory where the Parquet file reside.
column: the names of the columns in the Parquet file.
NoteFor more information about how to configure a reader, see Sample configurations.
Writer Config
namespace: the namespace of the LindormTable cluster.
lindormTable: the name of the destination wide table in LindormTable.
compression: the compression algorithm that you want to use. Only the Zstandard (Zstd) compression algorithm is supported. If you do not want to use a compression algorithm, specify none as the value of this parameter.
columns: the names of the columns in the destination wide table.
If the destination wide table is a Lindorm wide table that supports SQL queries, specify the names of the columns that are included in the Lindorm wide table as the value of this parameter. The columns must correspond to the columns that are specified in the reader configuration.
If the destination wide table is a Lindorm wide table that is compatible with ApsaraDB for HBase, specify the standard names of the columns in the destination table as the value of this parameter. The columns must correspond to the columns that are specified in the reader configuration.
timestamp: the timestamp of the data in the wide table. The following data types are supported:
13-digit Long values.
Strings in the yyyy-MM-dd HH:mm:ss or yyyy-MM-dd HH:mm:ss SSS formats.
NoteFor more information about how to configure a writer, see Sample configurations.
Job Config
Spark Driver Spec
Select the instance type of the Spark driver of the bulkload job.
Spark Executor Spec
Select the instance type of the Spark executors of the bulkload job.
Spark Executor Instances
Specify the number of Spark executors that you want to use to run the bulkload job.
Spark Config
Optional. The extended configurations of the bulkload job.
Click Submit.
On the Bulkload page, click the name of the bulkload job to view information about the job.
Click the name of the bulkload job. Then, view information about the job on the Spark web UI of the job.
Click Details to view the execution logs of the job.
NoteIf you want to import data to a Lindorm wide table in which data is evenly distributed to each partition, the size of data is 100 GB, and the compression ratio is 1:4, the system requires approximately 1 hour to import the data. In actual business scenarios, the import duration is determined based on the size of the data that is imported.
Call the related API operation to create a bulkload job
Create a bulkload job
API operation:
http://{BDSMaster}:12311/pro/proc/bulkload/create
. The HTTP request method is POST. {BDSMaster}: specifies the domain name of the primary node of your Lindorm instance. To obtain the domain name, log on to the LTS web UI of the Lindorm instance and view the information in the Basic Info section on the Cluster Info page.The following table describes the request parameters.
Parameter
Description
src
The name of the source data source.
dst
The name of the destination data source.
readerConfig
The configuration of the reader. The configuration data must be in the JSON format. For more information, see Sample configurations.
writerConfig
The configuration of the writer. The configuration data must be in the JSON format. For more information, see Sample configurations.
driverSpec
The instance type of the driver of the bulkload job. Valid values: small, medium, large, and xlarge. We recommend that you set this parameter to large.
instances
The number of Spark executors that you want to use to run the bulkload job.
fileType
The type of the source file. If the source data source is an HDFS data source, the file type is CSV or Parquet.
sparkAdditionalParams
Optional. Extension parameters.
Example:
curl -d "src=hdfs&dst=ld&readerConfig={\"filePath\":\"parquet/\",\"column\":[\"id\",\"intcol\",\"doublecol\",\"stringcol\",\"string1col\",\"decimalcol\"]}&writerConfig={\"columns\":[\"ROW||String\",\"f:intcol||Int\",\"f:doublecol||Double\",\"f:stringcol||String\",\"f:string1col||String\",\"f:decimalcol||Decimal\"],\"namespace\":\"default\",\"lindormTable\":\"bulkload_test\",\"compression\":\"zstd\"}&driverSpec=large&instances=5&fileType=Parquet" -H "Content-Type: application/x-www-form-urlencoded" -X POST http://{LTSMaster}:12311/pro/proc/bulkload/create
The following result is returned. The message parameter in the result indicates the ID of the job that is created.
{"success":"true","message":"proc-91-ff383c616e5242888b398e51359c****"}
Query information about a bulkload job.
API operation:
http://{LTSMaster}:12311/pro/proc/{procId}/info
. The HTTP request method is GET. {LTSMaster}: specifies the domain name of the primary node of your Lindorm instance. To obtain the domain name, log on to the LTS web UI of the Lindorm instance and view the information in the Basic Info section on the Cluster Info page.Request parameter: procId. The procId parameter specifies the ID of the job.
Example:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/info
The following result is returned:
{ "data":{ "checkJobs":Array, "procId":"proc-91-ff383c616e5242888b398e51359c****", //Indicates the job ID. "incrJobs":Array, "procConfig":Object, "stage":"WAIT_FOR_SUCCESS", "fullJobs":Array, "mergeJobs":Array, "srcDS":"hdfs", //Indicates the source data source. "sinkDS":"ld-uf6el41jkba96****", //Indicates the destination data source. "state":"RUNNING", //Indicates the status of the job. "schemaJob":Object, "procType":"SPARK_BULKLOAD" //Indicates the type of the job. }, "success":"true" }
Terminate a bulkload job
API operation:
http://{LTSMaster}:12311/pro/proc/{procId}/abort
. The HTTP request method is GET. {LTSMaster}: specifies the domain name of the primary node of your Lindorm instance. To obtain the domain name, log on to the LTS web UI of the Lindorm instance and view the information in the Basic Info section on the Cluster Info page.Request parameter: procId. The procId parameter specifies the ID of the job.
Example:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/abort
The following result is returned:
{"success":"true","message":"ok"}
Retry a bulkload job
API operation:
http://{LTSMaster}:12311/pro/proc/{procId}/retry
. The HTTP request method is GET. {LTSMaster}: specifies the domain name of the primary node of your Lindorm instance. To obtain the domain name, log on to the LTS cluster of the Lindorm instance and view the information in the Basic Info section on the Cluster Info page.Request parameter: procId. The procId parameter specifies the ID of the job.
Example:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/retry
The following result is returned:
{"success":"true","message":"ok"}
Delete a bulkload job
API operation:
http://{LTSMaster}:12311/pro/proc/{procId}/delete
. The HTTP request method is GET. {LTSMaster}: specifies the domain name of the primary node of your Lindorm instance. To obtain the domain name, log on to the LTS cluster of the Lindorm instance and view the information in the Basic Info section on the Cluster Info page.Request parameter: procId. The procId parameter specifies the ID of the job.
Example:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/delete
The following result is returned:
{"success":"true","message":"ok"}
Sample configurations
Sample configurations that are used to read data from a data source.
The following sample code provides an example on how to configure the reader to read data from a MaxCompute data source.
{ "table": "test", "column": [ "id", "intcol", "doublecol", "stringcol", "string1col", "decimalcol" ], "partition": [ "pt=1" ], "numPartitions":10 }
The following sample code provides an example on how to configure the reader to read data from a CSV file in an HDFS data source.
{ "filePath":"csv/", "header": false, "delimiter": ",", "column": [ "id|string", "intcol|int", "doublecol|double", "stringcol|string", "string1col|string", "decimalcol|decimal" ] }
The following sample code provides an example on how to configure the reader to read data from a Parquet file in an HDFS data source.
{ "filePath":"parquet/", "column": [ // The names of columns in the Parquet file. "id", "intcol", "doublecol", "stringcol", "string1col", "decimalcol" ] }
Sample configurations that are used to write data to a destination table.
The following sample code provides an example on how to configure the writer to write data to a Lindorm wide table that supports SQL queries.
{ "namespace": "default", "lindormTable": "xxx", "compression":"zstd", "timestamp":"2022-07-01 10:00:00", "columns": [ "id", "intcol", "doublecol", "stringcol", "string1col", "decimalcol" ] }
The following sample code provides an example on how to configure the writer to write data to a Lindorm wide table that is compatible with ApsaraDB for HBase.
{ "namespace": "default", "lindormTable": "xxx", "compression":"zstd", "timestamp":"2022-07-01 10:00:00", "columns": [ "ROW||String", //ROW indicates the row key, and String indicates the data type of the column. "f:intcol||Int", // Column family:Column name||Data type "f:doublecol||Double", "f:stringcol||String", "f:string1col||String", "f:decimalcol||Decimal" ] }