Class BeamPythonFunctionRunner
- java.lang.Object
-
- org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner
-
- All Implemented Interfaces:
AutoCloseable,PythonFunctionRunner
- Direct Known Subclasses:
BeamDataStreamPythonFunctionRunner,BeamTablePythonFunctionRunner
@Internal public abstract class BeamPythonFunctionRunner extends Object implements PythonFunctionRunner
ABeamPythonFunctionRunnerused to execute Python functions.
-
-
Field Summary
Fields Modifier and Type Field Description protected FlinkFnApi.CoderInfoDescriptorinputCoderDescriptorprotected static org.slf4j.LoggerLOGprotected org.apache.beam.sdk.fn.data.FnDataReceiver<org.apache.beam.sdk.util.WindowedValue<byte[]>>mainInputReceiverThe receiver which forwards the input elements to a remote environment for processing.protected FlinkFnApi.CoderInfoDescriptoroutputCoderDescriptorprotected LinkedBlockingQueue<org.apache.flink.api.java.tuple.Tuple2<String,byte[]>>resultBufferBuffers the Python function execution result which has still not been processed.protected Map<String,FlinkFnApi.CoderInfoDescriptor>sideOutputCoderDescriptors
-
Constructor Summary
Constructors Constructor Description BeamPythonFunctionRunner(org.apache.flink.runtime.execution.Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, FlinkMetricContainer flinkMetricContainer, org.apache.flink.runtime.state.KeyedStateBackend<?> keyedStateBackend, org.apache.flink.runtime.state.OperatorStateBackend operatorStateBackend, org.apache.flink.api.common.typeutils.TypeSerializer<?> keySerializer, org.apache.flink.api.common.typeutils.TypeSerializer<?> namespaceSerializer, TimerRegistration timerRegistration, org.apache.flink.runtime.memory.MemoryManager memoryManager, double managedMemoryFraction, FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor, Map<String,FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected abstract voidbuildTransforms(org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder componentsBuilder)voidclose()Tear-down the Python function runner.org.apache.beam.runners.fnexecution.control.JobBundleFactorycreateJobBundleFactory(org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct pipelineOptions)voiddrainUnregisteredTimers()voidflush()Forces to finish the processing of the current bundle of elements.protected abstract Optional<org.apache.beam.model.pipeline.v1.RunnerApi.Coder>getOptionalTimerCoderProto()protected abstract List<org.apache.beam.runners.core.construction.graph.TimerReference>getTimers(org.apache.beam.model.pipeline.v1.RunnerApi.Components components)voidnotifyNoMoreResults()Interrupts the progress of takeResult.voidopen(org.apache.flink.configuration.ReadableConfig config)Prepares the Python function runner, such as preparing the Python execution environment, etc.org.apache.flink.api.java.tuple.Tuple3<String,byte[],Integer>pollResult()Retrieves the Python function result.voidprocess(byte[] data)Executes the Python function with the input byte array.voidprocessTimer(byte[] timerData)Send the triggered timer to the Python function.protected voidstartBundle()org.apache.flink.api.java.tuple.Tuple3<String,byte[],Integer>takeResult()Retrieves the Python function result, waiting if necessary until an element becomes available.
-
-
-
Field Detail
-
LOG
protected static final org.slf4j.Logger LOG
-
inputCoderDescriptor
protected final FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor
-
outputCoderDescriptor
protected final FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor
-
sideOutputCoderDescriptors
protected final Map<String,FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors
-
resultBuffer
@VisibleForTesting protected transient LinkedBlockingQueue<org.apache.flink.api.java.tuple.Tuple2<String,byte[]>> resultBuffer
Buffers the Python function execution result which has still not been processed.
-
mainInputReceiver
@VisibleForTesting protected transient org.apache.beam.sdk.fn.data.FnDataReceiver<org.apache.beam.sdk.util.WindowedValue<byte[]>> mainInputReceiver
The receiver which forwards the input elements to a remote environment for processing.
-
-
Constructor Detail
-
BeamPythonFunctionRunner
public BeamPythonFunctionRunner(org.apache.flink.runtime.execution.Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, @Nullable FlinkMetricContainer flinkMetricContainer, @Nullable org.apache.flink.runtime.state.KeyedStateBackend<?> keyedStateBackend, @Nullable org.apache.flink.runtime.state.OperatorStateBackend operatorStateBackend, @Nullable org.apache.flink.api.common.typeutils.TypeSerializer<?> keySerializer, @Nullable org.apache.flink.api.common.typeutils.TypeSerializer<?> namespaceSerializer, @Nullable TimerRegistration timerRegistration, org.apache.flink.runtime.memory.MemoryManager memoryManager, double managedMemoryFraction, FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor, Map<String,FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors)
-
-
Method Detail
-
open
public void open(org.apache.flink.configuration.ReadableConfig config) throws ExceptionDescription copied from interface:PythonFunctionRunnerPrepares the Python function runner, such as preparing the Python execution environment, etc.- Specified by:
openin interfacePythonFunctionRunner- Throws:
Exception
-
close
public void close() throws ExceptionDescription copied from interface:PythonFunctionRunnerTear-down the Python function runner.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfacePythonFunctionRunner- Throws:
Exception
-
process
public void process(byte[] data) throws ExceptionDescription copied from interface:PythonFunctionRunnerExecutes the Python function with the input byte array.- Specified by:
processin interfacePythonFunctionRunner- Parameters:
data- the byte array data.- Throws:
Exception
-
drainUnregisteredTimers
public void drainUnregisteredTimers()
- Specified by:
drainUnregisteredTimersin interfacePythonFunctionRunner
-
processTimer
public void processTimer(byte[] timerData) throws ExceptionDescription copied from interface:PythonFunctionRunnerSend the triggered timer to the Python function.- Specified by:
processTimerin interfacePythonFunctionRunner- Throws:
Exception
-
startBundle
@VisibleForTesting protected void startBundle()
-
pollResult
public org.apache.flink.api.java.tuple.Tuple3<String,byte[],Integer> pollResult() throws Exception
Description copied from interface:PythonFunctionRunnerRetrieves the Python function result.- Specified by:
pollResultin interfacePythonFunctionRunner- Returns:
- the head of he Python function result buffer, or
nullif the result buffer is empty. f0 means the byte array buffer which stores the Python function result. f1 means the length of the Python function result byte array. - Throws:
Exception
-
takeResult
public org.apache.flink.api.java.tuple.Tuple3<String,byte[],Integer> takeResult() throws Exception
Description copied from interface:PythonFunctionRunnerRetrieves the Python function result, waiting if necessary until an element becomes available.- Specified by:
takeResultin interfacePythonFunctionRunner- Returns:
- the head of he Python function result buffer. f0 means the byte array buffer which stores the Python function result. f1 means the length of the Python function result byte array.
- Throws:
Exception
-
flush
public void flush() throws ExceptionDescription copied from interface:PythonFunctionRunnerForces to finish the processing of the current bundle of elements. It will flush the data cached in the data buffer for processing and retrieves the state mutations (if exists) made by the Python function. The call blocks until all of the outputs produced by this bundle have been received.- Specified by:
flushin interfacePythonFunctionRunner- Throws:
Exception
-
notifyNoMoreResults
public void notifyNoMoreResults()
Interrupts the progress of takeResult.
-
buildTransforms
protected abstract void buildTransforms(org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder componentsBuilder)
-
getTimers
protected abstract List<org.apache.beam.runners.core.construction.graph.TimerReference> getTimers(org.apache.beam.model.pipeline.v1.RunnerApi.Components components)
-
getOptionalTimerCoderProto
protected abstract Optional<org.apache.beam.model.pipeline.v1.RunnerApi.Coder> getOptionalTimerCoderProto()
-
-