Недавно команда LakeSoul r&d помогла пользователям решить практическую бизнес-задачу с помощью Hudi. Вот краткое описание и запись. Бизнес-процесс заключается в том, что восходящая система извлекает исходные данные из онлайн-таблицы DB в формат JSON и записывает их в Kafka. Нисходящая система использует Spark для чтения сообщений в Kafka. Данные обновляются и агрегируются с помощью Hudi и отправляются в базу данных для анализа.
Некоторые данные в Kafka – это только некоторые поля исходной таблицы. Образец данных Kafka: {A: A1, C: C4, D: D6, E: E7} {A: A2, B: B4, E: E6} {A: A3, B: B5, C: C5, D: D5, E: E5}. В последующих обновлениях данных используйте последние исторические данные вместо отсутствующего значения поля без обновления.
Следующий рисунок упрощает процесс обновления данных. В исходной таблице пять полей – A, B, C, D и E. Поле A является первичным ключом, а его тип – строка. Spark считывает пакетные данные из Kafka и преобразует их в формат, требуемый Upsert (DataFrame фиксированной схемы). MOR (Merge on Read) считывает новое содержимое таблицы.
В настоящее время для реализации этого бизнес-процесса используется Hudi’s Merge on Read, при этом фиксированная схема для вышеуказанных JSON-данных не поддерживается. Реализация вышеуказанного бизнес-процесса в Hudi’s Merge on Read была бы невозможна без фиксированной схемы для вышеуказанных JSON-данных. Недействительное значение NULL перезаписывает исходное содержимое, если отсутствующее поле заполнено нулевым значением. Копирование при записи ухудшается, если используется Merge Into, и производительность записи не соответствует требованиям. Обходным решением является получение неизмененных данных из исходной таблицы для каждого заполнения данных. Однако это увеличивает затраты ресурсов и нагрузку на разработчиков, что не соответствует ожиданиям пользователей.
LakeSoul поддерживает пользовательский MergeOperator. Каждому полю при выполнении Upsert может быть передан определяемый пользователем MergeOperator. Параметрами являются исходное значение поля и новое значение Upsert. Именно здесь результаты слияния могут быть определены на основе бизнес-требований. UDF такой же, как и родной UDF Spark. При использовании Upsert необходимо указать значение первичного ключа. Поэтому несколько дельта-файлов могут иметь различные значения для одного и того же первичного ключа и одного и того же поля. MergeOperator управляет поведением слияния этих значений. Реализация MergeOperator по умолчанию выглядит следующим образом:
class DefaultMergeOp[T] extends MergeOperator[T] {
override def mergeData(input: Seq[T]): T = {
input.last
}
}
В этом сценарии вы можете определить MergeOperator. Для неопределенных полей MergeOperator по-прежнему заполняет нулевые значения в качестве уникальных маркеров (служба гарантирует, что уникальные маркеры не конфликтуют с обычными данными). MergeOperator игнорируется во время Merge и возвращает исходные значения. Таким образом, когда Spark обрабатывает данные JSON и выполняет Upsert, null игнорируется. Исходное содержимое не перезаписывается, что позволяет уменьшить количество отсутствующих данных полей в процессе первоначального заполнения данных, значительно повысить эффективность выполнения и упростить логику кода. Код для этого пользовательского MergeOperator выглядит следующим образом:
class MergeNonNullOp[T] extends MergeOperator[T] {
override def mergeData(input: Seq[T]): T = {
val output=input.filter(_!=null)
output.filter(!_.equals("null")).last
}
}
Как видите, простая пользовательская реализация MergeOperator решает сложную бизнес-задачу.
Команда LakeSoul планирует интегрировать MergeOperator, который игнорирует пустые поля, во встроенную систему LakeSoul, с глобальными опциями для управления тем, включен ли этот тип MergeOperator по умолчанию или нет, что еще больше повысит эффективность разработки. См. Github Issue: https://github.com/meta-soul/LakeSoul/issues/30.
В будущем LakeSoul также будет поддерживать синтаксис Merge Into SQL для определения поведения Upsert и Merge on Read для дальнейшего улучшения выражения обновлений пакетной записи потока.
Для получения дополнительной информации об облачном нативном фреймворке LakeSoul Stream Batch All-in-one для поверхностного хранения данных обратитесь к предыдущей статье:
Построение библиотеки примеров машинного обучения в реальном времени с использованием лучшего проекта с открытым исходным кодом о больших данных и озерном хранилище данных, LakeSoul
Концепция дизайна лучшего opensource-проекта о больших данных и data lakehouse
4 лучших проекта с открытым исходным кодом о больших данных, которые вы должны попробовать