Wednesday, December 28, 2022

Apache NiFi: Filter events and only let through the latest in a timeframe

In the IoT world, some devices generate large volumes of events that can be difficult for back-end systems to process in real time. Of course you can use NiFi to throttle messages. However, this will not be sufficient if the flow of events is consistently higher than what can be handled by the back-end system. A way to deal with this is to let Apache NiFi group and filter messages based on a specific attribute and only letting through the latest message for a specific device, in a certain timeframe. In this blog post I'll illustrate how you can do this. The trick is to merge several messages together using the MergeContent processor and then select the latest one using a Jolt transformation.


The NiFi flow

I created the following sample NiFi flow which you can download here


GenerateFlowFile

This flow has 3 generate GenerateFlowFile processors. They generate the following 3 messages. Note only the eventTime and eventInstanceId differ;


EvaluateJsonPath


This processor puts the objectIdentifier from the flowfile in the objectIdentifier attribute.

MergeContent

The MergeContent processor merges the separate JSON messages into one large JSON flowfile for a period of 10 seconds. Mind that the maximum number of messages is in this example set to 1000. If more than 1000 messages arrive with the same objectIdentifier within 10 seconds, the surplus messages will go to the failure relationship.

JoltTransformJSON

You can download the transformation here. It first sorts based on eventtime and then it takes the latest one. In this example, the message with the eventTime "2021-04-16T11:41:28Z" (the second message). If there are several messages with the same eventTime, the latest message received, is passed along. N.b. this transformation can most likely be improved upon.


Finally

This seems a roundabout way to do something which seems like it could be a standard feature. If you find a more suitable solution to achieve the same, please let me know and I'll update this blog post.

No comments:

Post a Comment