Apache Flink is an open source platform for distributed stream and batch data processing. It is commonly used with Apache Kafka for data input and output.
Event streams may be pushed to Kafka and then consumed by the Flink jobs. 

To get started, please refer to our Getting started with Aiven Kafka article to setup your service, and create the necessary topics to be used by Flink.

In this example we will create two topics, test-flink-input and test-flink-output  to demonstrate consuming and producing data by the Apache flink

To add Flink to your project you will need to include the following Maven properties: 

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.9.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.9.0</version>
</dependency>


Please refer to Apache Flink documentation for the latest version information.

In this example, the application will read the data from test-flink-input topic, then capitalize received data, and then push it to test-flink-output topic.

public class WordsCapitalizer implements MapFunction<String, String> {
    @Override
    public String map(String s) {
        return s.toUpperCase();
    }
}

import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;


import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkExample{

    public static void main(String[] args) throws IOException{

        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-uri:kafka-port");
        props.put("security.protocol", "SSL");
        props.put("ssl.endpoint.identification.algorithm", "");
        props.put("ssl.truststore.location", "client.truststore.jks");
        props.put("ssl.truststore.password", "secret");
        props.put("ssl.keystore.type", "PKCS12");
        props.put("ssl.keystore.location", "client.keystore.p12");
        props.put("ssl.keystore.password", "secret");
        props.put("ssl.key.password", "secret");
        props.put("group.id", "test-flink-input-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-flink-input", new SimpleStringSchema(), props);
        DataStream<String> stringInputStream = environment.addSource(consumer);
        FlinkKafkaProducer<String> producer= new FlinkKafkaProducer<>(kafkaAddress, "test-flink-output", new SimpleStringSchema());
        stringInputStream.map(new WordsCapitalizer()).addSink(producer);

}


Additional articles may be found on our Aiven Support page.

If you have any questions with integration, please feel free to reach out to Support and let us know.

Got here by accident? Learn how Aiven simplifies working with Apache Kafka:

Did this answer your question?