Custom operator development

As our example application has the LineSplitter operator, which is not part of the Apex library, we will use it as an example to illustrate the process of developing a custom operator.

Splitting a line into words is, of course, a simple stateless operation. Connectors and stateful transformations will be more involved, and there are many examples in the Apex library to look at for this.

Here is the line splitter:

public class LineSplitter extends BaseOperator 
{ 
  // default pattern for word-separators 
  private static final Pattern nonWordDefault =    Pattern.compile
("[\\p{Punct}\\s]+"); private String nonWordStr; // configurable regex private transient Pattern nonWord; // compiled regex /** * Output port on which words from the current file are emitted */ public final transient DefaultOutputPort<String> output = new
DefaultOutputPort<>(); /** * Input port on which lines from the current file are received */ public final transient DefaultInputPort<String> input = new
DefaultInputPort<String>() { @Override public void process(String line) { // line; split it into words and emit them final String[] words = nonWord.split(line); for (String word : words) { if (word.isEmpty()) { continue; } output.emit(word); } } }; /** * Returns the regular expression that matches strings between words * @return Regular expression for strings that separate words */ public String getNonWordStr() { return nonWordStr; } /** * Sets the regular expression that matches strings between words * @param regex New regular expression for strings that separate words */ public void setNonWordStr(String regex) { nonWordStr = regex; } /** * {@inheritDoc} * Set nonWord to the default pattern if necessary */ @Override public void setup(OperatorContext context) { if (null == nonWordStr) { nonWord = nonWordDefault; } else { nonWord = Pattern.compile(nonWordStr); } } }

The operator has two ports: one input port to receive the lines and one output port to emit the words that result from splitting the lines. The processing logic (line splitting) is in the process method of the input port and that is really the logic of the operator. The rest of the code is not related to splitting lines, but more to the usability and efficient working of the operator.

We see a property nonWordStr (with getter and setter), which allows the user to configure the regular expression used to split the lines. We will later in this chapter see how operators can be configured without changing the Java code of the pipeline. The existence of respective operator properties is a prerequisite for this.

There is also a setup method, which is used to perform one-time initialization. This is an opportunity to perform operations that we don't want repeated on every event, for example, because they take time or would waste resources. In this case, the regular expression needs to be compiled only once, as there is no need to do it for every event. The compiled regex is assigned to a transient field, indicating that it isn't part of the operator state and should not be checkpointed. If the operator fails, it will be re-deployed and the setup will be executed again. This operator is stateless- it does not have any fields that the engine needs to manage as part of checkpointing for fault tolerance. If the operator were a counter, for example, then it would have state that needs to be recovered on a failure and we would want the fields that represent the counter's state to be checkpointed. Hence, they would not be transient.