Apache Airflow — это платформа с открытым исходным кодом, используемая для организации и планирования сложных конвейеров данных. Одной из ключевых функций Airflow является его способность подключаться к базам данных для взаимодействия с хранящимися в них данными. В этой статье блога мы рассмотрим различные методы установления соединений с базой данных в Apache Airflow 2.0, а также приведем примеры кода для каждого метода.
Метод 1: использование SQLAlchemy
SQLAlchemy — это популярный набор инструментов Python SQL и библиотека объектно-реляционного сопоставления (ORM). Airflow использует SQLAlchemy для установления соединений с различными базами данных.
from airflow import models
from sqlalchemy import create_engine
def create_db_connection():
conn_id = 'my_database_connection' # Connection ID defined in Airflow UI
engine = create_engine(models.Variable.get(conn_id))
conn = engine.connect()
return conn
Метод 2: использование классов-перехватчиков
Airflow предоставляет классы-перехватчики, которые инкапсулируют логику подключения к различным базам данных. Вот пример подключения к базе данных MySQL с использованием класса MySqlHook
.
from airflow.providers.mysql.hooks.mysql import MySqlHook
def create_db_connection():
conn_id = 'my_mysql_connection' # Connection ID defined in Airflow UI
hook = MySqlHook(mysql_conn_id=conn_id)
conn = hook.get_conn()
return conn
Метод 3: использование URL-адресов подключения
В Airflow 2.0 введена концепция URL-адресов подключения, которая позволяет вам определять детали подключения к базе данных непосредственно в файле конфигурации Airflow. Вот пример подключения к базе данных PostgreSQL с использованием URL-адреса подключения.
from airflow import settings
from airflow.models import Connection
def create_db_connection():
conn_id = 'my_postgres_connection' # Connection ID defined in Airflow UI
conn_url = "postgresql://username:password@localhost:5432/mydatabase"
conn = Connection(conn_id=conn_id, uri=conn_url)
session = settings.Session()
session.add(conn)
session.commit()
return conn
Метод 4: использование переменных
Airflow позволяет хранить сведения о соединении в виде переменных в своей базе данных метаданных. Вот пример подключения к базе данных MongoDB с использованием переменных.
from airflow import models
def create_db_connection():
conn_id = 'my_mongodb_connection' # Connection ID defined in Airflow UI
conn_uri = models.Variable.get(conn_id + "_uri")
conn = pymongo.MongoClient(conn_uri)
return conn
Установление подключений к базе данных — важный шаг в построении конвейеров данных с помощью Apache Airflow 2.0. В этой статье мы рассмотрели несколько методов подключения к базам данных, включая использование SQLAlchemy, классов перехватчиков, URL-адресов подключений и переменных. Каждый метод обеспечивает гибкость и подходит для различных случаев использования. Используя эти методы, инженеры по данным могут легко интегрировать Airflow с широким спектром баз данных, обеспечивая эффективную обработку данных и оркестрацию конвейеров.