Динамический способ выполнения ETL с помощью Pyspark

Вместо того чтобы писать ETL для каждой таблицы отдельно, вы можете использовать технику динамического выполнения ETL с помощью базы данных (MySQL, PostgreSQL, SQL-Server) и Pyspark. Выполните несколько шагов для написания кода, для лучшего понимания я разбиваю его на шаги.

Шаг 1

Создайте две таблицы в базе данных (я использую SQL-SERVER) с именем TEST_DWH:
таблица etl_metadata для хранения основных данных ETL (исходная и целевая информация).

CREATE TABLE [dbo].[etl_metadata](
    [id] [int] IDENTITY(1,1) NOT NULL,
    [source_type] [varchar](max) NULL,
    [source_info] [text] NULL,
    [destination_db] [varchar](max) NULL,
    [destination_schema] [varchar](max) NULL,
    [destination_table] [varchar](max) NULL,
    [etl_type] [varchar](max) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
Войти в полноэкранный режим Выход из полноэкранного режима

таблица etl_metadaata_schedule для хранения хода выполнения ежедневного ETL

CREATE TABLE [dbo].[etl_metadata_schedule](
    [id] [int] NULL,
    [source_type] [varchar](max) NULL,
    [source_info] [text] NULL,
    [destination_db] [varchar](max) NULL,
    [destination_schema] [varchar](max) NULL,
    [destination_table] [varchar](max) NULL,
    [etl_type] [varchar](max) NULL,
    [status] [varchar](max) NULL,
    [started_at] [datetime] NULL,
    [completed_at] [datetime] NULL,
    [schedule_date] [datetime] NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
Войти в полноэкранный режим Выйти из полноэкранного режима

Шаг 2

Теперь напишите ETL на языке python с помощью Pyspark

  1. Получите данные в pandas, чтобы зациклить процесс ETL
  2. используйте etl_type для переключения источников чтения (В моем случае я использовал два варианта, CSV и базу данных)
  3. записывать данные в место назначения, информация о месте назначения будет использоваться из etl_metadata

"""
Created on Thu Mar 17 11:06:28 2022

@author: Administrator
"""

#SPARK LIBRARIES
from pyspark.sql import SparkSession
import pyodbc
import pandas as pd
#initiate spark env
import findspark
findspark.init()
findspark.find()
#print(findspark.find())




spark = SparkSession 
    .builder 
    .appName("Python ETL script for TEST") 
    .master("local[*]")
    .config("spark.driver.memory", '8g')
    .config("spark.sql.ansi.enabled ",True)
    .config("spark.jars", "C:Driverssqljdbc42.jar") 
    .getOrCreate()





source_type = ''
source_info = ''
destination_db=''
destination_schema=''
destination_table = ''
etl_type = ''
query_string = '' 

##Initiatong variable for query establishhing

#- timedelta(43)
#today = (date.today())
#print("Today's date:", "select a.*,null status,null status_description ,null started_at,null completed_at,GETDATE() schedule_date from dbo.etl_metadata_schedule_staging where schedule_date = "+"'"+str(today)+"'")

#set variable to be used to connect the database




database = "TEST_DWH"
user = "user"
password  = "password"


query_string="SELECT a.*,CONCAT(ISNULL(b.status,'Pending'),b.status) status,null status_description ,null started_at,null completed_at FROM (SELECT *,getdate()  schedule_date FROM dbo.etl_metadata ) a LEFT JOIN [dbo].[etl_metadata_schedule] b ON a.id = b.id and  CAST(b.schedule_date AS date)= CAST(getdate() AS date) where ISNULL(b.status,'A') != 'completed'"

#Read ETL Meta Data
etl_meta_data_staging = spark.read
    .format("jdbc") 
    .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") 
    .option("query", query_string) 
    .option("user", user) 
    .option("password", password) 
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") 
    .load()    


#-------------------CREATE NEW SCHEDULE----------------------------#
etl_meta_data_staging.filter("status == 'Pending'").show()


#THEN READ BASE META DATA AND  CREATE ONE ELSE DONT
etl_meta_data_staging.filter("status == 'Pending'").write 
        .format("jdbc") 
        .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") 
        .option("dbtable", "dbo.etl_metadata_schedule") 
        .option("user", user) 
        .option("password", password) 
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") 
        .mode("append")
        .save()


#-------------------END CREATE NEW SCHEDULE----------------------------#


#--------------SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#
conn = pyodbc.connect("Driver={ODBC Driver 17 for SQL Server};"
                      "Server=localhost,1433;"
                      "Database="+database+";"
                      "UID="+user+";"
                      "PWD="+password+";")

cursor = conn.cursor()
#--------------END SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#

df_etl_meta_data_staging  = etl_meta_data_staging.toPandas()

df_etl_meta_data_staging=df_etl_meta_data_staging.sort_values('id')
#---LOOP : read FROM SOURCE  (ext☺ract) and write to destination.---#
for etl_id in df_etl_meta_data_staging['id']:



   status = 'In Progress'
   print("Starting for "+ str(etl_id))
   #---------------UPDATE In Progress Status---------------#
   cursor.
       execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
               SET [status]='''' 
               +status+ "',[started_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
   conn.commit()
   #---------------UPDATE In Progress Status---------------#





   # load meta data into variables
   source_type = df_etl_meta_data_staging['source_type'][df_etl_meta_data_staging['id']==etl_id].values[0]
   source_info = df_etl_meta_data_staging['source_info'][df_etl_meta_data_staging['id']==etl_id].values[0]
   destination_db = df_etl_meta_data_staging['destination_db'][df_etl_meta_data_staging['id']==etl_id].values[0]
   destination_schema = df_etl_meta_data_staging['destination_schema'][df_etl_meta_data_staging['id']==etl_id].values[0]
   destination_table = df_etl_meta_data_staging['destination_table'][df_etl_meta_data_staging['id']==etl_id].values[0]
   etl_type = df_etl_meta_data_staging['etl_type'][df_etl_meta_data_staging['id']==etl_id].values[0]





   # initialize empty status for each run
   status = ''

   # Read  data from spurce try to read otherwise through exception

   #print(url_link)
   #print("Reading via ", source_info)
   # Read module data

   try: 
        print("Reading via ", source_info)
        # Read module data 
        if source_type == 'CSV':
           jdbcDF = spark.read
                       .format("csv") 
                       .option("header", "true") 
                       .option("quote", """) 
                       .option("escape", """) 
                       .load(source_info) 

           status= 'read_successful'
           jdbcDF.show()

        elif source_type == 'sqlserver':
            jdbcDF = spark.read
                .format("jdbc") 
                .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") 
                .option("query", source_info) 
                .option("user", user) 
                .option("password", password) 
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") 
                .load() 

        #Try to Write Extracted data relevant to destination table
        try:
          jdbcDF.write 
                  .format("jdbc") 
                  .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+destination_db+"};") 
                  .option("dbtable", destination_schema+"."+destination_table) 
                  .option("user", user) 
                  .option("password", password) 
                  .option("truncate", "true") 
                  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") 
                  .mode("overwrite")
                  .save()

          status = 'completed'
          print("Write Successful")
          #---------------UPDATE Success Status---------------#
          cursor.
              execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                      SET [status]='''' 
                      +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
          conn.commit()
          #---------------UPDATE Success Status---------------#

        #except of Write Extracted data relevant to destination table
        #---------------UPDATE Success Status---------------#
        except Exception as e :
              print('some error in writing')
              status = 'error in writing to destination db, '+str(e)
              #---------------UPDATE Error Status---------------#
              cursor.
                  execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                          SET [status]='''' 
                          +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
              conn.commit()
              #---------------UPDATE Error Status---------------#
    #except of Read module data
   except Exception as e :
         print("some error in reading from source")
         status = 'error reading source , '+str(e)
         print(status)
         #---------------UPDATE Error Status---------------#
         cursor.
             execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                     SET [status]='''' 
                     +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
         conn.commit()
         #---------------UPDATE Error Status---------------#

Войти в полноэкранный режим Выйти из полноэкранного режима

Оцените статью
Procodings.ru
Добавить комментарий