DruidDeterminePartitionsJob源码解析

得意犹堪夸世俗,诏黄新湿字如鸦。这篇文章主要讲述DruidDeterminePartitionsJob源码解析相关的知识,希望能为你提供帮助。
功能【DruidDeterminePartitionsJob源码解析】在hadoop_index任务导入Druid形成Segment之前,会有Job通过对原始数据的统计计算,形成Segment的数据分布的具体配置,当tuningConfig.partitionsSpec.type=" single_dim" 或者" dimension" (旧版本)时,前置的对原始数据统计计算任务就走到DeterminePartitionsJob流程当中。
DeterminePartitionsJob主要流程分成三部分
1、determine_partitions_groupby任务,根据assumeGrouped配置情况,判断是否启动determine_partitions_groupby,assumeGrouped的含义是指输入的数据是否已经是Grouped状态。该groupby任务通过执行MR任务,计算原始数据按timestamp,dimension聚合后有多少数据条数。计算结果相当于导入Druid Segment的最终数据条数。
主要涉及DeterminePartitionsGroupByMapper、DeterminePartitionsGroupByReducer类
2、determine_partitions_dimselection任务,通过determine_partitions_groupby任务的结果数据,或者原始数据。选取判断partition的维度字段(dimension),并且通过该维度字段具体数据值,划分好多个数据区间。
主要涉及DeterminePartitionsDimSelectionPostGroupByMapper、DeterminePartitionsDimSelectionAssumeGroupedMapper、DeterminePartitionsDimSelectionReducer。重要逻辑更多集中在DeterminePartitionsDimSelectionReducer中,经过一些列逻辑最终决定选择哪个Dimension作为partition维度,并且通过Dimension的值划分了多个数据区间。
3、加载上一步partition信息,形成List< HadoopyShardSpec> 。下游任务可以通过List< HadoopyShardSpec> 生成导入Druid Segement的具体任务(IndexGeneratorJob)
逻辑处理流程图:

DruidDeterminePartitionsJob源码解析

文章图片

第一步,determine_partitions_groupby任务Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear in the final segment.
通过determine_partitions_groupby过程的MapReduce任务,将原始数据按Interval开始时间和其所有维度组合计算出来,就是对timestamp,dimension进行GROUP BY的效果。
通常在hadoop_index任务中这一过程的MR并不是必须存在的,只有在assumeGrouped配置为false时才会产生determine_partitions_groupby任务
Mapper的逻辑:
格式化当前数据< Interval开始时间戳, 当前行inputRow> ,将其作为Mapper的输出Key
protected void innerMap( InputRow inputRow, Context context ) throws IOException, InterruptedException { final List< Object> groupKey = Rows.toGroupKey( rollupGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), inputRow ); context.write( new BytesWritable(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(groupKey)), NullWritable.get() ); context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).increment(1); } }

Reducer的逻辑:
protected void reduce( BytesWritable key, Iterable< NullWritable> values, Context context ) throws IOException, InterruptedException { context.write(key, NullWritable.get()); }

determine_partitions_groupby过程的输入输出结果:
DruidDeterminePartitionsJob源码解析

文章图片

第二步、determine_partitions_dimselection任务Read grouped data and determine appropriate partitions过程
如果assumeGrouped配置为false的情况下,determine_partitions_dimselection的MapReduce任务将第一步骤的结果数据作为输入。以DeterminePartitionsDimSelectionPostGroupByMapper作为Mapper读入数据。
如果assumeGrouped配置为true的情况下,则任务原始数据是已经是Grouped状态,则把原始数据作为输入。以DeterminePartitionsDimSelectionAssumeGroupedMapper作为Mapper读入数据。
无论使用哪个Mapper,目的只有一个,就是计算DimValueCount,标识每个维度、维度值的数据量。其结构存储为
private DimValueCount(String dim, String value, long numRows) { this.dim = dim; this.value = https://www.songbingjia.com/android/value; this.numRows = numRows; }

在Mapper最主要的逻辑在helper.emitDimValueCounts(context, DateTimes.utc(inputRow.getTimestampFromEpoch()), dims); 方法中,主要emitDimValueCounts()方法代码逻辑:同一个groupKey每行记录对应存储new DimValueCount(" " , " " , 1),用于计算总的数据条数,然后再计算groupKey对每个维度也统计记录条数。计数逻辑在代码中有注释
void emitDimValueCounts( TaskInputOutputContext< ?, ?, BytesWritable, Text> context, DateTime timestamp, Map< String, Iterable< String> > dims ) throws IOException, InterruptedException { final Optional< Interval> maybeInterval = config.getGranularitySpec().bucketInterval(timestamp); if (!maybeInterval.isPresent()) { throw new ISE("No bucket found for timestamp: %s", timestamp); }final Interval interval = maybeInterval.get(); final int intervalIndex = intervalIndexes.get(interval.getStartMillis()); final ByteBuffer buf = ByteBuffer.allocate(4 + 8); buf.putInt(intervalIndex); buf.putLong(interval.getStartMillis()); final byte[] groupKey = buf.array(); // Emit row-counter value. write(context, groupKey, new DimValueCount("", "", 1)); // 统计总的数据条数for (final Map.Entry< String, Iterable< String> > dimAndValues : dims.entrySet()) { final String dim = dimAndValues.getKey(); if (partitionDimension == null || partitionDimension.equals(dim)) { final Iterable< String> dimValues = dimAndValues.getValue(); if (Iterables.size(dimValues) == 1) { // Emit this value. 统计每个维度的数据条数 write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1)); } else { // This dimension is unsuitable for partitioning. Poison it by emitting a negative value. write(context, groupKey, new DimValueCount(dim, "", -1)); } } } }

从DeterminePartitionsDimSelectionReducer的输入输出数据角度查看determine_partitions_dimselection过程的数据逻辑。
DruidDeterminePartitionsJob源码解析

文章图片

DeterminePartitionsDimSelectionReducer逻辑:
determine_partitions_dimselection任务的Mapper阶段结束后,DeterminePartitionsDimSelectionReducer拉取到所有的DimValueCount。
1.按规则拆分数据范围汇总所有维度的可能情况。根据Mapper对每个Dim的统计结果,结合targetRowsPerSegment,maxRowsPerSegment参数,将所有维度和该维度的起始值汇总。
代码642行:while (iterator.hasNext()) 逻辑中会按照维度的数据量对某个dimension计算数据区间[null, end),[start,end),[start,end)……,[start,null)
首先,确定dimension的start=null, end=值
其次,根据currentDimPartition.rows > 0 & & currentDimPartition.rows + dvc.numRows > config.getTargetPartitionSize()条件,确定中间范围dimension的start, end值
最后,确定dimension的start=值,end=null
形成数据结构如下
Adding possible shard with 9 rows and 1 unique values: SingleDimensionShardSpec{dimension=\'country\', start=\'US\', end=\'null\', partitionNum=1, numCorePartitions=-1} Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'null\', end=\'c.example.com\', partitionNum=0, numCorePartitions=-1} Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'c.example.com\', end=\'e.example.com\', partitionNum=1, numCorePartitions=-1} Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'e.example.com\', end=\'g.example.com\', partitionNum=2, numCorePartitions=-1} Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'g.example.com\', end=\'i.example.com\', partitionNum=3, numCorePartitions=-1} Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'i.example.com\', end=\'null\', partitionNum=4, numCorePartitions=-1}

2. Choose best dimension环节。选取最佳维度。根据第一步计算结果的所有维度起始条件。判断选取哪个维度(dimension)作为partition条件
for (final DimPartitions dimPartitions : dimPartitionss.values()) { if (dimPartitions.getRows() != totalRows) { log.info( "Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)", dimPartitions.dim, dimPartitions.getRows(), totalRows ); continue; }// Make sure none of these shards are oversized boolean oversized = false; final SingleDimensionPartitionsSpec partitionsSpec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec(); for (final DimPartition partition : dimPartitions.partitions) { if (partition.rows > partitionsSpec.getMaxRowsPerSegment()) { log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec); oversized = true; } }if (oversized) { continue; }final int cardinality = dimPartitions.getCardinality(); final long distance = dimPartitions.getDistanceSquaredFromTarget(config.getTargetPartitionSize()); if (cardinality > maxCardinality) { maxCardinality = cardinality; maxCardinalityPartitions = dimPartitions; }if (distance < minDistance) { minDistance = distance; minDistancePartitions = dimPartitions; } }

最终在assumeGrouped=false,targetRowsPerSegment=null,maxRowsPerSegment=2,interval=2014-10-20T00:00:00Z/P1D条件下,形成如下Reducer输出数据
[ {"type":"single","dimension":"host","start":null,"end":"c.example.com","partitionNum":0,"numCorePartitions":-1}, {"type":"single","dimension":"host","start":"c.example.com","end":"e.example.com","partitionNum":1,"numCorePartitions":-1}, {"type":"single","dimension":"host","start":"e.example.com","end":"g.example.com","partitionNum":2,"numCorePartitions":-1}, {"type":"single","dimension":"host","start":"g.example.com","end":"i.example.com","partitionNum":3,"numCorePartitions":-1}, {"type":"single","dimension":"host","start":"i.example.com","end":null,"partitionNum":4,"numCorePartitions":-1} ]

第三步、加载Partition形成List< HadoopyShardSpec> Load partitions determined by the previous job.
如同DetermineHashedPartitionsJob类,也存在这个过程。就是将前置几个步骤的输出数据转换为List< HadoopyShardSpec> ,
该List< HadoopyShardSpec> 将指导IndexGeneratorJob将Hadoop数据导入Druid形成Segment
数据处理逻辑图
DruidDeterminePartitionsJob源码解析

文章图片

疑问:1、为什么会有determine_partitions_dimselection过程,single_dim的情况下不是可以指定维度列么?
2、具体选择最好的dimension逻辑和规则是什么?
选择数据基数大
3、Choose best dimension环节,详细逻辑怎么解释
tuningConfig.partitionsSpec.type=" hashed" 逻辑:https://blog.51cto.com/u_10120275/3530686

    推荐阅读