- Цель
- Контекст
- Обновление агрегата
- В сторону: агрегаты звучат медленно
- Продолжительность жизни
- Снимки
- Эффективная датировка события удаления
- Чтение записей
- Что насчет существующих событий?
- Обновление модели чтения
- Подготовка базы данных
- Обновление проектора
- Принудительное повторное заполнение базы данных
- Скрытие удаленных элементов
- На чем мы остановились?
- Новая функция: восстановление удаленных элементов
- Реализация
- Создайте команду RestoreTodo.
- Создайте событие TodoRestored
- Обновление агрегата
- Направить команду на агрегат
- Обновление проектора
- Добавление метода API
- Добавьте метод в модуль контекста
- Добавьте метод в контроллер
- Зарегистрировать маршрут
- Валидация
- Резюме
Цель
Реализовать мягкое удаление в API из первой части, позволяющее восстанавливать элементы после удаления.
Следуйте тому, чему я научился в процессе итераций над проектом под названием todo_backend_commanded
. В его git-истории показаны шаги, необходимые для реализации функциональности мягкого удаления и восстановления.
Контекст
В предыдущем посте я преобразовал ванильный Phoenix API в CQRS с помощью Commanded.
Это приложение пишет в базу данных, используя хекс-пакет commanded_ecto_projections
, который подписывается на события и проецирует их состояние в таблицы, управляемые библиотекой базы данных Ecto.
Поскольку основная модель данных этого приложения представляет собой журнал с неизменяемыми данными (append-only), события могут быть воспроизведены, а модель чтения может быть кардинально изменена с использованием существующих данных.
Обновление агрегата
(Ссылка на соответствующий коммит)
Первым шагом будет добавление поля deleted_at
в агрегат Todo
.
defmodule TodoBackend.Todos.Aggregates.Todo do
defstruct [
:uuid,
:title,
:completed,
- :order
+ :order,
+ deleted_at: nil
]
...
- def apply(%Todo{uuid: uuid}, %TodoDeleted{uuid: uuid}) do
- nil
- end
+ def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid}) do
+ %Todo{todo | deleted_at: DateTime.utc_now()}
+ end
Агрегат — это структура, представляющая состояние объекта в приложении. В Commanded агрегаты создаются в памяти из диспетчеризированных событий. Структура агрегата используется для определения побочных эффектов команды, если команда вообще должна быть принята.
Этот diff заставляет поле deleted_at
агрегата быть установленным в качестве времени даты, когда журнал событий гидратируется. Мы еще вернемся к этому, поскольку было бы предпочтительнее фиксировать время, когда элемент todo был удален.
В сторону: агрегаты звучат медленно
Если агрегат должен гидратироваться из своих событий каждый раз, когда он используется, то чем это не ужасная нагрузка на производительность?
На практике у отдельного агрегата не должно быть много событий. Сколько взаимодействий может быть у одного элемента todo? Если окажется, что некоторые элементы имеют большие журналы событий, есть несколько рычагов, за которые можно потянуть в Commanded.
Продолжительность жизни
(Ссылка на соответствующий коммит)
Итак, я солгал сказал полуправду об агрегатах. Они не гидратируются в памяти для каждой команды/события. В действительности, агрегаты реализованы с помощью GenServer
, каждый из которых кэширует своё состояние и управляется деревом супервизоров управляемого приложения (в конечном итоге, DynamicSupervisor
под названием Commanded.Aggregates.Supervisor
, если быть точным).
Поскольку агрегаты кэшируются как длительно выполняющиеся процессы, нам нужно получить каждое событие только один раз.
Таймаут процесса агрегации может быть настроен через реализацию поведения AggregateLifespan
. Время жизни по умолчанию, DefaultLifespan
, устанавливает бесконечный тайм-аут, если не возникнет исключение.
В зависимости от количества агрегатных экземпляров в живой системе, это может создать другую проблему — эти процессы могут оставаться живыми в течение всей жизни сервера. Это похоже на OOM, который только и ждет, чтобы произойти.
Чтобы решить эту проблему, я реализовал время жизни для агрегата Todo, которое сохраняет агрегаты в течение минуты, пока они не получат событие TodoDeleted
. Возможность сопоставления с образцом и реализации этого в контексте приложения означает, что таймауты могут отражать специфические детали вашего домена.
Снимки
Для агрегатов с большим количеством событий в Commanded есть снимки состояния агрегатов. Их можно настроить, добавив секцию snapshotting
в приложение Commanded в config/config.exs
.
Существует две опции для каждого агрегата: snapshot_every
и snapshot_version
. snapshot_every
используется для создания моментальных снимков после определенного количества событий. Это значение действует как верхняя граница событий, которые должны быть оценены при гидратации агрегатного состояния. snapshot_version
— неотрицательное целое число, которое должно обновляться при каждом обновлении агрегата. Оно используется для аннулирования старых снимков, в которых могут отсутствовать новые поля или которые были созданы с помощью старых методов apply
.
Снимки могут быть полезны в приложениях с длинными журналами событий, но в данном todo API они не представляются целесообразными. Если бы мы использовали моментальные снимки, добавление поля deleted_at
потребовало бы увеличения snapshot_version
.
Чтобы узнать больше о совокупной производительности, ознакомьтесь с этой статьей в блоге, в которой организация значительно улучшила производительность и площадь приложения Commanded с помощью периодов жизни и моментальных снимков.
Эффективная датировка события удаления
(Ссылка на соответствующий коммит)
Как я отметил при добавлении поля deleted_at
в агрегат, не следует использовать время даты, когда событие было применено. Это приведет к тому, что значение deleted_at
будет отличаться при каждом развертывании службы или перезапуске агрегированного генсервера. Гораздо лучше было бы фиксировать время создания события, чтобы оно сохранялось в журнале событий.
Сначала я добавил поле datetime
в структуру события TodoDeleted
:
defmodule TodoBackend.Todos.Events.TodoDeleted do
@derive Jason.Encoder
defstruct [
- :uuid
+ :uuid,
+ :datetime
]
end
Затем я заполнил это значение в методе execute
агрегата todo, когда была получена команда DeleteTodo
. Поскольку execute
вызывается, когда команда отправляется, это значение будет установлено в дату, когда будет вызвана конечная точка удаления.
Я также обновил метод apply
, чтобы использовать время даты из события, а не текущее время. Это значение будет получено из журнала событий.
defmodule TodoBackend.Todos.Aggregates.Todo do
...
def execute(%Todo{uuid: uuid}, %DeleteTodo{uuid: uuid}) do
- %TodoDeleted{uuid: uuid}
+ %TodoDeleted{uuid: uuid, datetime: DateTime.utc_now()}
end
...
- def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid}) do
- %Todo{todo | deleted_at: DateTime.utc_now()}
+ def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid, datetime: effective_datetime}) do
+ %Todo{todo | deleted_at: effective_datetime}
end
Чтение записей
Журнал хранит события в формате JSON. Поскольку JSON не имеет собственного формата даты-времени, на структуре событий должен быть реализован протокол Commanded.Serialization.JsonDecoder
.
defmodule TodoBackend.Todos.Events.TodoDeleted do
...
defimpl Commanded.Serialization.JsonDecoder, for: TodoDeleted do
@doc """
Parse the datetime included in the aggregate state
"""
def decode(%TodoDeleted{} = state) do
%TodoDeleted{datetime: datetime} = state
{:ok, dt, _} = DateTime.from_iso8601(datetime)
%TodoDeleted{state | datetime: dt}
end
end
end
Что насчет существующих событий?
Это хорошо, но что делать со всеми теми событиями, которые уже есть в журнале без даты?
Так как журнал только для добавления и неизменяем, способ расширить определение события — это добавить запасное значение для нового поля.
В данном случае я изменил метод декодера JSON, подставив значение nil
для текущего времени даты. Поскольку это значение никогда не записывалось в старые события, фактическое время удаления теряется. Лучшее, что мы можем сделать, это предоставить некоторое значение и обеспечить запись времени даты во все будущие события.
defmodule TodoBackend.Todos.Events.TodoDeleted do
...
defimpl Commanded.Serialization.JsonDecoder, for: TodoDeleted do
@doc """
Parse the datetime included in the aggregate state
"""
def decode(%TodoDeleted{} = state) do
%TodoDeleted{datetime: datetime} = state
-
- {:ok, dt, _} = DateTime.from_iso8601(datetime)
-
- %TodoDeleted{state | datetime: dt}
+
+ if datetime == nil do
+ %TodoDeleted{state | datetime: DateTime.utc_now()}
+ else
+ {:ok, dt, _} = DateTime.from_iso8601(datetime)
+
+ %TodoDeleted{state | datetime: dt}
+ end
+ end
end
end
Обновление модели чтения
(Ссылка на соответствующий коммит)
В производственной среде лучше всего будет создать новую таблицу базы данных и проектор, содержащий поле deleted_at
, развернуть приложение для создания таблицы, обновить вызывающие команды для использования новой таблицы, а затем удалить старую таблицу. Это безопасно, поскольку все записи происходят на уровне событий — модель только для чтения.
В некоторых случаях целесообразно также обновить проектор таким образом, чтобы данные можно было перенести на место. Это показалось мне более сложной задачей, чем та, которую стоит затратить для статьи в блоге, поэтому я выбрал опасный вариант: уничтожить таблицу и построить ее заново с обновленным проектором.
Подготовка базы данных
Сначала я создал миграцию для добавления столбца deleted_at
в таблицу todos
:
mix ecto.gen.migration add_deleted_at_to_todo
В созданном файле миграции я добавил столбец и индекс:
defmodule TodoBackend.Repo.Migrations.AddDeletedAtToTodo do
use Ecto.Migration
def change do
alter table(:todos) do
add :deleted_at, :naive_datetime_usec
end
create index(:todos, [:deleted_at])
end
end
Наконец, я добавил поле в проекцию:
defmodule TodoBackend.Todos.Projections.Todo do
...
schema "todos" do
field :completed, :boolean, default: false
field :title, :string
field :order, :integer, default: 0
+ field :deleted_at, :naive_datetime_usec, default: nil
timestamps()
end
end
После запуска mix ecto.migrate
сторона базы данных готова к работе.
Обновление проектора
Обновление проектора очень простое: вместо удаления строки из таблицы просто обновите колонку deleted_at
.
defmodule TodoBackend.Todos.Projectors.Todo do
...
- project(%TodoDeleted{uuid: uuid}, _, fn multi ->
- Ecto.Multi.delete(multi, :todo, fn _ -> %Todo{uuid: uuid} end)
+ project(%TodoDeleted{uuid: uuid, datetime: effective_datetime}, _, fn multi ->
+ case Repo.get(Todo, uuid) do
+ nil ->
+ multi
+
+ todo ->
+ Ecto.Multi.update(
+ multi,
+ :todo,
+ Todo.delete_changeset(todo, %{deleted_at: effective_datetime})
+ )
+ end
+ end)
...
end
Я добавил delete_changeset
в проекцию Todo
, чтобы представить конкретное желаемое изменение. Это гарантирует, что мы обновим только столбец deleted_at
. Ссылка на него содержится в новом методе проекции TodoDeleted
.
defmodule TodoBackend.Todos.Projections.Todo do
...
def delete_changeset(todo, attrs \ %{}) do
todo
|> cast(attrs, [:deleted_at])
end
end
Принудительное повторное заполнение базы данных
Как я уже говорил, это ужасная идея — усекать базу данных в производстве. Вот как я сделал это локально, чтобы изучить Commanded. Возможно, это также можно сделать, обновив значение name
, предоставленное вызову use Commanded.Projections.Ecto
в TodoBackend.Todos.Projections.Todo
и перезапустив сервер.
Если в журнале нет существующих данных, этот шаг можно пропустить.
В оболочке iex (iex -S mix phx.server
):
# Delete the projected values
TodoBackend.Repo.delete_all(TodoBackend.Todos.Projections.Todo)
# Remove the version tracking entry for the ecto projection
TodoBackend.Repo.delete(%TodoBackend.Todos.Projectors.Todo.ProjectionVersion{projection_name: "Todos.Projectors.Todo"})
alias Commanded.Event.Handler
alias Commanded.Registration
# Trigger a reset of the projector
registry_name = Handler.name(TodoBackend.App, "Todos.Projectors.Todo")
projector = Registration.whereis_name(TodoBackend.App, registry_name)
send(projector, :reset)
Скрытие удаленных элементов
Теперь, когда удаленные элементы присутствуют в таблице todos
, важно обновить модуль контекста Todos
, чтобы отфильтровать удаленные элементы. Это может быть сделано в контексте без необходимости обновления контроллера, поскольку весь доступ инкапсулирован. Как я уже отмечал в предыдущем сообщении, это действительно ценное свойство вдохновленного DDD подхода современных приложений Phoenix.
defmodule TodoBackend.Todos do
...
def list_todos do
- Repo.all(Todo)
+ from(t in Todo,
+ where: is_nil(t.deleted_at)
+ )
+ |> Repo.all()
end
...
- def get_todo!(uuid), do: Repo.get_by!(Todo, uuid: uuid)
+ def get_todo!(uuid) do
+ from(t in Todo,
+ where: is_nil(t.deleted_at)
+ )
+ |> Repo.get_by!(uuid: uuid)
+ end
end
На чем мы остановились?
Реализовано мягкое удаление. Модель чтения была реконструирована на основе существующих данных, включая ранее удаленные элементы.
В приложениях CQRS данные никогда не удаляются — в большинстве случаев они просто удаляются из модели чтения.
Не юридическая консультация по работе с GDPR: некоторые приложения могут шифровать записи журнала с помощью ключа шифрования для каждого человека, чтобы удовлетворить постоянные запросы на удаление данных. Для полного удаления данных ключ шифрования может быть выброшен. Это приведет к невозможности прочтения событий при их воспроизведении. Потребуется специальная обработка, чтобы предотвратить аварийное завершение работы приложения на объектах, удаленных в соответствии с GDPR. Это явно выходит за рамки данной статьи в блоге, но это очень интересная концепция.
Michiel Rook написал два сообщения на эту тему, которые можно найти здесь и здесь.
Новая функция: восстановление удаленных элементов
Теперь, когда мягкое удаление реализовано, пришло время написать новую функцию: удаление удаленных элементов!
Реализация
(Ссылка на соответствующий коммит)
Давайте быстро пройдемся по изменениям кода, необходимым для реализации восстановления удаленных элементов todo:
Создайте команду RestoreTodo
.
defmodule TodoBackend.Todos.Commands.RestoreTodo do
defstruct [
:uuid
]
end
Создайте событие TodoRestored
defmodule TodoBackend.Todos.Events.TodoRestored do
@derive Jason.Encoder
defstruct [
:uuid
]
end
Обновление агрегата
defmodule TodoBackend.Todos.Aggregates.Todo do
...
+ def execute(%Todo{uuid: uuid}, %RestoreTodo{uuid: uuid}) do
+ %TodoRestored{uuid: uuid}
+ end
...
+ def apply(%Todo{uuid: uuid} = todo, %TodoRestored{uuid: uuid}) do
+ %Todo{todo | deleted_at: nil}
+ end
...
end
Направить команду на агрегат
defmodule TodoBackend.Router do
...
- dispatch([CreateTodo, DeleteTodo, UpdateTodo],
+ dispatch([CreateTodo, DeleteTodo, UpdateTodo, RestoreTodo],
to: Todo,
identity: :uuid,
lifespan: Todo
)
end
Обновление проектора
defmodule TodoBackend.Todos.Projectors.Todo do
...
+ project(%TodoRestored{uuid: uuid}, _, fn multi ->
+ case Repo.get(Todo, uuid) do
+ nil ->
+ multi
+
+ todo ->
+ Ecto.Multi.update(multi, :todo, Todo.delete_changeset(todo, %{deleted_at: nil}))
+ end
+ end)
end
На данный момент мы реализовали все необходимое для восстановления удаленных элементов todo, за исключением фактической отправки команды RestoreTodo
.
Добавление метода API
(Ссылка на соответствующий коммит)
Для диспетчеризации команды RestoreTodo
я добавил PUT /api/todos/:id/restore
в API.
Добавьте метод в модуль контекста
defmodule TodoBackend.Todos do
...
def restore_todo(id) do
command = %RestoreTodo{uuid: id}
with :ok <- App.dispatch(command, consistency: :strong) do
{:ok, get_todo!(id)}
else
reply -> reply
end
end
end
Добавьте метод в контроллер
defmodule TodoBackendWeb.TodoController do
...
def restore(conn, %{"id" => id}) do
with {:ok, %Todo{} = todo} <- Todos.restore_todo(id) do
render(conn, "show.json", todo: todo)
end
end
end
Зарегистрировать маршрут
defmodule TodoBackendWeb.Router do
...
scope "/api", TodoBackendWeb do
pipe_through :api
resources "/todos", TodoController
delete "/todos", TodoController, :delete_all
+ put "/todos/:id/restore", TodoController, :restore
end
end
Валидация
Впервые в этой системе рассмотрим, как можно использовать агрегатное состояние и обработчик команды execute
для предотвращения удаления уже удаленных элементов (и восстановления не удаленных).
Проверка команд является основным свойством конструкций CQRS. После того, как события испущены, они никогда не изменяются. Единственный способ обеспечить валидацию — это принимать или отклонять команды.
Первое подтверждение, которое я добавлю, это то, что элементы, которые в данный момент удалены, не могут быть удалены:
defmodule TodoBackend.Todos.Aggregates.Todo do
...
- def execute(%Todo{uuid: uuid}, %DeleteTodo{uuid: uuid}) do
+ def execute(%Todo{uuid: uuid, deleted_at: nil}, %DeleteTodo{uuid: uuid}) do
%TodoDeleted{uuid: uuid, datetime: DateTime.utc_now()}
end
...
+ def execute(%Todo{}, %DeleteTodo{}) do
+ {:error, "Can not delete todo that is already deleted"}
+ end
end
Этот метод, соответствующий шаблону, будет испускать событие TodoDeleted
только тогда, когда существующее агрегатное состояние содержит значение deleted_at
равное nil
. Во всех остальных случаях он будет переходить к реализации, которая возвращает кортеж {:error, message}
.
В производственной системе должны возвращаться структурированные ошибки, чтобы предоставить клиентам API полезные ответы. В настоящее время эта реализация просто вызывает ошибку 500.
Другая проверка предотвращает восстановление не удаленных элементов. Она читается почти так же, как и первая.
defmodule TodoBackend.Todos.Aggregates.Todo do
...
+ def execute(%Todo{deleted_at: nil}, %RestoreTodo{}) do
+ {:error, "Can only restore deleted todos"}
+ end
+
def execute(%Todo{uuid: uuid}, %RestoreTodo{uuid: uuid}) do
%TodoRestored{uuid: uuid}
end
end
Резюме
В этой статье я рассказал о том, как реализация CQRS API для списка дел упростила добавление функциональности мягкого удаления и восстановления.
Я добавил валидацию и показал, как преобразовать существующие события, чтобы обеспечить их совместимость с развитием приложения Commanded.
В большинстве проектов это, вероятно, невозможно, если только в ORM не используется расширение для отслеживания таблиц. Даже при включенном отслеживании изменений с помощью таких расширений, как paper trail или Django simple history, восстановить удаленные сущности может быть непросто. Отслеживание объектов должно быть включено до того, как возникнет необходимость в восстановлении данных.
Когда неизменяемый журнал, доступный только для приложений, является источником истины для приложения, он может быть обновлен в соответствии с требованиями, которые не были известны на начальном этапе проекта.