Изучение различных способов выполнения запуска DAG с конфигурацией на Python

Метод 1: интерфейс командной строки (CLI)
Самый простой способ запустить запуск DAG с настройкой — через интерфейс командной строки Airflow. Вы можете использовать команду airflow dags triggerдля передачи файла конфигурации JSON или пар ключ-значение в качестве аргументов. Вот пример:

airflow dags trigger my_dag --conf '{"param1": "value1", "param2": "value2"}'

Метод 2: REST API
Если вы предпочитаете программный подход, вы можете использовать REST API Airflow для запуска запуска DAG с конфигурацией. Вы можете отправить POST-запрос к конечной точке /api/v1/dags/<DAG_ID>/dagRunsи включить конфигурацию в формате JSON в тело запроса. Вот фрагмент кода Python с использованием библиотеки requests:

import requests
url = 'http://localhost:8080/api/v1/dags/my_dag/dagRuns'
config = {"param1": "value1", "param2": "value2"}
response = requests.post(url, json=config)

Метод 3. Конфигурация динамического запуска DAG
Airflow позволяет определять динамические запуски DAG с настройкой с помощью TriggerDagRunOperator. Вы можете передать конфигурацию в виде словаря методу trigger_dagrun. Вот пример:

from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime
dag = DAG('my_dag', ...)
trigger = TriggerDagRunOperator(
    task_id='trigger_dag_run',
    trigger_dag_id='my_dag',
    execution_date=datetime.now(),
    conf={"param1": "value1", "param2": "value2"},
    dag=dag
)

Метод 4: файлы конфигурации
Другой подход — определить конфигурацию в отдельном файле и передать ее при запуске DAG. Вы можете использовать YAML, JSON или любой другой формат, поддерживаемый Airflow. Вот пример использования файла YAML:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.configuration import conf
import yaml
with open(conf.get('core', 'dag_config_file')) as file:
    config = yaml.safe_load(file)
dag = DAG('my_dag', ...)
task = DummyOperator(
    task_id='my_task',
    dag=dag,
    params=config
)

В этой статье мы рассмотрели различные методы выполнения запуска DAG с настройкой в ​​Apache Airflow. Мы рассмотрели использование интерфейса командной строки Airflow, REST API, динамической конфигурации запуска DAG и файлов конфигурации. Каждый метод имеет свои преимущества, поэтому выберите тот, который лучше всего соответствует вашим требованиям. Используя настройку, вы можете сделать группы обеспечения доступности баз данных более гибкими и адаптируемыми к различным сценариям. Приятного воздушного потока!