Ugrás a fő tartalomra

Adattárház töltés alapok : Stage töltés

 Alap funkcionalitás


Ez egy tipikus adattárház fejlesztési feladat, amely során a forrásadatok stage táblába történő betöltése automatizálásra kerül. Az alábbi két megoldást mutatom be:

  1. PL/SQL megoldás: A PL/SQL kód lekérdezi a forrástábla szerkezetét a USER_TAB_COLUMNS nézetből, majd generál egy CREATE TABLE parancsot és az INSERT utasítást.

  2. Python megoldás: Az Oracle adatbázishoz csatlakozva (pl. cx_Oracle könyvtár segítségével) ugyanezt a folyamatot végrehajtja.


PL/SQL Megoldás

Ez a PL/SQL szkript automatikusan létrehozza a stage táblát, majd az adattöltő SQL utasítást is elkészíti.

---plsql

DECLARE v_table_name VARCHAR2(100) := 'FORRAS_TABLA'; -- Forrás tábla neve v_stage_table VARCHAR2(100) := 'STAGE_' || v_table_name; v_sql_create CLOB; v_sql_insert CLOB; v_columns CLOB; v_cursor SYS_REFCURSOR; v_column_name VARCHAR2(100); v_data_type VARCHAR2(100); v_column_list CLOB; BEGIN -- 1. Stage tábla törlése, ha létezik EXECUTE IMMEDIATE 'DROP TABLE ' || v_stage_table || ' CASCADE CONSTRAINTS PURGE'; -- 2. Forrás tábla oszlopainak lekérdezése v_columns := ''; v_column_list := ''; OPEN v_cursor FOR SELECT COLUMN_NAME, DATA_TYPE FROM USER_TAB_COLUMNS WHERE TABLE_NAME = UPPER(v_table_name); LOOP FETCH v_cursor INTO v_column_name, v_data_type; EXIT WHEN v_cursor%NOTFOUND; -- Oszlopok összeállítása v_columns := v_columns || v_column_name || ' ' || v_data_type || ', '; v_column_list := v_column_list || v_column_name || ', '; END LOOP; CLOSE v_cursor; -- Technikai mezők hozzáadása v_columns := v_columns || 'TECH_LOAD_DATE DATE DEFAULT SYSDATE, TECH_SOURCE_TABLE VARCHAR2(100)'; v_column_list := v_column_list || 'SYSDATE, ''' || v_table_name || ''''; -- Stage tábla létrehozásának SQL utasítása v_sql_create := 'CREATE TABLE ' || v_stage_table || ' (' || v_columns || ')'; EXECUTE IMMEDIATE v_sql_create; -- 3. Adatok átmásolása a stage táblába v_sql_insert := 'INSERT INTO ' || v_stage_table || ' SELECT ' || v_column_list || ' FROM ' || v_table_name; EXECUTE IMMEDIATE v_sql_insert; DBMS_OUTPUT.PUT_LINE('Stage tábla létrehozva és feltöltve: ' || v_stage_table); EXCEPTION WHEN OTHERS THEN DBMS_OUTPUT.PUT_LINE('Hiba történt: ' || SQLERRM); END; /

Python Megoldás (cx_Oracle)

Ez a Python szkript az Oracle adatbázisból lekéri a forrás tábla szerkezetét, majd dinamikusan létrehozza a stage táblát, és elvégzi a teljes betöltést.

---python

import cx_Oracle # Adatbázis kapcsolat conn = cx_Oracle.connect("user/password@hostname:port/service_name") cursor = conn.cursor() source_table = "FORRAS_TABLA" stage_table = f"STAGE_{source_table}" # 1. Stage tábla törlése, ha létezik try: cursor.execute(f"DROP TABLE {stage_table} CASCADE CONSTRAINTS PURGE") except cx_Oracle.DatabaseError: pass # Ha nincs tábla, nem baj # 2. Forrás tábla szerkezetének lekérdezése cursor.execute(f"SELECT COLUMN_NAME, DATA_TYPE FROM USER_TAB_COLUMNS WHERE TABLE_NAME = UPPER('{source_table}')") columns = cursor.fetchall() # Oszlopok összeállítása column_definitions = [] column_names = [] for col_name, col_type in columns: column_definitions.append(f"{col_name} {col_type}") column_names.append(col_name) # Technikai mezők hozzáadása column_definitions.append("TECH_LOAD_DATE DATE DEFAULT SYSDATE") column_definitions.append("TECH_SOURCE_TABLE VARCHAR2(100)") column_names.append("SYSDATE") column_names.append(f"'{source_table}'") # 3. Stage tábla létrehozása create_sql = f"CREATE TABLE {stage_table} ({', '.join(column_definitions)})" cursor.execute(create_sql) # 4. Adatok áttöltése insert_sql = f"INSERT INTO {stage_table} SELECT {', '.join(column_names)} FROM {source_table}" cursor.execute(insert_sql) conn.commit() cursor.close() conn.close() print(f"Stage tábla '{stage_table}' létrehozva és feltöltve.")

Mit csinál a kód?

  1. Lekérdezi a forrás tábla oszlopait az USER_TAB_COLUMNS nézetből.

  2. Összeállítja a stage tábla szerkezetét úgy, hogy az eredeti oszlopokat kiegészíti:

    • TECH_LOAD_DATE DATE DEFAULT SYSDATE (betöltési dátum)

    • TECH_SOURCE_TABLE VARCHAR2(100) (forrás tábla neve)

  3. Létrehozza a stage táblát az új szerkezettel.

  4. Végrehajt egy teljes (FULL) betöltést, amely átmásolja az összes adatot a technikai mezőkkel együtt.

Mindkét megoldás automatikusan kezeli a forrás tábla változó szerkezetét, így jól alkalmazható egy általánosított ETL folyamat részeként.



Folyamat bővítése

Az alábbiakkal frissítettem a PL/SQL és Python megoldásokat :

Inkrementális töltés: A betöltési dátum (LAST_UPDATE_DATE vagy MODIFIED_DATE) alapján történő szűrés.
Hibakezelés: Hibaüzenetek naplózása egy LOAD_LOG táblába.
Naplózás: Betöltési események naplózása, beleértve a rekordok számát és a futási időt.


1️⃣ PL/SQL Megoldás

--- plsql

DECLARE v_table_name VARCHAR2(100) := 'FORRAS_TABLA'; v_stage_table VARCHAR2(100) := 'STAGE_' || v_table_name; v_last_load_date DATE; v_sql_create CLOB; v_sql_insert CLOB; v_columns CLOB; v_column_list CLOB; v_cursor SYS_REFCURSOR; v_column_name VARCHAR2(100); v_data_type VARCHAR2(100); v_start_time TIMESTAMP; v_end_time TIMESTAMP; v_row_count NUMBER := 0; BEGIN v_start_time := SYSTIMESTAMP; -- 1. Napló tábla létrehozása, ha nem létezik EXECUTE IMMEDIATE 'CREATE TABLE IF NOT EXISTS LOAD_LOG ( LOG_ID NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, TABLE_NAME VARCHAR2(100), LOAD_TYPE VARCHAR2(20), START_TIME TIMESTAMP, END_TIME TIMESTAMP, ROW_COUNT NUMBER, STATUS VARCHAR2(20), ERROR_MESSAGE CLOB )'; -- 2. Utolsó betöltés dátumának lekérdezése BEGIN SELECT MAX(START_TIME) INTO v_last_load_date FROM LOAD_LOG WHERE TABLE_NAME = v_stage_table AND STATUS = 'SUCCESS'; EXCEPTION WHEN NO_DATA_FOUND THEN v_last_load_date := NULL; END; -- 3. Stage tábla létrehozása, ha nem létezik EXECUTE IMMEDIATE 'CREATE TABLE IF NOT EXISTS ' || v_stage_table || ' AS SELECT * FROM ' || v_table_name || ' WHERE 1=0'; EXECUTE IMMEDIATE 'ALTER TABLE ' || v_stage_table || ' ADD (TECH_LOAD_DATE DATE DEFAULT SYSDATE, TECH_SOURCE_TABLE VARCHAR2(100))'; -- 4. Inkrementális adatlekérdezés előkészítése v_sql_insert := 'INSERT INTO ' || v_stage_table || ' SELECT *, SYSDATE, ''' || v_table_name || ''' FROM ' || v_table_name; IF v_last_load_date IS NOT NULL THEN v_sql_insert := v_sql_insert || ' WHERE LAST_UPDATE_DATE > TO_DATE(''' || TO_CHAR(v_last_load_date, 'YYYY-MM-DD HH24:MI:SS') || ''', ''YYYY-MM-DD HH24:MI:SS'')'; END IF; -- 5. Adatok betöltése EXECUTE IMMEDIATE v_sql_insert; v_row_count := SQL%ROWCOUNT; v_end_time := SYSTIMESTAMP; -- 6. Naplózás INSERT INTO LOAD_LOG (TABLE_NAME, LOAD_TYPE, START_TIME, END_TIME, ROW_COUNT, STATUS) VALUES (v_stage_table, 'INCREMENTAL', v_start_time, v_end_time, v_row_count, 'SUCCESS'); EXCEPTION WHEN OTHERS THEN v_end_time := SYSTIMESTAMP; INSERT INTO LOAD_LOG (TABLE_NAME, LOAD_TYPE, START_TIME, END_TIME, ROW_COUNT, STATUS, ERROR_MESSAGE) VALUES (v_stage_table, 'INCREMENTAL', v_start_time, v_end_time, v_row_count, 'FAILED', SQLERRM); END; /

2️⃣ Python Megoldás (cx_Oracle)

--- python

import cx_Oracle from datetime import datetime # Adatbázis kapcsolat conn = cx_Oracle.connect("user/password@hostname:port/service_name") cursor = conn.cursor() source_table = "FORRAS_TABLA" stage_table = f"STAGE_{source_table}" # 1. Napló tábla létrehozása cursor.execute(""" BEGIN EXECUTE IMMEDIATE 'CREATE TABLE IF NOT EXISTS LOAD_LOG ( LOG_ID NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, TABLE_NAME VARCHAR2(100), LOAD_TYPE VARCHAR2(20), START_TIME TIMESTAMP, END_TIME TIMESTAMP, ROW_COUNT NUMBER, STATUS VARCHAR2(20), ERROR_MESSAGE CLOB )'; END; """) # 2. Utolsó sikeres betöltés dátumának lekérdezése cursor.execute(f"SELECT MAX(START_TIME) FROM LOAD_LOG WHERE TABLE_NAME = '{stage_table}' AND STATUS = 'SUCCESS'") last_load_date = cursor.fetchone()[0] # 3. Stage tábla létrehozása, ha nem létezik cursor.execute(f""" BEGIN EXECUTE IMMEDIATE 'CREATE TABLE IF NOT EXISTS {stage_table} AS SELECT * FROM {source_table} WHERE 1=0'; EXECUTE IMMEDIATE 'ALTER TABLE {stage_table} ADD (TECH_LOAD_DATE DATE DEFAULT SYSDATE, TECH_SOURCE_TABLE VARCHAR2(100))'; END; """) # 4. Adatok betöltése (FULL/INKREMENTÁLIS) load_start_time = datetime.now() if last_load_date: insert_sql = f""" INSERT INTO {stage_table} SELECT *, SYSDATE, '{source_table}' FROM {source_table} WHERE LAST_UPDATE_DATE > TO_DATE('{last_load_date.strftime('%Y-%m-%d %H:%M:%S')}', 'YYYY-MM-DD HH24:MI:SS') """ load_type = 'INCREMENTAL' else: insert_sql = f""" INSERT INTO {stage_table} SELECT *, SYSDATE, '{source_table}' FROM {source_table} """ load_type = 'FULL' try: cursor.execute(insert_sql) row_count = cursor.rowcount conn.commit() load_end_time = datetime.now() # 5. Naplózás cursor.execute(""" INSERT INTO LOAD_LOG (TABLE_NAME, LOAD_TYPE, START_TIME, END_TIME, ROW_COUNT, STATUS) VALUES (:1, :2, :3, :4, :5, 'SUCCESS') """, (stage_table, load_type, load_start_time, load_end_time, row_count)) conn.commit() print(f"Stage tábla '{stage_table}' frissítve ({row_count} sor).") except cx_Oracle.DatabaseError as e: load_end_time = datetime.now() error_msg = str(e) cursor.execute(""" INSERT INTO LOAD_LOG (TABLE_NAME, LOAD_TYPE, START_TIME, END_TIME, ROW_COUNT, STATUS, ERROR_MESSAGE) VALUES (:1, :2, :3, :4, :5, 'FAILED', :6) """, (stage_table, load_type, load_start_time, load_end_time, 0, error_msg)) conn.rollback() print(f"Hiba történt: {error_msg}") finally: cursor.close() conn.close()

🚀 Újdonságok a kódban

🔹 Inkrementális töltés: Ha már volt sikeres betöltés, akkor csak az azóta módosult rekordok kerülnek be.
🔹 Hibakezelés: A LOAD_LOG táblába bejegyzés történik hiba esetén is.
🔹 Naplózás: A betöltés kezdési és befejezési időpontja, betöltött sorok száma naplózásra kerül.
🔹 Automatikus tábla-létrehozás: A CREATE TABLE IF NOT EXISTS biztosítja, hogy a szükséges táblák létrejöjjenek.



További kiegészítési, javítási és optimalizálási fejezetek


1️⃣ Partíciókezelés

🔹 Range-partíciózás: Dátum szerint (pl. LOAD_DATE alapján havonta)
🔹 Hash-partíciózás: Nagy táblák egyenletes elosztásához
🔹 List-partíciózás: Ha előre definiált kategóriák (pl. ország, régió) szerint osztanánk fel

2️⃣ Időzített futtatás

🔹 Oracle DBMS_SCHEDULER: PL/SQL alapon beállítható időzítés
🔹 Crontab/Python Scheduler: Ha külső szkripttel (pl. Python) szeretnéd időzíteni

3️⃣ Optimalizálás

🔹 Indexek: A forrás- és stage-táblák optimalizálása
🔹 Parallel Processing: Több szálas betöltés
🔹 Bulk Insert vs. Merge: Teljesítmény és inkrementális töltés javítása


4️⃣ Időzített futtatás az adattárház stage töltéséhez

5️⃣ Optimalizálás az adattárház stage töltéséhez



1️⃣ Partíciókezelés adattárház stage táblákhoz

A partíciózás nagyobb adatmennyiségeknél segít a lekérdezési és betöltési teljesítmény javításában.

📌 1.1 Range-partíciózás (dátum szerint)

👉 Ha a LOAD_DATE vagy LAST_UPDATE_DATE mező alapján havonta bontjuk az adatokat.

📌 Példa: Havi bontású partíciók

--- sql

CREATE TABLE STAGE_TRANSACTIONS ( ID NUMBER, TRANSACTION_DATE DATE, AMOUNT NUMBER, TECH_LOAD_DATE DATE DEFAULT SYSDATE, TECH_SOURCE_TABLE VARCHAR2(100) ) PARTITION BY RANGE (TRANSACTION_DATE) ( PARTITION p_202401 VALUES LESS THAN (TO_DATE('2024-02-01', 'YYYY-MM-DD')), PARTITION p_202402 VALUES LESS THAN (TO_DATE('2024-03-01', 'YYYY-MM-DD')), PARTITION p_202403 VALUES LESS THAN (TO_DATE('2024-04-01', 'YYYY-MM-DD')), PARTITION p_max VALUES LESS THAN (MAXVALUE) );

🔹 Előnyök:
✅ Régebbi adatokat könnyen archiválhatunk (DROP PARTITION).
✅ Új hónapokhoz új partíciót kell létrehozni.


📌 1.2 Hash-partíciózás (nagy táblák egyenletes elosztásához)

👉 Ha nincs jól definiált időbélyeg, és egyenletesen akarjuk elosztani az adatokat.

📌 Példa: Hash-partíciózás ID szerint 4 partícióba

---sql

CREATE TABLE STAGE_CUSTOMERS ( CUSTOMER_ID NUMBER, NAME VARCHAR2(100), COUNTRY VARCHAR2(50), TECH_LOAD_DATE DATE DEFAULT SYSDATE, TECH_SOURCE_TABLE VARCHAR2(100) ) PARTITION BY HASH (CUSTOMER_ID) PARTITIONS 4;

🔹 Előnyök:
✅ Egyenletes terhelés, elkerüli a "forró" partíciókat.
✅ Nem kell manuálisan karbantartani az új időszakokhoz.


📌 1.3 List-partíciózás (előre definiált kategóriák szerint)

👉 Ha egy adott mező (pl. ország, régió) szerint szeretnénk szeparálni az adatokat.

📌 Példa: Országonkénti bontás

---sql

CREATE TABLE STAGE_SALES ( SALE_ID NUMBER, COUNTRY VARCHAR2(50), SALES_AMOUNT NUMBER, TECH_LOAD_DATE DATE DEFAULT SYSDATE, TECH_SOURCE_TABLE VARCHAR2(100) ) PARTITION BY LIST (COUNTRY) ( PARTITION p_USA VALUES ('USA'), PARTITION p_CANADA VALUES ('CANADA'), PARTITION p_EUROPE VALUES ('UK', 'GERMANY', 'FRANCE'), PARTITION p_OTHER VALUES (DEFAULT) );

🔹 Előnyök:
Országonkénti gyors keresés (pl. WHERE COUNTRY='USA').
Speciális tárolási beállítások országok szerint.


📌 1.4 Dinamikus partíciókezelés (új partíció létrehozása)

Ha egy új hónaphoz új partíció kell, ezt automatizálhatjuk PL/SQL triggerrel vagy DBMS_SCHEDULER-rel.

📌 Új havi partíció automatikus létrehozása

--- plsql

DECLARE v_new_partition VARCHAR2(100); v_partition_date DATE; BEGIN v_partition_date := TRUNC(SYSDATE, 'MM') + INTERVAL '1' MONTH; v_new_partition := 'p_' || TO_CHAR(v_partition_date, 'YYYYMM'); EXECUTE IMMEDIATE 'ALTER TABLE STAGE_TRANSACTIONS ADD PARTITION ' || v_new_partition || ' VALUES LESS THAN (TO_DATE(''' || TO_CHAR(v_partition_date + INTERVAL '1' MONTH, 'YYYY-MM-DD') || ''', ''YYYY-MM-DD''))'; END; /

🔹 Előnyök:
✅ Nem kell manuálisan létrehozni az új hónapokat.
Napi/heti/havi időzített futtatás DBMS_SCHEDULER-rel.


2️⃣ Időzített futtatás (Scheduler megoldások)

A stage táblák töltését időzítve kell futtatni, hogy az adatok frissüljenek és optimalizáltan érkezzenek. Ehhez két fő megoldás létezik:

  1. Oracle DBMS_SCHEDULER – Adatbázison belüli időzítés PL/SQL-lel

  2. Külső időzítés (Crontab/Python Scheduler) – Ha az adatbetöltés külső szkriptekből indul


📌 2.1 Oracle DBMS_SCHEDULER – PL/SQL alapú időzítés

👉 Ha a betöltést az adatbázison belülről akarjuk kezelni

Lépések

1️⃣ Létrehozunk egy tárolt eljárást, amely a betöltést végzi
2️⃣ Létrehozunk egy DBMS_SCHEDULER jobot, ami időzítve futtatja


📌 2.1.1 Tárolt eljárás a stage töltéshez

Ez az eljárás betölti a forrás adatait a stage táblába.

--- plsql

CREATE OR REPLACE PROCEDURE load_stage_transactions IS BEGIN INSERT INTO STAGE_TRANSACTIONS (ID, TRANSACTION_DATE, AMOUNT, TECH_LOAD_DATE, TECH_SOURCE_TABLE) SELECT ID, TRANSACTION_DATE, AMOUNT, SYSDATE, 'SOURCE_TRANSACTIONS' FROM SOURCE_TRANSACTIONS; COMMIT; END; /

📌 2.1.2 Időzített job létrehozása (DBMS_SCHEDULER)

Példa: Minden nap éjfélkor fusson

--- plsql

BEGIN DBMS_SCHEDULER.create_job ( job_name => 'JOB_LOAD_STAGE_TRANSACTIONS', job_type => 'PLSQL_BLOCK', job_action => 'BEGIN load_stage_transactions; END;', start_date => SYSTIMESTAMP, repeat_interval => 'FREQ=DAILY; BYHOUR=0; BYMINUTE=0; BYSECOND=0', enabled => TRUE ); END; /

📌 Ellenőrizhetjük a scheduler állapotát:

--- sql

SELECT job_name, state, last_start_date, next_run_date FROM dba_scheduler_jobs WHERE job_name = 'JOB_LOAD_STAGE_TRANSACTIONS';

🔹 Előnyök:
✅ Nem kell külső eszköz, minden az adatbázison belül fut
✅ Rugalmas időzítési beállítások


📌 2.2 Külső időzítés (Crontab vagy Python Scheduler)

2.2.1 Crontab (Linux/Mac esetén)

Ha az adatbetöltést egy külső szkript végzi, például Pythonból vagy SQLPlus-ból.

🔹 Példa: Minden nap éjfélkor futtat egy SQLPlus szkriptet

---bash

0 0 * * * /path/to/sqlplus user/password@db @load_stage.sql

2.2.2 Python Scheduler (apscheduler)

Ha a betöltési folyamatot Pythonból akarjuk vezérelni, használhatunk egy Python-based schedulert.

📌 Telepítés

---bash

pip install apscheduler cx_Oracle

📌 Python kód az adatbetöltéshez

--- python

from apscheduler.schedulers.blocking import BlockingScheduler import cx_Oracle def load_stage_transactions(): conn = cx_Oracle.connect("user/password@db") cursor = conn.cursor() cursor.execute(""" INSERT INTO STAGE_TRANSACTIONS (ID, TRANSACTION_DATE, AMOUNT, TECH_LOAD_DATE, TECH_SOURCE_TABLE) SELECT ID, TRANSACTION_DATE, AMOUNT, SYSDATE, 'SOURCE_TRANSACTIONS' FROM SOURCE_TRANSACTIONS """) conn.commit() cursor.close() conn.close() print("Stage betöltés sikeres!") scheduler = BlockingScheduler() scheduler.add_job(load_stage_transactions, 'cron', hour=0) # Minden nap éjfélkor futtatja scheduler.start()

🔹 Előnyök:
✅ Külső eszközről vezérelhető
✅ Logolás és hibakezelés beépíthető


Összegzés – Melyiket válasszuk?

MegoldásElőnyökHátrányok
DBMS_SCHEDULERBeépített, megbízható, nincs külső dependencyCsak adatbázison belüli futtatás
Crontab (Linux)Könnyű beállítás, shell script kompatibilisNem Windows-kompatibilis
Python SchedulerRugalmas, logolható, könnyen bővíthetőKülső függőségek, Python kell hozzá



3️⃣ Optimalizálás az adattárház stage töltéséhez

Az optimalizálás célja:
Gyorsabb betöltés
Kevesebb erőforrás-használat
Hatékonyabb indexelés és párhuzamos feldolgozás

Az optimalizáció három fő területre oszlik:

  1. Indexek és partíciókra optimalizált indexelés

  2. Párhuzamos feldolgozás (Parallel Processing)

  3. Bulk Insert vs. Merge stratégia


📌 3.1 Indexek – Milyen indexeket használjunk?

Stage táblákra általában kétféle indexet célszerű használni:
1️⃣ Bitmap Index – Ha kevés az egyedi érték, pl. státuszmező (STATUS)
2️⃣ B-tree Index – Ha nagy méretű egyedi értékekre keresünk, pl. TRANSACTION_ID


📌 3.1.1 B-tree index létrehozása (pl. azonosítókra)

--- sql

CREATE INDEX IDX_STAGE_TRANSACTIONS_ID ON STAGE_TRANSACTIONS (ID);

Előny: Gyors lekérdezés nagy mennyiségű adat esetén


📌 3.1.2 Bitmap index létrehozása (pl. státuszokra)

--- sql

CREATE BITMAP INDEX IDX_STAGE_TRANSACTIONS_STATUS ON STAGE_TRANSACTIONS (STATUS);

Előny: Kevés egyedi értéknél (pl. 'PENDING', 'COMPLETED', 'FAILED') sokkal gyorsabb


📌 3.1.3 Lokális index partíciózott táblákhoz

Ha partíciózott stage táblát használunk, az indexeket is partíciónként célszerű létrehozni.

--- sql

CREATE INDEX IDX_STAGE_TRANSACTIONS_DATE ON STAGE_TRANSACTIONS (TRANSACTION_DATE) LOCAL;

Előny: Gyorsabb lekérdezés egy adott időszakra


📌 3.2 Párhuzamos feldolgozás (Parallel Processing)

A nagy adattömeg betöltése több szálon párhuzamosan is történhet, így gyorsabb lesz.

📌 3.2.1 Parallel Hint használata az INSERT-ben

--- sql

INSERT /*+ PARALLEL(4) */ INTO STAGE_TRANSACTIONS SELECT * FROM SOURCE_TRANSACTIONS;

🔹 Mit jelent ez?
4 szálon párhuzamosan fut a betöltés


📌 3.2.2 Párhuzamos mód aktiválása a táblán

---sql

ALTER TABLE STAGE_TRANSACTIONS PARALLEL 4;

🔹 Előny:
Minden művelet (INSERT, SELECT, INDEX) párhuzamosan fut


📌 3.3 Bulk Insert vs. Merge – Melyiket használjuk?

MódszerElőnyMikor használjuk?
Bulk InsertGyorsabb, ha mindig teljes betöltést végzünkHa a teljes táblát töröljük és újratöltjük
Merge (Upsert)Megőrzi az adatokat, csak a módosult sorokat frissítiInkrementális töltéshez

📌 3.3.1 Bulk Insert – Ha minden rekordot újratöltünk

---sql

TRUNCATE TABLE STAGE_TRANSACTIONS; INSERT /*+ APPEND PARALLEL(4) */ INTO STAGE_TRANSACTIONS SELECT * FROM SOURCE_TRANSACTIONS; COMMIT;

🔹 Mit csinál?
Törli a táblát és újra feltölti
Nagyon gyors, ha nincs szükség előző adatokra


📌 3.3.2 Merge – Ha csak a módosult sorokat frissítjük

--- sql

MERGE INTO STAGE_TRANSACTIONS tgt USING SOURCE_TRANSACTIONS src ON (tgt.ID = src.ID) WHEN MATCHED THEN UPDATE SET tgt.AMOUNT = src.AMOUNT, tgt.TRANSACTION_DATE = src.TRANSACTION_DATE WHEN NOT MATCHED THEN INSERT (ID, TRANSACTION_DATE, AMOUNT, TECH_LOAD_DATE, TECH_SOURCE_TABLE) VALUES (src.ID, src.TRANSACTION_DATE, src.AMOUNT, SYSDATE, 'SOURCE_TRANSACTIONS');

🔹 Mit csinál?
Ha az ID létezik, frissíti az adatokat
Ha az ID még nem létezik, beszúrja az új rekordot


📌 Összegzés: Mit érdemes használni?

TémaJavasolt megoldás
IndexekB-tree index az egyedi mezőkre, bitmap index a kategóriákra, lokális index partícióknál
PárhuzamosításParallel Processing (Parallel hint, táblák parallel módba állítása)
Bulk Insert vagy Merge?Bulk Insert teljes törlésnél, Merge inkrementális töltésnél


🔹 1. TEMP Tables – Ideiglenes táblák használata

Nagy adattömeg esetén a közvetlen betöltés helyett érdemes ideiglenes táblákba (Global Temporary Tables, GTT) írni, majd onnan átrakni az adatokat.

🔹 Miért hasznos?
✅ Csökkenti a versengést a Stage tábla és az éles rendszer között
✅ Gyorsítja az írási műveleteket

📌 1.1 Ideiglenes tábla létrehozása

--- sql

CREATE GLOBAL TEMPORARY TABLE TMP_STAGE_TRANSACTIONS ( ID NUMBER, TRANSACTION_DATE DATE, AMOUNT NUMBER, TECH_LOAD_DATE DATE ) ON COMMIT PRESERVE ROWS;

ON COMMIT PRESERVE ROWS – A session végéig megtartja az adatokat

📌 1.2 Használat betöltéskor

--- sql

INSERT /*+ APPEND */ INTO TMP_STAGE_TRANSACTIONS SELECT * FROM SOURCE_TRANSACTIONS; INSERT INTO STAGE_TRANSACTIONS SELECT * FROM TMP_STAGE_TRANSACTIONS; COMMIT;

🔹 Előnyök:

  • Gyorsabb INSERT, mert az ideiglenes tábla nem ír redo logot

  • Csak az éles Stage táblába történő második INSERT generál redo logot


🔹 2. NOLOGGING – Redo log minimalizálása

Az UNDO és REDO logok felesleges írása lassíthatja a betöltést.

🔹 Megoldás: NOLOGGING és Direct Path Insert

---sql

ALTER TABLE STAGE_TRANSACTIONS NOLOGGING; INSERT /*+ APPEND */ INTO STAGE_TRANSACTIONS SELECT * FROM SOURCE_TRANSACTIONS; COMMIT;

Mit csinál?

  • A NOLOGGING letiltja a redo log írást

  • Az APPEND direct path insertet használ

🔹 Figyelem!

  • Ha a NOLOGGING aktív, az adatok nem állíthatók vissza egy DB crash után

  • Csak akkor használd, ha az adatok újragenerálhatók


🔹 3. Partition Exchange Load (PEL) – Partíciócsere betöltéshez

Ha a Stage tábla partícionált, sokkal gyorsabb lehet a betöltés egy partíciócserével.

📌 3.1 Lépések

1️⃣ Létrehozunk egy ideiglenes Stage táblát (ugyanazzal a struktúrával)
2️⃣ Ide töltjük be az adatokat
3️⃣ Partíciócsere a fő Stage táblával

--- sql

ALTER TABLE STAGE_TRANSACTIONS EXCHANGE PARTITION P202403 WITH TABLE TMP_STAGE_TRANSACTIONS;

Előny: A csere szinte azonnali, nem kell fizikailag mozgatni az adatokat!


🔹 4. Adaptive Query Optimization – Oracle automatikus finomhangolása

Ha Oracle 12c vagy újabb verziót használsz, akkor a SQL optimalizálás automatikusan is megtörténhet.

📌 4.1 Adaptive Query Optimization engedélyezése

---sql

ALTER SESSION SET optimizer_adaptive_features = TRUE;

Automatikusan optimalizálja a Parallel Query Execution-t
Megfigyeli a futási statisztikákat és módosítja a futási terveket


🔹 5. Bulk Collect és FORALL (PL/SQL-ben)

Ha a betöltést PL/SQL ciklusokkal végzed, a Bulk Collect és FORALL használata sokkal gyorsabb lesz!

📌 5.1 Példa Bulk Collect-re

--- plsql

DECLARE TYPE t_stage IS TABLE OF STAGE_TRANSACTIONS%ROWTYPE; v_data t_stage; BEGIN SELECT * BULK COLLECT INTO v_data FROM SOURCE_TRANSACTIONS; FORALL i IN 1..v_data.COUNT INSERT INTO STAGE_TRANSACTIONS VALUES v_data(i); COMMIT; END; /

Előny: Egyetlen művelettel több ezer sort írhatunk be egyszerre, nem soronként!


Összegzés: Melyik optimalizálási technika mikor hasznos?

Optimalizálási technikaElőnyMikor használd?
Temp Tables (GTT)Gyorsabb ideiglenes írásHa a Stage tábla versengést okoz
NOLOGGINGMinimalizálja a redo logotHa az adatok újragenerálhatók
Partition Exchange Load (PEL)Szinte azonnali partíciócsereHa a Stage tábla partícionált
Adaptive Query OptimizationOracle automatikusan finomhangoljaHa Oracle 12c+ verziót használsz
Bulk Collect + FORALLPL/SQL tömeges adatkezelésHa PL/SQL procedúrában töltöd az adatokat



4️⃣ Időzített futtatás az adattárház stage töltéséhez

Az időzített futtatás célja:
Automatikus futtatás meghatározott időközönként
Terheléselosztás (pl. éjszakai futtatás)
Hibakezelési mechanizmusok beépítése

Az időzített futtatás három fő módszere:

  1. Oracle DBMS_SCHEDULER (ajánlott) – Beépített ütemező az Oracle-ben

  2. Crontab/Python Scheduler – Külső ütemezők

  3. Event-Driven Scheduling – Adatváltozások alapján triggerelve


📌 4.1 DBMS_SCHEDULER – Az Oracle beépített ütemezője

Az Oracle DBMS_SCHEDULER egy fejlett ütemező, amely lehetővé teszi PL/SQL eljárások vagy SQL szkriptek futtatását időzítve.

📌 4.1.1 Ütemezett Stage töltés létrehozása

Ebben a példában a betöltés minden nap éjfélkor (00:00) lefut:

---sql

BEGIN DBMS_SCHEDULER.CREATE_JOB ( job_name => 'STAGE_TABLE_LOAD', job_type => 'PLSQL_BLOCK', job_action => 'BEGIN LOAD_STAGE_TRANSACTIONS; END;', start_date => SYSTIMESTAMP, repeat_interval => 'FREQ=DAILY; BYHOUR=0; BYMINUTE=0; BYSECOND=0', enabled => TRUE ); END; /

Mit csinál?

  • Naponta, éjfélkor futtatja a LOAD_STAGE_TRANSACTIONS eljárást

  • Automatikusan elindul minden nap


📌 4.1.2 Futtatás 10 percenként (gyors frissítés esetén)

---sql

BEGIN DBMS_SCHEDULER.CREATE_JOB ( job_name => 'STAGE_TABLE_LOAD_FREQUENT', job_type => 'PLSQL_BLOCK', job_action => 'BEGIN LOAD_STAGE_TRANSACTIONS; END;', start_date => SYSTIMESTAMP, repeat_interval => 'FREQ=MINUTELY; INTERVAL=10', enabled => TRUE ); END; /

Mit csinál?

  • Minden 10 percben újra lefuttatja a Stage töltést


📌 4.1.3 Az ütemezett futtatás ellenőrzése

---sql

SELECT job_name, state, next_run_date FROM dba_scheduler_jobs WHERE job_name = 'STAGE_TABLE_LOAD';

🔹 Mit látunk?

  • STATE = RUNNING → Jelenleg fut

  • STATE = SCHEDULED → Vár a következő indításra


📌 4.1.4 Ütemezés letiltása vagy törlése

Ha le szeretnéd állítani az ütemezett futtatást:

---sql

BEGIN DBMS_SCHEDULER.DISABLE('STAGE_TABLE_LOAD'); END; /

Ha teljesen törölni szeretnéd:

--- sql

BEGIN DBMS_SCHEDULER.DROP_JOB('STAGE_TABLE_LOAD'); END; /

📌 4.2 Crontab vagy Python alapú ütemezés

Ha Oracle-on kívüli megoldást keresel (pl. Python vagy Linux környezetben):

📌 4.2.1 Crontab – Unix/Linux időzített futtatás

🔹 Crontab beállítás (éjfélkor történő futtatásra)

--- bash

0 0 * * * /path/to/script.sh

Mit csinál?

  • Minden nap éjfélkor futtat egy shell scriptet

Ha a script egy SQL parancsot futtat, akkor így nézhet ki:

--- bash

sqlplus user/password@db <<EOF EXEC LOAD_STAGE_TRANSACTIONS; EXIT; EOF

📌 4.2.2 Python alapú időzítés

A Python schedule modulját használhatjuk, ha egy külső Python alkalmazásból vezéreljük a futtatást.

--- python

import schedule import time import cx_Oracle def run_stage_load(): connection = cx_Oracle.connect("user/password@db") cursor = connection.cursor() cursor.callproc("LOAD_STAGE_TRANSACTIONS") connection.commit() cursor.close() connection.close() print("Stage betöltés futott.") # Ütemezés minden éjfélkor schedule.every().day.at("00:00").do(run_stage_load) while True: schedule.run_pending() time.sleep(60)

Mit csinál?

  • Minden nap éjfélkor futtatja a Stage betöltést

  • Automatikusan újrafuttatja hiba esetén


📌 4.3 Event-Driven Scheduling – Ha az adatváltozás indítja el a betöltést

Ha nem időzítés, hanem az új adatok érkezése alapján akarjuk indítani a betöltést, akkor TRIGGER-eket vagy Change Data Capture (CDC) megoldásokat használhatunk.

📌 4.3.1 Triggeres megoldás – Ha a forrástábla frissül, indul a betöltés

--- sql

CREATE OR REPLACE TRIGGER trg_source_update AFTER INSERT OR UPDATE ON SOURCE_TRANSACTIONS FOR EACH ROW BEGIN DBMS_SCHEDULER.RUN_JOB('STAGE_TABLE_LOAD'); END; /

Mit csinál?

  • Ha új adat érkezik a forrástáblába, automatikusan elindítja a betöltést

🔹 Hátrány

  • Nagyon sűrű frissítések esetén túl sok futtatást generálhat


📌 4.4 Összegzés: Melyik időzítési módszert mikor használd?

Időzítési módszerElőnyMikor használd?
DBMS_SCHEDULERBeépített, könnyen kezelhető, Oracle belső megoldásHa az adatbázison belül kell időzíteni
Crontab/Linux SchedulerEgyszerű, stabil, OS-alapúHa shell script vagy külső rendszer vezérli a futtatást
Python SchedulerRugalmas, más rendszerekbe is beépíthetőHa egy Python alkalmazás részeként kell kezelni
Event-Driven Scheduling (Trigger)Azonnali feldolgozás, ha változik az adatHa csak akkor kell futnia, amikor új adat érkezik


5️⃣ Optimalizálás az adattárház stage töltéséhez

A stage tábla töltésének optimalizálása kulcsfontosságú az adatbetöltési idő csökkentése, a CPU/memória kihasználtság optimalizálása, valamint a tárolási költségek minimalizálása érdekében.

Az optimalizálás fő területei:

  1. Indexek és tömörítés

  2. Parallel Processing – párhuzamos végrehajtás

  3. Bulk Insert vs. Merge – tömeges beszúrás vs. intelligens frissítés

  4. SQL optimalizálás és hint-ek

  5. I/O műveletek csökkentése (Direct Path Load)


📌 5.1 Indexek és tömörítés

A stage tábla esetében az indexek és tömörítés megfelelő beállítása jelentősen javíthatja a betöltési teljesítményt.

📌 5.1.1 Indexek kezelése betöltéskor

Ha a stage táblán túl sok index van, az lassíthatja a beszúrást. Megoldás:
Beszúrás előtt az indexek letiltása, utána újraépítésük:

sql
ALTER INDEX stage_table_idx UNUSABLE; INSERT INTO stage_table (...) SELECT ... FROM source_table; ALTER INDEX stage_table_idx REBUILD;

Ha nagy mennyiségű adatot töltesz, az indexek újraépítése gyorsabb lehet, mint ha folyamatosan frissítenéd őket.


📌 5.1.2 Tábla tömörítés a tárhely csökkentése érdekében

Az Oracle table compression funkciója csökkentheti a tárhelyigényt és gyorsíthatja a lekérdezéseket.

Tömörített Stage tábla létrehozása (OLTP mód)

sql
CREATE TABLE stage_table ( id NUMBER, name VARCHAR2(100), created_at DATE ) COMPRESS FOR OLTP;

Előny: A kisebb méretű tárolás gyorsabb olvasást biztosít.
🔹 Hátrány: Nagyon gyakori DML műveletek esetén CPU-t használ.


📌 5.2 Parallel Processing – Párhuzamos végrehajtás

Az Oracle parallel query és parallel DML lehetőséget biztosít a nagyobb teljesítményű adatmozgatásra.

📌 5.2.1 Párhuzamos beszúrás engedélyezése

sql
ALTER SESSION ENABLE PARALLEL DML; INSERT /*+ APPEND PARALLEL(4) */ INTO stage_table SELECT /*+ PARALLEL(4) */ * FROM source_table;

Mit csinál?

  • Parallel(4) → 4 szálon futtatja a beszúrást

  • Append → Közvetlenül a szegmens végére ír, minimalizálva az UNDO-t


📌 5.2.2 Táblaszintű parallel beállítás

Ha egy tábla mindig párhuzamosan legyen feldolgozva:

sql
ALTER TABLE stage_table PARALLEL 4;

🔹 Megjegyzés: Ha a tábla mérete kicsi (<1M sor), a párhuzamosítás nem biztos, hogy hasznos.


📌 5.3 Bulk Insert vs. Merge – Tömeges beszúrás vagy frissítés?

Ha a stage tábla mindig teljes töltést kap, az INSERT + TRUNCATE gyorsabb.
Ha csak az új rekordokat kell frissíteni, a MERGE a hatékonyabb.

📌 5.3.1 Bulk Insert – Ha mindig teljes töltést végzünk

sql
TRUNCATE TABLE stage_table; INSERT /*+ APPEND */ INTO stage_table SELECT * FROM source_table;

Gyors, mert nincs UPDATE művelet
🔹 Csak akkor jó, ha nincs szükség történeti adatokra.


📌 5.3.2 Merge – Ha az új és meglévő adatok keverednek

sql
MERGE INTO stage_table t USING source_table s ON (t.id = s.id) WHEN MATCHED THEN UPDATE SET t.name = s.name, t.updated_at = SYSTIMESTAMP WHEN NOT MATCHED THEN INSERT (id, name, created_at) VALUES (s.id, s.name, SYSTIMESTAMP);

Hatékony, ha az adatok változnak és nem akarunk duplikációt
🔹 Lassabb, mint egy egyszerű beszúrás, mert összehasonlítja a meglévő rekordokat.


📌 5.4 SQL optimalizálás és hint-ek

Ha az SQL teljesítményét akarjuk növelni, használhatunk hint-eket.

📌 5.4.1 Full Table Scan vs. Index használat

Ha a tábla kicsi, indexet érdemes használni:

sql
SELECT /*+ INDEX(stage_table stage_table_idx) */ * FROM stage_table;

Ha a tábla nagy és szinte az összes sort beolvassuk, inkább full table scan:

sql
SELECT /*+ FULL(stage_table) */ * FROM stage_table;

📌 5.4.2 Query Rewriting – Materialized View

Ha a stage adatok gyakran változnak, egy materialized view gyorsíthatja a betöltést.

---sql

CREATE MATERIALIZED VIEW stage_mv REFRESH FAST AS SELECT * FROM source_table;

Előny: Az adatok inkrementálisan frissülnek, gyorsabb lekérdezések
🔹 Hátrány: Extra karbantartást igényel


📌 5.5 I/O műveletek csökkentése (Direct Path Load)

Ha nagy adatmennyiséget töltünk be, az Oracle Direct Path Load csökkentheti az UNDO és REDO terhelést.

📌 5.5.1 Direct Path Load az INSERT-ben

--- sql

INSERT /*+ APPEND */ INTO stage_table SELECT * FROM source_table;

Előny: Nem használ UNDO/REDO naplózást, gyorsabb írás
🔹 Hátrány: Nem lehet rollback-elni

Ha ez nem elég, az SQL*Loader-t is használhatjuk.


📌 5.6 Összegzés: Melyik optimalizálási módszert mikor használd?

Optimalizálási technikaElőnyMikor használd?
Indexek letiltása és újraépítéseBeszúrás gyorsításaHa sok index van
Táblatömörítés (OLTP/Hybrid)Kevesebb tárhely, gyorsabb olvasásHa nagy az adatmennyiség
Parallel ProcessingTöbbszálú végrehajtásHa nagyobb adatmennyiséget kell betölteni
Bulk Insert (TRUNCATE + INSERT)Gyors teljes újratöltésHa nincs szükség az előző adatokra
MERGE (UPSERT)Adatfrissítés duplikáció nélkülHa az adatok változnak
SQL optimalizálás (Hint-ek)Index vagy Full Table Scan beállításaHa az Oracle nem optimálisan választja meg a végrehajtási tervet
Direct Path LoadUNDO/REDO terhelés csökkentéseNagy mennyiségű adatnál



Konkrét példák


Adatgyűjtés:

  • Forrás: egy OLTP rendszerben található tranzakciós tábla (SOURCE_TRANSACTIONS).

  • Az adatok kinyerése egy Oracle External Table segítségével:

---sql

CREATE TABLE ext_transactions ( transaction_id NUMBER, customer_id NUMBER, amount NUMBER(10,2), transaction_date DATE ) ORGANIZATION EXTERNAL ( TYPE ORACLE_LOADER DEFAULT DIRECTORY data_dir ACCESS PARAMETERS ( RECORDS DELIMITED BY NEWLINE FIELDS TERMINATED BY ',' (transaction_id, customer_id, amount, transaction_date DATE 'YYYY-MM-DD') ) LOCATION ('transactions.csv') );


Betöltés és szum rekordszám ellenőrzés:


-- 1. Metaadat tábla létrehozása a forrásrendszeri tábla adataihoz
CREATE TABLE source_metadata (
    table_name VARCHAR2(128),
    column_name VARCHAR2(128),
    data_type VARCHAR2(128),
    PRIMARY KEY (table_name, column_name)
);
-- 2. Stage tábla generálásának ellenőrzése és létrehozása
CREATE OR REPLACE PROCEDURE generate_stage_table(p_source_table VARCHAR2) AS
    v_stage_table VARCHAR2(128);
    v_sql CLOB;
BEGIN
    v_stage_table := 'STG_' || p_source_table;
    v_sql := 'CREATE TABLE ' || v_stage_table || ' (';
    FOR rec IN (SELECT column_name, data_type FROM source_metadata WHERE table_name = p_source_table) LOOP
        v_sql := v_sql || rec.column_name || ' ' || rec.data_type || ', ';
    END LOOP;
    v_sql := v_sql || 'load_timestamp TIMESTAMP DEFAULT SYSTIMESTAMP, batch_id NUMBER)';
    EXECUTE IMMEDIATE v_sql;
END;
/
-- 3. Adatok betöltő SQL generálása
CREATE OR REPLACE PROCEDURE generate_insert_sql(p_source_table VARCHAR2) AS
    v_stage_table VARCHAR2(128);
    v_sql CLOB;
BEGIN
    v_stage_table := 'STG_' || p_source_table;
    v_sql := 'INSERT INTO ' || v_stage_table || ' (';
    FOR rec IN (SELECT column_name FROM source_metadata WHERE table_name = p_source_table) LOOP
        v_sql := v_sql || rec.column_name || ', ';
    END LOOP;
    v_sql := v_sql || 'load_timestamp, batch_id) SELECT ';
    FOR rec IN (SELECT column_name FROM source_metadata WHERE table_name = p_source_table) LOOP
        v_sql := v_sql || rec.column_name || ', ';
    END LOOP;
    v_sql := v_sql || 'SYSTIMESTAMP, :batch_id FROM ' || p_source_table;
    EXECUTE IMMEDIATE v_sql USING 1;
END;
/
-- 4.1. Betöltés ellenőrző SQL
CREATE OR REPLACE FUNCTION validate_load(p_source_table VARCHAR2) RETURN NUMBER AS
    v_stage_table VARCHAR2(128);
    v_count_source NUMBER;
    v_count_stage NUMBER;
BEGIN
    v_stage_table := 'STG_' || p_source_table;
    EXECUTE IMMEDIATE 'SELECT COUNT(*) FROM ' || p_source_table INTO v_count_source;
    EXECUTE IMMEDIATE 'SELECT COUNT(*) FROM ' || v_stage_table INTO v_count_stage;
    RETURN CASE WHEN v_count_source = v_count_stage THEN 1 ELSE 0 END;
END;
/
-- 4.2. Betöltés ellenőrző SQL
CREATE OR REPLACE FUNCTION validate_load(p_source_table VARCHAR2) RETURN CLOB AS
    v_stage_table VARCHAR2(128);
    v_sql CLOB;
    v_result CLOB := '';
BEGIN
    v_stage_table := 'STG_' || p_source_table;
    v_sql := 'SELECT s.*, t.* FROM ' || p_source_table || ' s FULL OUTER JOIN ' || v_stage_table || ' t ON (';
    FOR rec IN (SELECT column_name FROM source_metadata WHERE table_name = p_source_table) LOOP
        v_sql := v_sql || 's.' || rec.column_name || ' = t.' || rec.column_name || ' AND ';
    END LOOP;
    v_sql := RTRIM(v_sql, ' AND ');
    v_sql := v_sql || ') WHERE (';
    FOR rec IN (SELECT column_name FROM source_metadata WHERE table_name = p_source_table) LOOP
        v_sql := v_sql || 's.' || rec.column_name || ' <> t.' || rec.column_name || ' OR ';
    END LOOP;
    v_sql := RTRIM(v_sql, ' OR ');
    v_sql := v_sql || ') OR s.' || (SELECT MIN(column_name) FROM source_metadata WHERE table_name = p_source_table) || ' IS NULL OR t.' || (SELECT MIN(column_name) FROM source_metadata WHERE table_name = p_source_table) || ' IS NULL';
    RETURN v_sql;
END;
/

-- 4.3. Betöltés ellenőrző SQL generálása
CREATE OR REPLACE FUNCTION validate_load(p_source_table VARCHAR2) RETURN CLOB AS
    v_stage_table VARCHAR2(128);
    v_sql CLOB;
    v_result CLOB := '';
BEGIN
    v_stage_table := 'STG_' || p_source_table;
    v_sql := 'SELECT s.*, t.*, CASE ';
    FOR rec IN (SELECT column_name FROM source_metadata WHERE table_name = p_source_table) LOOP
        v_sql := v_sql || 'WHEN s.' || rec.column_name || ' IS DISTINCT FROM t.' || rec.column_name || ' THEN ''' || rec.column_name || ' MISMATCH'' ELSE NULL END AS ' || rec.column_name || '_diff, ';
    END LOOP;
    v_sql := RTRIM(v_sql, ', ');
    v_sql := v_sql || ' FROM ' || p_source_table || ' s FULL OUTER JOIN ' || v_stage_table || ' t ON (';
    FOR rec IN (SELECT column_name FROM source_metadata WHERE table_name = p_source_table) LOOP
        v_sql := v_sql || 's.' || rec.column_name || ' = t.' || rec.column_name || ' AND ';
    END LOOP;
    v_sql := RTRIM(v_sql, ' AND ');
    v_sql := v_sql || ') WHERE (';
    FOR rec IN (SELECT column_name FROM source_metadata WHERE table_name = p_source_table) LOOP
        v_sql := v_sql || 's.' || rec.column_name || ' IS DISTINCT FROM t.' || rec.column_name || ' OR ';
    END LOOP;
    v_sql := RTRIM(v_sql, ' OR ');
    v_sql := v_sql || ') OR s.' || (SELECT MIN(column_name) FROM source_metadata WHERE table_name = p_source_table) || ' IS NULL OR t.' || (SELECT MIN(column_name) FROM source_metadata WHERE table_name = p_source_table) || ' IS NULL';
    RETURN v_sql;
END;
/


Python vezérlő kód minta stage betöltéshez


--- Python
import cx_Oracle
def connect_to_db(user, password, dsn):
    """Kapcsolódás az Oracle adatbázishoz."""
    connection = cx_Oracle.connect(user, password, dsn)
    return connection
def load_metadata(cursor, source_table):
    """Lekéri a forrástábla oszlopait és adattípusait, majd beilleszti a source_metadata táblába."""
    cursor.execute("SELECT column_name, data_type FROM all_tab_columns WHERE table_name = :1", [source_table.upper()])
    columns = cursor.fetchall()
    cursor.execute("DELETE FROM source_metadata WHERE table_name = :1", [source_table])
    for column, data_type in columns:
        cursor.execute("INSERT INTO source_metadata (table_name, column_name, data_type) VALUES (:1, :2, :3)", 
                       [source_table, column, data_type])
def generate_stage_table(cursor, source_table):
    """Létrehozza a stage táblát a metaadatok alapján."""
    cursor.execute("SELECT column_name, data_type FROM source_metadata WHERE table_name = :1", [source_table])
    columns = cursor.fetchall()
    stage_table = f"STG_{source_table}"
    sql = f"CREATE TABLE {stage_table} ("
    sql += ", ".join([f"{col} {dtype}" for col, dtype in columns])
    sql += ", load_timestamp TIMESTAMP DEFAULT SYSTIMESTAMP, batch_id NUMBER)"
    cursor.execute(f"DROP TABLE {stage_table} PURGE")  # Törli, ha már létezik
    cursor.execute(sql)
def generate_insert_sql(cursor, source_table):
    """Generál egy INSERT INTO SQL-t a stage tábla betöltésére."""
    cursor.execute("SELECT column_name FROM source_metadata WHERE table_name = :1", [source_table])
    columns = [col[0] for col in cursor.fetchall()]
    stage_table = f"STG_{source_table}"
    sql = f"INSERT INTO {stage_table} ({', '.join(columns)}, load_timestamp, batch_id) "
    sql += f"SELECT {', '.join(columns)}, SYSTIMESTAMP, :batch_id FROM {source_table}"
    cursor.execute(sql, [1])
def validate_load(cursor, source_table):
    """Ellenőrzi a forrás és stage tábla közti eltéréseket."""
    cursor.execute("SELECT column_name FROM source_metadata WHERE table_name = :1", [source_table])
    columns = [col[0] for col in cursor.fetchall()]
    stage_table = f"STG_{source_table}"
    validation_sql = f"SELECT s.*, t.*, CASE "
    for col in columns:
        validation_sql += f"WHEN s.{col} IS DISTINCT FROM t.{col} THEN '{col} MISMATCH' ELSE NULL END AS {col}_diff, "
    validation_sql = validation_sql.rstrip(', ')
    validation_sql += f" FROM {source_table} s FULL OUTER JOIN {stage_table} t ON ("
    validation_sql += " AND ".join([f"s.{col} = t.{col}" for col in columns])
    validation_sql += ") WHERE ("
    validation_sql += " OR ".join([f"s.{col} IS DISTINCT FROM t.{col}" for col in columns])
    validation_sql += ")"
    cursor.execute(validation_sql)
    return cursor.fetchall()
if __name__ == "__main__":
    user, password, dsn = "your_user", "your_password", "your_dsn"
    source_table = "YOUR_SOURCE_TABLE"
    conn = connect_to_db(user, password, dsn)
    cur = conn.cursor()
    load_metadata(cur, source_table)
    generate_stage_table(cur, source_table)
    generate_insert_sql(cur, source_table)
    mismatches = validate_load(cur, source_table)
    print("Eltérések:", mismatches)
    conn.commit()
    cur.close()
    conn.close()






Megjegyzések