Ugrás a fő tartalomra

JIRA adatok adattárházba töltése : folyamatok

JIRA adatok adattárházba töltése : folyamatok


Alakítsuk ki a JIRA → JSON → CSV/Oracle staging → DW adatbetöltési folyamatot hibatűréssel, naplózással és testreszabott modullal.


✅ Cél: Python modul REST lekéréssel, naplózással, JSON mentéssel, CSV exporttal, előkészítve Oracle betöltésre


📁 Projekt struktúra javaslat

---bash
jira_etl/ ├── .env # API token, JIRA URL ├── main.py # Modul indítása ├── jira_api.py # REST hívások és paging ├── logger.py # Egységes naplózás ├── exporter.py # JSON / CSV fájlmentés ├── transformers.py # Dictionary mapping, mezőátalakítás ├── loaders/ │ └── oracle_loader.py # Oracle staging INSERT-ek (cx_Oracle vagy sqlalchemy) └── logs/ └── jira_etl.log # Naplózott események

🧱 1. .env fájl (konfiguráció)

---env
JIRA_BASE_URL=https://yourcompany.atlassian.net JIRA_USERNAME=valodi.felhasznalo@ceg.hu JIRA_API_TOKEN=your_real_token_here

🧰 2. logger.py – Naplózó modul

---python
import logging from datetime import datetime import os log_dir = "logs" os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f"jira_etl_{datetime.now().strftime('%Y%m%d')}.log") logging.basicConfig( filename=log_file, filemode='a', format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO ) def get_logger(): return logging.getLogger("JIRA_ETL")

🌐 3. jira_api.py – REST lekérdező modul

Ez fogja lekérni az issues, projects, users adatokat, JSON fájlba menteni és visszaadni feldolgozásra.

--- python
import requests from requests.auth import HTTPBasicAuth from dotenv import load_dotenv import os from logger import get_logger load_dotenv() logger = get_logger() JIRA_BASE_URL = os.getenv("JIRA_BASE_URL") JIRA_USERNAME = os.getenv("JIRA_USERNAME") JIRA_API_TOKEN = os.getenv("JIRA_API_TOKEN") auth = HTTPBasicAuth(JIRA_USERNAME, JIRA_API_TOKEN) headers = {"Accept": "application/json"} def get_paginated(endpoint, params=None, limit=100): all_data = [] start_at = 0 while True: p = params.copy() if params else {} p["startAt"] = start_at p["maxResults"] = limit url = f"{JIRA_BASE_URL}{endpoint}" try: response = requests.get(url, headers=headers, params=p, auth=auth) response.raise_for_status() data = response.json() items = data.get("issues", []) if "issues" in data else data if not items: break all_data.extend(items) logger.info(f"{endpoint} → lekérve {len(items)} rekord (startAt={start_at})") if len(items) < limit or "total" in data and start_at + limit >= data["total"]: break start_at += limit except Exception as e: logger.error(f"Hiba {endpoint} lekérdezésénél: {str(e)}") break return all_data def get_issues(jql='project = DEMO ORDER BY created DESC'): return get_paginated("/rest/api/2/search", params={"jql": jql, "fields": "summary,status,customfield_10021"}) def get_projects(): return get_paginated("/rest/api/2/project") def get_users(): return get_paginated("/rest/api/3/users/search")

💾 4. exporter.py – JSON és CSV fájlmentés

---python
import json import csv import os from logger import get_logger logger = get_logger() def save_json(data, filename): with open(filename, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"Mentve JSON: {filename}") def save_csv(data, filename, fieldnames): with open(filename, "w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for row in data: writer.writerow(row) logger.info(f"Mentve CSV: {filename}")

🔁 5. transformers.py – Mező értelmezés, dictionary mapping

---python
PRIORITY_DICT = { "Alacsony": "Low", "Közepes": "Normal", "Magas": "Customer Critical" } def transform_issues(raw_issues): results = [] for i in raw_issues: fields = i["fields"] results.append({ "issue_id": i["id"], "key": i["key"], "summary": fields.get("summary"), "status": fields.get("status", {}).get("name"), "custom_priority": PRIORITY_DICT.get(fields.get("customfield_10021", {}).get("value", ""), "Unknown") }) return results

🧵 6. main.py – Fővezérlés

---python
from jira_api import get_issues, get_projects, get_users from exporter import save_json, save_csv from transformers import transform_issues def main(): issues_raw = get_issues() save_json(issues_raw, "jira_issues_raw.json") issues_transformed = transform_issues(issues_raw) save_csv(issues_transformed, "jira_issues.csv", ["issue_id", "key", "summary", "status", "custom_priority"]) projects = get_projects() save_json(projects, "jira_projects.json") users = get_users() save_json(users, "jira_users.json") if __name__ == "__main__": main()



🔧 1. Oracle staging tábla – példa

Hozz létre egy staging táblát Oracle-ben, például:

---sql
CREATE TABLE jira_issues_stg ( issue_id VARCHAR2(50), key VARCHAR2(50), summary VARCHAR2(500), status VARCHAR2(100), custom_priority VARCHAR2(100), load_date DATE DEFAULT SYSDATE );

📦 2. Python Oracle connector: cx_Oracle telepítés

Telepítsd a szükséges csomagot:

---bash
pip install cx_Oracle python-dotenv

🔑 3. .env – Oracle elérés beállítása

---env
ORACLE_USER=adatbetolto ORACLE_PASSWORD=TitkosJelszo123 ORACLE_DSN=yourdbhost:1521/yourservicename

🧩 4. loaders/oracle_loader.py – Betöltő modul

---python
import cx_Oracle import os from dotenv import load_dotenv from logger import get_logger load_dotenv() logger = get_logger() ORACLE_USER = os.getenv("ORACLE_USER") ORACLE_PASSWORD = os.getenv("ORACLE_PASSWORD") ORACLE_DSN = os.getenv("ORACLE_DSN") def insert_issues_to_staging(issue_rows): conn = None try: conn = cx_Oracle.connect(user=ORACLE_USER, password=ORACLE_PASSWORD, dsn=ORACLE_DSN, encoding="UTF-8") cursor = conn.cursor() insert_sql = """ INSERT INTO jira_issues_stg (issue_id, key, summary, status, custom_priority) VALUES (:1, :2, :3, :4, :5) """ rows_to_insert = [ (i["issue_id"], i["key"], i["summary"], i["status"], i["custom_priority"]) for i in issue_rows ] cursor.executemany(insert_sql, rows_to_insert) conn.commit() logger.info(f"{len(rows_to_insert)} sor betöltve a staging táblába.") except Exception as e: logger.error(f"Hiba a staging betöltésnél: {str(e)}") if conn: conn.rollback() finally: if conn: conn.close()

🧵 5. main.py frissítve – betöltéssel együtt

---python
from jira_api import get_issues
from exporter import save_json, save_csv from transformers import transform_issues from loaders.oracle_loader import insert_issues_to_staging def main(): issues_raw = get_issues() save_json(issues_raw, "jira_issues_raw.json") issues_transformed = transform_issues(issues_raw) save_csv(issues_transformed, "jira_issues.csv", ["issue_id", "key", "summary", "status", "custom_priority"]) insert_issues_to_staging(issues_transformed) if __name__ == "__main__": main()

✅ Eredmény

Futtatás után:

  • jira_issues_raw.json → teljes JSON mentés

  • jira_issues.csv → könnyen betölthető CSV backup

  • jira_issues_stg → staging tábla Oracle-ben, naprakészen


🔄 Következő lépés: végső adattárház tábla betöltése

A staging tábla után egy PL/SQL vagy Python által meghívott SQL script végzi a feltöltést a final DW táblába – deduplikálás, lookup-ok, konverziók stb.

 Egy Pythonból indított MERGE INTO Oracle betöltést, ami a staging táblából (pl. jira_issues_stg) egy végső adattárház táblába (jira_issues_dw) tölti az adatokat – upsert (update + insert) logikával.



🎯 Cél

  • Új JIRA rekord → INSERT

  • Létező rekord (pl. issue ID alapján) → UPDATE, ha az adatok változtak

  • Betöltést Python indítja (cx_Oracle)

  • DW tábla: jira_issues_dw


🧱 1. Oracle DW tábla létrehozása

---sql
CREATE TABLE jira_issues_dw ( issue_id VARCHAR2(50) PRIMARY KEY, key VARCHAR2(50), summary VARCHAR2(500), status VARCHAR2(100), custom_priority VARCHAR2(100), inserted_at DATE DEFAULT SYSDATE, updated_at DATE );

⚙️ 2. MERGE SQL – üzleti logika

---sql
MERGE INTO jira_issues_dw dw
USING ( SELECT issue_id, key, summary, status, custom_priority FROM jira_issues_stg ) stg ON (dw.issue_id = stg.issue_id) WHEN MATCHED THEN UPDATE SET dw.key = stg.key, dw.summary = stg.summary, dw.status = stg.status, dw.custom_priority = stg.custom_priority, dw.updated_at = SYSDATE WHERE dw.key != stg.key OR dw.summary != stg.summary OR dw.status != stg.status OR dw.custom_priority != stg.custom_priority WHEN NOT MATCHED THEN INSERT (issue_id, key, summary, status, custom_priority, inserted_at) VALUES (stg.issue_id, stg.key, stg.summary, stg.status, stg.custom_priority, SYSDATE);

💡 A WHERE záradék az UPDATE részben biztosítja, hogy csak akkor történjen frissítés, ha valóban változott az adat.


🧩 3. Python modul: oracle_loader.py kiegészítése

---python
def merge_into_dw(): conn = None try: conn = cx_Oracle.connect(user=ORACLE_USER, password=ORACLE_PASSWORD, dsn=ORACLE_DSN, encoding="UTF-8") cursor = conn.cursor() merge_sql = """ MERGE INTO jira_issues_dw dw USING ( SELECT issue_id, key, summary, status, custom_priority FROM jira_issues_stg ) stg ON (dw.issue_id = stg.issue_id) WHEN MATCHED THEN UPDATE SET dw.key = stg.key, dw.summary = stg.summary, dw.status = stg.status, dw.custom_priority = stg.custom_priority, dw.updated_at = SYSDATE WHERE dw.key != stg.key OR dw.summary != stg.summary OR dw.status != stg.status OR dw.custom_priority != stg.custom_priority WHEN NOT MATCHED THEN INSERT (issue_id, key, summary, status, custom_priority, inserted_at) VALUES (stg.issue_id, stg.key, stg.summary, stg.status, stg.custom_priority, SYSDATE) """ cursor.execute(merge_sql) conn.commit() logger.info("MERGE INTO: staging → DW sikeres.") except Exception as e: logger.error(f"Hiba a MERGE INTO végrehajtása közben: {str(e)}") if conn: conn.rollback() finally: if conn: conn.close()

🧵 4. main.py frissítése

---python
from loaders.oracle_loader import insert_issues_to_staging, merge_into_dw def main(): # ... [adat lekérés és parsing] ... insert_issues_to_staging(issues_transformed) merge_into_dw()

✅ Mi történik végrehajtáskor?

  1. Lekérjük a JIRA adatokat REST API-n keresztül.

  2. A JSON adatokat CSV-be és staging Oracle táblába töltjük (jira_issues_stg).

  3. A Pythonból indított MERGE INTO lekérdezés:

    • beszúrja az új sorokat (INSERT)

    • frissíti a meglévőket, ha változott bármelyik mező (UPDATE)

    • így az adattárház tábla mindig naprakész marad


🔍 Tobábbi bővítési lehetőségek

  • Audit tábla: naplóz, ki és mikor töltött be mit

  • Hash alapú változásdetektálás: gyorsabb összehasonlítás

  • Historikus DW táblák: változáskövetés (pl. SCD Type 2)

  • Automatikus időzítés (cron, Airflow, dbms_scheduler)


A hash-alapú változásdetektálás egy hatékony technika az adattárház betöltés során, különösen nagy adatmennyiség vagy sok mező összehasonlítása esetén.

Ez a módszer segít gyorsan eldönteni, hogy egy rekord változott-e a staging és a cél (DW) tábla között, anélkül hogy minden mezőt külön-külön kellene összehasonlítani a MERGE-ben.


🎯 Mi a cél?

👉 Minimalizálni az UPDATE műveleteket a MERGE INTO-ban, csak ha tényleg változott az adat.


🧠 Ötlet lényege:

  1. Kiszámítunk egy hash értéket a mezőkből a staging táblában (pl. summary, status, priority, stb.)

  2. Ugyanilyen hash érték van tárolva a DW táblában

  3. MERGE során csak akkor történik UPDATE, ha a staging és DW hash értékei eltérnek


🧱 1. Oracle DW tábla kiegészítése

---sql
ALTER TABLE jira_issues_dw ADD hash_val VARCHAR2(64);

⚙️ 2. Hash érték kiszámítása – Oracle oldalon

Használjunk pl. STANDARD_HASH függvényt:

---sql
SELECT STANDARD_HASH(summary || status || custom_priority, 'SHA256') AS hash_val FROM jira_issues_stg;

🧩 3. MERGE INTO módosítva – hash logikával

---sql
MERGE INTO jira_issues_dw dw
USING ( SELECT issue_id, key, summary, status, custom_priority, STANDARD_HASH(summary || status || custom_priority, 'SHA256') AS new_hash FROM jira_issues_stg ) stg ON (dw.issue_id = stg.issue_id) WHEN MATCHED THEN UPDATE SET dw.key = stg.key, dw.summary = stg.summary, dw.status = stg.status, dw.custom_priority = stg.custom_priority, dw.updated_at = SYSDATE, dw.hash_val = stg.new_hash WHERE dw.hash_val IS DISTINCT FROM stg.new_hash WHEN NOT MATCHED THEN INSERT (issue_id, key, summary, status, custom_priority, inserted_at, hash_val) VALUES (stg.issue_id, stg.key, stg.summary, stg.status, stg.custom_priority,
SYSDATE, stg.new_hash);

🔍 Előny: A WHERE záradékban egy mezőt hasonlítunk össze (hash_val), nem négyet.


🐍 4. Python alternatíva: hash érték kiszámítása előre

Ha a hash-t inkább Pythonban szeretnéd előállítani, használhatod:

---python
import hashlib def compute_hash(summary, status, priority): hash_input = f"{summary}|{status}|{priority}" return hashlib.sha256(hash_input.encode("utf-8")).hexdigest()

Majd az issue_rows rekordhoz hozzáadhatod a hash_val mezőt, és betöltöd a stagingbe.

Ez akkor hasznos, ha:

  • az Oracle hash függvény nem elérhető (pl. régi verzió)

  • a logikát teljesen Pythonban szeretnéd kontrollálni


🧵 Összefoglalva

ElőnyMiért jó a hash-alapú változásdetektálás?
✅ GyorsCsak egy mezőt hasonlít össze
✅ EgyszerűNem kell minden mezőhöz OR logika
✅ Skálázható10–20 mező esetén is könnyen kezelhető
✅ KompatibilisOracle STANDARD_HASH vagy Python hashlib



 Bemutatok egy teljes Python példát, amely:

  1. Lekérdezi a JIRA issues adatokat REST API-n keresztül (egyszerűsített dummy adat)

  2. Kiszámítja a hash-t Pythonban (hashlib segítségével)

  3. Staging Oracle táblába tölti be az adatokat (jira_issues_stg)

  4. Majd Oracle oldalról MERGE INTO-val tölti a jira_issues_dw táblába a hash alapján, változásdetektálással



📁 Mappastruktúra (ajánlott)

-----
project/ │ ├── config.py ← Oracle és JIRA beállítások ├── utils.py ← Hash funkció ├── jira_loader.py ← JSON parsing és staging betöltés ├── dw_loader.py ← MERGE INTO DW └── main.py ← Végrehajtás

📌 1. config.py

---python
JIRA_URL = "https://your-domain.atlassian.net" JIRA_USER = "your.email@domain.com" JIRA_API_TOKEN = "your-api-token" ORACLE_DSN = "host:port/service_name" ORACLE_USER = "user" ORACLE_PASSWORD = "password"

🧮 2. utils.py – hash számítás

---python
import hashlib def compute_hash(summary, status, priority): hash_input = f"{summary}|{status}|{priority}" return hashlib.sha256(hash_input.encode("utf-8")).hexdigest()

🔄 3. jira_loader.py – REST hívás + hash + staging betöltés

---python
import requests import cx_Oracle from config import JIRA_URL, JIRA_USER, JIRA_API_TOKEN, ORACLE_DSN, ORACLE_USER, ORACLE_PASSWORD from utils import compute_hash import base64 def fetch_issues(): auth = (JIRA_USER, JIRA_API_TOKEN) url = f"{JIRA_URL}/rest/api/2/search?jql=project=DEMO&maxResults=10" headers = {"Accept": "application/json"} response = requests.get(url, headers=headers, auth=auth) response.raise_for_status() data = response.json() return data["issues"] def parse_issues(raw_issues): rows = [] for issue in raw_issues: issue_id = issue["id"] key = issue["key"] fields = issue["fields"] summary = fields.get("summary", "") status = fields.get("status", {}).get("name", "") priority = fields.get("priority", {}).get("name", "None") hash_val = compute_hash(summary, status, priority) rows.append((issue_id, key, summary, status, priority, hash_val)) return rows def insert_into_staging(rows): conn = None try: conn = cx_Oracle.connect(ORACLE_USER, ORACLE_PASSWORD, ORACLE_DSN, encoding="UTF-8") cursor = conn.cursor() cursor.execute("TRUNCATE TABLE jira_issues_stg") cursor.executemany(""" INSERT INTO jira_issues_stg (issue_id, key, summary, status, custom_priority, hash_val) VALUES (:1, :2, :3, :4, :5, :6) """, rows) conn.commit() print(f"{len(rows)} sor beszúrva a staging táblába.") except Exception as e: print(f"Hiba: {str(e)}") if conn: conn.rollback() finally: if conn: conn.close()

🏗️ 4. Oracle staging + DW tábla

Staging tábla:

---sql
CREATE TABLE jira_issues_stg ( issue_id VARCHAR2(50), key VARCHAR2(50), summary VARCHAR2(500), status VARCHAR2(100), custom_priority VARCHAR2(100), hash_val VARCHAR2(64) );

DW tábla:

---sql
CREATE TABLE jira_issues_dw ( issue_id VARCHAR2(50) PRIMARY KEY, key VARCHAR2(50), summary VARCHAR2(500), status VARCHAR2(100), custom_priority VARCHAR2(100), hash_val VARCHAR2(64), inserted_at DATE DEFAULT SYSDATE, updated_at DATE );

🚀 5. dw_loader.pyMERGE INTO

---python
import cx_Oracle from config import ORACLE_DSN, ORACLE_USER, ORACLE_PASSWORD def merge_into_dw(): conn = None try: conn = cx_Oracle.connect(ORACLE_USER, ORACLE_PASSWORD, ORACLE_DSN, encoding="UTF-8") cursor = conn.cursor() merge_sql = """ MERGE INTO jira_issues_dw dw USING ( SELECT issue_id, key, summary, status, custom_priority, hash_val FROM jira_issues_stg ) stg ON (dw.issue_id = stg.issue_id) WHEN MATCHED THEN UPDATE SET dw.key = stg.key, dw.summary = stg.summary, dw.status = stg.status, dw.custom_priority = stg.custom_priority, dw.hash_val = stg.hash_val, dw.updated_at = SYSDATE WHERE dw.hash_val IS DISTINCT FROM stg.hash_val WHEN NOT MATCHED THEN INSERT (issue_id, key, summary, status, custom_priority, hash_val, inserted_at) VALUES (stg.issue_id, stg.key, stg.summary, stg.status, stg.custom_priority, stg.hash_val, SYSDATE) """ cursor.execute(merge_sql) conn.commit() print("MERGE INTO sikeres.") except Exception as e: print(f"Hiba a MERGE során: {str(e)}") if conn: conn.rollback() finally: if conn: conn.close()

🧪 6. main.py – futtatás

---python
from jira_loader import fetch_issues, parse_issues, insert_into_staging
from dw_loader import merge_into_dw def main(): raw_issues = fetch_issues() parsed_rows = parse_issues(raw_issues) insert_into_staging(parsed_rows) merge_into_dw() if __name__ == "__main__": main()

✅ Mit nyertünk?

FunkcióMegoldás
REST API lekérésrequests
JSON parsingPython native
Hash számítás változásrahashlib
Oracle staging betöltéscx_Oracle.executemany()
DW betöltés csak változásraMERGE INTO + hash összehasonlítás


SCD2 elvek és használatuk


SCD Type 2 (Slowly Changing Dimension Type 2) logikára  jellemző  Megőrzi az adatok történetét, nem csak frissít, hanem új sort hoz létre, ha egy érték változik (pl. státusz vagy prioritás megváltozik egy JIRA issue-n).


🧠 Alapelvek – SCD Type 2

MezőSzerepe
issue_idBusiness kulcs (azonosítja az entitást)
start_dateÉrvényesség kezdete
end_dateÉrvényesség vége (NULL = aktuális rekord)
current_flagY / N jelzi, hogy az adott rekord az aktuális verzió-e
hash_valSegít detektálni, hogy változott-e valami
Egyéb mezőksummary, status, priority, stb. – változhatnak időben

🏗️ 1. DW Tábla SCD 2 struktúrával

---sql
CREATE TABLE jira_issues_hist ( surrogate_key NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, issue_id VARCHAR2(50), key VARCHAR2(50), summary VARCHAR2(500), status VARCHAR2(100), custom_priority VARCHAR2(100), hash_val VARCHAR2(64), start_date DATE, end_date DATE, current_flag CHAR(1), -- 'Y' vagy 'N' inserted_at DATE DEFAULT SYSDATE ); CREATE INDEX idx_jira_issues_hist_active ON jira_issues_hist(issue_id, current_flag)
WHERE current_flag = 'Y';

🔁 2. MERGE helyett → SCD2 logika (Python + SQL együtt)

Python logika:

  • Hash-t kiszámítod, staging-be betöltöd (ahogy eddig)

  • DW-ben megnézed: van-e aktív rekord (current_flag = 'Y') az issue_id-re

  • Ha nincs: INSERT

  • Ha van, de a hash_val különbözik:

    1. Lezárod a régit (end_date, current_flag='N')

    2. Újat nyitsz (current_flag='Y', end_date=NULL)


🧩 3. SCD Type 2 betöltés Pythonban – scd2_loader.py

python
import cx_Oracle
from config import ORACLE_DSN, ORACLE_USER, ORACLE_PASSWORD def load_scd2(): conn = None try: conn = cx_Oracle.connect(ORACLE_USER, ORACLE_PASSWORD, ORACLE_DSN, encoding="UTF-8") cursor = conn.cursor() # 1. Olvasd be a staging adatokat cursor.execute(""" SELECT issue_id, key, summary, status, custom_priority, hash_val FROM jira_issues_stg """) rows = cursor.fetchall() for row in rows: issue_id, key, summary, status, custom_priority, hash_val = row # 2. Van-e aktív sor erre az issue_id-re? cursor.execute(""" SELECT surrogate_key, hash_val FROM jira_issues_hist WHERE issue_id = :id AND current_flag = 'Y' """, {'id': issue_id}) current = cursor.fetchone() if current is None: # 3. Nincs még ilyen → INSERT új (új issue) cursor.execute(""" INSERT INTO jira_issues_hist ( issue_id, key, summary, status, custom_priority, hash_val, start_date, end_date, current_flag ) VALUES ( :id, :key, :summary, :status, :priority, :hash, SYSDATE, NULL, 'Y' ) """, { 'id': issue_id, 'key': key, 'summary': summary, 'status': status, 'priority': custom_priority, 'hash': hash_val }) else: old_key, old_hash = current if old_hash != hash_val: # 4. Lezárjuk a régi sort cursor.execute(""" UPDATE jira_issues_hist SET end_date = SYSDATE, current_flag = 'N' WHERE surrogate_key = :key """, {'key': old_key}) # 5. Beszúrjuk az új sort cursor.execute(""" INSERT INTO jira_issues_hist ( issue_id, key, summary, status, custom_priority, hash_val, start_date, end_date, current_flag ) VALUES ( :id, :key, :summary, :status, :priority, :hash, SYSDATE, NULL, 'Y' ) """, { 'id': issue_id, 'key': key, 'summary': summary, 'status': status, 'priority': custom_priority, 'hash': hash_val }) conn.commit() print("SCD Type 2 betöltés kész.") except Exception as e: print(f"Hiba a SCD2 betöltésnél: {e}") if conn: conn.rollback() finally: if conn: conn.close()

🔄 4. main.py frissítése

---python
from jira_loader import fetch_issues, parse_issues, insert_into_staging from scd2_loader import load_scd2 def main(): raw_issues = fetch_issues() parsed_rows = parse_issues(raw_issues) insert_into_staging(parsed_rows) load_scd2() if __name__ == "__main__": main()

✅ Eredmény: Történetmegőrzés DW-ben

issue_idsummarystatusstart_dateend_datecurrent_flag
10001"Bug A""Open"2024-01-012024-03-01N
10001"Bug A""Resolved"2024-03-01NULLY

✨ Bővíthető funkciók

  • updated_by mező (ki frissítette)

  • change_reason mező (miért lett változás?)

  • is_deleted jelölés törölt elemekre

  • Historikus custom_field dictionary-vel bővítés





Megjegyzések