Class BaseHybridHashTable
- java.lang.Object
-
- org.apache.flink.table.runtime.hashtable.BaseHybridHashTable
-
- All Implemented Interfaces:
org.apache.flink.core.memory.MemorySegmentSource,MemorySegmentPool
- Direct Known Subclasses:
BinaryHashTable,LongHybridHashTable
public abstract class BaseHybridHashTable extends Object implements MemorySegmentPool
Base table forLongHybridHashTableandBinaryHashTable.
-
-
Field Summary
Fields Modifier and Type Field Description protected longbuildRowCountprotected intbuildSpillRetBufferNumbersThe number of buffers in the build spill return buffer queue that are actually not write behind buffers, but regular buffers that only have not yet returned.protected LinkedBlockingQueue<org.apache.flink.core.memory.MemorySegment>buildSpillReturnBuffersThe queue of buffers that can be used for write-behind.protected AtomicBooleanclosedFlag indicating that the closing logic has been invoked.protected intcompressionBlockSizeprotected org.apache.flink.runtime.io.compression.BlockCompressionFactorycompressionCodecFactoryprotected booleancompressionEnabledprotected org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.EnumeratorcurrentEnumeratorThe channel enumerator that is used while processing the current partition to create channels for the spill partitions it requires.protected intcurrentRecursionDepthThe recursion depth of the partition that is currently processed.protected org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputViewcurrentSpilledBuildSideThe reader for the spilled-file of the build partition that is currently read.protected org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputViewcurrentSpilledProbeSideThe reader for the spilled-file of the probe partition that is currently read.protected intinitPartitionFanOutprotected LazyMemorySegmentPoolinternalPoolprotected org.apache.flink.runtime.io.disk.iomanager.IOManagerioManagerThe I/O manager used to instantiate writers for the spilled partitions.protected static org.slf4j.LoggerLOGprotected static intMAX_NUM_PARTITIONSThe maximum number of partitions, which defines the spilling granularity.protected static intMAX_RECURSION_DEPTHThe maximum number of recursive partitionings that the join does before giving up.protected longnumSpillFilesprotected intsegmentSizeThe size of the segments used by the hash join buckets.intsegmentSizeBitsintsegmentSizeMaskprotected longspillInBytesprotected inttotalNumBuffersThe total reserved number of memory segments available to the hash join.booleantryDistinctBuildRowTry to make the buildSide rows distinct.
-
Constructor Summary
Constructors Constructor Description BaseHybridHashTable(Object owner, boolean compressionEnabled, int compressionBlockSize, org.apache.flink.runtime.memory.MemoryManager memManager, long reservedMemorySize, org.apache.flink.runtime.io.disk.iomanager.IOManager ioManager, int avgRecordLen, long buildRowCount, boolean tryDistinctBuildRow)
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected abstract voidclearPartitions()voidclose()Closes the hash table.protected org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputViewcreateInputView(org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID id, int blockCount, int lastSegmentLimit)voidensureNumBuffersReturned(int minRequiredAvailable)This method makes sure that at least a certain number of memory segments is in the list of free segments.voidfree()voidfreeCurrent()Free the memory not used.intfreePages()org.apache.flink.core.memory.MemorySegmentgetNextBuffer()Gets the next buffer to be used with the hash-table, either for an in-memory partition, or for the table buckets.org.apache.flink.core.memory.MemorySegment[]getNextBuffers(int bufferSize)Bulk memory acquisition.protected org.apache.flink.core.memory.MemorySegmentgetNotNullNextBuffer()longgetNumSpillFiles()longgetSpillInBytes()longgetUsedMemoryInBytes()static inthash(int hashCode, int level)The level parameter is needed so that we can have different hash functions when we recursively apply the partitioning, so that the working set eventually fits into memory.intmaxInitBufferOfBucketArea(int partitions)Give up to one-sixth of the memory of the bucket area.protected intmaxNumPartition()Bucket area need at-least one and data need at-least one.org.apache.flink.core.memory.MemorySegmentnextSegment()This is the method called by the partitions to request memory to serialize records.intpageSize()Get the page size of each page this pool holds.protected List<org.apache.flink.core.memory.MemorySegment>readAllBuffers(org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID id, int blockCount)voidreleaseMemoryCacheForSMJ()Due to adaptive hash join is introduced, the cached memory segments should be released toMemoryManagerbefore switch to sort merge join.intremainBuffers()voidreturnAll(List<org.apache.flink.core.memory.MemorySegment> memory)Return all pages back into this pool.voidreturnPage(org.apache.flink.core.memory.MemorySegment segment)protected abstract intspillPartition()
-
-
-
Field Detail
-
LOG
protected static final org.slf4j.Logger LOG
-
MAX_RECURSION_DEPTH
protected static final int MAX_RECURSION_DEPTH
The maximum number of recursive partitionings that the join does before giving up.- See Also:
- Constant Field Values
-
MAX_NUM_PARTITIONS
protected static final int MAX_NUM_PARTITIONS
The maximum number of partitions, which defines the spilling granularity. Each recursion, the data is divided maximally into that many partitions, which are processed in one chuck.- See Also:
- Constant Field Values
-
initPartitionFanOut
protected final int initPartitionFanOut
-
buildRowCount
protected final long buildRowCount
-
totalNumBuffers
protected final int totalNumBuffers
The total reserved number of memory segments available to the hash join.
-
internalPool
protected final LazyMemorySegmentPool internalPool
-
ioManager
protected final org.apache.flink.runtime.io.disk.iomanager.IOManager ioManager
The I/O manager used to instantiate writers for the spilled partitions.
-
segmentSize
protected final int segmentSize
The size of the segments used by the hash join buckets. All segments must be of equal size to ease offset computations.
-
buildSpillReturnBuffers
protected final LinkedBlockingQueue<org.apache.flink.core.memory.MemorySegment> buildSpillReturnBuffers
The queue of buffers that can be used for write-behind. Any buffer that is written asynchronously to disk is returned through this queue. hence
-
segmentSizeBits
public final int segmentSizeBits
-
segmentSizeMask
public final int segmentSizeMask
-
closed
protected AtomicBoolean closed
Flag indicating that the closing logic has been invoked.
-
tryDistinctBuildRow
public final boolean tryDistinctBuildRow
Try to make the buildSide rows distinct.
-
currentRecursionDepth
protected int currentRecursionDepth
The recursion depth of the partition that is currently processed. The initial table has a recursion depth of 0. Partitions spilled from a table that is built for a partition with recursion depth n have a recursion depth of n+1.
-
buildSpillRetBufferNumbers
protected int buildSpillRetBufferNumbers
The number of buffers in the build spill return buffer queue that are actually not write behind buffers, but regular buffers that only have not yet returned. This is part of an optimization that the spilling code needs not wait until the partition is completely spilled before proceeding.
-
currentSpilledBuildSide
protected org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView currentSpilledBuildSide
The reader for the spilled-file of the build partition that is currently read.
-
currentSpilledProbeSide
protected org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView currentSpilledProbeSide
The reader for the spilled-file of the probe partition that is currently read.
-
currentEnumerator
protected org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator currentEnumerator
The channel enumerator that is used while processing the current partition to create channels for the spill partitions it requires.
-
compressionEnabled
protected final boolean compressionEnabled
-
compressionCodecFactory
protected final org.apache.flink.runtime.io.compression.BlockCompressionFactory compressionCodecFactory
-
compressionBlockSize
protected final int compressionBlockSize
-
numSpillFiles
protected transient long numSpillFiles
-
spillInBytes
protected transient long spillInBytes
-
-
Constructor Detail
-
BaseHybridHashTable
public BaseHybridHashTable(Object owner, boolean compressionEnabled, int compressionBlockSize, org.apache.flink.runtime.memory.MemoryManager memManager, long reservedMemorySize, org.apache.flink.runtime.io.disk.iomanager.IOManager ioManager, int avgRecordLen, long buildRowCount, boolean tryDistinctBuildRow)
-
-
Method Detail
-
maxNumPartition
protected int maxNumPartition()
Bucket area need at-least one and data need at-least one. In the initialization phase, we can use (totalNumBuffers - numWriteBehindBuffers) Segments. However, in the buildTableFromSpilledPartition phase, only (totalNumBuffers - numWriteBehindBuffers - 2) can be used because two Buffers are needed to read the data.
-
getNextBuffer
public org.apache.flink.core.memory.MemorySegment getNextBuffer()
Gets the next buffer to be used with the hash-table, either for an in-memory partition, or for the table buckets. This method returns null, if no more buffer is available. Spilling a partition may free new buffers then.- Returns:
- The next buffer to be used by the hash-table, or null, if no buffer remains.
-
getNextBuffers
public org.apache.flink.core.memory.MemorySegment[] getNextBuffers(int bufferSize)
Bulk memory acquisition. NOTE: Failure to get memory will throw an exception.
-
getNotNullNextBuffer
protected org.apache.flink.core.memory.MemorySegment getNotNullNextBuffer()
-
nextSegment
public org.apache.flink.core.memory.MemorySegment nextSegment()
This is the method called by the partitions to request memory to serialize records. It automatically spills partitions, if memory runs out.- Specified by:
nextSegmentin interfaceorg.apache.flink.core.memory.MemorySegmentSource- Returns:
- The next available memory segment.
-
freePages
public int freePages()
- Specified by:
freePagesin interfaceMemorySegmentPool- Returns:
- Free page number.
-
pageSize
public int pageSize()
Description copied from interface:MemorySegmentPoolGet the page size of each page this pool holds.- Specified by:
pageSizein interfaceMemorySegmentPool- Returns:
- the page size
-
returnAll
public void returnAll(List<org.apache.flink.core.memory.MemorySegment> memory)
Description copied from interface:MemorySegmentPoolReturn all pages back into this pool.- Specified by:
returnAllin interfaceMemorySegmentPool- Parameters:
memory- the pages which want to be returned.
-
spillPartition
protected abstract int spillPartition() throws IOException- Throws:
IOException
-
ensureNumBuffersReturned
public void ensureNumBuffersReturned(int minRequiredAvailable)
This method makes sure that at least a certain number of memory segments is in the list of free segments. Free memory can be in the list of free segments, or in the return-queue where segments used to write behind are put. The number of segments that are in that return-queue, but are actually reclaimable is tracked. This method makes sure at least a certain number of buffers is reclaimed.- Parameters:
minRequiredAvailable- The minimum number of buffers that needs to be reclaimed.
-
close
public void close()
Closes the hash table. This effectively releases all internal structures and closes all open files and removes them. The call to this method is valid both as a cleanup after the complete inputs were properly processed, and as an cancellation call, which cleans up all resources that are currently held by the hash join.
-
clearPartitions
protected abstract void clearPartitions()
-
free
public void free()
-
freeCurrent
public void freeCurrent()
Free the memory not used.
-
releaseMemoryCacheForSMJ
public void releaseMemoryCacheForSMJ()
Due to adaptive hash join is introduced, the cached memory segments should be released toMemoryManagerbefore switch to sort merge join. Otherwise, open sort merge join operator maybe fail because of insufficient memory.Note: this method should only be invoked for sort merge join.
-
returnPage
public void returnPage(org.apache.flink.core.memory.MemorySegment segment)
-
remainBuffers
public int remainBuffers()
-
getUsedMemoryInBytes
public long getUsedMemoryInBytes()
-
getNumSpillFiles
public long getNumSpillFiles()
-
getSpillInBytes
public long getSpillInBytes()
-
maxInitBufferOfBucketArea
public int maxInitBufferOfBucketArea(int partitions)
Give up to one-sixth of the memory of the bucket area.
-
readAllBuffers
protected List<org.apache.flink.core.memory.MemorySegment> readAllBuffers(org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID id, int blockCount) throws IOException
- Throws:
IOException
-
createInputView
protected org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView createInputView(org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID id, int blockCount, int lastSegmentLimit) throws IOException- Throws:
IOException
-
hash
public static int hash(int hashCode, int level)The level parameter is needed so that we can have different hash functions when we recursively apply the partitioning, so that the working set eventually fits into memory.
-
-