Добавление мягкого удаления в API с командами Феникса (CQRS)


Цель

Реализовать мягкое удаление в API из первой части, позволяющее восстанавливать элементы после удаления.

Следуйте тому, чему я научился в процессе итераций над проектом под названием todo_backend_commanded. В его git-истории показаны шаги, необходимые для реализации функциональности мягкого удаления и восстановления.

Контекст

В предыдущем посте я преобразовал ванильный Phoenix API в CQRS с помощью Commanded.

Это приложение пишет в базу данных, используя хекс-пакет commanded_ecto_projections, который подписывается на события и проецирует их состояние в таблицы, управляемые библиотекой базы данных Ecto.

Поскольку основная модель данных этого приложения представляет собой журнал с неизменяемыми данными (append-only), события могут быть воспроизведены, а модель чтения может быть кардинально изменена с использованием существующих данных.

Обновление агрегата

(Ссылка на соответствующий коммит)

Первым шагом будет добавление поля deleted_at в агрегат Todo.

 defmodule TodoBackend.Todos.Aggregates.Todo do
   defstruct [
     :uuid,
     :title,
     :completed,
-    :order
+    :order,
+    deleted_at: nil
   ]

   ...
-  def apply(%Todo{uuid: uuid}, %TodoDeleted{uuid: uuid}) do
-    nil
-  end
+  def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid}) do
+    %Todo{todo | deleted_at: DateTime.utc_now()}
+  end
Вход в полноэкранный режим Выход из полноэкранного режима

Агрегат – это структура, представляющая состояние объекта в приложении. В Commanded агрегаты создаются в памяти из диспетчеризированных событий. Структура агрегата используется для определения побочных эффектов команды, если команда вообще должна быть принята.

Этот diff заставляет поле deleted_at агрегата быть установленным в качестве времени даты, когда журнал событий гидратируется. Мы еще вернемся к этому, поскольку было бы предпочтительнее фиксировать время, когда элемент todo был удален.

В сторону: агрегаты звучат медленно

Если агрегат должен гидратироваться из своих событий каждый раз, когда он используется, то чем это не ужасная нагрузка на производительность?

На практике у отдельного агрегата не должно быть много событий. Сколько взаимодействий может быть у одного элемента todo? Если окажется, что некоторые элементы имеют большие журналы событий, есть несколько рычагов, за которые можно потянуть в Commanded.

Продолжительность жизни

(Ссылка на соответствующий коммит)

Итак, я солгал сказал полуправду об агрегатах. Они не гидратируются в памяти для каждой команды/события. В действительности, агрегаты реализованы с помощью GenServer, каждый из которых кэширует своё состояние и управляется деревом супервизоров управляемого приложения (в конечном итоге, DynamicSupervisor под названием Commanded.Aggregates.Supervisor, если быть точным).

Поскольку агрегаты кэшируются как длительно выполняющиеся процессы, нам нужно получить каждое событие только один раз.

Таймаут процесса агрегации может быть настроен через реализацию поведения AggregateLifespan. Время жизни по умолчанию, DefaultLifespan, устанавливает бесконечный тайм-аут, если не возникнет исключение.

В зависимости от количества агрегатных экземпляров в живой системе, это может создать другую проблему – эти процессы могут оставаться живыми в течение всей жизни сервера. Это похоже на OOM, который только и ждет, чтобы произойти.

Чтобы решить эту проблему, я реализовал время жизни для агрегата Todo, которое сохраняет агрегаты в течение минуты, пока они не получат событие TodoDeleted. Возможность сопоставления с образцом и реализации этого в контексте приложения означает, что таймауты могут отражать специфические детали вашего домена.

Снимки

Для агрегатов с большим количеством событий в Commanded есть снимки состояния агрегатов. Их можно настроить, добавив секцию snapshotting в приложение Commanded в config/config.exs.

Существует две опции для каждого агрегата: snapshot_every и snapshot_version. snapshot_every используется для создания моментальных снимков после определенного количества событий. Это значение действует как верхняя граница событий, которые должны быть оценены при гидратации агрегатного состояния. snapshot_version – неотрицательное целое число, которое должно обновляться при каждом обновлении агрегата. Оно используется для аннулирования старых снимков, в которых могут отсутствовать новые поля или которые были созданы с помощью старых методов apply.

Снимки могут быть полезны в приложениях с длинными журналами событий, но в данном todo API они не представляются целесообразными. Если бы мы использовали моментальные снимки, добавление поля deleted_at потребовало бы увеличения snapshot_version.

Чтобы узнать больше о совокупной производительности, ознакомьтесь с этой статьей в блоге, в которой организация значительно улучшила производительность и площадь приложения Commanded с помощью периодов жизни и моментальных снимков.

Эффективная датировка события удаления

(Ссылка на соответствующий коммит)

Как я отметил при добавлении поля deleted_at в агрегат, не следует использовать время даты, когда событие было применено. Это приведет к тому, что значение deleted_at будет отличаться при каждом развертывании службы или перезапуске агрегированного генсервера. Гораздо лучше было бы фиксировать время создания события, чтобы оно сохранялось в журнале событий.

Сначала я добавил поле datetime в структуру события TodoDeleted:

 defmodule TodoBackend.Todos.Events.TodoDeleted do
   @derive Jason.Encoder
   defstruct [
-    :uuid
+    :uuid,
+    :datetime
   ]
end
Вход в полноэкранный режим Выход из полноэкранного режима

Затем я заполнил это значение в методе execute агрегата todo, когда была получена команда DeleteTodo. Поскольку execute вызывается, когда команда отправляется, это значение будет установлено в дату, когда будет вызвана конечная точка удаления.

Я также обновил метод apply, чтобы использовать время даты из события, а не текущее время. Это значение будет получено из журнала событий.

 defmodule TodoBackend.Todos.Aggregates.Todo do
   ...
   def execute(%Todo{uuid: uuid}, %DeleteTodo{uuid: uuid}) do
-    %TodoDeleted{uuid: uuid}
+    %TodoDeleted{uuid: uuid, datetime: DateTime.utc_now()}
   end

   ...
-  def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid}) do
-    %Todo{todo | deleted_at: DateTime.utc_now()}
+  def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid, datetime: effective_datetime}) do
+    %Todo{todo | deleted_at: effective_datetime}
   end
Вход в полноэкранный режим Выход из полноэкранного режима

Чтение записей

Журнал хранит события в формате JSON. Поскольку JSON не имеет собственного формата даты-времени, на структуре событий должен быть реализован протокол Commanded.Serialization.JsonDecoder.

defmodule TodoBackend.Todos.Events.TodoDeleted do
  ...

  defimpl Commanded.Serialization.JsonDecoder, for: TodoDeleted do
    @doc """
    Parse the datetime included in the aggregate state
    """
    def decode(%TodoDeleted{} = state) do
      %TodoDeleted{datetime: datetime} = state

      {:ok, dt, _} = DateTime.from_iso8601(datetime)

      %TodoDeleted{state | datetime: dt}
    end
  end
end
Вход в полноэкранный режим Выход из полноэкранного режима

Что насчет существующих событий?

Это хорошо, но что делать со всеми теми событиями, которые уже есть в журнале без даты?

Так как журнал только для добавления и неизменяем, способ расширить определение события – это добавить запасное значение для нового поля.

В данном случае я изменил метод декодера JSON, подставив значение nil для текущего времени даты. Поскольку это значение никогда не записывалось в старые события, фактическое время удаления теряется. Лучшее, что мы можем сделать, это предоставить некоторое значение и обеспечить запись времени даты во все будущие события.

 defmodule TodoBackend.Todos.Events.TodoDeleted do
   ...

   defimpl Commanded.Serialization.JsonDecoder, for: TodoDeleted do
     @doc """
     Parse the datetime included in the aggregate state
     """
     def decode(%TodoDeleted{} = state) do
       %TodoDeleted{datetime: datetime} = state
-
-      {:ok, dt, _} = DateTime.from_iso8601(datetime)
-
-      %TodoDeleted{state | datetime: dt}
+
+      if datetime == nil do
+        %TodoDeleted{state | datetime: DateTime.utc_now()}
+      else
+        {:ok, dt, _} = DateTime.from_iso8601(datetime)
+
+        %TodoDeleted{state | datetime: dt}
+      end
+    end
   end
 end
Вход в полноэкранный режим Выход из полноэкранного режима

Обновление модели чтения

(Ссылка на соответствующий коммит)

В производственной среде лучше всего будет создать новую таблицу базы данных и проектор, содержащий поле deleted_at, развернуть приложение для создания таблицы, обновить вызывающие команды для использования новой таблицы, а затем удалить старую таблицу. Это безопасно, поскольку все записи происходят на уровне событий – модель только для чтения.

В некоторых случаях целесообразно также обновить проектор таким образом, чтобы данные можно было перенести на место. Это показалось мне более сложной задачей, чем та, которую стоит затратить для статьи в блоге, поэтому я выбрал опасный вариант: уничтожить таблицу и построить ее заново с обновленным проектором.

Подготовка базы данных

Сначала я создал миграцию для добавления столбца deleted_at в таблицу todos:

mix ecto.gen.migration add_deleted_at_to_todo
Вход в полноэкранный режим Выход из полноэкранного режима

В созданном файле миграции я добавил столбец и индекс:

defmodule TodoBackend.Repo.Migrations.AddDeletedAtToTodo do
  use Ecto.Migration

  def change do
    alter table(:todos) do
      add :deleted_at, :naive_datetime_usec
    end

    create index(:todos, [:deleted_at])
  end
end
Войти в полноэкранный режим Выйти из полноэкранного режима

Наконец, я добавил поле в проекцию:

 defmodule TodoBackend.Todos.Projections.Todo do
   ...
   schema "todos" do
     field :completed, :boolean, default: false
     field :title, :string
     field :order, :integer, default: 0
+    field :deleted_at, :naive_datetime_usec, default: nil

     timestamps()
   end
 end
Войти в полноэкранный режим Выйти из полноэкранного режима

После запуска mix ecto.migrate сторона базы данных готова к работе.

Обновление проектора

Обновление проектора очень простое: вместо удаления строки из таблицы просто обновите колонку deleted_at.

 defmodule TodoBackend.Todos.Projectors.Todo do
   ...
-  project(%TodoDeleted{uuid: uuid}, _, fn multi ->
-    Ecto.Multi.delete(multi, :todo, fn _ -> %Todo{uuid: uuid} end)
+  project(%TodoDeleted{uuid: uuid, datetime: effective_datetime}, _, fn multi ->
+    case Repo.get(Todo, uuid) do
+      nil ->
+        multi
+
+      todo ->
+        Ecto.Multi.update(
+          multi,
+          :todo,
+          Todo.delete_changeset(todo, %{deleted_at: effective_datetime})
+        )
+    end
+  end)
   ...
 end
Вход в полноэкранный режим Выход из полноэкранного режима

Я добавил delete_changeset в проекцию Todo, чтобы представить конкретное желаемое изменение. Это гарантирует, что мы обновим только столбец deleted_at. Ссылка на него содержится в новом методе проекции TodoDeleted.

defmodule TodoBackend.Todos.Projections.Todo do
  ...

  def delete_changeset(todo, attrs \ %{}) do
    todo
    |> cast(attrs, [:deleted_at])
  end
end
Вход в полноэкранный режим Выход из полноэкранного режима

Принудительное повторное заполнение базы данных

Как я уже говорил, это ужасная идея – усекать базу данных в производстве. Вот как я сделал это локально, чтобы изучить Commanded. Возможно, это также можно сделать, обновив значение name, предоставленное вызову use Commanded.Projections.Ecto в TodoBackend.Todos.Projections.Todo и перезапустив сервер.

Если в журнале нет существующих данных, этот шаг можно пропустить.

В оболочке iex (iex -S mix phx.server):

# Delete the projected values
TodoBackend.Repo.delete_all(TodoBackend.Todos.Projections.Todo)

# Remove the version tracking entry for the ecto projection
TodoBackend.Repo.delete(%TodoBackend.Todos.Projectors.Todo.ProjectionVersion{projection_name: "Todos.Projectors.Todo"})

alias Commanded.Event.Handler
alias Commanded.Registration

# Trigger a reset of the projector
registry_name = Handler.name(TodoBackend.App, "Todos.Projectors.Todo")
projector = Registration.whereis_name(TodoBackend.App, registry_name)
send(projector, :reset)
Войдите в полноэкранный режим Выйти из полноэкранного режима

Скрытие удаленных элементов

Теперь, когда удаленные элементы присутствуют в таблице todos, важно обновить модуль контекста Todos, чтобы отфильтровать удаленные элементы. Это может быть сделано в контексте без необходимости обновления контроллера, поскольку весь доступ инкапсулирован. Как я уже отмечал в предыдущем сообщении, это действительно ценное свойство вдохновленного DDD подхода современных приложений Phoenix.

 defmodule TodoBackend.Todos do
   ...

   def list_todos do
-    Repo.all(Todo)
+    from(t in Todo,
+      where: is_nil(t.deleted_at)
+    )
+    |> Repo.all()
   end

   ...
-  def get_todo!(uuid), do: Repo.get_by!(Todo, uuid: uuid)
+  def get_todo!(uuid) do
+    from(t in Todo,
+      where: is_nil(t.deleted_at)
+    )
+    |> Repo.get_by!(uuid: uuid)
+  end
 end
Вход в полноэкранный режим Выход из полноэкранного режима

На чем мы остановились?

Реализовано мягкое удаление. Модель чтения была реконструирована на основе существующих данных, включая ранее удаленные элементы.

В приложениях CQRS данные никогда не удаляются – в большинстве случаев они просто удаляются из модели чтения.

Не юридическая консультация по работе с GDPR: некоторые приложения могут шифровать записи журнала с помощью ключа шифрования для каждого человека, чтобы удовлетворить постоянные запросы на удаление данных. Для полного удаления данных ключ шифрования может быть выброшен. Это приведет к невозможности прочтения событий при их воспроизведении. Потребуется специальная обработка, чтобы предотвратить аварийное завершение работы приложения на объектах, удаленных в соответствии с GDPR. Это явно выходит за рамки данной статьи в блоге, но это очень интересная концепция.

Michiel Rook написал два сообщения на эту тему, которые можно найти здесь и здесь.

Новая функция: восстановление удаленных элементов

Теперь, когда мягкое удаление реализовано, пришло время написать новую функцию: удаление удаленных элементов!

Реализация

(Ссылка на соответствующий коммит)

Давайте быстро пройдемся по изменениям кода, необходимым для реализации восстановления удаленных элементов todo:

Создайте команду RestoreTodo.

defmodule TodoBackend.Todos.Commands.RestoreTodo do
  defstruct [
    :uuid
  ]
end
Войти в полноэкранный режим Выйти из полноэкранного режима

Создайте событие TodoRestored

defmodule TodoBackend.Todos.Events.TodoRestored do
  @derive Jason.Encoder
  defstruct [
    :uuid
  ]
end
Вход в полноэкранный режим Выход из полноэкранного режима

Обновление агрегата

 defmodule TodoBackend.Todos.Aggregates.Todo do
   ...
+  def execute(%Todo{uuid: uuid}, %RestoreTodo{uuid: uuid}) do
+    %TodoRestored{uuid: uuid}
+  end

   ...
+  def apply(%Todo{uuid: uuid} = todo, %TodoRestored{uuid: uuid}) do
+    %Todo{todo | deleted_at: nil}
+  end

   ...
end
Войти в полноэкранный режим Выйти из полноэкранного режима

Направить команду на агрегат

 defmodule TodoBackend.Router do
   ...
-  dispatch([CreateTodo, DeleteTodo, UpdateTodo],
+  dispatch([CreateTodo, DeleteTodo, UpdateTodo, RestoreTodo],
    to: Todo,
    identity: :uuid,
    lifespan: Todo
  )
end
Войти в полноэкранный режим Выход из полноэкранного режима

Обновление проектора

 defmodule TodoBackend.Todos.Projectors.Todo do
   ...
+  project(%TodoRestored{uuid: uuid}, _, fn multi ->
+    case Repo.get(Todo, uuid) do
+      nil ->
+        multi
+
+      todo ->
+        Ecto.Multi.update(multi, :todo, Todo.delete_changeset(todo, %{deleted_at: nil}))
+    end
+  end)
 end
Войти в полноэкранный режим Выход из полноэкранного режима

На данный момент мы реализовали все необходимое для восстановления удаленных элементов todo, за исключением фактической отправки команды RestoreTodo.

Добавление метода API

(Ссылка на соответствующий коммит)

Для диспетчеризации команды RestoreTodo я добавил PUT /api/todos/:id/restore в API.

Добавьте метод в модуль контекста

defmodule TodoBackend.Todos do
  ...

  def restore_todo(id) do
    command = %RestoreTodo{uuid: id}

    with :ok <- App.dispatch(command, consistency: :strong) do
      {:ok, get_todo!(id)}
    else
      reply -> reply
    end
  end
end
Войти в полноэкранный режим Выйти из полноэкранного режима

Добавьте метод в контроллер

defmodule TodoBackendWeb.TodoController do
  ...

  def restore(conn, %{"id" => id}) do
    with {:ok, %Todo{} = todo} <- Todos.restore_todo(id) do
      render(conn, "show.json", todo: todo)
    end
  end
end
Войти в полноэкранный режим Выйти из полноэкранного режима

Зарегистрировать маршрут

 defmodule TodoBackendWeb.Router do
   ...

   scope "/api", TodoBackendWeb do
     pipe_through :api

     resources "/todos", TodoController
     delete "/todos", TodoController, :delete_all
+    put "/todos/:id/restore", TodoController, :restore
   end
 end
Войти в полноэкранный режим Выйти из полноэкранного режима

Валидация

Впервые в этой системе рассмотрим, как можно использовать агрегатное состояние и обработчик команды execute для предотвращения удаления уже удаленных элементов (и восстановления не удаленных).

Проверка команд является основным свойством конструкций CQRS. После того, как события испущены, они никогда не изменяются. Единственный способ обеспечить валидацию – это принимать или отклонять команды.

Первое подтверждение, которое я добавлю, это то, что элементы, которые в данный момент удалены, не могут быть удалены:

 defmodule TodoBackend.Todos.Aggregates.Todo do
   ...
-  def execute(%Todo{uuid: uuid}, %DeleteTodo{uuid: uuid}) do
+  def execute(%Todo{uuid: uuid, deleted_at: nil}, %DeleteTodo{uuid: uuid}) do
     %TodoDeleted{uuid: uuid, datetime: DateTime.utc_now()}
   end

   ...
+  def execute(%Todo{}, %DeleteTodo{}) do
+    {:error, "Can not delete todo that is already deleted"}
+  end
 end
Войти в полноэкранный режим Выйти из полноэкранного режима

Этот метод, соответствующий шаблону, будет испускать событие TodoDeleted только тогда, когда существующее агрегатное состояние содержит значение deleted_at равное nil. Во всех остальных случаях он будет переходить к реализации, которая возвращает кортеж {:error, message}.

В производственной системе должны возвращаться структурированные ошибки, чтобы предоставить клиентам API полезные ответы. В настоящее время эта реализация просто вызывает ошибку 500.

Другая проверка предотвращает восстановление не удаленных элементов. Она читается почти так же, как и первая.

 defmodule TodoBackend.Todos.Aggregates.Todo do
   ...
+  def execute(%Todo{deleted_at: nil}, %RestoreTodo{}) do
+    {:error, "Can only restore deleted todos"}
+  end
+
  def execute(%Todo{uuid: uuid}, %RestoreTodo{uuid: uuid}) do
    %TodoRestored{uuid: uuid}
  end
end
Войти в полноэкранный режим Выход из полноэкранного режима

Резюме

В этой статье я рассказал о том, как реализация CQRS API для списка дел упростила добавление функциональности мягкого удаления и восстановления.

Я добавил валидацию и показал, как преобразовать существующие события, чтобы обеспечить их совместимость с развитием приложения Commanded.

В большинстве проектов это, вероятно, невозможно, если только в ORM не используется расширение для отслеживания таблиц. Даже при включенном отслеживании изменений с помощью таких расширений, как paper trail или Django simple history, восстановить удаленные сущности может быть непросто. Отслеживание объектов должно быть включено до того, как возникнет необходимость в восстановлении данных.

Когда неизменяемый журнал, доступный только для приложений, является источником истины для приложения, он может быть обновлен в соответствии с требованиями, которые не были известны на начальном этапе проекта.

Оцените статью
Procodings.ru
Добавить комментарий