大数据开发之Spark Shuffle 原理数据分析
湟中娱乐新闻网 2025-10-01
val dep = rddAndDep._2
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
partitionId
} else context.taskAttemptId()
dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
并不需要ShuffleWriter在ShuffleWriteProcessor的write作法其所首不须通过ShuffleManager给予writer实例,然后再次由除此以外的writer拒绝执行说明的write语义:
def write(
rdd: RDD[_],
dep: ShuffleDependency[_, _, _],
mapId: Long,
context: TaskContext,
partition: Partition): MapStatus = {
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
dep.shuffleHandle,
mapId,
context,
createMetricsReporter(context))
writer.write(
rdd.iterator(partition, context).asInstanceOf[Iterator[_
Spark根据ShuffleHandle的相异适用除此以外的ShuffleWriter的意味着,包括:UnsafeShuffleWriter、BypassMergeSortShuffleWriter和SortShuffleWriter三种:
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized { mapTaskIds.add(mapId) }
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)
}
而说明的ShuffleHandle的并不需要是根据shuffle等价基本上的partition有数、是否是并不需要拒绝执行依序或者转化成等情形来确认的:
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
new BypassMergeSortShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
new SerializedShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
new BaseShuffleHandle(shuffleId, dependency)
}
}
上头分别详述这三种ShuffleHandle的并不需要语义:
1、BypassMergeSortShuffleHandle
BypassMergeSortShuffleHandle相异BypassMergeSortShuffleWriter,当不并不需要做map后端的转化成,并且中区有数高于SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD(配置文件200)时适用这种方式也,可以跳过在磁盘其所依序和转化成的方式中:
if (dep.mapSideCombine) {
false
} else {
val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
dep.partitioner.numPartitions
}
BypassMergeSortShuffleWriter不并不需要将shuffle纪录不须录入磁盘文件系统结构设计其所,而是根据资讯的key最大或许到reduce中区,并创始相异的DiskBlockObjectWriter并不一定将资讯纪录必要录入到各中区相异的临时副本其所;【关注唯硅谷,巧妙学IT】最后再次将相异中区的临时副本新设生产data和index副本即可。
2、SerializedShuffleHandle,该方式也适用了tungsten基于磁盘压缩的前提,消除shuffle方式中其所的磁盘压力从而意味着shuffle加速。
适用该方式也并不需要实现三个条件:
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
s"${dependency.serializer.getClass.getName}, does not support object relocation")
false
} else if (dependency.mapSideCombine) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
s"map-side aggregation")
false
} else if (numPartitions> MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
1)dependency的序列化支架大力支持relocation
如果用户程序其所适用DataFrame、DataSet资讯模型则等中层适用SparkSQLLinux,当注意到shuffle的情形下,优化支架在制定天体物理学计划亦会构建ShuffleExchangeExec终端,并适用UnsafeRowSerializer,该序列化支架的supportsRelocationOfSerializedObjects物件为true,即大力支持对序列化并不一定顺利完成依序;另外,如果用户指明适用KryoSerializer序列化支架或者纪录的key和value为原生资讯子类或者string子类也适用KryoSerializer序列化支架,此时upportsRelocationOfSerializedObjects物件为true;否则适用配置文件的JavaSerializer,该物件的最大值为false。
2)不并不需要拒绝执行map后端新设:
如果适用非转化成类等价例如join相关等价时相异dependency的mapSideCombine物件最大值为false;如果适用转化成类等价如reduceByKey、aggregateByKey、combineByKey等mapSideCombine物件为true;请注意拒绝执行groupByKey等价时该物件也为false:
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
3)shuffledependency的partition有数高于MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE,即16777215。
3、如果没有适用从前两种ShuffleHandle,则适用BaseShuffleHandle,相异ShuffleWriter为SortShuffleWriter。
区域性以上数据分析,Spark根据ShuffleHandle的相异而并不需要除此以外的ShuffleWriter的意味着,再次一我们来详尽阐述这三种ShuffleWriter其所最为典型的意味着方式也SortShuffleWriter的拒绝执行基本原理,在更更进一步的Spark磁盘行政的文章其所我们将对UnsafeShuffleWriter以磁盘的角度顺利完成阐述;而BypassMergeSortShuffleWriter则是SortShuffleWriter的特殊情形,即跳过了map依序和转化成大多。
SortShuffleWriterSortShuffleWriter通过insertAll作法首不须将参加shuffle的资讯录入到shuffle文件系统条目其所,当文件系统条目的密闭增大到没有继续录入时则将资讯溢写成到硬盘其所。
Shuffle文件系统条目的意味着有两种文件系统:如果参加shuffle的等价并不需要做转化成则将资讯纪录录入到文件系统PartitionedAppendOnlyMap其所,该结构设计是一个HashMap,key为partitionId和纪录的key最大值,并且每处理一个纪录外亦会更新相异的key的value最大值;如果等价不并不需要做转化成则适用PartitionedPairBuffer的文件系统,并将纪录的key和value排序断开到buffer有字符串其所:
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update) //更新hashmap其所的value最大值
maybeSpillCollection(usingMap = true)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) //将纪录排序断开到buffer其所
maybeSpillCollection(usingMap = false)
}
}
由此可见,适用PartitionedAppendOnlyMap这种文件系统可以节约磁盘密闭、增大硬盘溢写成以及shuffle拉取的互联开销,这也是reduceByKey比groupByKey效能更快的因素;同时,也是为什么shuffle在没有转化成的情形下并不需要适用tungsten的传输方式也来更进一步提升拒绝执行效能。
每次录入纪录在此之后则亦会断定是否是并不需要将磁盘其所的资讯顺利完成溢写成,主要的断定语义是当shuffle文件系统的资讯量远超当从前的阈最大值在此之后尝试适配shuffle文件系统条目,当适配在此之后的密闭仍然不足的情形下则开始拒绝执行溢写成语义:
if (elementsRead % 32 == 0 && currentMemory>= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory>= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead> numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
依序:如果在shuffle依赖其所指明了依序的排序或者转化成搜索算法则定义依序线性keyComparator:
private def comparator: Option[Comparator[K]] = {
if (ordering.isDefined || aggregator.isDefined) {
Some(keyComparator)
} else {
None
}
}
在具有依序线性的情形下,PartitionedAppendOnlyMap和PartitionedPairBuffer分别意味着了partitionedDestructiveSortedIterator线性,对资讯纪录首不须根据中区依序,然后再次根据key顺利完成依序:
/**
* A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.
*/
def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] =
(a: (Int, K), b: (Int, K)) => {
val partitionDiff = a._1 - b._1
if (partitionDiff != 0) {
partitionDiff
} else {
keyComparator.compare(a._2, b._2)
}
}
硬盘刷写成:通过从在后的方式中将并不需要溢写成的资讯在磁盘其所依序并PCB到一个迭代支架并不一定inMemoryIterator其所,然后再次通过ExternalSorter呼叫spillMemoryIteratorToDisk作法将依序后的资讯写成到IO控制器关键点,当远超控制器关键点的容量大上限(固定式项:spark.shuffle.file.buffer,配置文件32K) 或者纪录的个有数超过SHUFFLE_SPILL_BATCH_SIZE的最大值(固定式项:spark.shuffle.spill.batchSize,配置文件10000),则将资讯flush到硬盘。因此如果一个扫雷shuffle溢写成的资讯量较少,可以尽量调大相关固定式参有数从而减轻硬盘IO的效能开销:
val (blockId, file) = diskBlockManager.createTempShuffleBlock() //创始临时溢写成副本,并为其转化成blockId
val writer: DiskBlockObjectWriter =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) //构建fileBufferSize大小的关键点writer
try {
while (inMemoryIterator.hasNext) {
val partitionId = inMemoryIterator.nextPartition()
require(partitionId>= 0 && partitionId
s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
inMemoryIterator.writeNext(writer) //将并不一定的键最大值对写成到关键点
elementsPerPartition(partitionId) += 1
objectsWritten += 1
if (objectsWritten == serializerBatchSize) { //当并不一定有数远超上限则flush到硬盘其所
flush()
}
将磁盘其所的纪录不须顺利完成依序并刷写成到临时硬盘副本在此之后,再次将该副本追加到spills条目其所,spills条目是一个ArrayBuffer[SpilledFile]的文件系统,坚称一个task所有的待新设副本的给定:
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills += spillFile
新设溢写成副本:当所有并不需要shuffle的纪录外处理完成并溢写成在此之后,ExternalSorter针对每个map中区呼叫writePartitionedMapOutput作法将溢写成到硬盘的临时副本和以及磁盘其所资讯顺利完成归并依序,并录入到一个data副本其所,说明意味着方式中如下:
1.根据shuffleId、mapId创始该中区溢写成副本新设后的data副本,副本名为:
name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
2.首不须针对每个reduce中区,迭代所有的临时溢写成副本和磁盘其所的纪录,将属于该中区的纪录根据key最大值顺利完成转化成运算;如果并不需要依序,则不须对纪录顺利完成归并依序再次根据key最大值做转化成;最后转化成一个(partitionId,partitionId相异的纪录条目) 的二元组迭代支架。
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] = {
val readers = spills.map(new SpillReader(_))
val inMemBuffered = inMemory.buffered
(0 until numPartitions).iterator.map { p =>
val inMemIterator = new IteratorForPartition(p, inMemBuffered)
val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
if (aggregator.isDefined) {
// Perform partial aggregation across partitions
(p, mergeWithAggregation(
iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
} else if (ordering.isDefined) {
// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
// sort the elements without trying to merge them
(p, mergeSort(iterators, ordering.get))
} else {
(p, iterators.iterator.flatten)
}
}
}
3.迭代第2步其所转化成的二元组迭代支架,依次为每个reduce partitionId创始一个ShufflePartitionPairsWriter并不一定,并将partitionId相异的所有纪录的key和value最大值录入到在方式中1其所创始的data副本其所:
for ((id, elements)
val blockId = ShuffleBlockId(shuffleId, mapId, id)
var partitionWriter: ShufflePartitionWriter = null
var partitionPairsWriter: ShufflePartitionPairsWriter = null
TryUtils.tryWithSafeFinally {
partitionWriter = mapOutputWriter.getPartitionWriter(id)
partitionPairsWriter = new ShufflePartitionPairsWriter(
partitionWriter,
serializerManager,
serInstance,
blockId,
context.taskMetrics().shuffleWriteMetrics)
if (elements.hasNext) {
for (elem
partitionPairsWriter.write(elem._1, elem._2)
}
}
}
并不需要说明的是,在将shuffle资讯顺利完成新设的方式中还亦会累计各个patition相异资讯所占去用的传输密闭的大小,这些资讯适用partitionLengths有字符串顺利完成纪录,partitionLengths有字符串是一个下标为partitionId、最大值为相异中区的资讯宽度的宽整形有字符串。
构建查找副本:由于在创始的data副本的方式中其所还构建了partitionLengths有字符串,就可以方便的知道各中区的资讯在data副本其所的的单位,以便于在reduce之前并能集成data副本其所的资讯,消除了大量shuffle副本的全量扫描,从而提高shuffle读过之前的处理效能。再次一详述为每个data副本构建查找副本的方式中:
1.在IndexShuffleBlockResolver类的writeIndexFileAndCommit作法其所,根据shuffleId、mapId即"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index" 作为查找副本的名称,并且该副本名也相异传输系统其所的BlockId,然后通过相异executor的DiskBlockManager并不一定在localDir(一般是spark.local.dir固定式项)目录其所创始一个index副本;
def getIndexFile(
shuffleId: Int,
mapId: Long,
dirs: Option[Array[String]] = None): File = {
val blockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
dirs
.map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir, blockId.name))
.getOrElse(blockManager.diskBlockManager.getFile(blockId))
}
2.根据partitionLengths有字符串其所的length最大值顺利完成一一以此类推计算,赢得每个reduce task的资讯在data副本其所的起始的单位offset,并将其纪录在index副本其所,用于更更进一步并能集成相异中区的shuffle资讯:
var offset = 0L
out.writeLong(offset)
for (length
offset += length
out.writeLong(offset)
}
Reduce 之前的资讯处理在shuffle的Map之前也即shuffle write之前完成了资讯的溢写成和新设,再次一重回shuffle的Reduce之前也即shuffle read 之前。
我们知道,所有RDD则亦会拒绝执行其compute作法,在ShuffleRDD的compute作法其所亦会子程序一个reader并不一定并呼叫其read()作法并在意味着了如上语义:
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
SparkEnv.get.shuffleManager.getReader(
dep.shuffleHandle, split.index, split.index + 1, context, metrics)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
子程序BlockStoreShuffleReader并不一定在reduce之前,SortShuffleManager首不须通过MapOutputTracker根据shuffleId从mapStatuses其所给予blocksByAddress并不一定,该并不一定的文件系统为:Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],坚称一个reduce中区所需拉取的shuffle副本的传输资讯(包括BlockManagerId以及BlockId、元组有数、mapId的给定);再次一创始BlockStoreShuffleReader并不一定用于擦除blocksByAddress其所所指明的shuffle副本:
override def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
}
其其所,BlockId为ShuffleBlockId的实例,其编码方式也为"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId,block所占去元组有数通过MapStatus给予:
for (part
val size = status.getSizeForBlock(part)
if (size != 0) {
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
((ShuffleBlockId(shuffleId, status.mapId, part), size, mapIndex))
}
}
拒绝执行read()作法BlockStoreShuffleReader通过拒绝执行其read()作法从本地给予或者通过互联从其他终端拉取shuffle资讯,并对这些资讯顺利完成更进一步处理,再次一我们来看一下read()作法的说明意味着:
1、给予shuffle map资讯在read()作法其所首不须构建了ShuffleBlockFetcherIterator实例,并通过ShuffleBlockFetcherIterator的initialize()作法来意味着shuffle纪录的擦除:
1)呼叫partitionBlocksByFetchMode作法根据shuffle副本的方位资讯区分为相异的队列:
根据blocksByAddress其所带上的shuffle副本的位址资讯,如果blockManager相异的executor与当从前reduce 勤务的executor完全一致,则将该的blockManager相异的shuffle副本传输资讯放入localBlocks条目其所;否则,如果blockManager所在的终端与当从前reduce 勤务的终端完全一致,则将该blockManager相异的shuffle副本传输资讯抽出hostLocalBlocks条目其所;否则shuffle副本资讯发挥作用于远程终端其所,将相异的shuffle副本传输资讯抽出fetchRequests队列其所:
for ((address, blockInfos)
//blockManager相异的executor与当从前reduce 勤务的executor完全一致
if (Seq(blockManager.blockManagerId.executorId, fallback).contains(address.executorId)) {
checkBlockSizes(blockInfos)
val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(
blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), doBatchFetch)
numBlocksToFetch += mergedBlockInfos.size
localBlocks ++= mergedBlockInfos.map(info => (info.blockId, info.mapIndex))
localBlockBytes += mergedBlockInfos.map(_.size).sum
} else if (blockManager.hostLocalDirManager.isDefined &&
address.host == blockManager.blockManagerId.host) { //blockManager所在的终端与当从前reduce 勤务的终端完全一致
checkBlockSizes(blockInfos)
val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(
blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), doBatchFetch)
numBlocksToFetch += mergedBlockInfos.size
val blocksForAddress =
mergedBlockInfos.map(info => (info.blockId, info.size, info.mapIndex))
hostLocalBlocksByExecutor += address -> blocksForAddress
hostLocalBlocks ++= blocksForAddress.map(info => (info._1, info._3))
hostLocalBlockBytes += mergedBlockInfos.map(_.size).sum
} else { //否则shuffle副本资讯发挥作用于远程终端其所
remoteBlockBytes += blockInfos.map(_._2).sum
collectFetchRequests(address, blockInfos, collectedRemoteRequests)
}
最大或许请注意的是,从远后端拉取资讯的情形下如果资讯量太大容易导致互联阻碍,因此spark其所通过targetRemoteRequestSize 来限制reduce task每次远程拉取的资讯量,如果超过该阈最大值则将当从前的blockPCB为一个FetchRequest并放置到collectedRemoteRequests条目其所作为更更进一步资讯拉取的一个基本短剧:
if (curRequestSize>= targetRemoteRequestSize || mayExceedsMaxBlocks) {
curBlocks = createFetchRequests(curBlocks, address, isLast = false,
collectedRemoteRequests)
private def createFetchRequests(
curBlocks: Seq[FetchBlockInfo],
address: BlockManagerId,
isLast: Boolean,
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = {
val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks, doBatchFetch)
numBlocksToFetch += mergedBlocks.size
var retBlocks = Seq.empty[FetchBlockInfo]
if (mergedBlocks.length
collectedRemoteRequests += createFetchRequest(mergedBlocks, address)
} else {
mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks =>
if (blocks.length == maxBlocksInFlightPerAddress || isLast) {
collectedRemoteRequests += createFetchRequest(blocks, address)
其其所,targetRemoteRequestSize 的最大值为 math.max(maxBytesInFlight / 5, 1L),maxBytesInFlight 通过固定式项SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024(配置文件48M)来指明;
如果互联带宽不是停滞并且并不需要拉取的shuffle资讯量较少,则可以尽量调大REDUCER_MAX_SIZE_IN_FLIGHT即固定式项spark.reducer.maxSizeInFlight的最大值,而会。
2)从在后的方式中根据shuffle副本传输的方位相异给予了三个催促条目,再次一分别给予各个条目其所的资讯:
// Send out initial requests for blocks, up to our maxBytesInFlight
fetchUpToMaxBytes() //跨终端拉取资讯
// Get Local Blocks
fetchLocalBlocks()
if (hostLocalBlocks.nonEmpty) {
blockManager.hostLocalDirManager.foreach(fetchHostLocalBlocks)
}
以给予远后端的shuffle副本拉取为例,ShuffleBlockFetcherIterator迭代fetchRequests,首不须给予各个request相异的blockManager,然后向该blockManager递送资讯拉取催促:
while (isRemoteBlockFetchable(fetchRequests)) {
val request = fetchRequests.dequeue()
val remoteAddress = request.address
send(remoteAddress, request)
}
}
基本上的催促递送是在NettyBlockTransferService的fetchBlocks作法其所意味着的,首不须创始TransportClient实例,然后由OneForOneBlockFetcher根据TransportClient实例、appId、executorId等向资讯所在的BlockManager递送拉取传闻FetchShuffleBlocks并处理调回的结果:
@Override
public void fetchBlocks(
String host,
int port,
String execId,
String[] blockIds,
BlockFetchingListener listener,
DownloadFileManager downloadFileManager) {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
(inputBlockId, inputListener) -> {
// Unless this client is closed.
if (clientFactory != null) {
TransportClient client = clientFactory.createClient(host, port, maxRetries> 0);
new OneForOneBlockFetcher(client, appId, execId,
inputBlockId, inputListener, conf, downloadFileManager).start();
当相异BlockManager的NettyBlockRpcServer接收到FetchShuffleBlocks传闻后,则根据ShuffleBlockId呼叫BlockManager的getLocalBlockData作法从本地的shuffle副本其所擦除所需的资讯:
case fetchShuffleBlocks: FetchShuffleBlocks =>
val blocks = fetchShuffleBlocks.mapIds.zipWithIndex.flatMap { case (mapId, index) =>
if (!fetchShuffleBlocks.batchFetchEnabled) {
fetchShuffleBlocks.reduceIds(index).map { reduceId =>
blockManager.getLocalBlockData(
ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapId, reduceId))
}
最大或许请注意的是,在getLocalBlockData作法的意味着代码其所我们看到了从在后提到的IndexShuffleBlockResolver的实例:
override def getLocalBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
logDebug(s"Getting local shuffle block ${blockId}")
try {
shuffleManager.shuffleBlockResolver.getBlockData(blockId)
}
由于IndexShuffleBlockResolver并不一定在shuffle map之前副本新设的方式中其所创始了index副本,在reduce之前就可以根据shuffleId、mapId等资讯给予说明的index副本,然后根据reduceId给予相异中区的资讯宽度最大值在index副本其所的的单位,并能地从data副本其所有别于到相异partition的资讯:
override def getBlockData(
blockId: BlockId,
dirs: Option[Array[String]]): ManagedBuffer = {
val (shuffleId, mapId, startReduceId, endReduceId) = blockId match {
case id: ShuffleBlockId =>
(id.shuffleId, id.mapId, id.reduceId, id.reduceId + 1)
val indexFile = getIndexFile(shuffleId, mapId, dirs)
val channel = Files.newByteChannel(indexFile.toPath)
channel.position(startReduceId * 8L)
val in = new DataInputStream(Channels.newInputStream(channel))
try {
val startOffset = in.readLong()
channel.position(endReduceId * 8L)
val endOffset = in.readLong()
val actualPosition = channel.position()
val expectedPosition = endReduceId * 8L + 8
if (actualPosition != expectedPosition) {
throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " +
s"expected $expectedPosition but actual position was $actualPosition.")
}
new FileSegmentManagedBuffer(
transportConf,
getDataFile(shuffleId, mapId, dirs),
startOffset,
endOffset - startOffset)
}
2、拒绝执行转化成如果指明了转化成线性则呼叫转化成支架(Aggregator)的combine CombinersByKey作法在reduce后端对资讯顺利完成转化成:
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
// We are reading values that are already combined
val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else {
// We don't know the value type, but also don't care ;还有 the dependency *should*
// have made sure its compatible w/ this aggregator, which will convert the value
// type to the combined type C
val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
}
3、拒绝执行依序如果并不需要根据key做依序(例如sortByKey等价),则呼叫ExternalSorter的insertAll作法对资讯顺利完成文件系统,当文件系统密闭没有适配时则不须在磁盘其所依序然后拒绝执行溢写成,这个方式中和map之前insertAll作法十分相似,reduce之前的控制器又可以作为下一个shuffle map之前或者是action的资讯源:
// Sort the output if there is a sort ordering defined.
val resultIter = dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data.
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was finished/cancelled.
context.addTaskCompletionListener[Unit](_ => {
sorter.stop()
})
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
总结Shuffle的处理方式中颇为复杂,并且由于其依序、转化成、硬盘溢写成以及资讯分发等方式中显然亦会造成CPU、磁盘、硬盘IO以及互联通信等上都的效能开销,我们在基本上的业务开发设计其所并不需要尽量消除shuffle,或者通过资讯过滤和其所间结果文件系统等方式也最大限度shuffle带来的效能影响。
上头根据本文的内容,Shuffle的基本原理主要总结如下:
1、Shuffle的造成从根本上是由从父三子RDD的中区支架是否是完全一致决定的,中区支架相异则显然造成勤务相互间的资讯分发;
2、Shuffle的方式中主要分为map和reduce两个之前,也即shuffle write和shuffle read之前:
在shuffle write之前,根据ShuffleHandle的相异,shuffle写成硬盘的方式中将适用相异的ShuffleWriter的意味着类,本文详尽详述了其其所最经典的意味着方式也SortShuffleWriter,该来进行通过PartitionedAppendOnlyMap文件系统在map后端将key最大值相异的资讯转化成在此之后再次顺利完成依序和溢写成,适用该结构设计可以增大资讯纪录占去用的磁盘密闭从而提升shuffle的拒绝执行效能;BypassMergeSortShuffleWriter 则是跳过了磁盘新设和依序的方式中,必要将shuffle资讯溢写成到相异中区的临时副本其所;而适用UnsafeShuffleWriter可以并用到Tungsten磁盘来进行的折扣,通过元组有字符串来民间组织资讯纪录,不仅增大了磁盘密闭的占去用,而且逐年增大了资讯并不一定的创始从而减轻JVM的开始运行压力。
在shuffle read之前,首不须根据shuffleId从mapStatuses其所给予相异的MapStatus条目,然后结合reduceId赢得reduce勤务相异的所有shuffle副本的传输资讯,并根据副本所在的传输方位将shuffle纪录分摊到相异的条目其所并分别拒绝执行资讯拉取;如果定义了新设和依序搜索算法则不须在磁盘其所顺利完成新设和依序在此之后再次溢写成到硬盘其所,否则必要将该中区的资讯录入相异的硬盘副本其所,并作为下一个shuffle read之前或者action等价的输入。
写成作者:焦媛
推荐读者过:
大资讯开发设计 Spark 模块之SparkSQL
大资讯开发设计之Spark基础知识
大资讯开发设计之Spark 基础基础知识学习
「转」大资讯开发设计之Spark面试读过书人
。云南妇科医院合肥白癜风治疗费用是多少
小孩鼻炎吃再林阿莫西林颗粒有用吗
郑州看白癜风哪家好
武汉妇科医院哪家正规
孩子感冒咳嗽怎么办
肝癌晚期怎么办
钇90微球治疗在哪家医院能做
肝癌中晚期能活多久一般
钇90选择性内放射治疗肝癌怎么样
-
ICE油菜籽继续反弹,基准止收高0.7%,周线下跌2.7%
视频 2025-10-23渥太华7年末8日消息:周五洲际融资所(ICE)美国油菜籽期货股市高企,其中基准期内收高0.7%,继续从周初缔造的现阶段低点处反弹。 截至股市,11年末期内收高6加元,报收855.10加元吨
-
列宁遗骸仅剩10,为何普京仍不同意下葬?不惜每年花费百万维护
视频 2025-10-23,终于霍查的皮下脂肪都被石蜡和甘冷水等食品所代替了,而后睫毛和眼球也都换成了真的。 相传,有人认真观察过霍查的遗骸,相提并论能看出来人工的痕迹,因此霍查遗骸的留存情况并不乐观。可即以后是
-
【棉花公检】2021年度全国棉花公证筛选量543.15万吨(截至2022.7.10)
时尚 2025-10-23中国纤维质量监测中心 截止到2022年7同年10日24点,2021小麦亚太区全国分别为1072家小麦精制企业按照小麦质量化验体制改革方案的要求精制小麦并进行公证化验,化验使用量240492
-
ANEC:2021/22等奖项巴西玉米出口将达到创纪录的4300万吨
星闻 2025-10-23博易大师 外媒7同月10日消息:哥斯达黎加全国高校谷物过境商联合会(ANEC)称,202122本年度哥斯达黎加小麦过境国预计达到年有的4300万吨,将是人均过境2080万吨的两倍以上,当时
-
CBOT芥菜周报(7月4日-7月8日)
视频 2025-10-23纽约7月底10日最新消息;截至2022年7月底8日当周,纽约衍生品交易所(CBOT)白米衍生品先抑后扬,最后收复了周初收复失地,指标合约比一周前所微涨0.2%。随着通货膨胀关切急剧下降,邻池小麦场反弹