This topic describes how to use the data copy tool Jindo DistCp.
Prerequisites
- Java Development Kit (JDK) 8 is installed on your computer.
- An E-MapReduce (EMR) cluster of the 3.28.0 version or later is created. For information about how to create a cluster, see Create a cluster.
Use Jindo DistCp
- Log on to the master node of the EMR cluster in SSH mode.
For more information, see Connect to the master node of an EMR cluster in SSH mode.
- Run the following command to obtain help information:
jindo distcp --help
The following information is returned:--help - Print help text --src=VALUE - Directory to copy files from --dest=VALUE - Directory to copy files to --parallelism=VALUE - Copy task parallelism --outputManifest=VALUE - The name of the manifest file --previousManifest=VALUE - The path to an existing manifest file --requirePreviousManifest=VALUE - Require that a previous manifest is present if specified --copyFromManifest - Copy from a manifest instead of listing a directory --srcPrefixesFile=VALUE - File containing a list of source URI prefixes --srcPattern=VALUE - Include only source files matching this pattern --deleteOnSuccess - Delete input files after a successful copy --outputCodec=VALUE - Compression codec for output files --groupBy=VALUE - Pattern to group input files by --targetSize=VALUE - Target size for output files --enableBalancePlan - Enable plan copy task to make balance --enableDynamicPlan - Enable plan copy task dynamically --enableTransaction - Enable transation on Job explicitly --diff - show the difference between src and dest filelist --ossKey=VALUE - Specify your oss key if needed --ossSecret=VALUE - Specify your oss secret if needed --ossEndPoint=VALUE - Specify your oss endPoint if needed --policy=VALUE - Specify your oss storage policy --cleanUpPending - clean up the incomplete upload when distcp job finish --queue=VALUE - Specify yarn queuename if needed --bandwidth=VALUE - Specify bandwidth per map/reduce in MB if needed --s3Key=VALUE - Specify your s3 key --s3Secret=VALUE - Specify your s3 Sercet --s3EndPoint=VALUE - Specify your s3 EndPoint
--src and --dest
--src
specifies the source directory. --dest
specifies the destination directory.
By default, Jindo DistCp copies all files in the directory specified by --src
to the directory specified by --dest
. If you do not specify a root directory, Jindo DistCp automatically creates a root
directory.
jindo distcp --src /opt/tmp --dest oss://yang-hhht/tmp
--parallelism
--parallelism
specifies the mapreduce.job.reduces parameter for the MapReduce job that is executed
to copy files. The default value is 7. You can customize --parallelism
based on your available cluster resources. This allows you to determine how many
reduce tasks can be run in parallel.
jindo distcp --src /opt/tmp --dest oss://yang-hhht/tmp --parallelism 20
--srcPattern
--srcPattern
specifies a regular expression that filters files for the copy operation. The regular
expression must match a full path.
For example, if you need to copy all log files in the /data/incoming/hourly_table/2017-02-01/03 directory, set --srcPattern
to .*\.log.
[root@emr-header-1 opt]# hdfs dfs -ls /data/incoming/hourly_table/2017-02-01/03
Found 6 items
-rw-r----- 2 root hadoop 2252 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/000151.sst
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/1.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/2.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/OPTIONS-000109
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp01.txt
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp06.txt
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --srcPattern . *\.log --parallelism 20
[root@emr-header-1 opt]# hdfs dfs -ls oss://yang-hhht/hourly_table/2017-02-01/03
Found 2 items
-rw-rw-rw- 1 4891 2020-04-17 20:52 oss://yang-hhht/hourly_table/2017-02-01/03/1.log
-rw-rw-rw- 1 4891 2020-04-17 20:52 oss://yang-hhht/hourly_table/2017-02-01/03/2.log
--deleteOnSuccess
--deleteOnSuccess
enables Jindo DistCp to delete the copied files from the source directory after a
copy operation succeeds.
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --deleteOnSuccess --parallelism 20
--outputCodec
--outputCodec
specifies the compression codec that is used to compress copied files online. Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --outputCodec=gz --parallelism 20
[root@emr-header-1 opt]# hdfs dfs -ls oss://yang-hhht/hourly_table/2017-02-01/03
Found 6 items
-rw-rw-rw- 1 938 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/000151.sst.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/1.log.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/2.log.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/OPTIONS-000109.gz
-rw-rw-rw- 1 506 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/emp01.txt.gz
-rw-rw-rw- 1 506 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/emp06.txt.gz
- none: Jindo DistCp does not compress copied files. If the files have been compressed, Jindo DistCp decompresses them.
- keep: Jindo DistCp copies files with no change in the compression.
--outputManifest and --requirePreviousManifest
--outputManifest
generates a manifest file that contains the information about all the files copied
by Jindo DistCp. The information includes the destination files, source files, and
file sizes.
--requirePreviousManifest
to false
. By default, the file is compressed in the GZ format. This is the only supported
format. jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --outputManifest=manifest-2020-04-17.gz --requirePreviousManifest=false --parallelism 20
[root@emr-header-1 opt]# hadoop fs -text oss://yang-hhht/hourly_table/manifest-2020-04-17.gz > before.lst
[root@emr-header-1 opt]# cat before.lst
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/000151.sst","baseName":"2017-02-01/03/000151.sst","srcDir":"oss://yang-hhht/hourly_table","size":2252}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/1.log","baseName":"2017-02-01/03/1.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/2.log","baseName":"2017-02-01/03/2.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/OPTIONS-000109","baseName":"2017-02-01/03/OPTIONS-000109","srcDir":"oss://yang-hhht/hourly_table","size":4891}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/emp01.txt","baseName":"2017-02-01/03/emp01.txt","srcDir":"oss://yang-hhht/hourly_table","size":1016}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/emp06.txt","baseName":"2017-02-01/03/emp06.txt","srcDir":"oss://yang-hhht/hourly_table","size":1016}
--outputManifest and --previousManifest
--outputManifest
generates a manifest file that contains a list of both previously and newly copied
files. --previousManifest
generates a manifest file that contains a list of previously copied files. This way,
you can recreate the full history of operations and see what files are copied by the
current job:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --outputManifest=manifest-2020-04-18.gz --previousManifest=oss://yang-hhht/hourly_table/manifest-2020-04-17.gz --parallelism 20
[root@emr-header-1 opt]# hadoop fs -text oss://yang-hhht/hourly_table/manifest-2020-04-18.gz > current.lst
[root@emr-header-1 opt]# diff before.lst current.lst
3a4,5
> {"path":"oss://yang-hhht/hourly_table/2017-02-01/03/5.log","baseName":"2017-02-01/03/5.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}
> {"path":"oss://yang-hhht/hourly_table/2017-02-01/03/6.log","baseName":"2017-02-01/03/6.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}
--copyFromManifest
--copyFromManifest
to specify a manifest file that was previously generated by --outputManifest
and copy the files listed in the manifest file to the destination directory. Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --previousManifest=oss://yang-hhht/hourly_table/manifest-2020-04-17.gz --copyFromManifest --parallelism 20
--srcPrefixesFile
--srcPrefixesFile
enables Jindo DistCp to copy files in multiple folders at a time.
[root@emr-header-1 opt]# hdfs dfs -ls oss://yang-hhht/hourly_table
Found 4 items
drwxrwxrwx - 0 1970-01-01 08:00 oss://yang-hhht/hourly_table/2017-02-01
drwxrwxrwx - 0 1970-01-01 08:00 oss://yang-hhht/hourly_table/2017-02-02
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --srcPrefixesFile file:///opt/folders.txt --parallelism 20
[root@emr-header-1 opt]# cat folders.txt
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-01
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-02
--groupBy and -targetSize
Reading a large number of small files from HDFS affects the data processing performance. Therefore, we recommend that you use Jindo DistCp to merge small files into large files of a specified size. This optimizes analysis performance and reduces costs.
[root@emr-header-1 opt]# hdfs dfs -ls /data/incoming/hourly_table/2017-02-01/03
Found 8 items
-rw-r----- 2 root hadoop 2252 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/000151.sst
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/1.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/2.log
-rw-r----- 2 root hadoop 4891 2020-04-17 21:08 /data/incoming/hourly_table/2017-02-01/03/5.log
-rw-r----- 2 root hadoop 4891 2020-04-17 21:08 /data/incoming/hourly_table/2017-02-01/03/6.log
-rw-r----- 2 root hadoop 4891 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/OPTIONS-000109
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp01.txt
-rw-r----- 2 root hadoop 1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp06.txt
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --targetSize=10 --groupBy='.*/([a-z]+). *.txt' --parallelism 20
[root@emr-header-1 opt]# hdfs dfs -ls oss://yang-hhht/hourly_table/2017-02-01/03/
Found 1 items
-rw-rw-rw- 1 2032 2020-04-17 21:18 oss://yang-hhht/hourly_table/2017-02-01/03/emp2
--enableBalancePlan
--enableBalancePlan
to optimize the job allocation plan. This improves the copy performance of Jindo
DistCp. If you do not specify an application plan, files are randomly allocated. jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --enableBalancePlan --parallelism 20
--groupby
or --targetSize
.
--enableDynamicPlan
--enableDynamicPlan
to optimize the job allocation plan. This improves the copy performance of Jindo
DistCp. jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --enableDynamicPlan --parallelism 20
--groupby
or --targetSize
.
--enableTransaction
--enableTransaction
ensures the integrity of job levels and transaction support among jobs. Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --enableTransaction --parallelism 20
--diff
After files are copied, you can use --diff
to check the differences between the source and destination directories.
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --diff
INFO distcp.JindoDistCp: distcp has been done completely
--copyFromManifest
and --previousManifest
to copy the files in the list to the destination directory. This way, the data volume
and file quantity are verified. If Jindo DistCp has performed compression or decompression
operations during the copy process, --diff
does not return accurate file size differences. jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --dest oss://yang-hhht/hourly_table --previousManifest=file:///opt/manifest-2020-04-17.gz --copyFromManifest --parallelism 20
--dest
in the format of /path, hdfs://hostname:ip/path, or hdfs://headerIp:ip/path. Other formats, such as hdfs:///path and hdfs:/path, are not supported.
--queue
--queue specifies the name of the YARN queue where the current DistCp task resides.
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --queue yarnqueue
--bandwidth
--bandwidth specifies the bandwidth allocated to the current DistCp task. This prevents the task from using excessive bandwidth. Unit: MB/s.
Use an AccessKey pair to access OSS
If you want to access OSS from an instance outside EMR or AccessKey-free access is not supported, you can use an AccessKey pair to access OSS. Set the --ossKey, --ossSecret, and --ossEndPoint parameters in the command to specify an AccessKey pair.
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --ossSecret yourkey --secret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --parallelism 20
Write data into OSS in IA or Archive mode
- Example of the Archive mode (--archive):
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --policy archive --parallelism 20
- Example of the IA mode (--ia):
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --policy ia --parallelism 20
Clean up residual files
When you run a DistCp task, files that are not correctly uploaded may be generated in your destination directory. The files are managed by OSS based on uploadId and may be invisible to users. In this case, you can specify the --cleanUpPending parameter in the command. This way, the system automatically cleans up the residual files after the task is completed. Alternatively, you can also clean up the files in the OSS console.
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --cleanUpPending --parallelism 20
Use Amazon S3 as a data source
You can use the --s3Key, --s3Secret, and --s3EndPoint parameters in a command to specify information related to Amazon S3.
jindo distcp jindo-distcp-2.7.3.jar --src s3a://yourbucket/ --dest oss://<your_bucket>/hourly_table --s3Key yourkey --s3Secret yoursecret --s3EndPoint s3-us-west-1.amazonaws.com
<configuration>
<property>
<name>fs.s3a.access.key</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3.endpoint</name>
<value>s3-us-west-1.amazonaws.com</value>
</property>
</configuration>
Example: jindo distcp /tmp/jindo-distcp-2.7.3.jar --src s3://smartdata1/ --dest s3://smartdata1/tmp --s3EndPoint s3-us-west-1.amazonaws.com
Check DistCp Counters
Distcp Counters
Bytes Destination Copied=11010048000
Bytes Source Read=11010048000
Files Copied=1001
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
Bytes Destination Copied
and Bytes Source Read
may be different.