From 4b10d15e620accca1869d6ca9af023892e911d32 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sat, 23 Jan 2021 22:59:24 +0300 Subject: [PATCH] [#3259] Background replication of objects' attachments into `media` table. --- lib/pleroma/application.ex | 3 +- lib/pleroma/data_migration.ex | 46 ++++ lib/pleroma/ecto_enums.ex | 8 + lib/pleroma/migrators/media_table_migrator.ex | 299 +++++++++++++++++++++ .../migrators/media_table_migrator/state.ex | 31 +++ lib/pleroma/repo.ex | 6 +- .../20210105195018_create_data_migrations.exs | 17 ++ ...0111172254_create_data_migration_failed_ids.exs | 14 + ..._data_migration_create_populate_media_table.exs | 14 + 9 files changed, 434 insertions(+), 4 deletions(-) create mode 100644 lib/pleroma/data_migration.ex create mode 100644 lib/pleroma/migrators/media_table_migrator.ex create mode 100644 lib/pleroma/migrators/media_table_migrator/state.ex create mode 100644 priv/repo/migrations/20210105195018_create_data_migrations.exs create mode 100644 priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs create mode 100644 priv/repo/migrations/20210122100954_data_migration_create_populate_media_table.exs diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 203a95004..36065f48b 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -104,7 +104,8 @@ defmodule Pleroma.Application do chat_child(chat_enabled?()) ++ [ Pleroma.Web.Endpoint, - Pleroma.Gopher.Server + Pleroma.Gopher.Server, + Pleroma.Migrators.MediaTableMigrator ] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html diff --git a/lib/pleroma/data_migration.ex b/lib/pleroma/data_migration.ex new file mode 100644 index 000000000..497fb00ea --- /dev/null +++ b/lib/pleroma/data_migration.ex @@ -0,0 +1,46 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.DataMigration do + use Ecto.Schema + + alias Pleroma.DataMigration + alias Pleroma.DataMigration.State + alias Pleroma.Repo + + import Ecto.Changeset + + schema "data_migrations" do + field(:name, :string) + field(:state, State, default: :pending) + field(:feature_lock, :boolean, default: false) + field(:params, :map, default: %{}) + field(:data, :map, default: %{}) + + timestamps() + end + + def changeset(data_migration, params \\ %{}) do + data_migration + |> cast(params, [:name, :state, :feature_lock, :params, :data]) + |> validate_required([:name]) + |> unique_constraint(:name) + end + + def update(data_migration, params \\ %{}) do + data_migration + |> changeset(params) + |> Repo.update() + end + + def update_state(data_migration, new_state) do + update(data_migration, %{state: new_state}) + end + + def get_by_name(name) do + Repo.get_by(DataMigration, name: name) + end + + def populate_media_table, do: get_by_name("populate_media_table") +end diff --git a/lib/pleroma/ecto_enums.ex b/lib/pleroma/ecto_enums.ex index f198cccb7..2a9addabc 100644 --- a/lib/pleroma/ecto_enums.ex +++ b/lib/pleroma/ecto_enums.ex @@ -17,3 +17,11 @@ defenum(Pleroma.FollowingRelationship.State, follow_accept: 2, follow_reject: 3 ) + +defenum(Pleroma.DataMigration.State, + pending: 1, + running: 2, + complete: 3, + failed: 4, + manual: 5 +) diff --git a/lib/pleroma/migrators/media_table_migrator.ex b/lib/pleroma/migrators/media_table_migrator.ex new file mode 100644 index 000000000..811f2b85a --- /dev/null +++ b/lib/pleroma/migrators/media_table_migrator.ex @@ -0,0 +1,299 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.MediaTableMigrator do + use GenServer + + require Logger + + import Ecto.Query + + alias __MODULE__.State + alias Pleroma.Config + alias Pleroma.DataMigration + alias Pleroma.Media + alias Pleroma.Object + alias Pleroma.Repo + + defdelegate state(), to: State, as: :get + defdelegate put_stat(key, value), to: State, as: :put + defdelegate increment_stat(key, increment), to: State, as: :increment + + defdelegate data_migration(), to: DataMigration, as: :populate_media_table + + @reg_name {:global, __MODULE__} + + def whereis, do: GenServer.whereis(@reg_name) + + def start_link(_) do + case whereis() do + nil -> + GenServer.start_link(__MODULE__, nil, name: @reg_name) + + pid -> + {:ok, pid} + end + end + + @impl true + def init(_) do + {:ok, nil, {:continue, :init_state}} + end + + @impl true + def handle_continue(:init_state, _state) do + {:ok, _} = State.start_link(nil) + + update_status(:init) + + data_migration = data_migration() + manual_migrations = Config.get([:instance, :manual_data_migrations], []) + + cond do + Config.get(:env) == :test -> + update_status(:noop) + + is_nil(data_migration) -> + update_status(:halt, "Data migration does not exist.") + + data_migration.state == :manual or data_migration.name in manual_migrations -> + update_status(:noop, "Data migration is in manual execution state.") + + data_migration.state == :complete -> + handle_success(data_migration) + + true -> + send(self(), :process_attachments) + end + + {:noreply, nil} + end + + @impl true + def handle_info(:process_attachments, state) do + data_migration = data_migration() + + persistent_data = Map.take(data_migration.data, ["max_processed_id"]) + + {:ok, data_migration} = + DataMigration.update(data_migration, %{state: :running, data: persistent_data}) + + update_status(:running) + put_stat(:started_at, NaiveDateTime.utc_now()) + + Logger.info("Starting creating `media` records for objects' attachments...") + + max_processed_id = data_migration.data["max_processed_id"] || 0 + + query() + |> where([object], object.id > ^max_processed_id) + |> Repo.chunk_stream(100, :batches, timeout: :infinity) + |> Stream.each(fn objects -> + object_ids = Enum.map(objects, & &1.id) + + failed_ids = + objects + |> Enum.map(&process_object_attachments(&1)) + |> Enum.filter(&(elem(&1, 0) == :error)) + |> Enum.map(&elem(&1, 1)) + + for failed_id <- failed_ids do + _ = + Repo.query( + "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <> + "VALUES ($1, $2) ON CONFLICT DO NOTHING;", + [data_migration.id, failed_id] + ) + end + + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = ANY($2)", + [data_migration.id, object_ids -- failed_ids] + ) + + max_object_id = Enum.at(object_ids, -1) + + put_stat(:max_processed_id, max_object_id) + increment_stat(:processed_count, length(object_ids)) + increment_stat(:failed_count, length(failed_ids)) + + put_stat( + :records_per_second, + state()[:processed_count] / + Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1]) + ) + + persist_stats(data_migration) + + # A quick and dirty approach to controlling the load this background migration imposes + sleep_interval = Config.get([:populate_media_table, :sleep_interval_ms], 0) + Process.sleep(sleep_interval) + end) + |> Stream.run() + + with 0 <- failures_count(data_migration.id) do + {:ok, data_migration} = DataMigration.update_state(data_migration, :complete) + + handle_success(data_migration) + else + _ -> + _ = DataMigration.update_state(data_migration, :failed) + + update_status(:failed, "Please check data_migration_failed_ids records.") + end + + {:noreply, state} + end + + def query do + from( + object in Object, + where: + fragment( + "(?)->'attachment' IS NOT NULL AND \ +(?)->'attachment' != ANY(ARRAY['null'::jsonb, '[]'::jsonb])", + object.data, + object.data + ), + select: %{ + id: object.id, + attachment: fragment("(?)->'attachment'", object.data), + actor: fragment("(?)->'actor'", object.data) + } + ) + end + + defp process_object_attachments(object) do + attachments = + if Map.has_key?(object, :attachment), do: object.attachment, else: object.data["attachment"] + + actor = if Map.has_key?(object, :actor), do: object.actor, else: object.data["actor"] + + Repo.transaction(fn -> + with {_, true} <- {:any, Enum.any?(attachments || [], &is_nil(&1["id"]))}, + updated_attachments = + Enum.map(attachments, fn attachment -> + if is_nil(attachment["id"]) do + with {:ok, media} <- + Media.create_from_object_data(attachment, %{ + actor: actor, + object_id: object.id + }) do + Map.put(attachment, "id", media.id) + else + {:error, e} -> + error = + "ERROR: could not process attachment of object #{object.id}: " <> + "#{attachment["href"]}: #{inspect(e)}" + + Logger.error(error) + Repo.rollback(object.id) + end + else + attachment + end + end), + {:ok, _} <- + Object.update_data(%Object{id: object.id}, %{"attachment" => updated_attachments}) do + object.id + else + {:any, false} -> + object.id + + {:error, e} -> + error = "ERROR: could not update attachments of object #{object.id}: #{inspect(e)}" + + Logger.error(error) + Repo.rollback(object.id) + end + end) + end + + @doc "Approximate count for current iteration (including processed records count)" + def count(force \\ false, timeout \\ :infinity) do + stored_count = state()[:count] + + if stored_count && !force do + stored_count + else + processed_count = state()[:processed_count] || 0 + max_processed_id = data_migration().data["max_processed_id"] || 0 + query = where(query(), [object], object.id > ^max_processed_id) + + count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count + put_stat(:count, count) + count + end + end + + defp persist_stats(data_migration) do + runner_state = Map.drop(state(), [:status]) + _ = DataMigration.update(data_migration, %{data: runner_state}) + end + + defp handle_success(_data_migration) do + update_status(:complete) + end + + def failed_objects_query do + from(o in Object) + |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), + on: dmf.record_id == o.id + ) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id) + |> order_by([o], asc: o.id) + end + + def failures_count(data_migration_id \\ nil) do + data_migration_id = data_migration_id || data_migration().id + + with {:ok, %{rows: [[count]]}} <- + Repo.query( + "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", + [data_migration_id] + ) do + count + end + end + + def retry_failed do + data_migration = data_migration() + + failed_objects_query() + |> Repo.chunk_stream(100, :one) + |> Stream.each(fn object -> + with {:ok, _} <- process_object_attachments(object) do + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = $2", + [data_migration.id, object.id] + ) + end + end) + |> Stream.run() + end + + def force_continue do + send(whereis(), :process_attachments) + end + + def force_restart do + {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}}) + force_continue() + end + + def force_complete do + {:ok, data_migration} = DataMigration.update_state(data_migration(), :complete) + + handle_success(data_migration) + end + + defp update_status(status, message \\ nil) do + put_stat(:status, status) + put_stat(:message, message) + end +end diff --git a/lib/pleroma/migrators/media_table_migrator/state.ex b/lib/pleroma/migrators/media_table_migrator/state.ex new file mode 100644 index 000000000..7a4e43456 --- /dev/null +++ b/lib/pleroma/migrators/media_table_migrator/state.ex @@ -0,0 +1,31 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.MediaTableMigrator.State do + use Agent + + @init_state %{} + @reg_name {:global, __MODULE__} + + def start_link(_) do + Agent.start_link(fn -> @init_state end, name: @reg_name) + end + + def get do + Agent.get(@reg_name, & &1) + end + + def put(key, value) do + Agent.update(@reg_name, fn state -> + Map.put(state, key, value) + end) + end + + def increment(key, increment \\ 1) do + Agent.update(@reg_name, fn state -> + updated_value = (state[key] || 0) + increment + Map.put(state, key, updated_value) + end) + end +end diff --git a/lib/pleroma/repo.ex b/lib/pleroma/repo.ex index 4556352d0..61b64ed3e 100644 --- a/lib/pleroma/repo.ex +++ b/lib/pleroma/repo.ex @@ -63,8 +63,8 @@ defmodule Pleroma.Repo do iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches) """ @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t() - def chunk_stream(query, chunk_size, returns_as \\ :one) do - # We don't actually need start and end funcitons of resource streaming, + def chunk_stream(query, chunk_size, returns_as \\ :one, query_options \\ []) do + # We don't actually need start and end functions of resource streaming, # but it seems to be the only way to not fetch records one-by-one and # have individual records be the elements of the stream, instead of # lists of records @@ -76,7 +76,7 @@ defmodule Pleroma.Repo do |> order_by(asc: :id) |> where([r], r.id > ^last_id) |> limit(^chunk_size) - |> all() + |> all(query_options) |> case do [] -> {:halt, last_id} diff --git a/priv/repo/migrations/20210105195018_create_data_migrations.exs b/priv/repo/migrations/20210105195018_create_data_migrations.exs new file mode 100644 index 000000000..5f2e8d96c --- /dev/null +++ b/priv/repo/migrations/20210105195018_create_data_migrations.exs @@ -0,0 +1,17 @@ +defmodule Pleroma.Repo.Migrations.CreateDataMigrations do + use Ecto.Migration + + def change do + create_if_not_exists table(:data_migrations) do + add(:name, :string, null: false) + add(:state, :integer, default: 1) + add(:feature_lock, :boolean, default: false) + add(:params, :map, default: %{}) + add(:data, :map, default: %{}) + + timestamps() + end + + create_if_not_exists(unique_index(:data_migrations, [:name])) + end +end diff --git a/priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs b/priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs new file mode 100644 index 000000000..ba0be98af --- /dev/null +++ b/priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs @@ -0,0 +1,14 @@ +defmodule Pleroma.Repo.Migrations.CreateDataMigrationFailedIds do + use Ecto.Migration + + def change do + create_if_not_exists table(:data_migration_failed_ids, primary_key: false) do + add(:data_migration_id, references(:data_migrations), null: false) + add(:record_id, :bigint, null: false) + end + + create_if_not_exists( + unique_index(:data_migration_failed_ids, [:data_migration_id, :record_id]) + ) + end +end diff --git a/priv/repo/migrations/20210122100954_data_migration_create_populate_media_table.exs b/priv/repo/migrations/20210122100954_data_migration_create_populate_media_table.exs new file mode 100644 index 000000000..e4f24e544 --- /dev/null +++ b/priv/repo/migrations/20210122100954_data_migration_create_populate_media_table.exs @@ -0,0 +1,14 @@ +defmodule Pleroma.Repo.Migrations.DataMigrationCreatePopulateMediaTable do + use Ecto.Migration + + def up do + dt = NaiveDateTime.utc_now() + + execute( + "INSERT INTO data_migrations(name, inserted_at, updated_at) " <> + "VALUES ('populate_media_table', '#{dt}', '#{dt}') ON CONFLICT DO NOTHING;" + ) + end + + def down, do: :ok +end