Ugrás a fő tartalomra

ODI és groovy alapok





ODI (Oracle Data Integrator) az oracle ETL eszköze, amely groovy szkriptekkel még hatékonyabban használható.

Most még is kicsit ketté választom.

ODI -ban lekérdezhető a futtatott Load Plan MAP-jainak SQL parancsai, paraméterei, futás idő, érintet rekordszám, ...:

 Select a.object_name,
       a.object_type_name,
       a.object_long_name,
       sr.step_name,
       sc.scen_name,
       s.step_beg,
       s.step_end,
       s.step_dur,
       s.step_status,
       s.nb_row,
       s.error_message      As error_msg,
       se.startup_variables,
       s.sess_no,
       a.id,
       a.parent_id,
       a.sid,
       a.status,
       a.error_message,
       a.record_count,
       sr.lschema_name,
       sr.mod_code,
       sr.table_name,
       sr.res_name,
       st.task_name3,
       st.col_conn_name,
       st.def_conn_name,
       st.task_beg,
       st.task_end,
       st.task_status,
       st.nb_row,
       st.def_txt,
       st.error_message     As task_error
  From ebh_odi_repo.snp_step_log      s,
       ebh_meta.mt_lp_all_executions  a,
       ebh_odi_repo.snp_step_report   sr,
       ebh_odi_repo.snp_scen          sc,
       ebh_odi_repo.snp_session       se,
       ebh_odi_repo.snp_sess_task_log st
Where s.sess_no = a.external_session_id(+)
   And s.sess_no = sr.scen_run_no
   and SR.SCEN_NO = SC.SCEN_NO(+)
   And s.step_beg > Date '2019-03-16'
   And s.sess_no = se.sess_no
   and S.SESS_NO = ST.SESS_NO
   and sc.scen_name like 'MAP_O%'
   and DEF_TXT like 'insert %' 
   and OBJECT_TYPE_NAME ='SCENARIO'

Két mező érdekes a számunkra :
- startup_variables :  futtatási paramétereket tartalmazza
def_txt                 :  futtatandó SQL parancs váz mely a paraméter hivatkozási címkéket tartalmazza

Ha ezt a két mezőt egy-egy txt filebe irányítjuk akkor az alábbi groovy szkript elvégzi a behelyettesítéseket és a ténylegesen futtatandó SQL parancsot kiírja egy filebe.

def words = []
def readies = []
def di_words =[][]
//-------------------
new File( 'star_values.txt' ).eachLine { ///// line ->  || it
                                         ///// words << it || words.add(it)
    if ( ( it.substring(0,1) == '"') && (it) )  {
       words.add(it.substring(1,))       // Idézőjel kezdet kihagyása
       di_words.add(it.substring(1,).split(/=/))
    } else {      
       words.add(it)
       di_words.add(it.split(/=/))
    }
}
//--------------------
new File( 'def_txt.txt' ).eachLine { line ->
      di_words.each { csere ->  
        if ( csere[0].length() > 1 ){
            line = line.replaceAll(":" + csere[0], csere[1])
        }       
      }
      readies.add(line + "\n")
}
//--- Kiírás a képernyőre is
readies.each {
    println it
}

///------------------------

def file = new File("out.txt")
file.write ""
readies.each {
  //// file.write "First line\n"
  //// file << "Second line\n"
  //// file.append("hello\n")
   file.append(it)
}




Hogy is nézz ki egy paraméter file:

"GLOBAL.P_LOAD_ID=1507058
GLOBAL.P_EFFECTIVE_LOAD_DATE=20190319000000
GLOBAL.P_GLOBAL_DEPENDENCIES= 
GLOBAL.P_MANUAL_MAPPING_EXECUTION=N
"



Hogyan nézz ki a SQL váz:

...
(ATRAN_HISTORY.TXDATETIME <= TO_DATE(:GLOBAL.P_EFFECTIVE_LOAD_DATE,'YYYYMMDDHH24MISS')) 
..


Persze ugyanezt megtehetjük, hogy groovy szkript bekérdez az adatbázisba kiolvassa a mezőket elvégzi  a cseréket és kiírja a kész SQL parancsot egy txt filebe.

Megturbozhatjuk hibakezeléssel, ..:


import groovy.sql.Sql
import java.sql.Driver
//---------------------
class kl_db_minta_csere {
   static void main(String[] args) {
//-------------------

def words = []
def readies = []
def di_words =[][]

//-------------------

try {
def driver = Class.forName('oracle.jdbc.OracleDriver').newInstance() as Driver
def props = new Properties()
props.setProperty("user", "user")
props.setProperty("password", "KLjelszo")
def conn = driver.connect("jdbc:oracle:thin:@ora:1152: T10", props)
def sql = new Sql(conn)
def sql_parancs ="""
Select a.object_name,
       a.object_long_name,
       se.startup_variables,
       st.def_txt,
       st.error_message     As task_error
  From ebh_odi_repo.snp_step_log      s,
       ebh_meta.mt_lp_all_executions  a,
       ebh_odi_repo.snp_step_report   sr,
       ebh_odi_repo.snp_scen          sc,
       ebh_odi_repo.snp_session       se,
       ebh_odi_repo.snp_sess_task_log st
   Where s.sess_no = a.external_session_id(+)
   And s.sess_no = sr.scen_run_no
   and SR.SCEN_NO = SC.SCEN_NO(+)
   And s.step_beg > Date '2019-03-16'
   And s.sess_no = se.sess_no
   and S.SESS_NO = ST.SESS_NO
   and sc.scen_name like 'MAP_B%'
   and st.col_conn_name ='EDW' and st.def_conn_name ='EDW'
   and DEF_TXT like 'insert %' 
   and OBJECT_TYPE_NAME ='SCENARIO'
   and s.nb_row = 22065
   """
//--------------------------  
def results = sql.firstRow( sql_parancs )  ///sql.rows( sql_parancs )
def parameters = " "
def sql_nyers = " "
sql_nyers = results['def_txt'].asciiStream.text                           /// sql (CLOB tipusu mező miatt konverzió
parameters = results['startup_variables'].asciiStream.text.split(/\n/)    /// parameter tömbösítás

//---------------------

   parameters.each { parameter ->  
//      println(parameter)  // kiíratás a képernyőre
      di_words.add(parameter.split(/=/))  
   }

//--------------------
      di_words.each { csere ->  
        if ( csere[0].length() > 1 ){
            sql_nyers = sql_nyers.replaceAll(":" + csere[0], csere[1])
//            print( csere[0])
//            print(' --> ')
//            println( csere[1])
        }       
      }
      readies.add(sql_nyers + "\n")
     
//------------------------


def file = new File("out2.txt")
file.write ""
readies.each {
   file.append(it)
}

//---------------------------
println file.text  //// file tartalmának kiírása
//---------------------------

/// Erőforrás felszabadítás
conn.close()
} catch(Exception ex) {
         println("HIBA: az adat nincs meg");
}


Persze a két mező két ODI táblából is közvetlenül is lekérdezhető és csak a munkamenet/session azonosító kell az azonosításhoz.

--- munkamenet futtáskor futtatott SQL maszk
select sess_no, def_txt as sqltxt, NNO,               NB_RUN,            SCEN_TASK_NO,             TASK_BEG,         TASK_END,                TASK_DUR,        TASK_STATUS, NB_ROW, NB_INS
from ebh_odi_repo.SNP_sess_task_log where 1=1
and sess_no = 39
and def_txt is not null and def_txt like 'insert%'
--  order by length(def_txt) desc
-----------------
--- munkamenethez tartozó paraméterek
select sess_no, startup_variables as act_vars, SESS_NAME, SESS_BEG, SESS_END, SCEN_NAME, NB_ROW, NB_INS, NB_UPD, NB_DEL, NB_ERR, SB_NO
from EBH_ODI_REPO.SNP_SESSION
where sess_no = 39




Az ODI-n belül az ODI groovy osztályok használatával egy hasonló funkció így nézz ki:



import groovy.sql.Sql
import oracle.odi.domain.project.finder.IOdiVariableFinder
import oracle.odi.domain.project.finder.IOdiSequenceFinder

import groovy.swing.SwingBuilder

tme = odiInstance.getTransactionalEntityManager()
varList = ((IOdiVariableFinder) tme.getFinder(OdiVariable.class)).findAll()
seqList = ((IOdiSequenceFinder) tme.getFinder(OdiSequence.class)).findAll()
actVarMap = new HashMap<String,String>()

// SQL
url = "jdbc:oracle:thin://@Aszerver:1152/D10"
user = "REPO"
pw = "Ejelszo"
sql = Sql.newInstance(url, user, pw, "oracle.jdbc.driver.OracleDriver")

// ehhez a sessionhoz tartozó logot dolgozza fel (régi működés)
// *** ha -1, akkor az sqltxt változóban megadott stringet dolgozza fel ***

tmp_sess_id = 39 //-1
actual_vars_fl = true // ha igen, akkor az aktuális paraméterekkel helyettesít, egyébként az alapértelmezettekkel

if ( tmp_sess_id < 0 ) {
  sqltxt = """
  """
} else {
 stmt = """
        select def_txt as sqltxt
         from SNP_sess_task_log
         where sess_no = :p_sess_id
           and def_txt is not null
         order by length(def_txt) desc
         """;
  stmt2 = """
        select startup_variables as act_vars
        from EBH_ODI_REPO.SNP_SESSION
        where sess_no = :p_sess_id
        """;

  sqltxt = sql.firstRow(stmt, [p_sess_id: tmp_sess_id]).sqltxt.getAsciiStream().getText() // SQL log
  tmp = sql.firstRow(stmt2, [p_sess_id: tmp_sess_id]).act_vars.getAsciiStream().getText() // aktuális paraméterek
  sql.close()

  for ( l in tmp.split('\n') ) {
    tmp2 = l.split('=')
    try {
      actVarMap.put(tmp2[0], tmp2[1])   
    } catch (Exception e) { null }
  }

}
sqltxt = sqltxt.replace(':', '')
sqltxt = sqltxt.replace('#', '')

if ( actual_vars_fl == true ) {
    // legújabb működés

    // aktuális változók
    for ( i in actVarMap ) {
      from = i.key
      to = i.value + ' /*' + i.key + '*/'
      sqltxt = sqltxt.replaceAll('\\b' + from + '\\b', to) // regexp word boundary check
    }
}else {
   // változók
    for ( i in varList ) {
      from = i.getQualifiedName()
        to = i.getDefaultValue() + ' /*' + i.getQualifiedName() + '*/'

      //sqltxt = sqltxt.replace(from, to)
      sqltxt = sqltxt.replaceAll('\\b' + from + '\\b', to) // regexp word boundary check

    }

}


for (i in seqList ) {
  from = i.getQualifiedName() + '_NEXTVAL'
  to = 'DWH.' + i.getNativeSequenceName() + '.nextval'
  //sqltxt = sqltxt.replace(from, to)
  sqltxt = sqltxt.replaceAll('\\b' + from + '\\b', to)  
}

print sqltxt + '\n;\n\n'
def swing = new SwingBuilder()

swing.edt{
    frame( title: 'Session: ' + tmp_sess_id, pack:true, show:true ){
        panel(){
            scrollPane( preferredSize:[800, 800] ){
                editorPane( contentType: ("text/sql"), text: sqltxt)
            }
        }
    }
}

}

}

----------------
set JAVA_HOME=c:\Program Files\Java\jdk1.8.0_131
set ODI_HOME=c:\Oracle\ODI123

set PATH=%JAVA_HOME%\bin;%PATH%
set CLASSPATH=%ODI_HOME%\odi\sdk\lib\odi-core.jar;%ODI_HOME%\odi\sdk\lib\spring-core.jar;%ODI_HOME%\odi\sdk\lib\spring-tx.jar;%ODI_HOME%\odi\sdk\lib\commons-lang-2.2.jar;%ODI_HOME%\odi\sdk\lib\spring-jdbc.jar
set CLASSPATH=%CLASSPATH%;%ODI_HOME%\odi\sdk\lib\spring-beans.jar;%ODI_HOME%\odi\sdk\lib\commons-logging-1.1.1.jar;%ODI_HOME%\odi\sdk\lib\commons-codec-1.3.jar;%ODI_HOME%\odi\sdk\lib\commons-collections-3.2.jar
set CLASSPATH=%CLASSPATH%;%ODI_HOME%\odi\sdk\lib\bsh-2.0b2.jar;odireleasehandler.jar;%ODI_HOME%\oracle_common\modules\oracle.jps\jps-manifest.jar;%ODI_HOME%\oracle_common\modules\javax.management.j2ee.jar
set CLASSPATH=%CLASSPATH%;%ODI_HOME%\oracle_common\modules\org.springframework_3.1.0.jar;%ODI_HOME%\oracle_common\modules\oracle.ucp.jar;%ODI_HOME%\odi\sdk\lib\javolution.jar;.

java -Xmx4096m odireleasehandler.ReleaseHandler -action backup

java -Xmx4096m odireleasehandler.ReleaseHandler -action delete

java -Xmx4096m odireleasehandler.ReleaseHandler -action import

java odireleasehandler.ReleaseHandler -action create

java -Xmx4096m odireleasehandler.ReleaseHandler -action scengen












Megjegyzések