Ugrás a fő tartalomra

Airflow alapok


Apache Airflow alapok


Apache Airflow egy nyílt forráskódú munkafolyamat-ütemező és -kezelő eszköz, amely lehetővé teszi összetett adatfolyamatok (DAG – Directed Acyclic Graph) létrehozását, ütemezését és felügyeletét. Fő célja az adatintegrációs, adattranszformációs és adatfeldolgozási feladatok automatizálása.

A DAG-ok lehetővé teszik a felhasználók számára, hogy megjelenítsék a feladatokat és azok függőségeit, biztosítva a feladatok pontos sorrendben történő végrehajtását, a hibák kezelését és az újrapróbálkozásokat automatikusan. Mivel a DAG-ok Python nyelven készültek, dinamikusak és skálázhatók, így az Airflow az egyszerű automatizálási feladatoktól a hatalmas adatmunkafolyamatokig sokféle alkalmazásra alkalmas.


Az Airflow főbb jellemzői a következők:

  • Pipeline ütemezés: Automatizálja a munkafolyamatok végrehajtását meghatározott ütemezések alapján.
  • Függőségkezelés: Biztosítja, hogy a feladatok a megfelelő sorrendben futjanak.
  • Valós idejű megfigyelés: Áttekintést biztosít a munkafolyamat állapotáról és teljesítményéről.
  • Skálázhatóság : Bármilyen léptékű munkafolyamatot kezel, a kicsitől a vállalati szintig.

Az Airflow zökkenőmentesen integrálható olyan felhőplatformokkal, mint az AWS, a Google Cloud és az Azure. Több mint 1500 előre beépített modullal, például kezelőkkel, horgokkal és érzékelőkkel, leegyszerűsíti a munkafolyamatok összehangolását különböző környezetekben.


Főbb jellemzők:

  • DAG-alapú modellezés: A munkafolyamatokat DAG formájában definiáljuk Pythonban.

  • Dinamikus ütemezés: Időzített vagy eseményvezérelt futtatás.

  • Paralelizmus és skálázhatóság: Több feladat párhuzamos futtatása különböző végrehajtókon (pl. Celery, Kubernetes).

  • Megfigyelhetőség és naplózás: Webes UI és loggolás támogatás.


Példa Airflow munkafolyamatra

Tegyük fel, hogy egy napi adatfeldolgozási folyamatot kell létrehozni, amely:

  1. Lekéri az adatokat egy API-ból.

  2. Betölti az adatokat egy adatbázisba.

  3. ETL (Extract, Transform, Load) transzformációt hajt végre.

  4. Elkészít egy jelentést és e-mailt küld.

Példa Airflow DAG kódrészlet:

---python

from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.email import EmailOperator from datetime import datetime import requests # API-ból adatok lekérése def fetch_data(): response = requests.get("https://api.example.com/data") with open("/tmp/data.json", "w") as f: f.write(response.text) # DAG definiálása default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 3, 23), 'retries': 1 } dag = DAG('daily_etl', default_args=default_args, schedule_interval='@daily') fetch_task = PythonOperator( task_id='fetch_data', python_callable=fetch_data, dag=dag ) email_task = EmailOperator( task_id='send_report', to='user@example.com', subject='ETL Futtatás Kész', html_content="Az ETL folyamat sikeresen lefutott.", dag=dag ) fetch_task >> email_task # Függőségek beállítása

Ebben a példában:

  • Egy API-ból lekérjük az adatokat.

  • E-mailben értesítést küldünk a folyamat sikerességéről.


Airflow vs Oracle Data Integrator (ODI)

TulajdonságApache AirflowOracle Data Integrator (ODI)
FunkcióMunkafolyamat- és ETL-automatizációETL, ELT és adattranszformációs megoldás
KonfigurációPython alapú DAG-okGrafikus felületen konfigurálható
ÜtemezésBeépített ütemező (cron, eseményvezérelt)ODI Agent használata
SkálázhatóságKönnyen skálázható Kubernetes, Celery használatávalJól illeszkedik az Oracle ökoszisztémába
MegfigyelhetőségWebes UI, logolás, metrikákOracle Enterprise Manager integráció

Mikor melyiket érdemes használni?

  • Airflow: Ha nyílt forráskódú, Python-alapú megoldást keresünk összetett adatintegrációs folyamatokhoz.

  • ODI: Ha Oracle-alapú adatbázisokkal dolgozunk, és inkább egy vizuális felületen szeretnénk konfigurálni a folyamatokat.

 


Apache Airflow implementáció lépésről lépésre


Ebben az útmutatóban egy teljesen működőképes Airflow rendszert állítunk be egy egyszerű ETL folyamatra, amely:

  1. Lekér egy CSV fájlt egy nyilvános URL-ről.

  2. Feldolgozza és transzformálja az adatokat.

  3. Betölti az adatokat egy PostgreSQL adatbázisba.


1. Airflow telepítése és konfigurálása

Ha még nincs telepítve az Airflow, használhatjuk a Docker Compose-t egy gyors és egyszerű beállításhoz.

Docker alapú telepítés (Ajánlott)

  1. Hozz létre egy új mappát és lépj bele:

    ---bash

    mkdir airflow_project && cd airflow_project
  2. Töltsd le az Airflow hivatalos docker-compose.yaml fájlját:

    ---bash

    curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
  3. Állítsd be a szükséges környezeti változókat:

    ---bash

    echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
  4. Indítsd el az Airflow-t:

    ---bash

    docker-compose up -d

    Ellenőrizheted az UI-t a következő címen: http://localhost:8080
    (Alapértelmezett bejelentkezési adatok: airflow / airflow)


2. Az Airflow DAG létrehozása

Hozzunk létre egy új DAG fájlt a következő helyen:
dags/simple_etl.py

---python

from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook import pandas as pd import requests from datetime import datetime # Adatlekérő függvény def fetch_data(): url = "https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv" response = requests.get(url) with open("/tmp/raw_data.csv", "wb") as f: f.write(response.content) # Adattranszformáció def transform_data(): df = pd.read_csv("/tmp/raw_data.csv", names=["Index", "Height", "Weight"]) df["BMI"] = df["Weight"] / (df["Height"] / 100) ** 2 # BMI számítása df.to_csv("/tmp/transformed_data.csv", index=False) # Adatbetöltés PostgreSQL-be def load_data(): df = pd.read_csv("/tmp/transformed_data.csv") pg_hook = PostgresHook(postgres_conn_id="my_postgres_conn") engine = pg_hook.get_sqlalchemy_engine() df.to_sql("bmi_data", con=engine, if_exists="replace", index=False) # DAG konfiguráció default_args = { "owner": "airflow", "start_date": datetime(2025, 3, 23), "retries": 1 } dag = DAG("simple_etl", default_args=default_args, schedule_interval="@daily") fetch_task = PythonOperator( task_id="fetch_data", python_callable=fetch_data, dag=dag ) transform_task = PythonOperator( task_id="transform_data", python_callable=transform_data, dag=dag ) load_task = PythonOperator( task_id="load_data", python_callable=load_data, dag=dag ) # Feladatok sorrendje fetch_task >> transform_task >> load_task

3. PostgreSQL kapcsolat beállítása

Az Airflow UI-ban Connections menüben állítsd be a PostgreSQL kapcsolatot:

  • Conn Id: my_postgres_conn

  • Conn Type: Postgres

  • Host: localhost (vagy Docker esetén a konténer neve)

  • Schema: postgres

  • Login: airflow

  • Password: airflow

Ha nincs PostgreSQL-ed, gyorsan telepítheted Dockerrel:

---bash

docker run --name postgres -e POSTGRES_USER=airflow -e POSTGRES_PASSWORD=airflow -p 5432:5432 -d postgres

4. DAG futtatása

Miután elhelyeztük a fájlt a dags/ könyvtárban, menjünk az Airflow UI-ra (http://localhost:8080), és:

  1. Engedélyezzük a DAG-ot.

  2. Manuálisan futtassuk le az első alkalommal.

Ha minden jól megy, az Airflow automatikusan végrehajtja a három lépést.


5. Ellenőrzés

  • Az Airflow UI Graph View nézetében láthatod a DAG futását.

  • A PostgreSQL-ben ellenőrizheted, hogy bekerültek-e az adatok:

    ---sql

    SELECT * FROM bmi_data;

Összegzés

Telepítettük és beállítottuk az Airflow-t.
Létrehoztunk egy DAG-ot egy egyszerű ETL folyamattal.
Adatokat töltöttünk le, transzformáltunk és PostgreSQL-be betöltöttünk.



Apache Airflow alapvetően egy Python-alapú eszköz, és a munkafolyamatokat (DAG-okat) Pythonban kell definiálni. Azonban vannak módok arra, hogy minimális Python tudással is használd:


 Airflow UI használata (Python nélkül is kezelhető)

📌 Az Airflow Web UI lehetővé teszi a következőket:
✔ DAG-ok engedélyezése és ütemezése
✔ Futások manuális indítása
✔ Munkafolyamat állapotának figyelése
✔ Naplók és hibaüzenetek ellenőrzése

Tehát ha már vannak előre megírt DAG-ok, akkor azok üzemeltetéséhez és figyeléséhez nincs szükség Python tudásra.





2. Airflow konfigurálása JSON/YAML segítségével

Ha egy DAG-ot szeretnél létrehozni Python nélkül, akkor egy lehetőség az, hogy egy külső adatforrásból (pl. JSON vagy YAML) generálod a munkafolyamatot.

Példa egy JSON-alapú DAG definícióra:

---json

{ "dag_id": "json_etl", "schedule": "@daily", "tasks": [ { "task_id": "extract", "operator": "BashOperator", "bash_command": "echo 'Adatlekérés...'" }, { "task_id": "load", "operator": "BashOperator", "bash_command": "echo 'Adatbetöltés...'", "dependencies": ["extract"] } ] }

Ehhez egy Python script kell, ami ezt JSON-ból Airflow DAG-á alakítja, de a DAG definícióját nem kell manuálisan kódolni.


3. Apache Airflow REST API használata

Airflow rendelkezik egy API-val, amely segítségével külső eszközökből is lehet DAG-okat kezelni.

📌 Példa egy DAG futtatására API-n keresztül (Python nélkül, pl. Postmanből vagy curl-lel):

--- bash

curl -X POST "http://localhost:8080/api/v1/dags/my_dag/dagRuns" \ -H "Content-Type: application/json" \ --user "airflow:airflow" \ -d '{"conf": {}}'

Ezzel akár egy grafikus felületű alkalmazásból is lehet DAG-okat indítani.


4. Airflow BashOperator és SQL használata

Ha nem akarsz Python kódot írni, használhatod az Airflow beépített BashOperator-át és SQL-lekérdezéseket.

Példa egy teljes DAG-ra Python minimális használatával:

---python

from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.postgres.operators.postgres import PostgresOperator from datetime import datetime dag = DAG("simple_sql_dag", start_date=datetime(2025, 3, 23), schedule="@daily") extract = BashOperator( task_id="extract_data", bash_command="curl -o /tmp/data.csv https://example.com/data.csv", dag=dag ) load = PostgresOperator( task_id="load_data", postgres_conn_id="my_postgres_conn", sql="COPY my_table FROM '/tmp/data.csv' WITH CSV HEADER;", dag=dag ) extract >> load

📌 Ebben a példában:

  • Nincs adatfeldolgozás Pythonban, minden Bash és SQL segítségével történik.

  • BashOperator hív egy külső API-t.

  • PostgresOperator betölti az adatokat egy adatbázisba.


Összegzés

Teljesen Python nélkül nem lehet használni az Airflow-t, mert a DAG-ok Pythonban vannak definiálva.
Viszont Python minimális tudással is lehet használni:

  • Az Airflow UI-val kezelheted a DAG-okat.

  • JSON/YAML segítségével definiálhatók DAG-ok (de valakinek kell egy átalakító Python scriptet írnia).

  • Az Airflow API segítségével lehet DAG-okat indítani.

  • BashOperator és SQL használatával Python kódolás nélkül is megvalósíthatók bizonyos ETL folyamatok.


Link:

https://airflow.apache.org/

https://github.com/apache/airflow

https://airbnb.io/projects/airflow/

https://www.astronomer.io/airflow/






Megjegyzések