Ugrás a fő tartalomra

Adattárház alapok, elvek

Adattárház alapok


Az adatház építésénél három fő szintet különböztetünk meg:

  1. Stage (előfeldolgozási réteg)

  2. Integrációs szint (központi egységes adattár réteg)

  3. Adatpiac (Data Mart, riportolási réteg)

A példában ügyfél, szerződés, szolgáltatás és kapcsolat tábláid vannak, az alábbiakban bemutatom, hogyan töltsd be az adatokat ezekbe a szintekbe, figyelembe véve az SCD elveket és a változáskövetési mechanizmusokat.


1. Stage réteg (Nyers adatok fogadása)

  • Cél: Az operatív forrásrendszerekből érkező adatok fogadása minimális transzformációval.

  • Javasolt mezők:

    • Forrásrendszer azonosító (pl. source_system)

    • Rekord beolvasási dátuma (load_date)

    • Technikai kulcs (hash vagy surrogate key)

    • Minden bejövő adatot teljes formában tárolsz, nincs még történeti követés

Példa Stage táblára (Ügyfél)

--- sql

CREATE TABLE stage_ugyfel ( source_system VARCHAR(50), source_id VARCHAR(50), nev VARCHAR(255), email VARCHAR(255), telefonszam VARCHAR(50), load_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

🔹 Itt az source_id a forrásrendszerben lévő egyedi azonosító.


2. Integrációs szint (egységes központi adattár)

  • Cél: Az adatok normalizálása, redundanciák csökkentése és változások követése.

  • Technikai mezők:

    • Surrogate Key (SK) – egyedi mesterséges kulcs az azonosításhoz

    • Natural Key (NK) – üzleti azonosító (pl. ügyfél azonosító)

    • Validitási időtartomány (valid_from, valid_to)

    • Műveleti jelző (operation_flag) – új (I), módosított (U), törölt (D)

SCD elvek figyelembevétele

  • SCD Type 1 (Felülírás) – ha nincs szükség történeti adatokra, egyszerű UPDATE

  • SCD Type 2 (Történeti változás követése) – új sor beszúrása, időbélyegekkel (valid_from, valid_to, is_active flag)

  • SCD Type 3 (Előző érték megőrzése egy mezőben) – külön oszlop a régi értéknek

Példa táblára (Ügyfél – SCD Type 2 szerint)

---sql

CREATE TABLE ods_ugyfel ( ugyfel_sk INT PRIMARY KEY AUTO_INCREMENT, ugyfel_nk VARCHAR(50), nev VARCHAR(255), email VARCHAR(255), telefonszam VARCHAR(50), valid_from TIMESTAMP DEFAULT CURRENT_TIMESTAMP, valid_to TIMESTAMP DEFAULT NULL, is_active BOOLEAN DEFAULT TRUE );

🔹 Ha egy ügyfél adata módosul, egy új sor jön létre valid_from és valid_to időbélyegekkel.


3. Adatpiac szint (Data Mart)

  • Cél: Riportok és analitikák támogatása, denormalizált, könnyen lekérdezhető formában.

  • Javasolt mezők:

    • Idődimenzió (date_key)

    • Szegmensek (customer_segment, contract_status)

    • Aggregációs mezők (pl. összesített szerződések száma)

Példa adatpiaci tábla (Ügyfél – szolgáltatás riportoláshoz)

---sql

CREATE TABLE data_mart_ugyfel ( ugyfel_sk INT, nev VARCHAR(255), email VARCHAR(255), szerzodes_szama INT, aktiv_szolgaltatasok_szama INT, datum DATE, PRIMARY KEY (ugyfel_sk, datum) );

🔹 Itt már általában denormalizált formában tároljuk az adatokat, az aggregált értékekkel.


Összegzés és ajánlások

  • Stage: Forrásrendszerből érkező adatokat tárolod, minimális transzformációval.

  • Integració: Az adatok normalizálása, változáskövetéssel (SCD Type 2 ajánlott).

  • Data Mart: Riportokhoz optimalizált, denormalizált adatmodell.



Adatmodell és táblák

Tételezzük fel, hogy az alábbi forrásadatokat (source data) kapjuk:

Customer_IDNameEmailPhoneUpdated_At
1001Kovács Péterpeter@email.com+36301112222024-01-01 12:00:00
1002Nagy Annaanna@email.com+36302223332024-01-05 14:30:00

1. Stage szint – Nyers adatok fogadása

A Stage réteg az operatív forrásrendszerekből érkező adatok elsődleges gyűjtési pontja. Ide minden változás beérkezik.

Stage ügyfél tábla

---sql

CREATE TABLE stage_customer ( source_system VARCHAR(50), -- Forrásrendszer azonosítója source_id INT, -- Eredeti ügyfélazonosító a forrásrendszerben name VARCHAR(255), email VARCHAR(255), phone VARCHAR(50), updated_at TIMESTAMP, -- Frissítés időpontja a forrásrendszerben load_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP -- Betöltési időpont );

🔹 Fontos elemek:

  • source_system: Segít azonosítani, honnan származik az adat.

  • updated_at: Megmutatja, mikor módosult utoljára az ügyfél.

  • load_date: Mikor töltöttük be az adatházba.


2. Integrációs szint – SCD Type 2 alkalmazása

Az integrációs rétegben követjük az adatok történetiségét. Ha egy ügyfél adatai megváltoznak, új rekordot szúrunk be, és a régit lezárjuk (valid_to időbélyeggel).

ügyfél tábla (SCD Type 2)

---sql

CREATE TABLE ods_customer ( customer_sk INT PRIMARY KEY AUTO_INCREMENT, -- Mesterséges kulcs customer_id INT, -- Üzleti kulcs (Natural Key) name VARCHAR(255), email VARCHAR(255), phone VARCHAR(50), valid_from TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- Érvényesség kezdete valid_to TIMESTAMP DEFAULT NULL, -- NULL, ha az aktuális rekord érvényes is_active BOOLEAN DEFAULT TRUE -- Csak az aktív rekordok vannak TRUE állapotban );

🔹 Fontos szabályok:

  • Ha nincs változás, akkor nem módosítunk semmit.

  • Ha van változás, az előző verziót lezárjuk (valid_to beállítva), és egy új sort hozunk létre.

Példa egy módosítás esetére

Az ügyfél email címe megváltozik:

Customer_SKCustomer_IDNameEmailPhoneValid_FromValid_ToIs_Active
11001Kovács Péterpeter@email.com+36301112222024-01-01 12:00:002024-03-10 10:00:00FALSE
21001Kovács Péterpeter.new@email.com+36301112222024-03-10 10:00:00NULLTRUE

✅ Az új rekord az aktuális verzió, míg a régi archiválódott.


3. Adatpiac szint (Data Mart) – Riportolás

Az adatpiac réteg aggregált, riportolásra optimalizált adatokat tartalmaz. Itt nem kell minden részletet tárolni, hanem összesített információkat készítünk.

Ügyfél-szolgáltatás riport tábla

---sql

CREATE TABLE data_mart_customer_service ( customer_sk INT, -- Külföldi kulcs az ODS ügyfél táblára name VARCHAR(255), active_contracts INT, -- Ügyfél aktív szerződéseinek száma active_services INT, -- Ügyfél aktív szolgáltatásainak száma report_date DATE, -- Riport dátuma PRIMARY KEY (customer_sk, report_date) );

🔹 Ebben a táblában aggregált adatokat tárolunk.
Például egy adott időpontban hány aktív szerződése és szolgáltatása van az ügyfélnek.

Példa riport adatokra

Customer_SKNameActive_ContractsActive_ServicesReport_Date
2Kovács Péter352024-03-10
3Nagy Anna122024-03-10


Összegzés – Adatáramlás folyamata

  1. Stage réteg: A forrásrendszerből beérkező adatok minimális transzformációval tárolva.

  2. Itegrációs réteg: Az ügyfél adatai SCD Type 2 szerint követve, történeti változáskezeléssel.

  3. Data Mart réteg: Aggregált, riportolásra kész adatok.



📌 Miért hasznos ez a megoldás?

  • Történeti adatok visszakereshetősége: Tudjuk, hogy mikor milyen állapotban volt egy ügyfél.

  • Hatékony riportkészítés: Az adatpiaci réteg segít gyorsan lekérdezni a fontos üzleti mutatókat.

  • SCD Type 2 biztosítja a változások precíz követését.

 


ETL folyamat


Az ETL (Extract, Transform, Load) folyamat három fő lépésből áll:

  1. Extract (Kinyerés) – Az adatok begyűjtése a forrásrendszerekből.

  2. Transform (Transzformáció) – Az adatok átalakítása, tisztítása, történeti követés alkalmazása.

  3. Load (Betöltés) – Az átalakított adatok betöltése az adatpiacba és a riportolási rétegbe.


1. Extract – Adatok kinyerése a forrásrendszerből

A forrásadatokat lehet fájlokból (CSV, JSON), relációs adatbázisokból (MySQL, PostgreSQL, Oracle) vagy API-kból kinyerni.

Példa SQL-lekérdezés egy forrásrendszerből:

--sql

SELECT customer_id, name, email, phone, updated_at FROM source_customer WHERE updated_at > (SELECT MAX(load_date) FROM stage_customer);

🔹 Ez biztosítja, hogy csak az új vagy módosult rekordokat hozzuk el. (delta töltés)

Extract eszközök:

  • SQL alapú ETL: Talend, Apache Nifi, Airflow, ODI, Pentaho

  • Adatbázis replikáció: Debezium, Oracle GoldenGate

  • API integráció: Python (requests, pandas), Apache Kafka


2. Transform – Adatok előkészítése és transzformáció

Ebben a lépésben az adatokat megtisztítjuk, kiegészítjük és a megfelelő struktúrába helyezzük.

2.1 Adattisztítás

  • Hiányzó adatok kezelése:

    ---sql

    UPDATE stage_customer SET phone = 'N/A' WHERE phone IS NULL;
  • Adatformátumok egységesítése:

    ---sql

    UPDATE stage_customer SET email = LOWER(email);

2.2 SCD Type 2 változáskezelés

Az SCD Type 2 elv szerint az ODS-be történő betöltéskor:

  • Ha nincs változás, nem csinálunk semmit.

  • Ha van változás, az előző verziót lezárjuk (valid_to frissítés), és egy új rekordot beszúrunk.

Ellenőrizzük, hogy változott-e az ügyféladat:

---sql

SELECT * FROM ods_customer ods JOIN stage_customer stg ON ods.customer_id = stg.source_id WHERE (ods.name <> stg.name OR ods.email <> stg.email OR ods.phone <> stg.phone) AND ods.valid_to IS NULL;

Ha van változás, frissítjük a régi rekordot és beszúrjuk az újat:

---sql

-- Előző rekord lezárása UPDATE ods_customer SET valid_to = NOW(), is_active = FALSE WHERE customer_id IN (SELECT source_id FROM stage_customer); -- Új rekord beszúrása INSERT INTO ods_customer (customer_id, name, email, phone, valid_from, valid_to, is_active) SELECT source_id, name, email, phone, NOW(), NULL, TRUE FROM stage_customer;

3. Load – Adatok betöltése az adatpiacba (Data Mart)

A Data Mart rétegben már összesített, riportolásra kész adatok lesznek.

Példa aggregált riport betöltésére:

---sql

INSERT INTO data_mart_customer_service (customer_sk, name, active_contracts, active_services, report_date) SELECT ods.customer_sk, ods.name, (SELECT COUNT(*) FROM ods_contract WHERE ods_contract.customer_id = ods.customer_id AND ods_contract.is_active = TRUE) AS active_contracts, (SELECT COUNT(*) FROM ods_service WHERE ods_service.customer_id = ods.customer_id AND ods_service.is_active = TRUE) AS active_services, CURRENT_DATE FROM ods_customer ods WHERE ods.is_active = TRUE;

🔹 Ezzel minden nap frissül a riport az aktuális állapotokkal.


Automatizált ETL Folyamat Airflow-ban

Ha szeretnéd, az ETL folyamatot automatizálhatod Apache Airflow segítségével.
Egy egyszerű Python DAG (Directed Acyclic Graph) így néz ki:

---python

from airflow import DAG from airflow.operators.postgres_operator import PostgresOperator from datetime import datetime default_args = { 'owner': 'admin', 'start_date': datetime(2024, 3, 10), 'retries': 1, } dag = DAG('etl_customer_pipeline', default_args=default_args, schedule_interval='@daily') extract_task = PostgresOperator( task_id='extract_data', sql="INSERT INTO stage_customer SELECT * FROM source_customer WHERE updated_at > (SELECT MAX(load_date) FROM stage_customer);", postgres_conn_id='postgres_default', dag=dag ) transform_task = PostgresOperator( task_id='transform_data', sql="CALL scd_type2_update();", # Tárolt eljárás az SCD Type 2 kezelésére postgres_conn_id='postgres_default', dag=dag ) load_task = PostgresOperator( task_id='load_data_mart', sql="INSERT INTO data_mart_customer_service SELECT * FROM ods_customer;", postgres_conn_id='postgres_default', dag=dag ) extract_task >> transform_task >> load_task

Összegzés

  • Extract: Az új vagy módosult adatok lekérése.

  • Transform: Az adatok tisztítása, normálása és SCD Type 2 kezelése.

  • Load: Az adatok betöltése az adatpiacba riportolásra.

Automatizálás: Apache Airflow vagy SQL tárolt eljárások segítségével.


ODS 

ODS (Operational Data Store) – Az Adattárház Integrációs Szintje

Az ODS (Operational Data Store) egy olyan köztes réteg az adattárház architektúrában, amely integrálja a különböző forrásrendszerek adatait, biztosítva azok történetiségét és változáskövetését.

Az ODS célja, hogy rugalmas, részletes és friss adatokkal szolgáljon a riporting és analitikai rendszerek számára, miközben minimalizálja az operatív rendszerek terhelését.


1. ODS Fő Jellemzői

Köztes tároló a forrásadatok és az adatpiac (Data Mart) között
Részletes, integrált adatok tárolása
Történeti adatok kezelése (SCD Type 2, Type 3, Type 4)
Gyors adathozzáférés biztosítása a riporting és BI rendszerek számára
Időbélyegzett adatok az állapotváltozások követésére


2. ODS Felépítése és Táblaszerkezet

Az ODS réteg táblái az operatív rendszerekből érkező adatok történeti és aktuális állapotait tárolják.

2.1 Ügyfél (Customer) Tábla – SCD Type 2 Példa

---sql

CREATE TABLE ods_customer ( customer_sk INT PRIMARY KEY AUTO_INCREMENT, -- Mesterséges kulcs customer_id INT NOT NULL, -- Eredeti ügyfélazonosító a forrásrendszerből name VARCHAR(255), email VARCHAR(255), phone VARCHAR(50), valid_from TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- Érvényesség kezdete valid_to TIMESTAMP DEFAULT NULL, -- NULL, ha az aktuális rekord érvényes is_active BOOLEAN DEFAULT TRUE -- Csak az aktív rekordok vannak TRUE állapotban );

🔹 Példa adatok:

Customer_SKCustomer_IDNameEmailPhoneValid_FromValid_ToIs_Active
11001Kovács Péterpeter@email.com+36301112222024-01-01 12:00:002024-03-10 10:00:00FALSE
21001Kovács Péterpeter.new@email.com+36301112222024-03-10 10:00:00NULLTRUE

Miért fontos ez?

  • Bármikor visszanézhetjük, hogy egy ügyfél milyen adatokkal rendelkezett egy adott időpontban.

  • Az ODS támogatja a riportokat, ahol fontos lehet egy adott időpillanat állapotának visszakeresése.


2.2 Szerződés (Contract) Tábla – SCD Type 2 Példa

--- sql

CREATE TABLE ods_contract ( contract_sk INT PRIMARY KEY AUTO_INCREMENT, contract_id INT NOT NULL, customer_sk INT, -- Kapcsolat az ügyfél táblával service_id INT, contract_start DATE, contract_end DATE, status VARCHAR(50), -- pl. 'Aktív', 'Lezárt' valid_from TIMESTAMP DEFAULT CURRENT_TIMESTAMP, valid_to TIMESTAMP DEFAULT NULL, is_active BOOLEAN DEFAULT TRUE, FOREIGN KEY (customer_sk) REFERENCES ods_customer(customer_sk) );

🔹 Ha egy szerződés módosul, az előző verzió archiválódik, és egy új rekord keletkezik.

🔹 Példa adatok:

Contract_SKContract_IDCustomer_SKService_IDContract_StartContract_EndStatusValid_FromValid_ToIs_Active
1500121012024-01-102025-01-10Aktív2024-01-10 10:00:00NULLTRUE
2500221022024-02-012025-02-01Lezárt2024-02-01 10:00:002024-03-15 12:00:00FALSE

2.3 Szolgáltatás (Service) Tábla – SCD Type 1 Példa

Ha egy szolgáltatás adatai nem igényelnek történetiséget, akkor SCD Type 1 megközelítést alkalmazunk, azaz egyszerűen frissítjük az adatokat.

---sql

CREATE TABLE ods_service ( service_id INT PRIMARY KEY, service_name VARCHAR(255), description TEXT, price DECIMAL(10,2), updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );

🔹 Ebben az esetben nincs "valid_from" vagy "valid_to" mező, mindig a legfrissebb adatot tároljuk.


3. ODS Betöltési Folyamat (ETL)

Az ODS rétegbe való betöltés az SCD szabályok alapján történik.

3.1 Stage-ből ODS-be való betöltés – SCD Type 2

--sql

-- Előző ügyfélrekordok lezárása, ha változás történt UPDATE ods_customer SET valid_to = NOW(), is_active = FALSE WHERE customer_id IN (SELECT source_id FROM stage_customer) AND (name, email, phone) NOT IN (SELECT name, email, phone FROM stage_customer); -- Új verzió beszúrása INSERT INTO ods_customer (customer_id, name, email, phone, valid_from, is_active) SELECT source_id, name, email, phone, NOW(), TRUE FROM stage_customer WHERE (source_id, name, email, phone) NOT IN (SELECT customer_id, name, email, phone FROM ods_customer);

🔹 Mi történik itt?

  • Ha az adatok nem változtak, semmi sem történik.

  • Ha van változás, a régi rekordot lezárjuk (valid_to = NOW()), és egy új verziót hozunk létre.

3.2 Riportokhoz használt gyors lekérdezések

Mivel az ODS nagy mennyiségű adatot tartalmaz, érdemes indexeket és particionálást alkalmazni a gyors lekérdezésekhez.

Aktív ügyfelek listája:

---sql

SELECT customer_id, name, email, phone FROM ods_customer WHERE is_active = TRUE;

Egy adott dátum szerinti állapot visszakeresése:

---sql

SELECT * FROM ods_customer WHERE '2024-02-01' BETWEEN valid_from AND COALESCE(valid_to, '9999-12-31');

4. ODS és Adatpiac (Data Mart) Kapcsolata

  • Az ODS részletes, történeti adatokat tárol, de nem ideális riportkészítésre.

  • Az adatpiaci réteg (Data Mart) összegző lekérdezéseket, aggregált adatokat tartalmaz, amelyeket a BI rendszerek könnyebben tudnak használni.

🔹 Példa riport készítésére (Data Mart betöltés)

---sql

INSERT INTO data_mart_customer_service (customer_sk, name, active_contracts, active_services, report_date) SELECT ods.customer_sk, ods.name, COUNT(DISTINCT ods_contract.contract_id) AS active_contracts, COUNT(DISTINCT ods_service.service_id) AS active_services, CURRENT_DATE FROM ods_customer ods LEFT JOIN ods_contract ON ods.customer_sk = ods_contract.customer_sk AND ods_contract.is_active = TRUE LEFT JOIN ods_service ON ods_contract.service_id = ods_service.service_id WHERE ods.is_active = TRUE GROUP BY ods.customer_sk, ods.name;

Összegzés

📌 Az ODS egy köztes réteg, amely biztosítja az adatok integrálását és változáskövetését.
📌 Az SCD Type 2 módszert alkalmazzuk a változások nyomon követésére.
📌 Az ODS adatai az adatpiaci rétegbe kerülnek aggregálás után.




Megjegyzések