This topic describes how to use the data copy tool Jindo DistCp.
Prerequisites
- Java Development Kit (JDK) 8 is installed on your computer.
- An E-MapReduce (EMR) cluster is created. For more information, see Create a cluster.
Use Jindo DistCp
- Connect to the master node of the EMR cluster in SSH mode.
For more information, see Connect to the master node of an EMR cluster in SSH mode.
- Run the following command to obtain help information:
jindo distcp --help
The following information is returned:--help - Print help text --src=VALUE - Directory to copy files from --dest=VALUE - Directory to copy files to --parallelism=VALUE - Copy task parallelism --outputManifest=VALUE - The name of the manifest file --previousManifest=VALUE - The path to an existing manifest file --requirePreviousManifest=VALUE - Require that a previous manifest is present if specified --copyFromManifest - Copy from a manifest instead of listing a directory --srcPrefixesFile=VALUE - File containing a list of source URI prefixes --srcPattern=VALUE - Include only source files matching this pattern --deleteOnSuccess - Delete input files after a successful copy --outputCodec=VALUE - Compression codec for output files --groupBy=VALUE - Pattern to group input files by --targetSize=VALUE - Target size for output files --enableBalancePlan - Enable plan copy task to make balance --enableDynamicPlan - Enable plan copy task dynamically --enableTransaction - Enable transation on Job explicitly --diff - show the difference between src and dest filelist --ossKey=VALUE - Specify your oss key if needed --ossSecret=VALUE - Specify your oss secret if needed --ossEndPoint=VALUE - Specify your oss endPoint if needed --policy=VALUE - Specify your oss storage policy --cleanUpPending - clean up the incomplete upload when distcp job finish --queue=VALUE - Specify yarn queuename if needed --bandwidth=VALUE - Specify bandwidth per map/reduce in MB if needed --s3Key=VALUE - Specify your s3 key --s3Secret=VALUE - Specify your s3 Sercet --s3EndPoint=VALUE - Specify your s3 EndPoint --enableCMS - Enable CMS --update - Update target, copying only missing files or directories --filters=VALUE - Specify a path of file containing patterns to exlude source files
--src and --dest
--src
specifies the source directory. --dest
specifies the destination directory.
By default, Jindo DistCp copies all files in the directory specified by --src
to the directory specified by --dest
. If you do not specify a root directory, Jindo DistCp automatically creates a root
directory.
jindo distcp --src /opt/tmp --dest oss://<yourBucketName>/tmp
--parallelism
--parallelism
specifies the mapreduce.job.reduces parameter for the MapReduce job that is run to
copy files. The default value is 7. You can customize --parallelism
based on your available cluster resources. This allows you to determine how many
reduce tasks can be run in parallel.
jindo distcp --src /opt/tmp --dest oss://<yourBucketName>/tmp --parallelism 20
--srcPattern
--srcPattern
specifies a regular expression that filters files for the copy operation. The regular
expression must match a full path.
For example, if you need to copy all log files in the /data/incoming/hourly_table/2017-02-01/03 directory, set --srcPattern
to .*\.log.
hdfs dfs -ls /data/incoming/hourly_table/2017-02-01/03
Found 6 items
-rw-r----- 2 root hadoop 2252 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/000151.sst
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/1.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/2.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/OPTIONS-000109
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp01.txt
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp06.txt
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --srcPattern .*\.log --parallelism 20
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03
Found 2 items
-rw-rw-rw- 1 4891 2020-04-17 20:52 oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log
-rw-rw-rw- 1 4891 2020-04-17 20:52 oss://<yourBucketName>/hourly_table/2017-02-01/03/2.log
--deleteOnSuccess
--deleteOnSuccess
enables Jindo DistCp to delete the copied files from the source directory after a
copy operation succeeds.
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --deleteOnSuccess --parallelism 20
--outputCodec
--outputCodec
specifies the compression codec that is used to compress copied files online. Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputCodec=gz --parallelism 20
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03
Found 6 items
-rw-rw-rw- 1 938 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/000151.sst.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/2.log.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/OPTIONS-000109.gz
-rw-rw-rw- 1 506 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp01.txt.gz
-rw-rw-rw- 1 506 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp06.txt.gz
- none: Jindo DistCp does not compress copied files. If the files have been compressed, Jindo DistCp decompresses them.
- keep: Jindo DistCp copies files with no change in the compression.
--outputManifest and --requirePreviousManifest
--outputManifest
generates a manifest file that contains the information about all the files copied
by Jindo DistCp. The information includes the destination files, source files, and
file sizes.
--requirePreviousManifest
to false
. By default, the file is compressed in the GZ format. This is the only supported
format. jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputManifest=manifest-2020-04-17.gz --requirePreviousManifest=false --parallelism 20
hadoop fs -text oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz > before.lst
cat before.lst
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/000151.sst","baseName":"2017-02-01/03/000151.sst","srcDir":"oss://<yourBucketName>/hourly_table","size":2252}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log","baseName":"2017-02-01/03/1.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/2.log","baseName":"2017-02-01/03/2.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/OPTIONS-000109","baseName":"2017-02-01/03/OPTIONS-000109","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/emp01.txt","baseName":"2017-02-01/03/emp01.txt","srcDir":"oss://<yourBucketName>/hourly_table","size":1016}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/emp06.txt","baseName":"2017-02-01/03/emp06.txt","srcDir":"oss://<yourBucketName>/hourly_table","size":1016}
--outputManifest and --previousManifest
--outputManifest
generates a manifest file that contains a list of both previously and newly copied
files. --previousManifest
generates a manifest file that contains a list of previously copied files. This way,
you can recreate the full history of operations and see what files are copied by the
current job.
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputManifest=manifest-2020-04-18.gz --previousManifest=oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz --parallelism 20
hadoop fs -text oss://<yourBucketName>/hourly_table/manifest-2020-04-18.gz > current.lst
diff before.lst current.lst
3a4,5
> {"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/5.log","baseName":"2017-02-01/03/5.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
> {"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/6.log","baseName":"2017-02-01/03/6.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
--copyFromManifest
--copyFromManifest
to specify a manifest file that was previously generated by --outputManifest
and copy the files listed in the manifest file to the destination directory. Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --previousManifest=oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz --copyFromManifest --parallelism 20
--srcPrefixesFile
--srcPrefixesFile
enables Jindo DistCp to copy files in multiple folders at a time.
hdfs dfs -ls oss://<yourBucketName>/hourly_table
Found 4 items
drwxrwxrwx - 0 1970-01-01 08:00 oss://<yourBucketName>/hourly_table/2017-02-01
drwxrwxrwx - 0 1970-01-01 08:00 oss://<yourBucketName>/hourly_table/2017-02-02
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --srcPrefixesFile file:///opt/folders.txt --parallelism 20
cat folders.txt
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-01
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-02
--groupBy and -targetSize
Reading a large number of small files from HDFS affects the data processing performance. Therefore, we recommend that you use Jindo DistCp to merge small files into large files of a specified size. This optimizes analysis performance and reduces costs.
hdfs dfs -ls /data/incoming/hourly_table/2017-02-01/03
Found 8 items
-rw-r----- 2 root hadoop 2252 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/000151.sst
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/1.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/2.log
-rw-r----- 2 root hadoop 4891 2020-04-17 21:08 /data/incoming/hourly_table/2017-02-01/03/5.log
-rw-r----- 2 root hadoop 4891 2020-04-17 21:08 /data/incoming/hourly_table/2017-02-01/03/6.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/OPTIONS-000109
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp01.txt
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp06.txt
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --targetSize=10 --groupBy='.*/([a-z]+).*.txt' --parallelism 20
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03/
Found 1 items
-rw-rw-rw- 1 2032 2020-04-17 21:18 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp2
--enableBalancePlan
--enableBalancePlan
to optimize the job allocation plan. This improves the copy performance of Jindo
DistCp. If you do not specify an application plan, files are randomly allocated. jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableBalancePlan --parallelism 20
--groupBy
or --targetSize
.
--enableDynamicPlan
--enableDynamicPlan
to optimize the job allocation plan. This improves the copy performance of Jindo
DistCp. jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableDynamicPlan --parallelism 20
--groupBy
or --targetSize
.
--enableTransaction
--enableTransaction
ensures the integrity of job levels and transaction support among jobs. Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableTransaction --parallelism 20
--diff
After files are copied, you can use --diff
to check the differences between files in the source and destination directories.
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --diff
INFO distcp.JindoDistCp: Jindo DistCp job exit with 0
--copyFromManifest
and --previousManifest
to copy the files in the list to the destination directory. This way, the data volume
and file quantity are verified. If Jindo DistCp has performed compression or decompression
operations during the copy process, --diff
does not return accurate file size differences. jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --dest oss://<yourBucketName>/hourly_table --previousManifest=file:///opt/manifest-2020-04-17.gz --copyFromManifest --parallelism 20
--dest
in the /path, hdfs://hostname:ip/path, or hdfs://headerIp:ip/path format. Other formats, such as hdfs:///path and hdfs:/path, are not supported.
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --update --parallelism 20
--queue
--queue specifies the name of the YARN queue in which the current DistCp task resides.
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --queue yarnqueue
--bandwidth
--bandwidth specifies the bandwidth allocated to a single node of the current DistCp task. This prevents the task from occupying excessive bandwidth. Unit: MB/s.
--update
--update enables Jindo DistCp to incrementally update files with one click. Jindo DistCp skips the files and directories that are the same as those in the destination and directly synchronizes new and updated files and directories from the source to the destination.
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --update --parallelism 20
--filters
--filters specifies the path of a file. In this file, one regular expression is specified in each row. The regular expressions are used to filter out the files that you do not want to copy or compare for differences in the current DistCp task.
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table -filters /path/to/filterfile.txt --parallelism 20
.*\.tmp.
.*\.staging.*
If the preceding sample file is used, the DistCp task filters out the files whose names contain .tmp or .staging in the hdfs://data/incoming/hourly_tabl directory and skips these files during copy and --diff operations.
Use an AccessKey pair to access OSS
If you want to access OSS from an instance outside EMR or AccessKey-free access is not supported, you can use an AccessKey pair to access OSS. Set the --key, --secret, and --endPoint parameters in the command to specify an AccessKey pair.
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --key <yourAccessKeyId> --secret <yourAccessKeySecret> --endPoint oss-cn-hangzhou.aliyuncs.com --parallelism 20
yourAccessKeyId is the AccessKey ID of your Alibaba Cloud account. yourAccessKeySecret is the AccessKey secret of your Alibaba Cloud account.
Write data to OSS Cold Archive, Archive, or IA storage
--policy
to specify a storage class.
- Example of the Cold Archive storage class (
--policy coldArchive
)hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy coldArchive --parallelism 20
Note The Cold Archive storage class is available only in some regions. For more information about the supported regions, see Overview. - Example of the Archive storage class (
--policy archive
)jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy archive --parallelism 20
- Example of the Infrequent Access (IA) storage class (
--policy ia
)jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy ia --parallelism 20
Use CloudMonitor
You can use CloudMonitor to collect the metrics of Alibaba Cloud resources and your custom metrics, detect service availability, and configure alerts for metrics. This helps you obtain the Alibaba Cloud resource usage and the status of applications and handle exceptions at the earliest opportunity to ensure the normal running of your applications.
You can specify whether CloudMonitor reports a failure if the current DistCp task fails. You can perform the following steps to configure the alerting feature in the CloudMonitor console:
- Create an alert contact or an alert contact group. For more information, see Create an alert contact or alert group.
- Obtain an alert token.
- In the left-side navigation pane, choose .
- On the Alert Contacts page, click the Alert Contact Group tab.
- Find your alert contact group and click Access External alert.
Record the alert token that is displayed in the panel that appears.
- In the panel, click Test Command to configure the environment variables described in the following table.
Parameter Description cmsAccessKeyId The AccessKey ID of your Alibaba Cloud account. cmsAccessSecret The AccessKey secret of your Alibaba Cloud account. cmsRegion The ID of the region where the cluster resides, such as cn-hangzhou. cmsToken The alert token that is obtained in Step 2. cmsLevel The alert level. The following levels are supported: - INFO: email and DingTalk chatbot
- WARN: text message, email, and DingTalk chatbot
- CRITICAL: phone call, text message, email, and DingTalk chatbot
Example:export cmsAccessKeyId=<your_key_id> export cmsAccessSecret=<your_key_secret> export cmsRegion=cn-hangzhou export cmsToken=<your_cms_token> export cmsLevel=WARN hadoop jar jindo-distcp-3.5.0.jar \ --src /data/incoming/hourly_table \ --dest oss://yang-hhht/hourly_table \ --enableCMS
Clean up residual files
When you run a DistCp task, files that are not correctly uploaded may be generated in your destination directory. The files are managed by OSS based on uploadId and may be invisible to users. In this case, you can specify the --cleanUpPending parameter in the command. This way, the system automatically cleans up the residual files after the task is complete. Alternatively, you can also clean up the files in the OSS console.
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --cleanUpPending --parallelism 20
Use Amazon S3 as a data source
You can use the --s3Key, --s3Secret, and --s3EndPoint parameters in a command to specify information related to Amazon S3.
jindo distcp jindo-distcp-2.7.3.jar --src s3a://yourbucket/ --dest oss://<your_bucket>/hourly_table --s3Key yourkey --s3Secret yoursecret --s3EndPoint s3-us-west-1.amazonaws.com
<configuration>
<property>
<name>fs.s3a.access.key</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3.endpoint</name>
<value>s3-us-west-1.amazonaws.com</value>
</property>
</configuration>
Example: jindo distcp /tmp/jindo-distcp-2.7.3.jar --src s3://smartdata1/ --dest s3://smartdata1/tmp --s3EndPoint s3-us-west-1.amazonaws.com
Check DistCp counters
JindoDistcpCounter
BYTES_EXPECTED=10000
BYTES_SKIPPED=10000
FILES_EXPECTED=11
FILES_SKIPPED=11
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
Task counter | Description |
---|---|
COPY_FAILED | The number of files that fail to be copied. An alert is reported when this counter is not 0. |
CHECKSUM_DIFF | The number of files that fail to pass the checksum verification. The number is added to the value of COPY_FAILED. |
FILES_EXPECTED | The number of files that need to be copied. |
FILES_COPIED | The number of files that are copied. |
FILES_SKIPPED | The number of files that are skipped during incremental updates. |
BYTES_SKIPPED | The number of bytes that are skipped during incremental updates. |
DIFF_FILES | The number of files that have differences. The files are obtained by using --diff. An alert is reported when this counter is not 0. |
SAME_FILES | The number of files that have no differences. The files are obtained by using --diff. |
DST_MISS | The number of files that do not exist in the destination directory. The number is added to the value of DIFF_FILES. |
LENGTH_DIFF | The number of files that are of different sizes in the source and destination directories. The number is added to the value of DIFF_FILES. |
CHECKSUM_DIFF | The number of files that fail to pass the checksum verification. The number is added to the value of DIFF_FILES. |
DIFF_FAILED | The number of files for which --diff-related errors are reported. You can view details about the errors in log files. |