Создание потоковых приложений: руководство по методам и примерам

Для разработки потоковых приложений существует несколько методов и технологий. Давайте рассмотрим некоторые из популярных из них вместе с примерами кода:

  1. Apache Kafka:
    Apache Kafka — это распределенная платформа потоковой передачи, позволяющая создавать приложения потоковой передачи в реальном времени. Он обеспечивает архитектуру очереди сообщений и поддерживает высокопроизводительную, отказоустойчивую и масштабируемую обработку потоков.

Пример кода:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-consumer-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            records.forEach(record -> System.out.printf("Received message: key = %s, value = %s%n",
                    record.key(), record.value()));
        }
    }
}
  1. Apache Flink:
    Apache Flink — это мощная платформа обработки потоков с открытым исходным кодом. Он предоставляет API для создания отказоустойчивых приложений потоковой обработки с отслеживанием состояния. Flink поддерживает обработку времени событий, работу с окнами, а также различные источники и приемники данных.

Пример кода:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlinkWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> text = env.socketTextStream("localhost", 9999);
        DataStream<Tuple2<String, Integer>> counts = text
                .flatMap(new Tokenizer())
                .keyBy(0)
                .sum(1);
        counts.print();
        env.execute("Flink WordCount");
    }
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] words = value.toLowerCase().split("\\W+");
            for (String word : words) {
                if (word.length() > 0) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }
}
  1. Apache Storm:
    Apache Storm — это распределенная система вычислений в реальном времени. Он предоставляет отказоустойчивую и масштабируемую платформу для обработки потоковых данных. Storm использует направленный ациклический граф (DAG) для определения и выполнения топологий потоковой обработки.

Пример кода:

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
public class StormWordCount {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("wordSpout", new WordSpout(), 2);
        builder.setBolt("splitBolt", new SplitBolt(), 4).shuffleGrouping("wordSpout");
        builder.setBolt("countBolt", new CountBolt(), 4).fieldsGrouping("splitBolt", new Fields("word"));
        Config config = new Config();
        config.setDebug(true);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("wordCountTopology", config, builder.createTopology());
        Utils.sleep(10000);
        cluster.killTopology("wordCountTopology");
        cluster.shutdown();
    }
}
// WordSpout emits random words
class WordSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private String[] words = {"apple", "banana", "cherry", "date", "elderberry"};
    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
    @Override
    public void nextTuple() {
        Random rand = new Random();
        String word = words[rand.nextInt(words.length)];
        collector.emit(new Values(word));
        Utils.sleep(100);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
// SplitBolt splits words into tuples
class SplitBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getStringByField("word");
        String[] splitWords = word.split("");
        for (String splitWord : splitWords) {
            collector.emit(new Values(splitWord));
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("splitWord"));
    }
}
// CountBolt counts the occurrences of words
class CountBolt extends BaseBasicBolt {
    private Map<String, Integer> counts = new HashMap<>();
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String splitWord = tuple.getStringByField("splitWord");
        int count = counts.getOrDefault(splitWord, 0) + 1;
        counts.put(splitWord, count);
        collector.emit(new Values(splitWord, count));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("splitWord", "count"));
    }
}

Это всего лишь несколько примеров технологий, которые можно использовать для разработки потоковых приложений. Другие популярные варианты включают Apache Samza, Spring Cloud Stream и AWS Kinesis.