AWS Lambda, Kinesis and Java : The Producer
 (Part two of a three part blog.)

This is the second post of the series in which we look at the different Kinesis Producers and their usage in a Java based SDK environment. For knowing details of Kinesis or the consumer you can visit below links

  1. A kinesis stream : Kinesis Fundamentals and the design we will be following in the post
  2. Kinesis Producer : creating a stream and publishing into the stream using both SDK API and KPL and also the kinesis agent.
  3. A kinesis consumer : A lambda function which runs whenever a message is published into the stream

The code for this post can be found here.


Kinesis Producer: 

This is a spring boot application which is meant to run as an independent application. It could be deployed into the EC2 instance/ECS or run as a third party. For our purpose we are running this locally to publish messages into the producer.There are 4 different mechanisms of producing messages. 2 using SDK APIs and another 2 using KPL libraries.All 4 are covered here.Let us dive into the code.

KinesisClient.java

For understanding purposes, assume that the producer is publishing some sort of location data into the stream and the consumer is reading the same.This class initialises the sdk client and the producer. In real world scenario there will be either and not both. This example is to demonstrate the differences between producers



Creates the stream if it doesn’t exist.





Note : Because this is running from local IDE the credentials and regions for creating are picked up from the AWS Toolkit plugin which is having all the details regarding the profiles and permissions.If deployed in EC2 instance the best practice is to provide the appropriate roles to the EC2 instance from which the application could pick up the details. If running from 3rd party then  appropriate access tokens need to be generated and given.

Domain Objects : GeoLoc.java and VirtualLoc.Java

The two classes have fields to indicate latitude and longitude as well as the publisher type. The constructors initialise random values along with the type of source publishing.



The geoLocationToJson() method coverts a object to json string. Since Kinesis accepts only bytes the string when converted to byte is easily persisted as a json structure.
The same is for VirtualLoc.java

KinesisSDKProducer.java

This class produces events into streams with two methods.


publishToStreamOneByOne():




The checkStreamStatus() checks if the stream is available or not. Since it is possible that we have created the stream when the application starts, this method checks the status of stream a maximum of 3 times, with a wait time of 1 minute each. If within these retries the stream is ready then messages are published else the application throws an error.




Then in a for loop the requests are formed and put in the stream one by one. As a response we get the following for each put.

Put result {ShardId: shardId-000000000000,SequenceNumber: 49641467651587751887421032360435671554803639492957700098,}

 

publishToStreamAllAtOnce()


This method publishes a bulk of messages into the kinesis stream. For that all the entries are to be collected as a list of PutRecordsRequestEntry.




The result for this put operation contains a list of shard and sequence number for each message has been put. The difference is this contains all message and also failed Message count.

Put result {FailedRecordCount: 0,Records: [{SequenceNumber: 49641467651587751887421032363341929225157208166390300674,ShardId: shardId-000000000000,}, …..]}


KinesisKPLProducer.java:


This class leverages the KPL libraries for producing. By default the KPL producers are async in nature, and if we want to wait for response we have to implement Future and FutureCallback based on our requirement. We have applied all the 3 methods.

KPL Unblocking : Do not wait for response:

This approach sends the request using KPL library and forgets it. It is the job of KPL to retry in case of failure. The client will not know if producing is successful or not. The addUserRecord()  of KPL does the actual publishing


KPL Blocking : Wait till you get the response before publishing next message

Each record response is added to a List of Future, when being published

Using this approach we can wait for the response of each addRecordUser() method called. Until we get the response, the next addRecordUser() response will not be processed. This is what does the blocking.



Since it is async in nature, we will be storing the result in List of Futures and processing those one by one.




KPL Async Non-Blocking with Response

This is complicated when compared to other methods of publishing. This requires an ExecutorFramework and FutureCallback ¸ mechanism.

FutureCallback Implementation:






The FutureCallback overrides the onSuccess and onFailure to handle the scenarios.



A Runnable object publishing records. This object will be passed into the executor framework for execution. If inspected closely, there is a ListenableFuture in which the result of addUserRecord() is stored and the call back is added in the last line. This is what makes it async in nature.



 
Finally submission of the Runnable job to executor framework. 


Kinesis Agent


This is an agent built on top of KPL with all the functionalities of KPL. Runs in the background and can collect data from specific folder and keep on streaming to the Kinesis Stream.
Currently will run only on linux based servers.
This can be installed using standard sudo commands. The internals of this agent are beyond the scope of this post. The purpose of mentioning is to be aware of such tools

 

Comments

Popular posts from this blog

AWS Lambda with Java : Starting the journey

AWS Lambda, Kinesis and Java : Streams (Part one of a three part blog.)