Раскрытие возможностей Apache Storm: подсчет слов в HDInsight

В мире обработки больших данных Apache Storm стал мощным инструментом для анализа в реальном времени и потоковой обработки. Благодаря своей способности обрабатывать огромные объемы данных и обрабатывать их в режиме реального времени, Storm стал идеальным решением для организаций, работающих с высокоскоростными потоками данных. В этой статье блога мы рассмотрим различные методы подсчета слов с помощью Apache Storm в HDInsight, облачной платформе больших данных Microsoft.

Метод 1: простая топология WordCount
Давайте начнем с базовой топологии WordCount в Apache Storm. Вот пример кода на Java:

public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("sentence-spout", new SentenceSpout(), 1);
        builder.setBolt("split-bolt", new SplitBolt(), 1).shuffleGrouping("sentence-spout");
        builder.setBolt("count-bolt", new CountBolt(), 1).fieldsGrouping("split-bolt", new Fields("word"));
        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("word-count-topology", config, builder.createTopology());
        Thread.sleep(5000);
        cluster.shutdown();
    }
}

Метод 2: подсчет слов без учета регистра
Если вы хотите выполнить подсчет слов без учета регистра, вы можете изменить SplitBolt, чтобы преобразовать все слова в нижний регистр перед их отправкой. Вот пример:

public class SplitBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.toLowerCase().split(" ");

        for (String word : words) {
            collector.emit(new Values(word));
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

Метод 3: подсчет слов в окне
Чтобы выполнить подсчет слов в течение скользящего окна времени, вы можете использовать WindowedBolt в Apache Storm. Это позволяет подсчитывать слова в течение определенного периода времени. Вот пример:

public class WindowedCountBolt extends BaseWindowedBolt {

    private static final int WINDOW_LENGTH = 5000; // 5 seconds

    private Map<String, Integer> wordCounts = new HashMap<>();
    @Override
    public void execute(TupleWindow inputWindow) {
        for (Tuple tuple : inputWindow.get()) {
            String word = tuple.getStringByField("word");
            int count = wordCounts.getOrDefault(word, 0) + 1;
            wordCounts.put(word, count);
        }
// Output word counts
        for (Map.Entry<String, Integer> entry : wordCounts.entrySet()) {
            String word = entry.getKey();
            int count = entry.getValue();
            System.out.println(word + ": " + count);
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // No output fields in this example
    }

    @Override
    public BaseWindowedBolt.Duration getWindowDuration() {
        return Duration.seconds(WINDOW_LENGTH);
    }
}

Apache Storm — мощный инструмент для обработки и анализа в реальном времени, а HDInsight предоставляет масштабируемую и управляемую среду для запуска приложений Storm. В этой статье мы рассмотрели различные методы подсчета слов с помощью Apache Storm в HDInsight. Мы рассмотрели простую топологию WordCount, подсчет слов без учета регистра и подсчет слов в оконном режиме. Эти методы можно настроить и расширить в соответствии с вашими конкретными требованиями. Итак, вперед и раскройте возможности Apache Storm для обработки и анализа высокоскоростных потоков данных!