В мире обработки больших данных Apache Storm стал мощным инструментом для анализа в реальном времени и потоковой обработки. Благодаря своей способности обрабатывать огромные объемы данных и обрабатывать их в режиме реального времени, Storm стал идеальным решением для организаций, работающих с высокоскоростными потоками данных. В этой статье блога мы рассмотрим различные методы подсчета слов с помощью Apache Storm в HDInsight, облачной платформе больших данных Microsoft.
Метод 1: простая топология WordCount
Давайте начнем с базовой топологии WordCount в Apache Storm. Вот пример кода на Java:
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentence-spout", new SentenceSpout(), 1);
builder.setBolt("split-bolt", new SplitBolt(), 1).shuffleGrouping("sentence-spout");
builder.setBolt("count-bolt", new CountBolt(), 1).fieldsGrouping("split-bolt", new Fields("word"));
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count-topology", config, builder.createTopology());
Thread.sleep(5000);
cluster.shutdown();
}
}
Метод 2: подсчет слов без учета регистра
Если вы хотите выполнить подсчет слов без учета регистра, вы можете изменить SplitBolt, чтобы преобразовать все слова в нижний регистр перед их отправкой. Вот пример:
public class SplitBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.toLowerCase().split(" ");
for (String word : words) {
collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
Метод 3: подсчет слов в окне
Чтобы выполнить подсчет слов в течение скользящего окна времени, вы можете использовать WindowedBolt в Apache Storm. Это позволяет подсчитывать слова в течение определенного периода времени. Вот пример:
public class WindowedCountBolt extends BaseWindowedBolt {
private static final int WINDOW_LENGTH = 5000; // 5 seconds
private Map<String, Integer> wordCounts = new HashMap<>();
@Override
public void execute(TupleWindow inputWindow) {
for (Tuple tuple : inputWindow.get()) {
String word = tuple.getStringByField("word");
int count = wordCounts.getOrDefault(word, 0) + 1;
wordCounts.put(word, count);
}
// Output word counts
for (Map.Entry<String, Integer> entry : wordCounts.entrySet()) {
String word = entry.getKey();
int count = entry.getValue();
System.out.println(word + ": " + count);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// No output fields in this example
}
@Override
public BaseWindowedBolt.Duration getWindowDuration() {
return Duration.seconds(WINDOW_LENGTH);
}
}
Apache Storm — мощный инструмент для обработки и анализа в реальном времени, а HDInsight предоставляет масштабируемую и управляемую среду для запуска приложений Storm. В этой статье мы рассмотрели различные методы подсчета слов с помощью Apache Storm в HDInsight. Мы рассмотрели простую топологию WordCount, подсчет слов без учета регистра и подсчет слов в оконном режиме. Эти методы можно настроить и расширить в соответствии с вашими конкретными требованиями. Итак, вперед и раскройте возможности Apache Storm для обработки и анализа высокоскоростных потоков данных!