The NiFi flow
I created the following sample NiFi flow which you can download here
This flow has 3 generate GenerateFlowFile processors. They generate the following 3 messages. Note only the eventTime and eventInstanceId differ;
EvaluateJsonPath
MergeContent
The MergeContent processor merges the separate JSON messages into one large JSON flowfile for a period of 10 seconds. Mind that the maximum number of messages is in this example set to 1000. If more than 1000 messages arrive with the same objectIdentifier within 10 seconds, the surplus messages will go to the failure relationship.
JoltTransformJSON
You can download the transformation here. It first sorts based on eventtime and then it takes the latest one. In this example, the message with the eventTime "2021-04-16T11:41:28Z" (the second message). If there are several messages with the same eventTime, the latest message received, is passed along. N.b. this transformation can most likely be improved upon.
Finally
This seems a roundabout way to do something which seems like it could be a standard feature. If you find a more suitable solution to achieve the same, please let me know and I'll update this blog post.
No comments:
Post a Comment