Детальный взгляд на рабочий процесс ETL Conductor

В нашей предыдущей статье об использовании Conductor для анализа данных мы обсудили рабочий процесс Netflix Conductor, который извлекает данные из GitHub, преобразует их, а затем загружает результаты в Orbit. По сути, это описывает процесс ETL (Extract, Transform, Load) — автоматизированный рабочий процесс Conductor. В этой заметке мы подробно рассмотрим, как построен рабочий процесс — изучим, что делает каждая задача. Этот рабочий процесс будет выполняться ежедневно в полночь по Гринвичу, гарантируя, что данные в нашем экземпляре Orbit всегда соответствуют данным на GitHub.

Наш рабочий процесс выглядит следующим образом:

Запуск рабочего процесса

Мы запускаем рабочий процесс со следующими входными данными:

{ "gh_account": "netflix", "gh_repo": "conductor", "star_offset": 3800, "gh_token": "<github_api_key>", "orbit_workspace": "oss-stats", "activity_name": "starredConductor", "orbit_apikey": "<orbit_api_key>"}
Войти в полноэкранный режим Выйти из полноэкранного режима

Первые 4 записи — это параметры GitHub:

  • Учетная запись GitHub, на которой размещается репозиторий
  • Имя репозитория GitHub
  • Смещение звезды (нет смысла получать звезды 0-3800 — они будут неизменны)
  • токен API GitHub

Последние 3 записи — это параметры Orbit:

  • Рабочая область Orbit для загрузки данных.
  • Каждая активность в Orbit имеет имя. starredConductor — это то, как мы отмечаем, когда проводник был отмечен звездой.
  • Ключ API Orbit для аутентификации наших загрузок.

Имея эти данные, мы можем теперь пройти через весь рабочий процесс. Мы начнем с первых двух задач, которые используются для дальнейшей настройки рабочего процесса:

Задачи настройки рабочего процесса

Первые две задачи в нашем рабочем процессе — это задачи настройки, которые помогают подготовить сцену и успешно завершить рабочий процесс:

  • calculate_start_cutoff — это INLINE-задача. Поскольку этот рабочий процесс запускается каждые 24 часа, мы хотим добавить только «новые звезды», которые произошли за последние 24 часа. Эта задача использует JavaScript для получения времени отсечки за 24 часа до этого:
{ "name": "calculate_start_cutoff", "taskReferenceName": "calculate_start_cutoff_ref", "inputParameters": { "evaluatorType": "javascript", "expression": "new Date(Date.now() - 86400 * 1000).toISOString();" }, "type": "INLINE" }
Войти в полноэкранный режим Выход из полноэкранного режима

Выражение берет текущее время и вычитает нужное количество миллисекунд, чтобы получить время 24 часа назад.

  • Get_repo_details : Это HTTP-задача, которая опрашивает репозиторий GitHub. Это даст нам общую информацию о репозитории (в данном случае netflix/conductor). Одним из элементов является общее количество звезд. Мы используем его, чтобы понять верхнюю границу нашего запроса stargazer (начало — 3800, конец — любое значение).

Получив эти данные, мы можем приступить к ETL, извлекая и преобразуя данные из GitHub.

Извлечение данных из GitHub

Следующий раздел рабочего процесса извлекает данные из GitHub. Основной особенностью этого раздела является цикл DO/WHILE под названием get_all_stars. API GitHub предоставляет только 100 результатов за один раз, поэтому мы можем использовать цикл для получения более 100 записей из GitHub. У нас есть «начальное» значение cutoff из входного JSON и конечное значение из общего количества звезд, поэтому этот цикл вычисляет, сколько раз нужно сделать запрос, а затем перебирает эти запросы.

Вот как DO/WHILE завершает вычисления:

 "inputParameters": { "offset": "${workflow.input.star_offset}", "stargazers": "${Get_repo_details_ref.output.response.body.stargazers_count}" }, "loopCondition": "if ($.get_all_stars_loop_ref['iteration'] < Math.ceil(($.stargazers-$.offset)/100)) { true; } else { false; }",
Вход в полноэкранный режим Выход из полноэкранного режима

Входные параметры дают нам начальный номер (входной параметр star_offset — который в данном примере равен 3800), и общее количество звезд (stargazers).

Условие цикла будет выполняться до тех пор, пока есть звезды, которые можно захватить. Например, 16 мая 2022 года для Netflix Conductor имеется 4319 звездочетов. 4319-3800 (начальное смещение) — это 519. Поскольку каждый ответ GitHub содержит максимум 100 элементов, нам потребуется 5,19 обращений к GitHub. Math.ceil округляет это число до 6.

При таком механизме цикла в каждом цикле выполняется три задачи:

  1. pagination_calc_ref : Звезды GitHub 3800 — 3899 появятся на 39-й странице результатов GitHub (существует проблема, связанная с тем, что страница 1 состоит из записей 1-99). Однако итератор DO/WHILE начинается со значения 1. Эта INLINE-задача использует JavaScript для создания счетчика страниц GitHub: parseInt(offset/100) + iterator — цикл 1 начнется со страницы 39 в нашем примере.

  2. 100_stargazers : Используя входные данные рабочего процесса — владелец GitHub, репозиторий & ключ API — вместе с пагинацией, рассчитанной в предыдущей задаче, мы можем извлечь 100 записей из GitHub.

Преобразование данных из GitHub

  • jq_cleanup_stars Эта задача берет большие выходные данные с GitHub и упрощает их, чтобы отобразить только те значения, которые нам нужны для Orbit. Мы также отформатируем JSON, чтобы он соответствовал формату, который Orbit требует для ввода данных.

Для этого мы используем задачу JQ Transform Task. Здесь много всего происходит, поэтому мы будем распаковывать это шаг за шагом:

{ "name": "jq_cleanup_stars", "taskReferenceName": "jq_cleanup_stars_ref", "inputParameters": { "activityName": "${workflow.input.activity_name}", "starlist": "${hundred_stargazers_ref.output.response.body}", "queryExpression": "[.starlist[] | select (.starred_at > "${calculate_start_cutoff_ref.output.result}") |{occurred_at:.starred_at, title: "${workflow.input.activity_name}", member: {github: .user.login}}]" }, "type": "JSON_JQ_TRANSFORM" }
Войти в полноэкранный режим Выход из полноэкранного режима

Для входных данных мы берем имя активности (в данном случае starredConductor), список звезд starlist из предыдущей задачи, а затем прогоняем его через JQ-запрос.

Выражение queryExpression.

  1. В нашем выражении запроса используется массив startlist[].

  2. Первый аргумент (между трубами): select (.starred_at > "${calculate_start_cutoff_ref.output.result}") Это сравнивает значение starred_at в выводе GitHub с рассчитанным значением cutoff. Это фильтрует список звезд GitHub, отбирая только те, где действие звезды произошло за последние 24 часа. Если запись является «новой», мы можем продолжить преобразование.

  3. Теперь, когда мы определили, что данные соответствуют нашим временным критериям, мы можем создать входной JSON-файл в формате, который требует Orbit: |{occurred_at:.starred_at, title: "${workflow.input.activity_name}", member: {github: .user.login}}].

После завершения цикла DO/WHILE мы получим всех наших новых пользователей, отмеченных звездами, идентифицированных и разобранных в массив JSON, который Orbit понимает для загрузки.

Дополнительные преобразования

Выходом нашего цикла DO/WHILE является массив JSON с результатами всех задач. Это означает, что в нашем примере данных у нас есть массив 6 с результатами всех трех задач.

Первая задача после цикла — jq_stars_combine. Это еще одно преобразование JQ, которое объединяет все входные данные в формате Orbit в один массив JSON:

{ "name": "jq_stars_combine", "taskReferenceName": "jq_stars_combine", "inputParameters": { "bigList": "${get_all_stars_loop_ref.output}", "queryExpression": ".bigList[].jq_cleanup_stars_ref?.resultList?[][]" }, "type": "JSON_JQ_TRANSFORM", "decisionCases": {}, "defaultCase": [], "forkTasks": [], "startDelay": 0, "joinOn": [], "optional": false, "defaultExclusiveJoinTask": [], "asyncComplete": false, "loopOver": [] },
Войти в полноэкранный режим Выйти из полноэкранного режима

bigList — это большой вывод из списка DO/WHILE. JQ-запрос извлекает только результаты каждой итерации jq_cleanup_stars_ref и объединяет их в один массив JSON.

Загрузка данных на Orbit

Если бы API Orbit позволял нам загружать все записи сразу, мы могли бы сделать это с результатами последней задачи. Но это не так, каждая активность должна быть выгружена отдельно.

Для этого используется еще один цикл DO/WHILE loop_through_users для перебора всех записей в jq_stars_combine:

 "inputParameters": { "activities": "${jq_stars_combine.output.resultList}" }, "loopCondition": "if ($.loop_through_users_ref['iteration'] < $.activities.length) { true; } else { false; }",
Вход в полноэкранный режим Выход из полноэкранного режима

Для каждого вида деятельности выполняется 2 задачи:

{ "name": "zero_offset_fix", "taskReferenceName": "zero_offset_fix", "inputParameters": { "iterator": "${loop_through_users_ref.output.iteration}", "jsonList": "${jq_stars_combine.output.resultList}", "evaluatorType": "javascript", "expression": " $.jsonList[$.iterator -1];" }, "type": "INLINE" },
Войти в полноэкранный режим Выход из полноэкранного режима
{ "name": "post_to_orbit", "taskReferenceName": "post_to_orbit_ref", "inputParameters": { "http_request": { "uri": "https://app.orbit.love/api/v1/${workflow.input.orbit_workspace}/activities", "method": "POST", "headers": { "Authorization": "Bearer ${workflow.input.orbit_apikey}" }, "body": "${zero_offset_fix.output.result}", "readTimeOut": 2000, "connectionTimeOut": 2000 } }, "type": "HTTP" }
Войти в полноэкранный режим Выход из полноэкранного режима

Избегание ограничения скорости

API Orbit имеет ограничение скорости 120 записей в минуту, и если ежедневная загрузка превышает 120 записей, возможно, что последняя задача будет ограничена Orbit по скорости. Чтобы избежать ограничения скорости, мы можем расширить задачу. Это означает, что мы создадим задачу в нашем экземпляре Conductor под названием post_to_orbit и добавим два дополнительных параметра:

  "rateLimitPerFrequency": 100, "rateLimitFrequencyInSeconds": 60,
Войти в полноэкранный режим Выйти из полноэкранного режима

Это указывает Conductor, что любые вызовы задачи post_to_orbit — независимо от того, какой рабочий процесс в нашем экземпляре Conductor вызывает эту задачу — должны быть ограничены 100 вызовами в 60 секунд. Это предотвратит сбой нашего рабочего процесса из-за ограничения скорости со стороны Orbit API.

Планирование рабочего процесса

В Orkes Cloud (и в Orkes Playground) есть функция, называемая планировщиком (скоро она появится в Open Source Conductor). Планировщик дает возможность планировать ваши рабочие процессы на регулярный интервал. Для рабочего процесса GitHub to Orbit выбран интервал 0 0 0 0 * * * ?, что означает, что рабочий процесс будет запускаться каждый день в 12:00 по Гринвичу.

Благодаря автоматическому планированию рабочего процесса данные Netflix Conductor Star ежедневно загружаются с GitHub на Orbit без участия человека!

Заключение

Этот рабочий процесс является примером рабочего процесса ETL — мы извлекаем данные из GitHub, преобразуем их, а затем загружаем в Orbit. Он выполняется автоматически каждые 24 часа.

До создания этого рабочего процесса данные приходилось извлекать вручную, обновлять их в электронной таблице, а затем загружать в Orbit через CSV. Автоматизировав этот процесс с помощью Netflix Conductor (в Orkes Cloud), мы обеспечиваем точное и регулярное обновление «звездочек» Netflix Conductor в нашем рабочем пространстве данных Orbit.

Оцените статью
Procodings.ru
Добавить комментарий