Configuring the number of sub-partitions for a reduce task
Without hash table-based aggregation, the number of partitions equals the number of reduce tasks, with the reduce task requiring the map output only for its particular partition from several map tasks across the cluster. When hash table-based aggregation is enabled, the framework splits partitions into sub-partitions, with each reduce task handling multiple sub-partitions for each map task output.
About this task
Configure the number of sub-partitions that must be split for the output of each map task through the pmr.subpartition.num parameter. Creating sub-partitions avoids memory issues, when a reduce task could use too much memory if it emits outputs only at the end of a partition. With this parameter, the sub-partition becomes more fine-grained than the partition, so that the reduce task emits outputs at the end of each sub-partition within a partition.
- To avoid memory issues, set pmr.subpartition.num to be greater than the number of tasks a reducer can run (mapred.reduce.num) and set pmr.framework.aggregation to hash or none.
- If you do not have memory concerns, set pmr.subpartition.num to equal mapred.reduce.num and set pmr.framework.aggregation to hash or none.
Assuming pmr.subpartition.num is N and mapred.reduce.num is M, each reduce task gets at least N/M sub-partitions. Front N%M reduce tasks get an extra sub-partition. For example, if pmr.subpartition.num is 8 and mapred.reduce.num is 3, reduce task 1 gets sub-partitions 0, 1, and 2. Reduce task 2 gets sub-partitions 3, 4, and 5. Reduce task 3 gets only sub-partitions 6 and 7.
- Enable services to be pre-started through the preStartApplication element in the Consumer section of the application profile.
- If you do not want strict data sorting, set
pmr.framework.aggregation=hash
. If you want to implement aggregation in your reducer class code, setpmr.framework.aggregation=none
. - When pmr.framework.aggregation is set to hash or
none, configuring pmr.subpartition.num to be greater than
mapred.reduce.tasks avoids out-of-memory issues. For a balanced workload, we
recommend that you configure (
pmr.subpartition.num=X*mapred.reduce.tasks
).Determine the value of pmr.subpartition.num according to the ratio between InputSize and MapOutputSize, maximum input size, and number of reduce tasks. For example, if the ratio of map input to intermediate data is 1:2 (1 MB input generates 2 MB of intermediate data), the maximum input size is 55 MB, and the total number of input files is 100, the total size is 2*55MB*100. In this case, we would split 11000 MB into sub-partitions to fit the reducer memory limit.