Apache Flink is an open source platform for distributed stream and batch data processing. It is commonly used with Apache Kafka for data input and output.
Event streams may be pushed to Kafka and then consumed by Apache Flink jobs. 

In this example, we will create a simple Java application to demonstrate how to read data from a Kafka topic, process it, and then push it to a different Kafka topic using Apache Flink.

Before continuing, please refer to our Getting started with Aiven Kafka article to set up your Apache Kafka service. You'll need to create two topics.

If you're using Apache Maven, to add Apache Flink libraries to your Java project you will need to include the following dependencies to your pom.xml file: 

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.0</version>
</dependency>


(please refer to Apache Flink documentation for the latest version information)

First, we will need a map function. This is a simple class which will take some input and map (i.e. transform) it into something different. In our function, we'll simply take whatever string input we receive and make it upper case.

WordsCapitalizer.java

import org.apache.flink.api.common.functions.MapFunction;

public class WordsCapitalizer implements MapFunction<String, String> {
public String map(String s) {
return s.toUpperCase();
}
}

Next, the main class of the application has the job of setting up Flink's streaming execution environment and creating a pipeline.

We create an instance of a Flink Kafka consumer to read from our first topic named test-flink-input. Adding this to the environment forms an input stream to which we attach our map function, then finally we direct the output to a sink Kafka producer and write to a second topic named test-flink-output.

FlinkExample.java

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class FlinkExample {

    public static void main(String[] args) throws Exception{

        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-uri:kafka-port");
        props.put("security.protocol", "SSL");
        props.put("ssl.endpoint.identification.algorithm", "");
        props.put("ssl.truststore.location", "client.truststore.jks");
        props.put("ssl.truststore.password", "secret");
        props.put("ssl.keystore.type", "PKCS12");
        props.put("ssl.keystore.location", "client.keystore.p12");
        props.put("ssl.keystore.password", "secret");
        props.put("ssl.key.password", "secret");
        props.put("group.id", "test-flink-input-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-flink-input", new SimpleStringSchema(), props);
DataStream<String> stringInputStream = environment.addSource(consumer);
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test-flink-output", new SimpleStringSchema(), props);
stringInputStream.map(new WordsCapitalizer()).addSink(producer);
environment.execute("FlinkExample");

}

That's it... build and run the code! The application will start up and wait for messages to arrive. If you've enabled the Kafka REST API from the Aiven Console, you can use the UI to produce a test message to your first topic and then watch it arrive (now uppercase thanks to Apache Flink), on your second topic.

Additional articles may be found on our Aiven Support page.

If you have any questions with integration, please feel free to reach out to Support and let us know.

Got here by accident? Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?