Sunday, February 7, 2016

Getting close to Apache Flink, albeit in a Träge manner - 3


In the last two blogs on Flink, I hope to have been able to underline the primacy of Windows in the scheme of things of Apache Flink's streaming. I have shared my understanding of two types of Windows that can be attached to a stream of Events, namely (a) CountWindow and (b) TimeWindow. Variations of these types are offered too; for example, one can put to use a Sliding TimeWindow for analysing incoming events. In this blog, I will try and elaborate a bit on the these Windows work.

Some key points to recall

In Flink, a Window is a collection of Events (also referred to as Event Elements): the events which are otherwise a part of a continuous Stream.
In a way, a Window is our tool to grab and focus on a finite number of elements of the Stream, so that we can deduce something meaningful out of them (in that sense, a Window is perhaps better described as a Pane )!
In order to deduce something meaningful out of them, we employ specialized functions (I refer to them as Evaluators here). The elements held in a Window are handed over to such functions and the latter carry out the job. In Flink's terminology, we say that the Window is evaluated (by such functions). Flink provides a handful of such functions (maxBy(), minBy() etc.); we can write our own too.

Evaluator needs to access the elements

The question is: when does an Window hand over the elements to the Evaluator?

Flink puts in place, a mechanism that helps the Window decide this. A Window can provide a logic (code) of its choice to be executed, whenever an element is added to it. Flink arranges to call that code at the appropriate time. Using this piece of code, one can accurately determine (and instruct Flink), when to invoke the Evaluator. This piece is referred to as the Trigger. Every Window has a trigger associated with it.

Let us take an example to understand it better.

Flink provides a specialized Window operator called countWindowAll(), as we have seen earlier. We assign a countWindowAll to a stream when we want to derive something meaningful out of a certain number (count) of successive elements. For example, for every set of 10 cars passing through a junction, we want to notify some observer. In this case, we use a countWindowAll(10).
Such a countWindowAll() already has a trigger associated with it, that Flink makes available to us. Here's a portion of countWindowAll's trigger (from Flink’s source code), eponymously named as countTrigger:

@Override
public TriggerResult onElement(Object element, long timestamp, W Window, TriggerContext ctx) throws IOException {
OperatorState<Long> count = ctx.getKeyValueState("count", 0L);
long currentCount = count.value() + 1;
count.update(currentCount);
if (currentCount >= maxCount) {
       count.update(0L);
      return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
// ........


Our point of current interest is the overridden method OnElement(). Flink calls this method of a trigger whenever an element is assigned to the Window. Obviously, this is the best place to keep count of the elements in the Window. After the code housekeeps the data structures, either of the two possible results is returned: TriggerResult.FIRE and TriggerResult.CONTINUE. If the current count of the elements in the Window has reached a limit (passed during construction), then the elements are passed on to the Evaluator; otherwise, the processing continues as if nothing has happened.

Let's refer to an application snippet to elaborate:

val readings =
readIncomingReadings(env,"./sampleIOTTiny.csv")
.map(e => (e.sensorUUID,e.ambientTemperature))
.countWindowAll(4)
.maxBy(1)

After creating the stream by mapping the input tuples, we are assigning an Window to it by calling countWindowAll(). Ending the expression is an Evaluator function, maxBy(). Flink arranges to handover the current contents of the Window to this function, whenever the trigger returns TiggerResult.FIRE. 'FIRE' is the instruction to fire the evaluation, so as to say. In this particular case, maxBy(1) is called whenever 4 elements have gathered in the Window. Apache
Flink provides other of pre-defined Windows like aforementioned countWindowAll, such as timeWindowAll().

A little aside

For every stream, Flink keeps a WindowAssigner. A WindowAssigner is an object, whose job is to determine which all Window should this just arrived element, be assigned to. The last statement indicates that more than one Window can co-exist and an element can belong to (or assigned to) multiple such windows, but I will explain this particular aspect in a later blog. For the time being, let's keep things simple and assume that there exists only one Window for a stream.

Disposal of elements in the Window

I presume a point is nagging you, just like it did when I saw the code the first time: when is countWindowAll's trigger disposing the elements held in the Window? If it doesn't do that, elements continue to be collected, occupy – and eventually run out of – memory. Surely, there has to be a way.
Let's refer to the code snippet [1] above. Two kinds of TriggerResults are mentioned there, namely FIRE and CONTINUE. But those are not all. Other kinds of TriggerResults are available too. From the API documentation of Flink, there are four such types:
  • CONTINUE
  • FIRE
  • PURGE
  • FIRE_AND_PURGE
We have seen the first two already. Let us take a look at the last two.

If a trigger returns a TriggerResult.FIRE_AND_PURGE, it indicates that the contents of the Window are handed over to the Evaluator and then are disposed off, freeing space. However, if a trigger returns a TriggerResult.PURGE, elements are not handed over to the Evaluator; just disposed off.
Wiser with this background, if we look at the countTrigger again, we find no mention of a PURGE. Then, how does Flink ensure that elements are disposed of, when we use countWindowAll() in our application? Well, countTrigger is not the only trigger that countWindowAll() uses. There is another trigger that works along with it. This is called, rather unsurprisingly, PurgingTrigger. A PurgingTrigger's job is to simply broom out the contents of an Window; in other words: Purge!
This is how Flink implements the countWindowAll(n) assigner:

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return Window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}

As we can see, after a Window is created, with it is associated a PurgingTrigger which in turn, requires a CountTrigger. Whenever CountTrigger returns TriggerResult.FIRE, PurgingTrigger takes care to cleanse the Window of the elements, after the Evaluator has seen them.

Enter the Evictor

There is another character in this whole story who we have not mentioned yet: Evictor! An Evictor is an object very similar to a Trigger, but its job is different. To understand what an Evictor does, let us take a look at the behaviour of a variation of countWindowAll() that we have seen in the earlier blog:

val readings =
readIncomingReadings(env,"./sampleIOTTiny.csv")
.map(e => (e.sensorUUID,e.ambientTemperature))
.countWindowAll(4,1)
.maxBy(1)

Recall from the same blog, that in this case the contents of the Window roll: after every 5th element arrives, we want the oldest one element to be removed before the rest are handed over to the Evaluator. In other words, the Evaluator always should see the latest 4 elements. This job of evicting element(s) from the contents – so that the Evaluator always gets to compute only those we want it to – is that of the Evictor. Of course, the act of eviction also frees up space occupied by the elements. An Evictor is optional though; if it is absent, then all the contents that come out of the trigger reach the Evaluator unstopped.
Thus, when we use countWindowAll(n), flink employs only a Trigger, but when we use countWindowAll(m,n), flink employs a combo of a Trigger and an Evictor to bring forth the desired behaviour. Flink allows us to combine a Trigger with an Evictor in this manner. One has to be careful though: a trigger's behaviour should be in tandem with the corresponding Evictor's behaviour. This is how Flink implements a countWindowAll(m,n) assigner:

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return Window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}


To reinforce our understanding, here's a concise presentation of these two types of Windows (from the official documentation of Flink):
When we ask for:
We actually actually want:
<stream>.countWindow(1000)
<stream>.Window(GlobalWindows.create())
.trigger(CountTrigger.of(1000)
.evictor(CountEvictor.of(1000)))
<stream>.countWindow(1000, 100)
<stream>.Window(GlobalWindows.create()) .evictor(CountEvictor.of(1000))
.trigger(CountTrigger.of(100))

I find it easier to remember the arrangement, this way.

Evictor can be a misnomer, beware

One important aspect of an Evictor, I have not mentioned as yet! It is the way an Evictor's parameter is to be interpreted. When we say that we need an Evictor(m), what Apache Flink understands is that we want 'm' elements to be retained in the Window, not to be disposed of. Let me illustrate this:
Blog-Picture-Flink-3 (2).jpg

So, we should remember that evict (m) => keepLatest (m)!

Summary

  1. We want to subject the contents of an Window to an Evaluator function. This function helps us extract meaning out of the stream of events that this Window is attached to. In short, this is what our application is intended for.
  2. As event elements arrive at an Window (from an incoming stream), they are added to the Window. Thus, Window (its buffer, to be more precise) consumes memory. We should be careful to free this memory before it constricts our application because of lack of memory.
  3. Every Window has a Trigger associated with it. This trigger sees every element as it is added to the Window and it decides how should the current contents be handled. Depending on the Trigger's logic, the current contents are left as they are, or are handed over to the Evaluator. The logic also dictates whether the elements are disposed of too, freeing memory.
  4. For a finer control, we can optionally attach an Evictor to an Window. This Evictor gets to see the contents of the Window after the Trigger decides to hand over the elements to the Evaluator but before the Evaluator sees the elements. It decides how many of these elements should be retained before the Evaluator begins execution.
  5. After the Evaluator has finished, the contents of the Window are completely disposed of, only if that is what the Trigger dictates.

I hope that I have been able to describe the functionality of Trigger, Evictor and Evaluator function adequately. My intention is to help you grasp the fundamental points behind how these interact and how can we control the way they do so. As before, you may want to read fantastic blogs like this, for even better understanding.

In the next blogs, I will try and explain how to store intermediate computed results and use them in a Flink application.
I would love to hear from the readers here, especially pointing out any obvious or inadvertent mistakes that I have made. Help me in learning Apache Flink better.