在对spark源码进行改进的过程中,想要尝试拿到从spark-shell输入的原始数据,并在控制台中输出相关数据,但是spark将原始数据处理为internalRow格式,使用println函数输出后得到的数据为多元组,类似[0,1000000001,36]的格式,其中可以发现最后一列对应的数据为输入数据的十六进制格式,但是采用任何方法都无法得到原始数据,请问是否有了解相关技术的专家给出相应的解决方案,即如何从spark源码中分离到原始数据。
val aggregationIterator =
new TungstenAggregationIterator(
partIndex,
groupingExpressions,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
(expressions, inputSchema) =>
MutableProjection.create(expressions, inputSchema),
inputAttributes,
iter,
testFallbackStartsAt,
numOutputRows,
peakMemory,
spillSize,
avgHashProbe)
if (!hasInput && groupingExpressions.isEmpty) {
numOutputRows += 1
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
} else {
aggregationIterator
}
上面的代码是spark hashaggregationExec 的核心代码,其创建了一个迭代器,其中iter即为存储数据的迭代器。
能够将原始输入数据从spark中分离出来