Для разработки потоковых приложений существует несколько методов и технологий. Давайте рассмотрим некоторые из популярных из них вместе с примерами кода:
- Apache Kafka:
Apache Kafka — это распределенная платформа потоковой передачи, позволяющая создавать приложения потоковой передачи в реальном времени. Он обеспечивает архитектуру очереди сообщений и поддерживает высокопроизводительную, отказоустойчивую и масштабируемую обработку потоков.
Пример кода:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
records.forEach(record -> System.out.printf("Received message: key = %s, value = %s%n",
record.key(), record.value()));
}
}
}
- Apache Flink:
Apache Flink — это мощная платформа обработки потоков с открытым исходным кодом. Он предоставляет API для создания отказоустойчивых приложений потоковой обработки с отслеживанием состояния. Flink поддерживает обработку времени событий, работу с окнами, а также различные источники и приемники данных.
Пример кода:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlinkWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
counts.print();
env.execute("Flink WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}
- Apache Storm:
Apache Storm — это распределенная система вычислений в реальном времени. Он предоставляет отказоустойчивую и масштабируемую платформу для обработки потоковых данных. Storm использует направленный ациклический граф (DAG) для определения и выполнения топологий потоковой обработки.
Пример кода:
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
public class StormWordCount {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("wordSpout", new WordSpout(), 2);
builder.setBolt("splitBolt", new SplitBolt(), 4).shuffleGrouping("wordSpout");
builder.setBolt("countBolt", new CountBolt(), 4).fieldsGrouping("splitBolt", new Fields("word"));
Config config = new Config();
config.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCountTopology", config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("wordCountTopology");
cluster.shutdown();
}
}
// WordSpout emits random words
class WordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] words = {"apple", "banana", "cherry", "date", "elderberry"};
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
Random rand = new Random();
String word = words[rand.nextInt(words.length)];
collector.emit(new Values(word));
Utils.sleep(100);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
// SplitBolt splits words into tuples
class SplitBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getStringByField("word");
String[] splitWords = word.split("");
for (String splitWord : splitWords) {
collector.emit(new Values(splitWord));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("splitWord"));
}
}
// CountBolt counts the occurrences of words
class CountBolt extends BaseBasicBolt {
private Map<String, Integer> counts = new HashMap<>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String splitWord = tuple.getStringByField("splitWord");
int count = counts.getOrDefault(splitWord, 0) + 1;
counts.put(splitWord, count);
collector.emit(new Values(splitWord, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("splitWord", "count"));
}
}
Это всего лишь несколько примеров технологий, которые можно использовать для разработки потоковых приложений. Другие популярные варианты включают Apache Samza, Spring Cloud Stream и AWS Kinesis.