Подключение к базе данных в Apache Airflow 2.0: подробное руководство с примерами кода

Apache Airflow — это платформа с открытым исходным кодом, используемая для организации и планирования сложных конвейеров данных. Одной из ключевых функций Airflow является его способность подключаться к базам данных для взаимодействия с хранящимися в них данными. В этой статье блога мы рассмотрим различные методы установления соединений с базой данных в Apache Airflow 2.0, а также приведем примеры кода для каждого метода.

Метод 1: использование SQLAlchemy
SQLAlchemy — это популярный набор инструментов Python SQL и библиотека объектно-реляционного сопоставления (ORM). Airflow использует SQLAlchemy для установления соединений с различными базами данных.

from airflow import models
from sqlalchemy import create_engine
def create_db_connection():
    conn_id = 'my_database_connection'  # Connection ID defined in Airflow UI
    engine = create_engine(models.Variable.get(conn_id))
    conn = engine.connect()
    return conn

Метод 2: использование классов-перехватчиков
Airflow предоставляет классы-перехватчики, которые инкапсулируют логику подключения к различным базам данных. Вот пример подключения к базе данных MySQL с использованием класса MySqlHook.

from airflow.providers.mysql.hooks.mysql import MySqlHook
def create_db_connection():
    conn_id = 'my_mysql_connection'  # Connection ID defined in Airflow UI
    hook = MySqlHook(mysql_conn_id=conn_id)
    conn = hook.get_conn()
    return conn

Метод 3: использование URL-адресов подключения
В Airflow 2.0 введена концепция URL-адресов подключения, которая позволяет вам определять детали подключения к базе данных непосредственно в файле конфигурации Airflow. Вот пример подключения к базе данных PostgreSQL с использованием URL-адреса подключения.

from airflow import settings
from airflow.models import Connection
def create_db_connection():
    conn_id = 'my_postgres_connection'  # Connection ID defined in Airflow UI
    conn_url = "postgresql://username:password@localhost:5432/mydatabase"
    conn = Connection(conn_id=conn_id, uri=conn_url)
    session = settings.Session()
    session.add(conn)
    session.commit()
    return conn

Метод 4: использование переменных
Airflow позволяет хранить сведения о соединении в виде переменных в своей базе данных метаданных. Вот пример подключения к базе данных MongoDB с использованием переменных.

from airflow import models
def create_db_connection():
    conn_id = 'my_mongodb_connection'  # Connection ID defined in Airflow UI
    conn_uri = models.Variable.get(conn_id + "_uri")
    conn = pymongo.MongoClient(conn_uri)
    return conn

Установление подключений к базе данных — важный шаг в построении конвейеров данных с помощью Apache Airflow 2.0. В этой статье мы рассмотрели несколько методов подключения к базам данных, включая использование SQLAlchemy, классов перехватчиков, URL-адресов подключений и переменных. Каждый метод обеспечивает гибкость и подходит для различных случаев использования. Используя эти методы, инженеры по данным могут легко интегрировать Airflow с широким спектром баз данных, обеспечивая эффективную обработку данных и оркестрацию конвейеров.