В предыдущей статье «Концепция дизайна лучшего проекта с открытым исходным кодом о больших данных и data lakehouse» была представлена концепция дизайна и принцип частичной реализации фреймворка LakeSoul с открытым исходным кодом и потоковым пакетным интегрированным поверхностным хранилищем. Первоначальный замысел дизайна LakeSoul заключается в решении различных проблем, которые трудно решить в традиционных сценариях хранилища данных Hive, включая обновление Upsert, слияние при чтении и одновременную запись. В этой статье будут продемонстрированы основные возможности LakeSoul на примере типичного сценария применения: создание библиотеки образцов машинного обучения в реальном времени.
1. Общие сведения о бизнес-требованиях
1.1 Онлайновая рекомендательная система
В Интернете, финансовой и других отраслях многие бизнес-сценарии, такие как поиск, реклама, рекомендации, контроль рисков и т.д., могут быть обобщены как онлайновая персонализированная рекомендательная система. Например, в бизнесе электронной коммерции персонализированные рекомендации «угадай, что тебе нравится», основанные на системе персонализированных рекомендаций, могут улучшить показатели кликов и покупок пользователей. В рекламном бизнесе персонализированные рекомендации являются основной системой для достижения правильной ориентации и повышения ROI. В сфере контроля финансовых рисков необходимо реализовать прогнозирование в реальном времени способности пользователей к погашению кредита и возможности просрочки, а также предоставить персонализированную кредитную линию и цикл погашения кредита для каждого пользователя.
Рекомендательная система широко используется в различных отраслях. Построение онлайновой рекомендательной системы промышленного уровня требует подключения множества связей и систем, затрачивая большое количество времени на разработку. Фреймворк MetaSpore, разработанный компанией DmetaSoul, предоставляет универсальное решение для разработки рекомендательной системы. Пожалуйста, обратитесь к моему предыдущему посту «Концепция дизайна всемогущего Opensource-проекта о платформе машинного обучения».
Эта статья посвящена созданию базы данных образцов в реальном времени для реализации полного замкнутого цикла «обратная связь с пользователем — итерация модели». Рекомендательная система может обучаться и итерироваться независимо и быстро улавливать изменения пользовательского интереса.
1.2 Что такое библиотека образцов машинного обучения рекомендательной системы?
В рекомендательной системе основной частью является модель алгоритма персонализированной сортировки. Обучение модели начинается с построения выборок и изучения предпочтений каждого пользователя с помощью различных характеристик и меток обратной связи о поведении пользователя. Библиотека образцов обычно состоит из нескольких частей:
- Характеристика пользователя: включает основные атрибуты, исторические модели поведения и недавние модели поведения пользователей в реальном времени. Основные атрибуты пользователей могут быть получены из онлайн-запросов в реальном времени или меток поведения, добытых автономным DMP. Историческое поведение пользователей и их поведение в реальном времени обычно включает события с поведением обратной связи в истории пользователей и некоторые соответствующие совокупные статистические показатели.
- Элемент-функция: Элемент — это объект, который должен быть рекомендован пользователям, это могут быть товары, новости, реклама и т.д. Характеристики обычно включают все виды атрибутов статей, в том числе дискретные атрибуты и непрерывные атрибуты статистических значений.
- Отзывы пользователей: является меткой в модели алгоритма. Ярлыки — это все виды поведения обратной связи с пользователем, такие как показать, нажать, преобразовать и т.д. Алгоритмические модели должны научиться моделировать предпочтения пользователя через взаимосвязь между признаками и метками.
*1.3 Проблемы создания библиотеки образцов машинного обучения.
Существует несколько видов проблем и вызовов при создании библиотеки образцов машинного обучения:
- Требования реального времени. Обучение модели основной рекомендательной системы в отрасли развивается в направлении онлайн и реального времени. Чем своевременнее обновляется модель, тем быстрее она может улавливать изменения в интересах пользователей, обеспечивая тем самым более точные результаты рекомендаций и улучшая бизнес-эффект, что требует от библиотеки образцов поддержки высокой пропускной способности записи.
- Многопотоковые обновления. На многие онлайн-функции необходимо влиять в режиме реального времени для дальнейшего обучения модели после расчета сортировки моделью в режиме онлайн. Отзывы пользователей также должны поступать в библиотеку образцов, часто с несколькими потоками отзывов пользователей в реальном времени. В этом случае несколько живых потоков одновременно записываются в разные столбцы в библиотеке образцов обновлений. Традиционные хранилища данных Hive обычно не могут поддерживать обновления в реальном времени и должны быть реализованы через полный Join. Однако эффективность операции низка, если окно Join велико и большое количество данных является избыточным. Оконный Join в Flink также имеет проблему огромных данных состояния и высоких затрат на эксплуатацию и обслуживание.
- Параллельные эксперименты. В практической деятельности инженерам, разрабатывающим алгоритмы, часто приходится проводить параллельные эксперименты с несколькими моделями для одновременного сравнения результатов. Для разных моделей могут потребоваться разные колонки признаков и меток, обновляемые по-разному. Например, автономные пакетные вычисления генерируют некоторые признаки, и эти пакетные данные также должны быть вставлены в базу данных признаков.
- Откат признаков. При алгоритмическом развитии бизнеса иногда требуется добавление признаков, то же самое относится и к моделям с обратным отслеживанием, требующим пакетного обновления новых признаков по историческим данным. Это также сложно эффективно реализовать в Hive.
Существует множество проблем при построении базы данных выборки в реальном времени в сценарии алгоритма рекомендательной системы. Основные проблемы этих задач заключаются в том, что функции и производительность хранилища данных Hive слабы, а такие сценарии, как пакетная интеграция потоков, инкрементное обновление и одновременная запись, не могут быть хорошо поддержаны. Bytedance и другие компании ранее делились решениями на основе Hudi для создания образцов рекомендательных систем в потоковой и пакетной интеграции. Однако у Hudi все еще есть проблемы, такие как одновременное обновление в реальном использовании.
Разработанный компанией DMetaSoul и являющийся открытым исходным кодом потоковый пакетный фреймворк хранения таблиц с одним телом, LakeSoul может хорошо решить эти проблемы. В следующей статье подробно описано, как использовать LakeSoul для создания библиотеки образцов рекомендательных систем промышленного уровня.
2 Создание библиотеки образцов машинного обучения в реальном времени
LakeSoul — это структура хранения таблиц, разработанная для потоковых пакетных сценариев и имеющая следующие ключевые особенности:
- Обновление на уровне столбцов (Upsert)
- Поддержка функции Merge on Read, которая объединяет данные во время чтения для повышения пропускной способности записи.
- Поддержка хранения объектов без семантики файлов
- одновременная запись, которая может поддерживать несколько потоков и пакетных заданий для обновления одного и того же раздела
- Распределенное управление метаданными для улучшения масштабируемости метаданных
- Эволюция схемы позволяет добавлять и удалять столбцы таблицыОбщий дизайн построения библиотеки примеров машинного обучения LakeSoul заключается в использовании Upsert вместо Join для записи нескольких групп признаков и меток в одну таблицу потоковым и пакетным способом, соответственно, достигая высокой пропускной способности одновременной записи и чтения/записи. В следующей части подробно описывается конкретный процесс реализации и фундаментальные принципы.
2.1 Проектирование первичного ключа
Чтобы обеспечить эффективное слияние, LakeSoul предоставляет возможность задавать первичные ключи. Разделите столбцы первичного ключа в таблице на определенное количество хэш-бакетов равномерно в соответствии с количеством хэш-бакетов. В каждом ведре столбцы первичных ключей сортируются и записываются. Объединить несколько инкрементных файлов с упорядоченным первичным ключом во время чтения, получив результат Merge.
При рекомендации библиотек образцов системы обратный поток всех функций и тегов помечен идентификатором запроса, сгенерированным во время онлайн-запроса, который используется в качестве ключа Join Key в сценариях offline Join. Поэтому вы можете использовать идентификатор запроса в качестве первичного ключа таблицы образцов LakeSoul, а час — в качестве раздела Range. Создайте таблицу LakeSoul в задании Spark следующим образом:
LakeSoulTable.createTable(data, path).shortTableName("sample").hashPartitions("request_id").hashBucketNum(100).rangePartitions("hour").create()
Это создаст таблицу с ‘request_id’ в качестве первичного ключа, хэш-букетами 100 и часами в качестве раздела Range.
2.2 Запись данных и одновременное обновление
Поскольку характеристики и метки поступают из разных потоков и партий, нам нужны задания из нескольких потоков или партий для одновременного обновления таблицы SAMPLE. Каждые данные должны иметь столбец request_id и столбец hour. При выполнении LakeSoulTable.Upsert, LakeSoul Spark Writer автоматически переразбивает ведра на основе request_id и записывает данные в соответствующее ведро разбиения в соответствии с колонкой часа. Пакет записанных данных может иметь значения нескольких Range partitions.
LakeSoul поддерживает многопотоковый одновременный Upsert, который может удовлетворить потребности многопотоковых обновлений базы данных образца в реальном времени. Например, есть два потока, а именно данные о рефлюксе признаков и рефлюксе меток, которые могут быть обновлены в базе данных образцов в режиме реального времени путем выполнения Upsert:
// Read feature reflow, update sample table
val featureStreamDF = spark.readStream...
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
lakeSoulTable.upsert(featureStreamDF)
// Read label reflow, update sample sheet
val labelStreamDF = spark.readStream...
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
lakeSoulTable.upsert(labelStreamDF)
Поскольку операция Merge не требуется, записываются только текущие инкрементные данные, что обеспечивает высокую пропускную способность. В реальных тестах скорость записи каждого ядра в облачное хранилище объектов составляет более 30 МБ/с, то есть 30 исполнителей Spark, что означает, что скорость записи может достигать 1 ГБ при одном ядре CPU, выделенном для каждого.
2.3 Слияние при чтении
LakeSoul автоматически объединяет _Upsert _данные при чтении. Таким образом, интерфейс для чтения ничем не отличается от интерфейса для чтения таблицы:
val lakeSoulTable = LakeSoulTable.forPath(path)
lakeSoulTable.toDF.select("*").show()
Его также можно запросить с помощью операторов SQL Select. В базовой реализации для каждого хэш-ведра, поскольку первичный ключ уже упорядочен, требуется только внешнее объединение нескольких упорядоченных списков, как показано ниже:
На рисунке показано, что и поток образцов, и поток меток выполняют множественные _Upsert_s. LakeSoul автоматически найдет инкрементный обновленный файл на основе записи обновления службы метаданных и выполнит упорядоченное внешнее слияние, когда работает задание на чтение. LakeSoul реализует упорядоченное слияние файлов Parquet и улучшает производительность многостороннего упорядоченного слияния за счет оптимизации конструкции малой верхней кучи.
2.4 Обратное заполнение данных
Поскольку LakeSoul поддерживает Upsert любых данных, разбитых на диапазоны, нет никакой разницы между обратной загрузкой и потоковой записью. Когда данные, которые необходимо вставить, готовы, Spark выполняет Upsert для обновления исторических данных. LakeSoul автоматически распознает изменения схемы. Обновление метаинформации таблиц для реализации эволюции схемы. LakeSoul обеспечивает полную функцию хранения таблиц хранилища данных, и каждый исторический раздел может быть запрошен и обновлен. По сравнению со схемой оконного соединения Flink, она решает проблему невидимых промежуточных состояний и может быстро реализовать массовое обновление и прослеживаемость исторических данных.
Заключение
Эта статья представляет применение LakeSoul в типичном сценарии пакетной интеграции потоков, создавая образец библиотеки машинного обучения рекомендательной системы. Пакетная интеграция потоков LakeSoul с возможностью Merge on Read может поддерживать крупномасштабное многопотоковое обновление в реальном времени, решать некоторые проблемы, существующие в Hive warehouse mass Join и Flink window Join.