Class AbstractOperator
- java.lang.Object
-
- com.ibm.streams.operator.AbstractOperator
-
- All Implemented Interfaces:
- Operator
- Direct Known Subclasses:
- AbstractWindowOperator
public abstract class AbstractOperator extends java.lang.Object implements Operator
An abstract implementation of Operator that may be used by developers to implement their own Operator.
Simple implementations of the required methods are provided and a number of utility methods.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface com.ibm.streams.operator.Operator
Operator.TagNames
-
-
Field Summary
Fields Modifier and Type Field and Description static java.lang.String
IBM_COPYRIGHT
-
Constructor Summary
Constructors Constructor and Description AbstractOperator()
-
Method Summary
Methods Modifier and Type Method and Description void
allPortsReady()
Called once initialization is complete and all input and output ports are connected and ready to process and submit tuples.void
checkPorts(int numberInputPorts, int numberOutputPorts)
Deprecated.As of InfoSphere Streams 3.0, replaced by input and output port configurations in the Java primitive operator model.java.lang.Thread
createAvoidCompletionThread()
Create a thread to avoid completion of the operator by the SPL runtime.static java.lang.Thread
createAvoidCompletionThread(OperatorContext context)
Create a thread that avoids the SPL runtime completing the operator.java.lang.Thread
createAvoidCompletionThreadIfNoInputs()
Create a thread to avoid completion of the operator by the SPL runtime if the operator has no input ports.ControlPlaneContext
getControlPlaneContext()
Get the optionalControlPlaneContext
for the Job Control Plane in the application.StreamingInput<Tuple>
getInput(int port)
Shorthand method to get the StreamingInput object for a given port.OperatorContext
getOperatorContext()
Return the OperatorContext object describing the execution environment of this operator.StreamingOutput<OutputTuple>
getOutput(int port)
Shorthand method to get the StreamingOutput object for a given port.void
initialize(OperatorContext context)
Initialize this operator.void
process(StreamingInput<Tuple> stream, Tuple tuple)
Process an incoming tuple that arrived on the specified port.void
processPunctuation(StreamingInput<Tuple> stream, StreamingData.Punctuation mark)
Process an incoming punctuation mark on the specified port.void
setLoggerAspects(java.lang.String loggerName, java.lang.String... aspects)
Set the aspects associated with a namedLogger
.void
setTagData(java.lang.String tagName, java.util.Map<java.lang.String,java.lang.String> tagValues)
Set tag data for the operator.void
shutdown()
Instruct this operator to shutdown.
-
-
-
Field Detail
-
IBM_COPYRIGHT
public static final java.lang.String IBM_COPYRIGHT
- See Also:
- Constant Field Values
-
-
Method Detail
-
initialize
public void initialize(OperatorContext context) throws java.lang.Exception
Initialize this operator. Called once before any tuples are processed.This implementation initializes the operator to ensure its utility methods return the correct information.
Sub-classes must call
super.initialize(context)
if they override this method.- Specified by:
initialize
in interfaceOperator
- Parameters:
context
- OperatorContext for this operator.- Throws:
java.lang.Exception
- Operator failure, will cause the enclosing PE to terminate.
-
allPortsReady
public void allPortsReady() throws java.lang.Exception
Called once initialization is complete and all input and output ports are connected and ready to process and submit tuples. Operators that submit tuples independently of incoming tuples, e.g. source type operators, must wait for this method to be called before submitting any tuples.
Since this is a notification that all input and output ports are ready, it is possible thatprocess
andprocessPunctuation
may be called beforeallPortsReady
, due to thread scheduling.Implementations must not block and must return control to the caller.
This operator's
OperatorContextMXBean
will also issue anotification
when all ports are ready.
Code may register actions to be invoked at all ports ready using the utility methodaddAllPortsReadyAction
or by adding notification listeners to this operator'sOperatorContextMXBean
.This implementation does nothing.
- Specified by:
allPortsReady
in interfaceOperator
- Throws:
java.lang.Exception
- Operator failure, will cause the enclosing PE to terminate.- See Also:
OperatorContextMXBean
,OperatorLifeCycle.addAllPortsReadyAction(OperatorContext, javax.management.NotificationListener)
-
getOperatorContext
public final OperatorContext getOperatorContext()
Return the OperatorContext object describing the execution environment of this operator.- Returns:
- OperatorContext context for this operator
-
process
public void process(StreamingInput<Tuple> stream, Tuple tuple) throws java.lang.Exception
Process an incoming tuple that arrived on the specified port.This implementation does nothing, sub-classes that need to process incoming tuples must override this method.
-
processPunctuation
public void processPunctuation(StreamingInput<Tuple> stream, StreamingData.Punctuation mark) throws java.lang.Exception
Process an incoming punctuation mark on the specified port.This implementation punctuates all (if any) output ports with the incoming mark if it is a WINDOW_MARKER. Sub-classes may override this method to provide different behavior.
- Specified by:
processPunctuation
in interfaceOperator
- Parameters:
stream
- Port punctuation mark arrived onmark
- value of mark- Throws:
java.lang.Exception
- Operator failure, will cause the enclosing PE to terminate.
-
shutdown
public void shutdown() throws java.lang.Exception
Instruct this operator to shutdown. Called by the SPL runtime when the processing element (PE) hosting the operator is being shutdown. Implementations should complete or terminate any asynchronous activity and release any resources, such as closing open files.Prior to this call, the SPL runtime initiates an orderly shutdown on the
Operator
'sThreadFactory
andScheduledExecutorService
so that no new threads or tasks can be created.
Once this method returns the SPL Runtime completes the shutdown of theOperator
'sThreadFactory
(callingThread.interrupt
on any of its active threads) andScheduledExecutorService
(callingExecutorService.shutdownNow()
).This operator's
OperatorContextMXBean
will also issue ashutdown notification
and ashutdown complete notification
.
Code may register actions to be invoked atshutdown
using the utility methodaddShutdownAction
or by adding notification listeners to this operator'sOperatorContextMXBean
.This implementation does nothing.
- Specified by:
shutdown
in interfaceOperator
- Throws:
java.lang.Exception
- Error while shutting down.- See Also:
OperatorContext.getThreadFactory()
,OperatorContext.getScheduledExecutorService()
,OperatorContextMXBean
,OperatorLifeCycle.addShutdownAction(OperatorContext, javax.management.NotificationListener)
,OperatorLifeCycle.closeOnShutdown(OperatorContext, java.io.Closeable)
-
checkPorts
@Deprecated public final void checkPorts(int numberInputPorts, int numberOutputPorts)
Deprecated. As of InfoSphere Streams 3.0, replaced by input and output port configurations in the Java primitive operator model.Check that the correct number of ports exists for the operator.
Use of this deprecated method can limit the flexibility of anOperator
implementation class by fixing its ports thus disallowing sub-classes from adding optional ports, thus its use is not recommended.
Instead, it is recommended that input and output port configurations in the Java primitive operator model are used.Typically called by a sub-class' initialize method. If the number of ports is positive then exactly that many ports must exist according to the context. If the number of ports is negative then at least that many must exist.
As an example, checkPorts(-1,1) indicates one or more input port and one output port.
- Parameters:
numberInputPorts
- expected number of input portsnumberOutputPorts
- expected number of output ports- Throws:
java.lang.IllegalStateException
- if ports do not match the requirements
-
getInput
public final StreamingInput<Tuple> getInput(int port)
Shorthand method to get the StreamingInput object for a given port.- Parameters:
port
- input port number.- Returns:
- the description for the input port
-
getOutput
public final StreamingOutput<OutputTuple> getOutput(int port)
Shorthand method to get the StreamingOutput object for a given port.- Parameters:
port
- output port number- Returns:
- the description for the input port
-
setLoggerAspects
public final void setLoggerAspects(java.lang.String loggerName, java.lang.String... aspects)
Set the aspects associated with a namedLogger
. The aspects replace any existing aspects associated with anyLogger
of the given name. Aspects are not inherited by childLogger
s.- Parameters:
loggerName
- Logger nameaspects
- Aspects to associate with named logger.- Since:
- InfoSphere® Streams Version 3.0
-
createAvoidCompletionThread
public java.lang.Thread createAvoidCompletionThread()
Create a thread to avoid completion of the operator by the SPL runtime. This is equivalent to calling
AbstractOperator.createAvoidCompletionThread(getOperatorContext())
.
This method must be called atOperator.initialize(OperatorContext)
time.- Since:
- InfoSphere® Streams Version 3.1
- See Also:
createAvoidCompletionThread(OperatorContext)
-
createAvoidCompletionThreadIfNoInputs
public java.lang.Thread createAvoidCompletionThreadIfNoInputs()
Create a thread to avoid completion of the operator by the SPL runtime if the operator has no input ports. If the operator has input ports then this is equivalent to callingcreateAvoidCompletionThread()
.
If the operator has input ports then no thread is created and operator completion will occur when all the input ports have receivedfinal marks
.
This method must be called atOperator.initialize(OperatorContext)
time.- Since:
- InfoSphere® Streams Version 3.1
- See Also:
createAvoidCompletionThread()
,createAvoidCompletionThread(OperatorContext)
-
createAvoidCompletionThread
public static java.lang.Thread createAvoidCompletionThread(OperatorContext context)
Create a thread that avoids the SPL runtime completing the operator. The SPL runtime will complete an operator once it has no remaining active threads or tasks and all of its input ports have receivedfinal marks
. Completing an operator will send final marks to all of its output ports, thus closing the ports.
Source operators that produce tuples in threads that are not visible to the SPL runtime may immediately complete afterOperator.allPortsReady()
and thus not produce any tuples on their output ports. This typically will happen when the source operator is producing tuples from an event framework. In this case the operator has registered an event handler with the event framework, but the framework is executing using its own threads that are not visible to the SPL runtime.
The correct mechanism to handle this is to pass the operator'sthread factory
orexecution service
when initializing the framework.
If the framework does not support passing in a thread factory or execution service then the operator needs to create a thread to avoid completion. This convenience method is called atOperator.initialize(OperatorContext)
to create such a thread.This method:
- Creates a thread using
OperatorContext.getThreadFactory()
- Sets the thread to not be a daemon thread
- Registers a notification listener with the operator's
OperatorContextMXBean
to start the thread atOperatorContextMXBean.ALL_PORTS_READY
- Registers a notification listener with the operator's
OperatorContextMXBean
to interrupt the thread atOperatorContextMXBean.SHUTDOWN
The thread'srun()
method simply waits to be interrupted. This method must be called atOperator.initialize(OperatorContext)
time.- Parameters:
context
- Context for the operator avoiding completion- Since:
- InfoSphere® Streams Version 3.1
- Creates a thread using
-
getControlPlaneContext
public final ControlPlaneContext getControlPlaneContext()
Get the optionalControlPlaneContext
for the Job Control Plane in the application.- Returns:
- the optional
ControlPlaneContext
- Since:
- InfoSphere® Streams Version 4.0
-
setTagData
public void setTagData(java.lang.String tagName, java.util.Map<java.lang.String,java.lang.String> tagValues)
Set tag data for the operator. If this value isOperator.TagNames.OperatorIGC
, the tags will be registered in the IG catalog. Other tag names may be defined in future releases.- Specified by:
setTagData
in interfaceOperator
- Since:
- InfoSphere® Streams Version 4.1
-
-