Получение данных из сотен источников менее чем за минуту

Представьте, что у вас есть сотни магазинов, которые используют определенный тип базы данных (например, Oracle) и, несмотря на наличие механизмов, экспортирующих данные из них в базу данных вашей компании. Вам необходимо проверить, не пропало ли что-нибудь.

Цель

Обычно вы получаете список магазинов и перебираете их для получения данных, объединения с целевыми данными и поиска недостающих данных.

Получение всех данных, например, чеков, и проверка их соответствия в целом с помощью классического цикла может занять много времени.

Теория

Не проще ли получить рассчитанное количество продаж за соответствующий день и сравнить его с данными внутренней системы, а в случае обнаружения несоответствия — искать только в тех магазинах, где оно было обнаружено?

Производительность

Если обычно цикл for loop не так уж плох, то идея перебора 500 магазинов — это идея.

Однако в Python есть модуль параллелизма, который позволяет порождать несколько процессов одновременно в цикле и ускорять обработку с 30 минут до 50 секунд.

Начальная точка кода

Поскольку нам необходимо подключить Oracle к SQL Server, мы будем использовать пользовательские классы, которые позволят нам получать данные непосредственно во фреймы данных Pandas и обрабатывать данные для наших нужд.

Учитывая, что наши классы написаны примерно так, сначала нам нужно создать функцию, которая запрашивает данные из хранилищ:

from datetime import date, timedelta
from dateutil.relativedelta import relativedelta

yesterday: str = str((date.today()) - timedelta(days=1))
last_three_days: str = str((date.today()) - timedelta(days=3))
last_seven_days: str = str((date.today()) - timedelta(days=7))
last_half_month: str = str((date.today()) - timedelta(days=14))
last_month: str = str((date.today()) - relativedelta(months=+1))
last_quarter: str = str((date.today()) - relativedelta(months=+3))
last_year: str = str((date.today()) - relativedelta(months=+12))
today: str = str((date.today()))

def query_data_from_store(store: str):
    """
    Fetch data from stores

    Args:
      store -> str  
    """

    date_from = last_year

    logger.info(f"Connecting to store: {store} with date from: {date_from}")
    store_connection = OracleConnection(
        f"{store}ORASERV", STORE_USR, STORE_PWD, STORE_SID, 666)
    result = store_connection.query(
        orasql_store_sales_count_query.format(date_from, yesterday))

    return result
Вход в полноэкранный режим Выход из полноэкранного режима

Обычно я добавляю query в функцию, но поскольку мы всегда будем использовать один и тот же запрос для этой работы, его можно жестко закодировать в самой функции.

Данные, получаемые из каждого магазина, выглядят следующим образом:

магазин дата продажи количество продаж
X001 2022-05-02 100
X001 2022-05-03 412
X001 2022-05-04 96

Получение данных с помощью потоков за секунды

Затем мы создаем функцию, которая выполняет итерации по query_data_from_store со списком магазинов, используя 20 потоков одновременно:

def process_data():
    """
    Process data fetched from stores    
    """

    # Create connection with pre-erp database
    logger.info("Connecting to PRE-ERP Database and getting the sales count.")
    warehouse = SqlServerConnection(STAGING_HOST, STAGING_DB)

    # Get production stores into list
    stores_df = warehouse.query(mssql_prod_stores_list)
    stores_list = stores_df["store"].to_list()

    # Remove badly configured stores, for testing / debug purposes:
    unwanted_stores = {"X1234", "X002", "X3596","X991"}
    stores_list = [element for element in stores_list if element not in unwanted_stores]

    # Iterate over query function using 20 threads using stores list
    pool = multiprocessing.Pool(20)
    result = pool.map(query_data_from_store, stores_list)
    pool.close()
    pool.join()

    return result
Вход в полноэкранный режим Выход из полноэкранного режима

Анализ аномалий

Последнее — это анализ данных (обычно мы заносим результаты в таблицу, а затем визуализируем их в виде ошибок в BI).

Для целей статьи мы записываем в файлы все несоответствия, которые мы нашли во время сравнения:

def check_store_receipts_integrity():
    """Check sales data integrity using direct stores connection."""

    date_from = last_year

    multiprocessing.freeze_support()

    start_data_fetch_time = timer()
    results = process_data()
    pos_quantity = pd.concat(results, ignore_index=True, axis=0)
    end_data_fetch_time = timer()

    # Create connection with PRE-ERP database
    warehouse = SqlServerConnection(STAGING_HOST, STAGING_DB)
    start_processing_time = timer()

    # Get receipts count per store and day from sales table
    target_quantity = warehouse.query(
        mssql_get_target_receipts_count.format(date_from, yesterday))

    # Some type refactoring
    store_quantity.store = store_quantity.store.astype(str)
    store_quantity.sale_date = store_quantity.sale_date.astype(str)
    target_quantity.store = target_quantity.store.astype(str)
    target_quantity.sale_date = target_quantity.sale_date.astype(str)

    # Merge dataframes with stores quantity
    logger.info("Merging both data sources into one.")
    difference_temp = pd.merge(target_quantity, store_quantity,
                               how="left",
                               on=["store", "sale_date"])

    # Generate differences
    logger.info("Generating differences.")
    difference_temp["difference"] = (
            difference_temp["store_quantity"] - difference_temp["target_quantity"])
    missing_in_target = difference_temp[difference_temp["difference"] > 0]

    # Print info about differences in console
    logger.info("Checking if differences exist.")
    if len(missing_in_target) > 0:
        logger.warning("The inconsistency in data has been found. Starting row by row comparison...")

        # Get missing in target sales data
        logger.info("Generating temp variables.")
        receipts_from_store= pd.DataFrame()
        receipts_from_target = pd.DataFrame()
        results_from_stores_and_target = pd.DataFrame()

        if str(type(missing_in_target)) in """<class 'pandas.core.frame.DataFrame'>""":

            missing_in_target_list = missing_in_target[["store", "receipt_date"]]

            for i, row in missing_in_terg_list.iterrows():
                store: str = str(row["store"])
                sale_date: str = str(row["sale_date"])
                server: str = str(store + "ORASERV")

                logger.info(f"Connecting to store: {store} with date: {sale_date}")

                store_connection = OracleConnection(
                    server, STORE_USR, STORE_PWD, STORE_SID, 666)

                temp_stores = store_connection.query(
                    orasql_get_store_receipts_for_specific_day.format(receipt_date))
                receipts_from_store = pd.concat([receipts_from_store, temp_stores], ignore_index=True, axis=0)

                temp_target = warehouse.query(mssql_get_target_receipts_for_specific_day.format(store, receipt_date))
                receipts_from_target = pd.concat([receipts_from_target, temp_target], ignore_index=True, axis=0)

                receipts_from_store["erp_number"] = receipts_from_store["store"] + "/" + receipts_from_market[
                    "salenumber"]
                receipts_from_target["erp_number"] = receipts_from_target["store"] + "/" + receipts_from_target[
                    "document_number"]

                if receipts_from_store.equals(receipts_from_target) is False:
                    results_from_stores_and_target= pd.merge(receipts_from_stores, receipts_from_target, how="left",
                                                            on="ax_number"
                                                            , indicator=True)

                    results_from_stores_and_target = results_from_stores_and_target.rename(columns={
                        "store_x": "store",
                        "sale_date_x": "date",
                        "sale_id_x": "id",
                        "sale_number_x": "number",
                        "sale_value_x": "value",
                        "erp_number": "erpnumber",
                        "store_y": "store",
                        "sale_date_y": "saledate",
                        "sale_id_y": "saleid",
                        "sale_number_y": "salenumber",
                        "sale_value_y": "salevalue",
                        "_merge": "ismissing"}
                    )

                else:
                    logger.info("Compared data row by row and found nothing.")

            if len(results_from_stores_and_target) > 0:
                # warehouse.push_data(results_from_stores_and_target, 'stores_integrity_report', 'staging')
                if os.path.exists("./data/processed/store_integrity_checks.csv"):
                    os.remove("./data/processed/store_integrity_checks.csv")
                    logger.info("Cleaning .csv file for reimport.")
                else:
                    logger.warning("The .csv file does not exist...")

                if os.path.exists("./data/processed/store_integrity_checks.xlsx"):
                    os.remove("./data/processed/store_integrity_checks.xlsx")
                    logger.info("Cleaning .xlsx file for reimport.")
                else:
                    logger.warning("The .xlsx file does not exist...")

                results_from_stores_and_target = results_from_stores_and_target[
                    results_from_stores_and_target["ismissing"] == "left_only"]
                results_from_stores_and_target.to_csv("./data/processed/pos_integrity_checks.csv", index=False)
                results_from_stores_and_target.to_excel("./data/processed/pos_integrity_checks.xlsx", index=False)

                if len(results_from_stores_and_target) > 0:
                    logger.warning(
                        f"Found and exported: {len(results_from_stores_and_target)} inconsistencies to file.")
                else:
                    logger.info("There are no missing sales in PRE-ERP.")

    end_processing_time = timer()

    logger.info(f"Fetched rows from stores in: {(end_data_fetch_time - start_data_fetch_time):0.4f} "
                f"seconds, while data processing took: {(end_processing_time - start_processing_time):0.4f}.")
Войти в полноэкранный режим Выйти из полноэкранного режима

Вула!

Мы не только сравнили данные из нескольких источников с нашей целевой базой данных, но и использовали параллелизм, что ускорило нашу обработку с 30 минут до минуты или около того.

Оцените статью
Procodings.ru
Добавить комментарий