Alap funkcionalitás
További kiegészítési, javítási és optimalizálási fejezetek
1️⃣ Partíciókezelés
🔹 Range-partíciózás: Dátum szerint (pl. LOAD_DATE
alapján havonta)
🔹 Hash-partíciózás: Nagy táblák egyenletes elosztásához
🔹 List-partíciózás: Ha előre definiált kategóriák (pl. ország, régió) szerint osztanánk fel
2️⃣ Időzített futtatás
🔹 Oracle DBMS_SCHEDULER: PL/SQL alapon beállítható időzítés
🔹 Crontab/Python Scheduler: Ha külső szkripttel (pl. Python) szeretnéd időzíteni
3️⃣ Optimalizálás
🔹 Indexek: A forrás- és stage-táblák optimalizálása
🔹 Parallel Processing: Több szálas betöltés
🔹 Bulk Insert vs. Merge: Teljesítmény és inkrementális töltés javítása
4️⃣ Időzített futtatás az adattárház stage töltéséhez
5️⃣ Optimalizálás az adattárház stage töltéséhez
1️⃣ Partíciókezelés adattárház stage táblákhoz
A partíciózás nagyobb adatmennyiségeknél segít a lekérdezési és betöltési teljesítmény javításában.
📌 1.1 Range-partíciózás (dátum szerint)
👉 Ha a LOAD_DATE
vagy LAST_UPDATE_DATE
mező alapján havonta bontjuk az adatokat.
📌 Példa: Havi bontású partíciók
🔹 Előnyök:
✅ Régebbi adatokat könnyen archiválhatunk (DROP PARTITION).
✅ Új hónapokhoz új partíciót kell létrehozni.
📌 1.2 Hash-partíciózás (nagy táblák egyenletes elosztásához)
👉 Ha nincs jól definiált időbélyeg, és egyenletesen akarjuk elosztani az adatokat.
📌 Példa: Hash-partíciózás ID
szerint 4 partícióba
🔹 Előnyök:
✅ Egyenletes terhelés, elkerüli a "forró" partíciókat.
✅ Nem kell manuálisan karbantartani az új időszakokhoz.
📌 1.3 List-partíciózás (előre definiált kategóriák szerint)
👉 Ha egy adott mező (pl. ország, régió) szerint szeretnénk szeparálni az adatokat.
📌 Példa: Országonkénti bontás
🔹 Előnyök:
✅ Országonkénti gyors keresés (pl. WHERE COUNTRY='USA'
).
✅ Speciális tárolási beállítások országok szerint.
📌 1.4 Dinamikus partíciókezelés (új partíció létrehozása)
Ha egy új hónaphoz új partíció kell, ezt automatizálhatjuk PL/SQL triggerrel vagy DBMS_SCHEDULER-rel.
📌 Új havi partíció automatikus létrehozása
🔹 Előnyök:
✅ Nem kell manuálisan létrehozni az új hónapokat.
✅ Napi/heti/havi időzített futtatás DBMS_SCHEDULER-rel.
2️⃣ Időzített futtatás (Scheduler megoldások)
A stage táblák töltését időzítve kell futtatni, hogy az adatok frissüljenek és optimalizáltan érkezzenek. Ehhez két fő megoldás létezik:
-
Oracle DBMS_SCHEDULER – Adatbázison belüli időzítés PL/SQL-lel
-
Külső időzítés (Crontab/Python Scheduler) – Ha az adatbetöltés külső szkriptekből indul
📌 2.1 Oracle DBMS_SCHEDULER – PL/SQL alapú időzítés
👉 Ha a betöltést az adatbázison belülről akarjuk kezelni
Lépések
1️⃣ Létrehozunk egy tárolt eljárást, amely a betöltést végzi
2️⃣ Létrehozunk egy DBMS_SCHEDULER jobot, ami időzítve futtatja
📌 2.1.1 Tárolt eljárás a stage töltéshez
Ez az eljárás betölti a forrás adatait a stage táblába.
📌 2.1.2 Időzített job létrehozása (DBMS_SCHEDULER)
Példa: Minden nap éjfélkor fusson
📌 Ellenőrizhetjük a scheduler állapotát:
🔹 Előnyök:
✅ Nem kell külső eszköz, minden az adatbázison belül fut
✅ Rugalmas időzítési beállítások
📌 2.2 Külső időzítés (Crontab vagy Python Scheduler)
2.2.1 Crontab (Linux/Mac esetén)
Ha az adatbetöltést egy külső szkript végzi, például Pythonból vagy SQLPlus-ból.
🔹 Példa: Minden nap éjfélkor futtat egy SQLPlus szkriptet
2.2.2 Python Scheduler (apscheduler)
Ha a betöltési folyamatot Pythonból akarjuk vezérelni, használhatunk egy Python-based schedulert.
📌 Telepítés
📌 Python kód az adatbetöltéshez
🔹 Előnyök:
✅ Külső eszközről vezérelhető
✅ Logolás és hibakezelés beépíthető
Összegzés – Melyiket válasszuk?
Megoldás | Előnyök | Hátrányok |
---|---|---|
DBMS_SCHEDULER | Beépített, megbízható, nincs külső dependency | Csak adatbázison belüli futtatás |
Crontab (Linux) | Könnyű beállítás, shell script kompatibilis | Nem Windows-kompatibilis |
Python Scheduler | Rugalmas, logolható, könnyen bővíthető | Külső függőségek, Python kell hozzá |
3️⃣ Optimalizálás az adattárház stage töltéséhez
Az optimalizálás célja:
✅ Gyorsabb betöltés
✅ Kevesebb erőforrás-használat
✅ Hatékonyabb indexelés és párhuzamos feldolgozás
Az optimalizáció három fő területre oszlik:
-
Indexek és partíciókra optimalizált indexelés
-
Párhuzamos feldolgozás (Parallel Processing)
-
Bulk Insert vs. Merge stratégia
📌 3.1 Indexek – Milyen indexeket használjunk?
Stage táblákra általában kétféle indexet célszerű használni:
1️⃣ Bitmap Index – Ha kevés az egyedi érték, pl. státuszmező (STATUS
)
2️⃣ B-tree Index – Ha nagy méretű egyedi értékekre keresünk, pl. TRANSACTION_ID
📌 3.1.1 B-tree index létrehozása (pl. azonosítókra)
✅ Előny: Gyors lekérdezés nagy mennyiségű adat esetén
📌 3.1.2 Bitmap index létrehozása (pl. státuszokra)
✅ Előny: Kevés egyedi értéknél (pl. 'PENDING', 'COMPLETED', 'FAILED') sokkal gyorsabb
📌 3.1.3 Lokális index partíciózott táblákhoz
Ha partíciózott stage táblát használunk, az indexeket is partíciónként célszerű létrehozni.
✅ Előny: Gyorsabb lekérdezés egy adott időszakra
📌 3.2 Párhuzamos feldolgozás (Parallel Processing)
A nagy adattömeg betöltése több szálon párhuzamosan is történhet, így gyorsabb lesz.
📌 3.2.1 Parallel Hint használata az INSERT-ben
🔹 Mit jelent ez?
✅ 4 szálon párhuzamosan fut a betöltés
📌 3.2.2 Párhuzamos mód aktiválása a táblán
🔹 Előny:
✅ Minden művelet (INSERT, SELECT, INDEX) párhuzamosan fut
📌 3.3 Bulk Insert vs. Merge – Melyiket használjuk?
Módszer | Előny | Mikor használjuk? |
---|---|---|
Bulk Insert | Gyorsabb, ha mindig teljes betöltést végzünk | Ha a teljes táblát töröljük és újratöltjük |
Merge (Upsert) | Megőrzi az adatokat, csak a módosult sorokat frissíti | Inkrementális töltéshez |
📌 3.3.1 Bulk Insert – Ha minden rekordot újratöltünk
🔹 Mit csinál?
✅ Törli a táblát és újra feltölti
✅ Nagyon gyors, ha nincs szükség előző adatokra
📌 3.3.2 Merge – Ha csak a módosult sorokat frissítjük
🔹 Mit csinál?
✅ Ha az ID létezik, frissíti az adatokat
✅ Ha az ID még nem létezik, beszúrja az új rekordot
📌 Összegzés: Mit érdemes használni?
Téma | Javasolt megoldás |
---|---|
Indexek | B-tree index az egyedi mezőkre, bitmap index a kategóriákra, lokális index partícióknál |
Párhuzamosítás | Parallel Processing (Parallel hint, táblák parallel módba állítása) |
Bulk Insert vagy Merge? | Bulk Insert teljes törlésnél, Merge inkrementális töltésnél |
🔹 1. TEMP Tables – Ideiglenes táblák használata
Nagy adattömeg esetén a közvetlen betöltés helyett érdemes ideiglenes táblákba (Global Temporary Tables, GTT) írni, majd onnan átrakni az adatokat.
🔹 Miért hasznos?
✅ Csökkenti a versengést a Stage tábla és az éles rendszer között
✅ Gyorsítja az írási műveleteket
📌 1.1 Ideiglenes tábla létrehozása
✅ ON COMMIT PRESERVE ROWS – A session végéig megtartja az adatokat
📌 1.2 Használat betöltéskor
🔹 Előnyök:
-
Gyorsabb INSERT, mert az ideiglenes tábla nem ír redo logot
-
Csak az éles Stage táblába történő második INSERT generál redo logot
🔹 2. NOLOGGING – Redo log minimalizálása
Az UNDO és REDO logok felesleges írása lassíthatja a betöltést.
🔹 Megoldás: NOLOGGING és Direct Path Insert
✅ Mit csinál?
-
A
NOLOGGING
letiltja a redo log írást -
Az
APPEND
direct path insertet használ
🔹 Figyelem!
-
Ha a
NOLOGGING
aktív, az adatok nem állíthatók vissza egy DB crash után -
Csak akkor használd, ha az adatok újragenerálhatók
🔹 3. Partition Exchange Load (PEL) – Partíciócsere betöltéshez
Ha a Stage tábla partícionált, sokkal gyorsabb lehet a betöltés egy partíciócserével.
📌 3.1 Lépések
1️⃣ Létrehozunk egy ideiglenes Stage táblát (ugyanazzal a struktúrával)
2️⃣ Ide töltjük be az adatokat
3️⃣ Partíciócsere a fő Stage táblával
✅ Előny: A csere szinte azonnali, nem kell fizikailag mozgatni az adatokat!
🔹 4. Adaptive Query Optimization – Oracle automatikus finomhangolása
Ha Oracle 12c vagy újabb verziót használsz, akkor a SQL optimalizálás automatikusan is megtörténhet.
📌 4.1 Adaptive Query Optimization engedélyezése
✅ Automatikusan optimalizálja a Parallel Query Execution-t
✅ Megfigyeli a futási statisztikákat és módosítja a futási terveket
🔹 5. Bulk Collect és FORALL (PL/SQL-ben)
Ha a betöltést PL/SQL ciklusokkal végzed, a Bulk Collect és FORALL használata sokkal gyorsabb lesz!
📌 5.1 Példa Bulk Collect-re
✅ Előny: Egyetlen művelettel több ezer sort írhatunk be egyszerre, nem soronként!
Összegzés: Melyik optimalizálási technika mikor hasznos?
Optimalizálási technika | Előny | Mikor használd? |
---|---|---|
Temp Tables (GTT) | Gyorsabb ideiglenes írás | Ha a Stage tábla versengést okoz |
NOLOGGING | Minimalizálja a redo logot | Ha az adatok újragenerálhatók |
Partition Exchange Load (PEL) | Szinte azonnali partíciócsere | Ha a Stage tábla partícionált |
Adaptive Query Optimization | Oracle automatikusan finomhangolja | Ha Oracle 12c+ verziót használsz |
Bulk Collect + FORALL | PL/SQL tömeges adatkezelés | Ha PL/SQL procedúrában töltöd az adatokat |
4️⃣ Időzített futtatás az adattárház stage töltéséhez
Az időzített futtatás célja:
✅ Automatikus futtatás meghatározott időközönként
✅ Terheléselosztás (pl. éjszakai futtatás)
✅ Hibakezelési mechanizmusok beépítése
Az időzített futtatás három fő módszere:
-
Oracle DBMS_SCHEDULER (ajánlott) – Beépített ütemező az Oracle-ben
-
Crontab/Python Scheduler – Külső ütemezők
-
Event-Driven Scheduling – Adatváltozások alapján triggerelve
📌 4.1 DBMS_SCHEDULER – Az Oracle beépített ütemezője
Az Oracle DBMS_SCHEDULER egy fejlett ütemező, amely lehetővé teszi PL/SQL eljárások vagy SQL szkriptek futtatását időzítve.
📌 4.1.1 Ütemezett Stage töltés létrehozása
Ebben a példában a betöltés minden nap éjfélkor (00:00) lefut:
✅ Mit csinál?
-
Naponta, éjfélkor futtatja a
LOAD_STAGE_TRANSACTIONS
eljárást -
Automatikusan elindul minden nap
📌 4.1.2 Futtatás 10 percenként (gyors frissítés esetén)
✅ Mit csinál?
-
Minden 10 percben újra lefuttatja a Stage töltést
📌 4.1.3 Az ütemezett futtatás ellenőrzése
🔹 Mit látunk?
-
STATE = RUNNING
→ Jelenleg fut -
STATE = SCHEDULED
→ Vár a következő indításra
📌 4.1.4 Ütemezés letiltása vagy törlése
Ha le szeretnéd állítani az ütemezett futtatást:
Ha teljesen törölni szeretnéd:
📌 4.2 Crontab vagy Python alapú ütemezés
Ha Oracle-on kívüli megoldást keresel (pl. Python vagy Linux környezetben):
📌 4.2.1 Crontab – Unix/Linux időzített futtatás
🔹 Crontab beállítás (éjfélkor történő futtatásra)
✅ Mit csinál?
-
Minden nap éjfélkor futtat egy shell scriptet
Ha a script egy SQL parancsot futtat, akkor így nézhet ki:
📌 4.2.2 Python alapú időzítés
A Python schedule
modulját használhatjuk, ha egy külső Python alkalmazásból vezéreljük a futtatást.
✅ Mit csinál?
-
Minden nap éjfélkor futtatja a Stage betöltést
-
Automatikusan újrafuttatja hiba esetén
📌 4.3 Event-Driven Scheduling – Ha az adatváltozás indítja el a betöltést
Ha nem időzítés, hanem az új adatok érkezése alapján akarjuk indítani a betöltést, akkor TRIGGER-eket vagy Change Data Capture (CDC) megoldásokat használhatunk.
📌 4.3.1 Triggeres megoldás – Ha a forrástábla frissül, indul a betöltés
✅ Mit csinál?
-
Ha új adat érkezik a forrástáblába, automatikusan elindítja a betöltést
🔹 Hátrány
-
Nagyon sűrű frissítések esetén túl sok futtatást generálhat
📌 4.4 Összegzés: Melyik időzítési módszert mikor használd?
Időzítési módszer | Előny | Mikor használd? |
---|---|---|
DBMS_SCHEDULER | Beépített, könnyen kezelhető, Oracle belső megoldás | Ha az adatbázison belül kell időzíteni |
Crontab/Linux Scheduler | Egyszerű, stabil, OS-alapú | Ha shell script vagy külső rendszer vezérli a futtatást |
Python Scheduler | Rugalmas, más rendszerekbe is beépíthető | Ha egy Python alkalmazás részeként kell kezelni |
Event-Driven Scheduling (Trigger) | Azonnali feldolgozás, ha változik az adat | Ha csak akkor kell futnia, amikor új adat érkezik |
5️⃣ Optimalizálás az adattárház stage töltéséhez
A stage tábla töltésének optimalizálása kulcsfontosságú az adatbetöltési idő csökkentése, a CPU/memória kihasználtság optimalizálása, valamint a tárolási költségek minimalizálása érdekében.
Az optimalizálás fő területei:
-
Indexek és tömörítés
-
Parallel Processing – párhuzamos végrehajtás
-
Bulk Insert vs. Merge – tömeges beszúrás vs. intelligens frissítés
-
SQL optimalizálás és hint-ek
-
I/O műveletek csökkentése (Direct Path Load)
📌 5.1 Indexek és tömörítés
A stage tábla esetében az indexek és tömörítés megfelelő beállítása jelentősen javíthatja a betöltési teljesítményt.
📌 5.1.1 Indexek kezelése betöltéskor
Ha a stage táblán túl sok index van, az lassíthatja a beszúrást. Megoldás:
✅ Beszúrás előtt az indexek letiltása, utána újraépítésük:
Ha nagy mennyiségű adatot töltesz, az indexek újraépítése gyorsabb lehet, mint ha folyamatosan frissítenéd őket.
📌 5.1.2 Tábla tömörítés a tárhely csökkentése érdekében
Az Oracle table compression funkciója csökkentheti a tárhelyigényt és gyorsíthatja a lekérdezéseket.
Tömörített Stage tábla létrehozása (OLTP mód)
✅ Előny: A kisebb méretű tárolás gyorsabb olvasást biztosít.
🔹 Hátrány: Nagyon gyakori DML műveletek esetén CPU-t használ.
📌 5.2 Parallel Processing – Párhuzamos végrehajtás
Az Oracle parallel query és parallel DML lehetőséget biztosít a nagyobb teljesítményű adatmozgatásra.
📌 5.2.1 Párhuzamos beszúrás engedélyezése
✅ Mit csinál?
-
Parallel(4) → 4 szálon futtatja a beszúrást
-
Append → Közvetlenül a szegmens végére ír, minimalizálva az UNDO-t
📌 5.2.2 Táblaszintű parallel beállítás
Ha egy tábla mindig párhuzamosan legyen feldolgozva:
🔹 Megjegyzés: Ha a tábla mérete kicsi (<1M sor), a párhuzamosítás nem biztos, hogy hasznos.
📌 5.3 Bulk Insert vs. Merge – Tömeges beszúrás vagy frissítés?
Ha a stage tábla mindig teljes töltést kap, az INSERT + TRUNCATE gyorsabb.
Ha csak az új rekordokat kell frissíteni, a MERGE a hatékonyabb.
📌 5.3.1 Bulk Insert – Ha mindig teljes töltést végzünk
✅ Gyors, mert nincs UPDATE művelet
🔹 Csak akkor jó, ha nincs szükség történeti adatokra.
📌 5.3.2 Merge – Ha az új és meglévő adatok keverednek
✅ Hatékony, ha az adatok változnak és nem akarunk duplikációt
🔹 Lassabb, mint egy egyszerű beszúrás, mert összehasonlítja a meglévő rekordokat.
📌 5.4 SQL optimalizálás és hint-ek
Ha az SQL teljesítményét akarjuk növelni, használhatunk hint-eket.
📌 5.4.1 Full Table Scan vs. Index használat
Ha a tábla kicsi, indexet érdemes használni:
Ha a tábla nagy és szinte az összes sort beolvassuk, inkább full table scan:
📌 5.4.2 Query Rewriting – Materialized View
Ha a stage adatok gyakran változnak, egy materialized view gyorsíthatja a betöltést.
✅ Előny: Az adatok inkrementálisan frissülnek, gyorsabb lekérdezések
🔹 Hátrány: Extra karbantartást igényel
📌 5.5 I/O műveletek csökkentése (Direct Path Load)
Ha nagy adatmennyiséget töltünk be, az Oracle Direct Path Load csökkentheti az UNDO és REDO terhelést.
📌 5.5.1 Direct Path Load az INSERT-ben
✅ Előny: Nem használ UNDO/REDO naplózást, gyorsabb írás
🔹 Hátrány: Nem lehet rollback-elni
Ha ez nem elég, az SQL*Loader-t is használhatjuk.
📌 5.6 Összegzés: Melyik optimalizálási módszert mikor használd?
Optimalizálási technika | Előny | Mikor használd? |
---|---|---|
Indexek letiltása és újraépítése | Beszúrás gyorsítása | Ha sok index van |
Táblatömörítés (OLTP/Hybrid) | Kevesebb tárhely, gyorsabb olvasás | Ha nagy az adatmennyiség |
Parallel Processing | Többszálú végrehajtás | Ha nagyobb adatmennyiséget kell betölteni |
Bulk Insert (TRUNCATE + INSERT) | Gyors teljes újratöltés | Ha nincs szükség az előző adatokra |
MERGE (UPSERT) | Adatfrissítés duplikáció nélkül | Ha az adatok változnak |
SQL optimalizálás (Hint-ek) | Index vagy Full Table Scan beállítása | Ha az Oracle nem optimálisan választja meg a végrehajtási tervet |
Direct Path Load | UNDO/REDO terhelés csökkentése | Nagy mennyiségű adatnál |
Konkrét példák
Adatgyűjtés:
Forrás: egy OLTP rendszerben található tranzakciós tábla (
SOURCE_TRANSACTIONS
).Az adatok kinyerése egy Oracle External Table segítségével:
Betöltés és szum rekordszám ellenőrzés:
CREATE TABLE source_metadata (
table_name VARCHAR2(128),
column_name VARCHAR2(128),
data_type VARCHAR2(128),
PRIMARY KEY (table_name, column_name)
);
-- 2. Stage tábla generálásának ellenőrzése és létrehozása
CREATE OR REPLACE PROCEDURE generate_stage_table(p_source_table VARCHAR2) AS
v_stage_table VARCHAR2(128);
v_sql CLOB;
BEGIN
v_stage_table := 'STG_' || p_source_table;
v_sql := 'CREATE TABLE ' || v_stage_table || ' (';
FOR rec IN (SELECT column_name, data_type FROM source_metadata WHERE table_name = p_source_table) LOOP
v_sql := v_sql || rec.column_name || ' ' || rec.data_type || ', ';
END LOOP;
v_sql := v_sql || 'load_timestamp TIMESTAMP DEFAULT SYSTIMESTAMP, batch_id NUMBER)';
EXECUTE IMMEDIATE v_sql;
END;
/
-- 3. Adatok betöltő SQL generálása
CREATE OR REPLACE PROCEDURE generate_insert_sql(p_source_table VARCHAR2) AS
v_stage_table VARCHAR2(128);
v_sql CLOB;
BEGIN
v_stage_table := 'STG_' || p_source_table;
v_sql := 'INSERT INTO ' || v_stage_table || ' (';
FOR rec IN (SELECT column_name FROM source_metadata WHERE table_name = p_source_table) LOOP
v_sql := v_sql || rec.column_name || ', ';
END LOOP;
v_sql := v_sql || 'load_timestamp, batch_id) SELECT ';
FOR rec IN (SELECT column_name FROM source_metadata WHERE table_name = p_source_table) LOOP
v_sql := v_sql || rec.column_name || ', ';
END LOOP;
v_sql := v_sql || 'SYSTIMESTAMP, :batch_id FROM ' || p_source_table;
EXECUTE IMMEDIATE v_sql USING 1;
END;
/
-- 4.1. Betöltés ellenőrző SQL
CREATE OR REPLACE FUNCTION validate_load(p_source_table VARCHAR2) RETURN NUMBER AS
v_stage_table VARCHAR2(128);
v_count_source NUMBER;
v_count_stage NUMBER;
BEGIN
v_stage_table := 'STG_' || p_source_table;
EXECUTE IMMEDIATE 'SELECT COUNT(*) FROM ' || p_source_table INTO v_count_source;
EXECUTE IMMEDIATE 'SELECT COUNT(*) FROM ' || v_stage_table INTO v_count_stage;
RETURN CASE WHEN v_count_source = v_count_stage THEN 1 ELSE 0 END;
END;
/
Python vezérlő kód minta stage betöltéshez
import cx_Oracle
def connect_to_db(user, password, dsn):
"""Kapcsolódás az Oracle adatbázishoz."""
connection = cx_Oracle.connect(user, password, dsn)
return connection
def load_metadata(cursor, source_table):
"""Lekéri a forrástábla oszlopait és adattípusait, majd beilleszti a source_metadata táblába."""
cursor.execute("SELECT column_name, data_type FROM all_tab_columns WHERE table_name = :1", [source_table.upper()])
columns = cursor.fetchall()
cursor.execute("DELETE FROM source_metadata WHERE table_name = :1", [source_table])
for column, data_type in columns:
cursor.execute("INSERT INTO source_metadata (table_name, column_name, data_type) VALUES (:1, :2, :3)",
[source_table, column, data_type])
def generate_stage_table(cursor, source_table):
"""Létrehozza a stage táblát a metaadatok alapján."""
cursor.execute("SELECT column_name, data_type FROM source_metadata WHERE table_name = :1", [source_table])
columns = cursor.fetchall()
stage_table = f"STG_{source_table}"
sql = f"CREATE TABLE {stage_table} ("
sql += ", ".join([f"{col} {dtype}" for col, dtype in columns])
sql += ", load_timestamp TIMESTAMP DEFAULT SYSTIMESTAMP, batch_id NUMBER)"
cursor.execute(f"DROP TABLE {stage_table} PURGE") # Törli, ha már létezik
cursor.execute(sql)
def generate_insert_sql(cursor, source_table):
"""Generál egy INSERT INTO SQL-t a stage tábla betöltésére."""
cursor.execute("SELECT column_name FROM source_metadata WHERE table_name = :1", [source_table])
columns = [col[0] for col in cursor.fetchall()]
stage_table = f"STG_{source_table}"
sql = f"INSERT INTO {stage_table} ({', '.join(columns)}, load_timestamp, batch_id) "
sql += f"SELECT {', '.join(columns)}, SYSTIMESTAMP, :batch_id FROM {source_table}"
cursor.execute(sql, [1])
def validate_load(cursor, source_table):
"""Ellenőrzi a forrás és stage tábla közti eltéréseket."""
cursor.execute("SELECT column_name FROM source_metadata WHERE table_name = :1", [source_table])
columns = [col[0] for col in cursor.fetchall()]
stage_table = f"STG_{source_table}"
validation_sql = f"SELECT s.*, t.*, CASE "
for col in columns:
validation_sql += f"WHEN s.{col} IS DISTINCT FROM t.{col} THEN '{col} MISMATCH' ELSE NULL END AS {col}_diff, "
validation_sql = validation_sql.rstrip(', ')
validation_sql += f" FROM {source_table} s FULL OUTER JOIN {stage_table} t ON ("
validation_sql += " AND ".join([f"s.{col} = t.{col}" for col in columns])
validation_sql += ") WHERE ("
validation_sql += " OR ".join([f"s.{col} IS DISTINCT FROM t.{col}" for col in columns])
validation_sql += ")"
cursor.execute(validation_sql)
return cursor.fetchall()
if __name__ == "__main__":
user, password, dsn = "your_user", "your_password", "your_dsn"
source_table = "YOUR_SOURCE_TABLE"
conn = connect_to_db(user, password, dsn)
cur = conn.cursor()
load_metadata(cur, source_table)
generate_stage_table(cur, source_table)
generate_insert_sql(cur, source_table)
mismatches = validate_load(cur, source_table)
print("Eltérések:", mismatches)
conn.commit()
cur.close()
conn.close()
Megjegyzések
Megjegyzés küldése