This post is for day 19th of Safie Engineers' Blog! Advent Calendar
AWS Kinesis Data Stream and Apache Flink are two of the most popular tools for streaming data processing applications. In today's post, we will explore how those two tools can enhance each other and how Apache Flink compares to AWS counterpart AWS Kinesis Consumer Library (KCL)
- What is Apache Flink
- When use Apache Fink over KCL
- Application Code example
- DataStreamAPI example
- TableAPI example
- Deployment on AWS cloud
- Conclusion
What is Apache Flink
In order to understand how Apache Flink can enhance a AWS Kinesis application, firstly, we need to understand its purpose and basic characteristics.
Apache Flink is an open-source stream processing framework developed to go hand in hand with data streaming processes, including, but not limited to, the open-source message broker Apache Kafka . Its goal is to be used on unbounded and bounded data streams in real-time and batch processing. One of its strengths is the ability to connect to various data sources and its final destination (downstream data sinks), including data streaming services, like AWS Kinesis Data Stream, filesystem such AWS S3 and directly to Databases, using JBDC Drivers.
Flink was developed considering native cloud deployment, providing a robust and scalable platform for building data-driven applications, offering features such as event time processing, stateful computations, fault tolerance, scalability.
When use Apache Fink over KCL
While KCL allows record processing checkpoint for each shard iterator for recovery, it does not support state recovery. On the other hand, besides record checkpoint recovery, Flink also allows for complex data structure state recovery.
Similar to checkpoint settings, the user can define not only the frequency of state save, but also the backend location. It can be saved locally, on a filesystem, such as S3 or in a key-value store. In case of a failure, Flink restores the state from the latest checkpoint and replays the process until the current point. The number of restart attempts and delay between each attempt is also configurable. This robust state recovery process allows for a better fault tolerance and consistency (exactly-once processing).
Another advantage of Flink over KCL is its ability to process records using event-time semantics. KCL does not provide event-time semantics. All the processing logic is based on arrival time. The implementation of event-time processing can be done using a custom logic for out-of-order events buffer, but its implementation can be cumbersome and error prone, besides its overhead on the processing power.
Flink natively handles out-of-order records using a watermark strategy, which can be either monotonously increasing (only accept records with higher timestamp than previous) or fixed amount of lateness behind (only accept records within a defined duration behind the previous watermark). This system of event-time semantics allows for accurate window aggregation operations and time-based analysis. It also increases the scalable and resilient processing capability under unstable data flow conditions.
The architecture and scalability characteristics of both Fink and KCL differ in many ways. White KCL allows for horizontal scalability, ultimately, the number of workers is limited by the number of active shards. Each KCL instance subscribes to one or more shards, and while multiple KCL can subscribe to one shard, it can lead to a record being processed multiple times. Flink uses a multiple node architecture, allowing for horizontal scalability not bounded by the number of shards.
Flink's overall architecture is also more resilient and robust against failure. The process control is based on Task Manager and Job Managers instances. With the Job Manager acting as the central work coordinator handling the Task Manager, who performs the actual work. JobManager is also responsible for detecting and handling processing failures, including establishing new Task Managers nodes and retrieving the previous state and checkpoint. Task Managers can have internal sub-task slots, which with its own purpose and sharing data.
One point where KCL clearly excels is in its simplicity to deploy to production. While a Flink Application has many moving parts that must be adjusted and managed, KCL is an out-of-the-box solution on the AWS environment. Although modem deployment tools, such as Amazon Kinesis Data Analytics for Apache Flink, and solution vendors can help to lower the difficulties of a Fink Application deployment, it is still necessary a high level of understanding of its inner architecture and definitions to really take the full potential of its usage.
Application Code example
Now, to get a feeling of how the application is written, lets try to make a simple example considering a hypothetical, but realistic, IoT data analysis and processing.
One of the first choices to be made when writing an Flink Application is deciding which API to use. Both DataStreamAPI and TableAPI can be used to transform and analyse the data. DataStreamAPI is a lower level abstraction and allows a higher flexibility, while TableAPI is a higher level of abstraction based on SQL language that requires less code to write and thus quicker development cycles. For this example, let's try one example of each API to get a feeling of how each one can be used.
Similar to KCL, Flink backbone is written in Java, but there is a wrapper for Python available. In this example, we will use its native Java language to show a few of its characteristics.
So, now to the code: In order to connect to a AWS Kinesis Data Stream, we will first need to include its relative connector in our dependencies. The easiest way to do it is to use a Java build tool such as Maven or Gradle
maven
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-aws-kinesis-stream</artifactId> <version>5.0.0-1.20</version> </dependency>
gradle
implementation 'org.apache.flink:flink-connector-aws-kinesis-streams:5.0.0-1.20'
While the datastream can be used unstructured and undefined, I will help a lot in the long run if we define the data schema that we expect to be receiving. For this particular case, let's assume the data is for a device that will be sending periodically a packet of data in bytes and a value related to its status. Keep in mind that Flink has a native AWS Glue Schema decoder that can also be used for bigger projects where the schema must be shared across several services.
public class DeviceRecord implements Serializable{ public String device_id; public long timestamp; public double value; public String data; public DeviceRecord() { } public DeviceRecord(String device_id, long timestamp, double value, String data) { this.device_id = device_id; this.timestamp = timestamp; this.value = value; this.data = data; } public String getDeviceId() { return device_id; } public long getTimestamp() { return timestamp; } public double getValue() { return value; } public byte[] getData() { return data != null ? data.getBytes() : null; } @Override public String toString() { return "DeviceRecord{" + "device_id='" + device_id + '\'' + ", timestamp=" + timestamp + ", value=" + value + ", data=" + data + '}'; } }
The next step is to define how to deserialize the data received from the stream. We can get that byte array and transform it into an DeviceRecord instance.
public class DeviceRecordDeserializationSchema implements DeserializationSchema<DeviceRecord> { private static final long serialVersionUID = 1L; @Override public DeviceRecord deserialize(byte[] message) throws IOException { String line = new String(message, StandardCharsets.UTF_8); ObjectMapper objectMapper = new ObjectMapper(); DeviceRecord deviceRecord = objectMapper.readValue(line, DeviceRecord.class); return deviceRecord; } @Override public boolean isEndOfStream(DeviceRecord nextElement) { return false; } @Override public TypeInformation<DeviceRecord> getProducedType() { return TypeInformation.of(DeviceRecord.class); } }
DataStreamAPI example
This is the actual implementation code of the processing job that connects with the Kinesis DataStream. For now no actual processing is being done. We will implement the sink connector latter
public class BasicStreamingJob { public static void main(String[] args) throws Exception { Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.LATEST); // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<DeviceRecord> kdsSource = KinesisStreamsSource.<DeviceRecord>builder() .setStreamArn("your-stream-arn") .setSourceConfig(sourceConfig) .setDeserializationSchema(new DeviceRecordDeserializationSchema()) // The DeviceRecord deserializer .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build(); WatermarkStrategy<DeviceRecord> watermarkStrategy = WatermarkStrategy.<DeviceRecord>forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner((event, timestamp) -> event.getTimestamp()); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.enableCheckpointing(5000); // Define the data processing pipeline. We will create one job for each device_id KeyedStream<DeviceRecord, String> stream = env.fromSource(kdsSource, watermarkStrategy, "Kinesis Source") .returns(TypeInformation.of(DeviceRecord.class)) .keyBy(value -> value.getDeviceId()); // Print the records to the console stream.print(); // Execute the Flink job env.execute("Order Records by Partition Key"); } }
In this example, we will retrieve the field “data” from the Device object and insert it in as AWS S3 bucket. The first step it to decide how to retrieve this data. This can be done with an encoder class that writes the data into an OutputStream.
public class DeviceRecordEncoder implements Encoder<DeviceRecord> { @Override public void encode(DeviceRecord element, OutputStream stream) throws IOException { stream.write(element.getData()); } }
After that we will need to decide where to put our objects. For this case, we will create a daily bucket, combined with the device ID.
public class DeviceRecordBucketAssigner implements BucketAssigner<DeviceRecord, String> { private static final long serialVersionUID = 1L; private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); @Override public String getBucketId(DeviceRecord element, Context context) { LocalDate date = Instant.ofEpochSecond(element.getTimestamp()).atZone(ZoneId.of("Z")).toLocalDate(); return date.format(formatter) + "/" + element.getDeviceId(); } @Override public SimpleVersionedSerializer<String> getSerializer() { return SimpleVersionedStringSerializer.INSTANCE; } }
Finally, now we can combine all these elements and create our processing job to insert the data into an S3 bucket, by adding the following code to the main function.
// Define the S3 sink FileSink<DeviceRecord> s3Sink = FileSink .forRowFormat(new Path("s3://testbucket/"), new DeviceRecordEncoder()) // Use the custom encoder .withBucketAssigner(new DeviceRecordBucketAssigner()) // Use the custom bucket assigner .withRollingPolicy(DefaultRollingPolicy.builder() // Roll the file every 5 seconds .withRolloverInterval(Duration.ofSeconds(5)) .withInactivityInterval(Duration.ofMinutes(1)) .withMaxPartSize(MemorySize.ofMebiBytes(16)) .build()) .build(); // Add the S3 sink to the pipeline stream.sinkTo(s3Sink);
With this piece of code, we can insert the "data" field from the Kinesis record into the S3 according to the rolling policy as we receive it.
TableAPI example
For the example with TableAPI, let’s consider a job that inserts the streaming data directly into a RDS Database. In this example, we will insert the Device field “value” into the table as we receive it.
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // Define the Kinesis source table tableEnv.executeSql( "CREATE TABLE kinesis_table (" + " device_id character varying(32)," + " `timestamp` NUMERIC," + " `value` NUMERIC" + ") " + "PARTITIONED BY (device_id) WITH (" + " 'connector' = 'kinesis'," + " 'stream.arn' = 'your-stream-arn'," + " 'format' = 'json'" + ")" ); // Define the PostgreSQL sink table tableEnv.executeSql( "CREATE TABLE postgres_table (" + " device_id VARCHAR(32)," + " `timestamp` NUMERIC," + " `value` NUMERIC" + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:postgresql://db-host:port/dbname'," + " 'table-name' = 'yourtablename'" + ")" ); // Insert data from Kinesis source table to PostgreSQL sink table tableEnv.executeSql( "INSERT INTO postgres_table " + "SELECT device_id, `timestamp`, `value` FROM kinesis_table" );
The above TableAPI example shows how it trades granularity and control for simplicity and quick deployment. It is ideal to work with highly structured data; or for cases when it requires JOIN between several data sources. For either cases, both APIs can be mixed at will for any Flink Application. So choose the API you will be using depending on your necessity.
Deployment on AWS cloud
With your Flink Application ready, now you can deploy it and start analyzing the stream data. Strictly speaking, the deployment can be done even locally, but to really reap the benefits of scaling and parallel processing, deployment on the cloud is recommended. Since we will already be using AWS Kinesis Datastream, let's consider the options for deployment on AWS Cloud. The deployment can be done on several different services, each one with its own benefits and trade-offs.
- Amazon Kinesis Data Analytics for Apache Flink (previously called Amazon Kinesis Data Analytics)
- EMR (Elastic MapReduce)
- AWS Fargate with Kubernetes (EKS)
- ECS
- AWS Lambda
For architectures based on nodes, such as EKS, another choice that must be made is to decide if the application will be run in Session mode or Application Mode. In Session mode, the resources are shared between the nodes inside the single cluster, lowering the overhead for the overall service, but miss the isolation. This mode is best suited for quick or small jobs.
Some special considerations must be taken into account when defining the configuration for a Flink application on the cloud. A critical point that has a direct impact on the cost of operation is regarding resource allocation. The number of tasks should be decided considering the parallelism level that the Task Manager will use to process the job, while the instance size should be decided considering the expected load. While Flink allows for those parameters to be set in its configuration file flink.conf, most services, such as EMR and Amazon Kinesis Data Analytics for Apache Flink allow to integrate its autoscaling to AWS native metrics and setup.
Conclusion
In this post we showed how Apache Flink can be used to analyze and process data coming from a AWS Kinesis Data Stream. When compared to AWS native solution, KCL, Flink has a few advantages, such as better flexibility and the ability to create save states. On the flip side, it also increases the complexity of the system, and consequently, the need of inhouse skills. The choice of which stream data analysis framework to use should be taken considering your specific necessities. To have Apache Flink as one of the available tools will surely increase your possibilities to take the data analysis and transformation process even further.