Apache Flink is an open source platform for processing distributed stream and batch data. It is commonly used with Apache Kafka for data input and output.
You can push event streams to Kafka and then use Apache Flink jobs to consume them. 

The example in this article shows you how to create a simple Java application to read data from a Kafka topic, process it, and then push it to a different Kafka topic using Apache Flink.

Requirements:

  • An Aiven for Apache Kafka service (see this article for details on creating a service)

  • Two topics added to the service: test-flink-input and test-flink-output

  • If you are using Apache Maven, include the following dependencies in your pom.xml file to add the Apache Flink libraries (check the Apache Flink documentation for the latest information):

    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.11.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.11.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.11.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.0</version>
    </dependency>

To try this example:

  1. Create a map function.
    This is a class that takes input and maps, or transforms, it into something different. This example takes any received string input and makes it uppercase:

    import org.apache.flink.api.common.functions.MapFunction;

    public class WordsCapitalizer implements MapFunction<String, String> {
    public String map(String s) {
    return s.toUpperCase();
    }
    }

  2. Save the function as WordsCapitalizer.java.

  3. Create the main class for the application to set up the execution environment for Flink streaming and to create a pipeline.
    This example creates an instance of a Flink Kafka consumer to read from the test-flink-input topic. This forms an input stream into the map function. The code example then directs the output to a sink Kafka producer and writes the data to the test-flink-output topic:

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

    import java.util.Properties;

    public class FlinkExample {

    public static void main(String[] args) throws Exception{

    Properties props = new Properties();
    props.put("bootstrap.servers", "kafka-uri:kafka-port");
    props.put("security.protocol", "SSL");
    props.put("ssl.endpoint.identification.algorithm", "");
    props.put("ssl.truststore.location", "client.truststore.jks");
    props.put("ssl.truststore.password", "secret");
    props.put("ssl.keystore.type", "PKCS12");
    props.put("ssl.keystore.location", "client.keystore.p12");
    props.put("ssl.keystore.password", "secret");
    props.put("ssl.key.password", "secret");
    props.put("group.id", "test-flink-input-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-flink-input", new SimpleStringSchema(), props);
    DataStream<String> stringInputStream = environment.addSource(consumer);
    FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test-flink-output", new SimpleStringSchema(), props);
    stringInputStream.map(new WordsCapitalizer()).addSink(producer);
    environment.execute("FlinkExample");

    }

  4. Save the function as FlinkExample.java.

  5. Build and run the example code.
    The application starts up and waits for messages to arrive.

  6. Add a test message to the test-flink-input topic.
    If you have enabled the Kafka REST API in the Aiven web console, you can add this message through the web console. Apache Flink process the message and it appears in the test-flink-output topic in uppercase.

If you have any questions about Apache Flink integration, please contact Aiven support.

Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?