spark ShuffleInMemorySorter 源码
spark ShuffleInMemorySorter 代码
文件路径:/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.sort;
import java.util.Comparator;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.Sorter;
import org.apache.spark.util.collection.unsafe.sort.RadixSort;
final class ShuffleInMemorySorter {
private static final class SortComparator implements Comparator<PackedRecordPointer> {
@Override
public int compare(PackedRecordPointer left, PackedRecordPointer right) {
return Integer.compare(left.getPartitionId(), right.getPartitionId());
}
}
private static final SortComparator SORT_COMPARATOR = new SortComparator();
private final MemoryConsumer consumer;
/**
* An array of record pointers and partition ids that have been encoded by
* {@link PackedRecordPointer}. The sort operates on this array instead of directly manipulating
* records.
*
* Only part of the array will be used to store the pointers, the rest part is preserved as
* temporary buffer for sorting.
*/
private LongArray array;
/**
* Whether to use radix sort for sorting in-memory partition ids. Radix sort is much faster
* but requires additional memory to be reserved memory as pointers are added.
*/
private final boolean useRadixSort;
/**
* The position in the pointer array where new records can be inserted.
*/
private int pos = 0;
/**
* How many records could be inserted, because part of the array should be left for sorting.
*/
private int usableCapacity = 0;
private final int initialSize;
ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) {
this.consumer = consumer;
assert (initialSize > 0);
this.initialSize = initialSize;
this.useRadixSort = useRadixSort;
this.array = consumer.allocateArray(initialSize);
this.usableCapacity = getUsableCapacity();
}
private int getUsableCapacity() {
// Radix sort requires same amount of used memory as buffer, Tim sort requires
// half of the used memory as buffer.
return (int) (array.size() / (useRadixSort ? 2 : 1.5));
}
public void free() {
if (array != null) {
consumer.freeArray(array);
array = null;
}
}
public int numRecords() {
return pos;
}
public void reset() {
// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op.
pos = 0;
if (consumer != null) {
consumer.freeArray(array);
// As `array` has been released, we should set it to `null` to avoid accessing it before
// `allocateArray` returns. `usableCapacity` is also set to `0` to avoid any codes writing
// data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in
// ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to access
// `ShuffleInMemorySorter` when `allocateArray` throws SparkOutOfMemoryError).
array = null;
usableCapacity = 0;
array = consumer.allocateArray(initialSize);
usableCapacity = getUsableCapacity();
}
}
public void expandPointerArray(LongArray newArray) {
assert(newArray.size() > array.size());
Platform.copyMemory(
array.getBaseObject(),
array.getBaseOffset(),
newArray.getBaseObject(),
newArray.getBaseOffset(),
pos * 8L
);
consumer.freeArray(array);
array = newArray;
usableCapacity = getUsableCapacity();
}
public boolean hasSpaceForAnotherRecord() {
return pos < usableCapacity;
}
public long getMemoryUsage() {
return array.size() * 8;
}
/**
* Inserts a record to be sorted.
*
* @param recordPointer a pointer to the record, encoded by the task memory manager. Due to
* certain pointer compression techniques used by the sorter, the sort can
* only operate on pointers that point to locations in the first
* {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.
* @param partitionId the partition id, which must be less than or equal to
* {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
*/
public void insertRecord(long recordPointer, int partitionId) {
if (!hasSpaceForAnotherRecord()) {
throw new IllegalStateException("There is no space for new record");
}
array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
pos++;
}
/**
* An iterator-like class that's used instead of Java's Iterator in order to facilitate inlining.
*/
public static final class ShuffleSorterIterator {
private final LongArray pointerArray;
private final int limit;
final PackedRecordPointer packedRecordPointer = new PackedRecordPointer();
private int position = 0;
ShuffleSorterIterator(int numRecords, LongArray pointerArray, int startingPosition) {
this.limit = numRecords + startingPosition;
this.pointerArray = pointerArray;
this.position = startingPosition;
}
public boolean hasNext() {
return position < limit;
}
public void loadNext() {
packedRecordPointer.set(pointerArray.get(position));
position++;
}
}
/**
* Return an iterator over record pointers in sorted order.
*/
public ShuffleSorterIterator getSortedIterator() {
int offset = 0;
if (useRadixSort) {
offset = RadixSort.sort(
array, pos,
PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
} else {
MemoryBlock unused = new MemoryBlock(
array.getBaseObject(),
array.getBaseOffset() + pos * 8L,
(array.size() - pos) * 8L);
LongArray buffer = new LongArray(unused);
Sorter<PackedRecordPointer, LongArray> sorter =
new Sorter<>(new ShuffleSortDataFormat(buffer));
sorter.sort(array, 0, pos, SORT_COMPARATOR);
}
return new ShuffleSorterIterator(pos, array, offset);
}
}
相关信息
相关文章
spark BypassMergeSortShuffleWriter 源码
spark ShuffleExternalSorter 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦