Merge branch 'feature/object-hashtags-rework' into build-docker/feature/object-hashtags-rework

This commit is contained in:
Mark Felder 2021-02-18 16:13:58 -06:00
commit 7d40f594c5
6 changed files with 119 additions and 43 deletions

View File

@ -657,6 +657,8 @@ config :pleroma, :oauth2,
config :pleroma, :database, rum_enabled: false
config :pleroma, :populate_hashtags_table, fault_rate_allowance: 0.01
config :pleroma, :env, Mix.env()
config :http_signatures,

View File

@ -480,6 +480,13 @@ config :pleroma, :config_description, [
description: "`populate_hashtags_table` background migration settings",
children: [
%{
key: :fault_rate_allowance,
type: :float,
description:
"Max rate of failed objects to actually processed objects in order to enable the feature (any value from 0.0 which tolerates no errors to 1.0 which will enable the feature even if hashtags transfer failed for all records).",
suggestions: [0.01]
},
%{
key: :sleep_interval_ms,
type: :integer,
description:

View File

@ -70,6 +70,7 @@ To add configuration to your config file, you can copy it from the base config.
## Background migrations
* `populate_hashtags_table/sleep_interval_ms`: Sleep interval between each chunk of processed records in order to decrease the load on the system (defaults to 0 and should be keep default on most instances).
* `populate_hashtags_table/fault_rate_allowance`: Max rate of failed objects to actually processed objects in order to enable the feature (any value from 0.0 which tolerates no errors to 1.0 which will enable the feature even if hashtags transfer failed for all records).
## Welcome
* `direct_message`: - welcome message sent as a direct message.

View File

@ -15,7 +15,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
alias Pleroma.Object
alias Pleroma.Repo
defdelegate data_migration(), to: State
defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table
defdelegate data_migration_id(), to: State
defdelegate state(), to: State
defdelegate persist_state(), to: State, as: :persist_to_db
@ -23,10 +24,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
defdelegate put_stat(key, value), to: State, as: :put_data_key
defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
@feature_config_path [:database, :improved_hashtag_timeline]
@reg_name {:global, __MODULE__}
def whereis, do: GenServer.whereis(@reg_name)
def feature_state, do: Config.get(@feature_config_path)
def start_link(_) do
case whereis() do
nil ->
@ -46,8 +50,6 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
def handle_continue(:init_state, _state) do
{:ok, _} = State.start_link(nil)
update_status(:pending)
data_migration = data_migration()
manual_migrations = Config.get([:instance, :manual_data_migrations], [])
@ -56,10 +58,14 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
update_status(:noop)
is_nil(data_migration) ->
update_status(:failed, "Data migration does not exist.")
message = "Data migration does not exist."
update_status(:failed, message)
Logger.error("#{__MODULE__}: #{message}")
data_migration.state == :manual or data_migration.name in manual_migrations ->
update_status(:manual, "Data migration is in manual execution state.")
message = "Data migration is in manual execution or manual fix mode."
update_status(:manual, message)
Logger.warn("#{__MODULE__}: #{message}")
data_migration.state == :complete ->
on_complete(data_migration)
@ -78,7 +84,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
update_status(:running)
put_stat(:started_at, NaiveDateTime.utc_now())
%{id: data_migration_id} = data_migration()
data_migration_id = data_migration_id()
max_processed_id = get_stat(:max_processed_id, 0)
Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...")
@ -89,12 +95,19 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
|> Stream.each(fn objects ->
object_ids = Enum.map(objects, & &1.id)
results = Enum.map(objects, &transfer_object_hashtags(&1))
failed_ids =
objects
|> Enum.map(&transfer_object_hashtags(&1))
results
|> Enum.filter(&(elem(&1, 0) == :error))
|> Enum.map(&elem(&1, 1))
# Count of objects with hashtags (`{:noop, id}` is returned for objects having other AS2 tags)
chunk_affected_count =
results
|> Enum.filter(&(elem(&1, 0) == :ok))
|> length()
for failed_id <- failed_ids do
_ =
Repo.query(
@ -116,6 +129,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
put_stat(:max_processed_id, max_object_id)
increment_stat(:processed_count, length(object_ids))
increment_stat(:failed_count, length(failed_ids))
increment_stat(:affected_count, chunk_affected_count)
put_stat(:records_per_second, records_per_second())
persist_state()
@ -125,17 +139,42 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
end)
|> Stream.run()
with 0 <- failures_count(data_migration_id) do
_ = delete_non_create_activities_hashtags()
set_complete()
else
_ ->
update_status(:failed, "Please check data_migration_failed_ids records.")
fault_rate = fault_rate()
put_stat(:fault_rate, fault_rate)
fault_rate_allowance = Config.get([:populate_hashtags_table, :fault_rate_allowance], 0)
cond do
fault_rate == 0 ->
set_complete()
is_float(fault_rate) and fault_rate <= fault_rate_allowance ->
message = """
Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}.
Putting data migration to manual fix mode. Check `retry_failed/0`.
"""
Logger.warn("#{__MODULE__}: #{message}")
update_status(:manual, message)
on_complete(data_migration())
true ->
message = "Too many failures. Check data_migration_failed_ids records / `retry_failed/0`."
Logger.error("#{__MODULE__}: #{message}")
update_status(:failed, message)
end
persist_state()
{:noreply, state}
end
def fault_rate do
with failures_count when is_integer(failures_count) <- failures_count() do
failures_count / Enum.max([get_stat(:affected_count, 0), 1])
else
_ -> :error
end
end
defp records_per_second do
get_stat(:processed_count, 0) / Enum.max([running_time(), 1])
end
@ -194,6 +233,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
|> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id))
end
@spec transfer_object_hashtags(Map.t()) :: {:noop | :ok | :error, integer()}
defp transfer_object_hashtags(object) do
embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"]
hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags})
@ -201,7 +241,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
if Enum.any?(hashtags) do
transfer_object_hashtags(object, hashtags)
else
{:ok, object.id}
{:noop, object.id}
end
end
@ -209,13 +249,11 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
Repo.transaction(fn ->
with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id})
expected_rows = length(hashtag_records)
base_error =
"ERROR when inserting #{expected_rows} hashtags_objects for obj. #{object.id}"
base_error = "ERROR when inserting hashtags_objects for object with id #{object.id}"
try do
with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do
with {rows_count, _} when is_integer(rows_count) <-
Repo.insert_all("hashtags_objects", maps, on_conflict: :nothing) do
object.id
else
e ->
@ -260,11 +298,11 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
data_migration.feature_lock ->
:noop
not is_nil(Config.get([:database, :improved_hashtag_timeline])) ->
not is_nil(feature_state()) ->
:noop
true ->
Config.put([:database, :improved_hashtag_timeline], true)
Config.put(@feature_config_path, true)
:ok
end
end
@ -274,38 +312,41 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
|> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
on: dmf.record_id == o.id
)
|> where([_o, dmf], dmf.data_migration_id == ^data_migration().id)
|> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
|> order_by([o], asc: o.id)
end
def failures_count(data_migration_id \\ nil) do
data_migration_id = data_migration_id || data_migration().id
def failures_count do
with {:ok, %{rows: [[count]]}} <-
Repo.query(
"SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
[data_migration_id]
[data_migration_id()]
) do
count
end
end
def retry_failed do
data_migration = data_migration()
data_migration_id = data_migration_id()
failed_objects_query()
|> Repo.chunk_stream(100, :one)
|> Stream.each(fn object ->
with {:ok, _} <- transfer_object_hashtags(object) do
with {res, _} when res != :error <- transfer_object_hashtags(object) do
_ =
Repo.query(
"DELETE FROM data_migration_failed_ids " <>
"WHERE data_migration_id = $1 AND record_id = $2",
[data_migration.id, object.id]
[data_migration_id, object.id]
)
end
end)
|> Stream.run()
put_stat(:failed_count, failures_count())
persist_state()
force_continue()
end
def force_continue do

View File

@ -7,7 +7,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
alias Pleroma.DataMigration
defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table
defdelegate data_migration(), to: Pleroma.Migrators.HashtagsTableMigrator
@reg_name {:global, __MODULE__}
@ -99,4 +99,6 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
defp persist_non_data_change(_, _) do
nil
end
def data_migration_id, do: Map.get(state(), :data_migration_id)
end

View File

@ -787,19 +787,42 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
from(
[_activity, object] in query,
where:
fragment(
"""
EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[])
AND hashtags_objects.object_id = ? LIMIT 1)
""",
^tags,
object.id
# TODO: refactor: debug / experimental feature
if Config.get([:database, :improved_hashtag_timeline]) == :preselect_hashtag_ids do
hashtag_ids =
from(ht in Pleroma.Hashtag,
where: fragment("name = ANY(?::citext[])", ^tags),
select: ht.id
)
)
|> Repo.all()
from(
[_activity, object] in query,
where:
fragment(
"""
EXISTS (
SELECT 1 FROM hashtags_objects WHERE hashtag_id = ANY(?) AND object_id = ? LIMIT 1)
""",
^hashtag_ids,
object.id
)
)
else
from(
[_activity, object] in query,
where:
fragment(
"""
EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[])
AND hashtags_objects.object_id = ? LIMIT 1)
""",
^tags,
object.id
)
)
end
end
defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do