Создание приложений реального времени с помощью FastAPI, WebSockets и Kafka

Что такое FastAPI, WebSockets и Kafka?
Прежде чем мы начнем, давайте быстро разберемся с ключевыми компонентами, с которыми мы будем работать:

  1. FastAPI: FastAPI — это современная высокопроизводительная веб-платформа Python для создания API. Он обеспечивает отличную производительность благодаря своей асинхронной природе и использует подсказки типов для автоматического документирования API.

  2. WebSockets: WebSockets — это протокол связи, который обеспечивает полнодуплексные каналы связи через одно TCP-соединение. Он обеспечивает передачу данных в реальном времени между клиентом и сервером, что позволяет создавать интерактивные и динамические приложения.

  3. Kafka: Kafka — это распределенная платформа потоковой передачи событий, предназначенная для высокопроизводительной, отказоустойчивой и масштабируемой потоковой передачи данных. Он обеспечивает модель публикации-подписки, при которой производители публикуют сообщения в темах, а потребители подписываются на эти темы, чтобы получать сообщения.

Настройка среды:
Чтобы начать, убедитесь, что в вашей системе установлен Python. Создать виртуальную среду и установить необходимые пакеты можно с помощью следующих команд:

$ python -m venv myenv
$ source myenv/bin/activate
$ pip install fastapi kafka-python websockets

Создание приложения FastAPI.
Давайте создадим приложение FastAPI, которое интегрирует WebSockets и Kafka. Начнем с простого примера, демонстрирующего двустороннюю связь между сервером и клиентом с помощью WebSockets.

from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Server received: {data}")

В этом фрагменте кода мы определяем конечную точку WebSocket /ws, которая принимает входящие соединения. Сервер принимает соединение, получает текстовые данные от клиента и отправляет ответ обратно клиенту.

Интеграция Kafka:
Чтобы интегрировать Kafka с нашим приложением FastAPI, мы можем использовать библиотеку kafka-python. Давайте изменим наш предыдущий пример, чтобы публиковать входящие сообщения WebSocket в теме Kafka и использовать сообщения из другой конечной точки.

from fastapi import FastAPI, WebSocket
from kafka import KafkaProducer, KafkaConsumer
app = FastAPI()
producer = KafkaProducer(bootstrap_servers="localhost:9092")
consumer = KafkaConsumer("my_topic", bootstrap_servers="localhost:9092")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        producer.send("my_topic", data.encode("utf-8"))
@app.get("/messages")
async def read_messages():
    messages = []
    for message in consumer:
        messages.append(message.value.decode("utf-8"))
    return {"messages": messages}

В этом обновленном коде мы создаем производителя и потребителя Kafka. Производитель публикует сообщения WebSocket в теме Kafka «my_topic», а конечная точка потребителя /messagesизвлекает сообщения из темы.

В этой статье мы рассмотрели, как создавать приложения реального времени с использованием FastAPI, WebSockets и Kafka. Мы узнали, как создать приложение FastAPI с поддержкой WebSocket и интегрировать Kafka для потоковой передачи событий. Объединив эти технологии, вы сможете разрабатывать мощные и масштабируемые приложения реального времени.

Не забудьте убедиться, что Kafka настроен и запущен локально, чтобы примеры кода работали правильно. Не стесняйтесь экспериментировать дальше и изучать дополнительные возможности и возможности, предлагаемые FastAPI, WebSockets и Kafka.