分布式文件系统按块(Block)存放数据,文件大小比块大小(64MB)小的文件称为小文件。分布式系统不可避免会产生小文件,比如SQL或其他分布式引擎的计算结果、Tunnel数据采集。合并小文件可以达到优化系统性能的目的。本文为您介绍如何在MaxCompute中合并小文件。
背景信息
小文件过多,会带来以下问题:
MaxCompute处理单个大文件比处理多个小文件更有效率,小文件过多会影响整体的执行性能。
小文件过多会给Pangu文件系统带来一定的压力,影响存储空间的有效利用。
因此从存储和性能两方面考虑,都需要将计算过程中产生的小文件合并。MaxCompute在小文件处理方面的功能日趋完善,主要体现在以下方面:
默认情况下,当作业完成之后,如果满足一定的条件,系统会自动分配一个Fuxi Task进行小文件合并,即使用过程中经常看到的MergeTask。
默认情况下,一个Fuxi Instance不再只能处理一个小文件,而是最多可以处理100个小文件,同时可以通过
odps.sql.mapper.merge.limit.size
参数来控制读取文件总大小。MaxCompute后台会定期扫描元数据库,对小文件较多的表或分区进行小文件合并,这个合并过程对您透明。
但是通过元数据发现仍然存在大量的小文件未被合并掉,例如有的表一直在写入,无法自动执行合并操作,需要您先将写入作业停止,然后再手工进行小文件合并操作。
注意事项
使用合并小文件功能需要用到计算资源,如果您购买的实例是按量计费,会产生相关费用,具体计费规则与SQL按量计费保持一致,详情请参见计算费用。
合并小文件命令不支持Transactional表,如果需要对Transactional表进行小文件合并,请参见COMPACTION。
查看表的文件数
命令语法
您可以通过如下命令查看表的文件数:
desc extended <table_name> [partition (<pt_spec>)];
参数说明
table_name:必填。待查看表的名称。
pt_spec:可选。待查看分区表的指定分区。格式为
(partition_col1 = partition_col_value1, partition_col2 = partition_col_value2, ...)
。
结果示例
如下图所示,
odl_bpm_wfc_task_log
表的示例分区有3607个文件, 分区大小274MB (287869658Byte),平均文件大小约为0.07MB,需要进行小文件合并。
解决方案
通过元数据检测,分区中含有100个以上的文件且平均文件大小小于64MB的都可以进行小文件合并,合并的方案有如下两种。
即时合并
使用如下命令进行小文件即时合并。
ALTER TABLE <table_name> [partition (<pt_spec>)] MERGE SMALLFILES;
对
odl_bpm_wfc_task_log
表执行即时合并操作,完成后如下所示文件数减少为19个,存储大小也减少到37MB,在合并小文件的同时,存储效率也有了明显提升。一般情况下,使用默认参数可以达到合并小文件的效果。但MaxCompute同时提供一些参数完成定制需求,常用的一些参数如下:
set odps.merge.cross.paths=true|false
设置是否跨路径合并,对于表下面有多个分区的情况,合并过程会将多个分区生成独立的MergeAction进行合并,所以对于
odps.merge.cross.paths
设置为true,并不会改变路径个数,只是分别去合并每个路径下的小文件。set odps.merge.smallfile.filesize.threshold = 64
设置合并文件的小文件大小阈值,文件大小超过该阈值,则不进行合并,单位为MB。此参数可以不进行设置,不设置时,则使用全局变量
odps_g_merge_filesize_threshold
,该参数值默认为32MB,设置时必须大于32MB。set odps.merge.maxmerged.filesize.threshold = 500
设置合并输出文件量的大小,输出文件大于该阈值,则创建新的输出文件,单位为MB。此参数可以不进行设置,不设置时,则使用全局变
odps_g_max_merged_filesize_threshold
,该参数值默认为500MB,设置时必须大于500MB。
PyODPS脚本合并
通过PyODPS异步提交任务,合并前一天任务产出的小文件,脚本示例如下:
import os from odps import ODPS # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用 Access Key ID / Access Key Secret 字符串 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) #注意需要替换$table_name为所需的表名 table_name = $table_name t = odps.get_table(table_name) #设置merge选项 hints = {'odps.merge.maxmerged.filesize.threshold': 256} #examples for multi partition insts = [] #iterate_partitions list列举ds=$datetime下所有分区,分区格式也可能是其他格式 for partition in t.iterate_partitions(spec="ds=%s" % $datetime): instance=odps.run_merge_files(table_name, str(partition), hints=hints) #从这个logview的Waiting Queue点进去,才可以找到真正执行的logview print(instance.get_logview_address()) insts.append(instance) #等待分区结果 for inst in insts: inst.wait_for_completion()
运行上述脚本需要提前安装PyODPS,安装方法请参见PyODPS。
使用案例
tbcdm.dwd_tb_log_pv_di
是数据稳定性体系识别出来的需要合并小文件的物理表,通过元数据tbcdm.dws_rmd_merge_task_1d
提供的信息,如下图所示,可以看出此表相关分区的文件个数大部分都在1000以上,多的甚至达到7000以上,但平均文件大小有些还不到1MB。使用如下命令采用即时合并方案对其进行小文件合并。
set odps.merge.cross.paths=true;
set odps.merge.smallfile.filesize.threshold=128;
set odps.merge.max.filenumber.per.instance = 2000;
alter table tbcdm.dwd_tb_log_pv_di partition (ds='20151116') merge smallfiles;
合并小文件后结果如下图所示: