This topic describes how to use the data copy tool Jindo DistCp.
Prerequisites
Use Jindo DistCp
Run the following command to obtain the help information:
[root@emr-header-1 opt]# jindo distcp --help
- --src and --dest
- --parallelism
- --srcPattern
- --deleteOnSuccess
- --outputCodec
- --outputManifest and --requirePreviousManifest
- --outputManifest and --previousManifest
- --copyFromManifest
- --srcPrefixesFile
- --groupBy and -targetSize
- --enableBalancePlan
- --enableDynamicPlan
- --enableTransaction
- --diff
- Check DistCp Counters
- Use an AccessKey pair to access OSS
- Write data into OSS in IA or Archive mode
- Clean up residual files
--src and --dest
--src
specifies the source directory. --dest
specifies the destination directory.
By default, Jindo DistCp copies all files in the directory specified by --src
to the directory specified by --dest
. If you do not specify a root directory, Jindo DistCp automatically creates a root
directory.
jindo distcp --src /opt/tmp --dest oss://yang-hhht/tmp
--parallelism
--parallelism
specifies the mapreduce.job.reduces parameter for the MapReduce job that is executed
to copy files. The default value is 7. You can customize --parallelism
based on your available cluster resources. This allows you to determine how many
reduce tasks can be run in parallel.
jindo distcp --src /opt/tmp --dest oss://yang-hhht/tmp --parallelism 20
--srcPattern
--srcPattern
specifies a regular expression that filters files for the copy operation. The regular
expression must match a full path.
For example, if you need to copy all log files in the /data/incoming/hourly_table/2017-02-01/03 directory, set --srcPattern
to .*\.log.
[root@emr-header-1 opt]# hdfs dfs -ls /data/incoming/hourly_table/2017-02-01/03
Found 6 items
-rw-r----- 2 root hadoop 2252 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/000151.sst
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/1.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/2.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/OPTIONS-000109
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp01.txt
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp06.txt
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --srcPattern . *\.log --parallelism 20
[root@emr-header-1 opt]# hdfs dfs -ls oss://yang-hhht/hourly_table/2017-02-01/03
Found 2 items
-rw-rw-rw- 1 4891 2020-04-17 20:52 oss://yang-hhht/hourly_table/2017-02-01/03/1.log
-rw-rw-rw- 1 4891 2020-04-17 20:52 oss://yang-hhht/hourly_table/2017-02-01/03/2.log
--deleteOnSuccess
--deleteOnSuccess
enables Jindo DistCp to delete the copied files from the source directory after a
copy operation succeeds.
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --deleteOnSuccess --parallelism 20
--outputCodec
--outputCodec
specifies the compression codec that is used to compress copied files online. Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --outputCodec=gz --parallelism 20
[root@emr-header-1 opt]# hdfs dfs -ls oss://yang-hhht/hourly_table/2017-02-01/03
Found 6 items
-rw-rw-rw- 1 938 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/000151.sst.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/1.log.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/2.log.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/OPTIONS-000109.gz
-rw-rw-rw- 1 506 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/emp01.txt.gz
-rw-rw-rw- 1 506 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/emp06.txt.gz
- none: Jindo DistCp does not compress copied files. If the files have been compressed, Jindo DistCp decompresses them.
- keep: Jindo DistCp copies files with no change in the compression.
--outputManifest and --requirePreviousManifest
--outputManifest
generates a manifest file that contains the information about all the files copied
by Jindo DistCp. The information includes the destination files, source files, and
file sizes.
--requirePreviousManifest
to false
. By default, the file is compressed in the GZ format. This is the only supported
format. jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --outputManifest=manifest-2020-04-17.gz --requirePreviousManifest=false --parallelism 20
[root@emr-header-1 opt]# hadoop fs -text oss://yang-hhht/hourly_table/manifest-2020-04-17.gz > before.lst
[root@emr-header-1 opt]# cat before.lst
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/000151.sst","baseName":"2017-02-01/03/000151.sst","srcDir":"oss://yang-hhht/hourly_table","size":2252}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/1.log","baseName":"2017-02-01/03/1.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/2.log","baseName":"2017-02-01/03/2.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/OPTIONS-000109","baseName":"2017-02-01/03/OPTIONS-000109","srcDir":"oss://yang-hhht/hourly_table","size":4891}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/emp01.txt","baseName":"2017-02-01/03/emp01.txt","srcDir":"oss://yang-hhht/hourly_table","size":1016}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/emp06.txt","baseName":"2017-02-01/03/emp06.txt","srcDir":"oss://yang-hhht/hourly_table","size":1016}
--outputManifest and --previousManifest
--outputManifest
generates a manifest file that contains a list of both previously and newly copied
files. --previousManifest
generates a manifest file that contains a list of previously copied files. This way,
you can recreate the full history of operations and see what files are copied by the
current job:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --outputManifest=manifest-2020-04-18.gz --previousManifest=oss://yang-hhht/hourly_table/manifest-2020-04-17.gz --parallelism 20
[root@emr-header-1 opt]# hadoop fs -text oss://yang-hhht/hourly_table/manifest-2020-04-18.gz > current.lst
[root@emr-header-1 opt]# diff before.lst current.lst
3a4,5
> {"path":"oss://yang-hhht/hourly_table/2017-02-01/03/5.log","baseName":"2017-02-01/03/5.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}
> {"path":"oss://yang-hhht/hourly_table/2017-02-01/03/6.log","baseName":"2017-02-01/03/6.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}
--copyFromManifest
--copyFromManifest
to specify a manifest file that was previously generated by --outputManifest
and copy the files listed in the manifest file to the destination directory. Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --previousManifest=oss://yang-hhht/hourly_table/manifest-2020-04-17.gz --copyFromManifest --parallelism 20
--srcPrefixesFile
--srcPrefixesFile
enables Jindo DistCp to copy files in multiple folders at a time.
[root@emr-header-1 opt]# hdfs dfs -ls oss://yang-hhht/hourly_table
Found 4 items
drwxrwxrwx - 0 1970-01-01 08:00 oss://yang-hhht/hourly_table/2017-02-01
drwxrwxrwx - 0 1970-01-01 08:00 oss://yang-hhht/hourly_table/2017-02-02
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --srcPrefixesFile file:///opt/folders.txt --parallelism 20
[root@emr-header-1 opt]# cat folders.txt
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-01
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-02
--groupBy and -targetSize
Reading a large number of small files from HDFS affects the data processing performance. Therefore, we recommend that you use Jindo DistCp to merge small files into large files of a specified size. This optimizes analysis performance and reduces costs.
[root@emr-header-1 opt]# hdfs dfs -ls /data/incoming/hourly_table/2017-02-01/03
Found 8 items
-rw-r----- 2 root hadoop 2252 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/000151.sst
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/1.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/2.log
-rw-r----- 2 root hadoop 4891 2020-04-17 21:08 /data/incoming/hourly_table/2017-02-01/03/5.log
-rw-r----- 2 root hadoop 4891 2020-04-17 21:08 /data/incoming/hourly_table/2017-02-01/03/6.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/OPTIONS-000109
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp01.txt
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp06.txt
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --targetSize=10 --groupBy='.*/([a-z]+). *.txt' --parallelism 20
[root@emr-header-1 opt]# hdfs dfs -ls oss://yang-hhht/hourly_table/2017-02-01/03/
Found 1 items
-rw-rw-rw- 1 2032 2020-04-17 21:18 oss://yang-hhht/hourly_table/2017-02-01/03/emp2
--enableBalancePlan
--enableBalancePlan
to optimize the job allocation plan. This improves the copy performance of Jindo
DistCp. If you do not specify an application plan, files are randomly allocated. jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --enableBalancePlan --parallelism 20
--groupby
or --targetSize
.
--enableDynamicPlan
--enableDynamicPlan
to optimize the job allocation plan. This improves the copy performance of Jindo
DistCp. jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --enableDynamicPlan --parallelism 20
--groupby
or --targetSize
.
--enableTransaction
--enableTransaction
ensures the integrity of job levels and transaction support among jobs. Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --enableTransaction --parallelism 20
--diff
After files are copied, you can use --diff
to check the differences between the source and destination directories.
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --diff
INFO distcp.JindoDistCp: distcp has been done completely
--copyFromManifest
and --previousManifest
to copy the files in the list to the destination directory. This way, the data volume
and file quantity are verified. If Jindo DistCp has performed compression or decompression
operations during the copy process, --diff
does not return accurate file size differences. jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --dest oss://yang-hhht/hourly_table --previousManifest=file:///opt/manifest-2020-04-17.gz --copyFromManifest --parallelism 20
--dest
in the format of /path, hdfs://hostname:ip/path, or hdfs://headerIp:ip/path. Other formats, such as hdfs:///path and hdfs:/path, are not supported.
Check DistCp Counters
Distcp Counters
Bytes Destination Copied=11010048000
Bytes Source Read=11010048000
Files Copied=1001
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
Bytes Destination Copied
and Bytes Source Read
may be different.
Use an AccessKey pair to access OSS
If you want to access OSS from an instance outside EMR or AccessKey-free access is not supported, you can use an AccessKey pair to access OSS. Set the --ossKey, --ossSecret, and --ossEndPoint parameters in the command to specify an AccessKey pair.
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --ossSecret yourkey --secret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --parallelism 20
Write data into OSS in IA or Archive mode
- Example of the Archive mode (--archive):
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --policy archive --parallelism 20
- Example of the IA mode (--ia):
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --policy ia --parallelism 20
Clean up residual files
When you run a DistCp task, files that are not correctly uploaded may be generated in your destination directory. The files are managed by OSS based on uploadId and may be invisible to users. In this case, you can specify the --cleanUpPending parameter in the command. This way, the system automatically cleans up the residual files after the task is completed. Alternatively, you can also clean up the files in the OSS console.
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --cleanUpPending --parallelism 20