Changed default hashtags filtering strategy to non-aggregate approach.build-docker/feature/object-hashtags-rework
@@ -940,7 +940,7 @@ config :pleroma, :config_description, [ | |||||
key: :improved_hashtag_timeline, | key: :improved_hashtag_timeline, | ||||
type: :keyword, | type: :keyword, | ||||
description: | description: | ||||
"If `true` / `:prefer_aggregation` / `:avoid_aggregation`, hashtags table and selected strategy will be used for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes." | |||||
"If `true` / `:prefer_aggregation`, hashtags table and selected strategy will be used for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes." | |||||
} | } | ||||
] | ] | ||||
}, | }, | ||||
@@ -109,8 +109,9 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do | |||||
_ = | _ = | ||||
Repo.query( | Repo.query( | ||||
"DELETE FROM data_migration_failed_ids WHERE id = ANY($1)", | |||||
[object_ids -- failed_ids] | |||||
"DELETE FROM data_migration_failed_ids " <> | |||||
"WHERE data_migration_id = $1 AND record_id = ANY($2)", | |||||
[data_migration.id, object_ids -- failed_ids] | |||||
) | ) | ||||
max_object_id = Enum.at(object_ids, -1) | max_object_id = Enum.at(object_ids, -1) | ||||
@@ -133,12 +134,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do | |||||
end) | end) | ||||
|> Stream.run() | |> Stream.run() | ||||
with {:ok, %{rows: [[0]]}} <- | |||||
Repo.query( | |||||
"SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", | |||||
[data_migration.id] | |||||
) do | |||||
_ = DataMigration.update_state(data_migration, :complete) | |||||
with 0 <- failures_count(data_migration.id) do | |||||
{:ok, data_migration} = DataMigration.update_state(data_migration, :complete) | |||||
handle_success(data_migration) | handle_success(data_migration) | ||||
else | else | ||||
@@ -167,7 +164,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do | |||||
end | end | ||||
defp transfer_object_hashtags(object) do | defp transfer_object_hashtags(object) do | ||||
hashtags = Object.object_data_hashtags(%{"tag" => object.tag}) | |||||
embedded_tags = (Map.has_key?(object, :tag) && object.tag) || object.data["tag"] | |||||
hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) | |||||
Repo.transaction(fn -> | Repo.transaction(fn -> | ||||
with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do | with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do | ||||
@@ -246,6 +244,36 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do | |||||
|> order_by([o], asc: o.id) | |> order_by([o], asc: o.id) | ||||
end | end | ||||
def failures_count(data_migration_id \\ nil) do | |||||
data_migration_id = data_migration_id || data_migration().id | |||||
with {:ok, %{rows: [[count]]}} <- | |||||
Repo.query( | |||||
"SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", | |||||
[data_migration_id] | |||||
) do | |||||
count | |||||
end | |||||
end | |||||
def retry_failed do | |||||
data_migration = data_migration() | |||||
failed_objects_query() | |||||
|> Repo.chunk_stream(100, :one) | |||||
|> Stream.each(fn object -> | |||||
with {:ok, _} <- transfer_object_hashtags(object) do | |||||
_ = | |||||
Repo.query( | |||||
"DELETE FROM data_migration_failed_ids " <> | |||||
"WHERE data_migration_id = $1 AND record_id = $2", | |||||
[data_migration.id, object.id] | |||||
) | |||||
end | |||||
end) | |||||
|> Stream.run() | |||||
end | |||||
def force_continue do | def force_continue do | ||||
send(whereis(), :migrate_hashtags) | send(whereis(), :migrate_hashtags) | ||||
end | end | ||||
@@ -255,6 +283,12 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do | |||||
force_continue() | force_continue() | ||||
end | end | ||||
def force_complete do | |||||
{:ok, data_migration} = DataMigration.update_state(data_migration(), :complete) | |||||
handle_success(data_migration) | |||||
end | |||||
defp update_status(status, message \\ nil) do | defp update_status(status, message \\ nil) do | ||||
put_stat(:status, status) | put_stat(:status, status) | ||||
put_stat(:message, message) | put_stat(:message, message) | ||||
@@ -727,6 +727,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do | |||||
|> Enum.map(&List.wrap(&1)) | |> Enum.map(&List.wrap(&1)) | ||||
end | end | ||||
# Note: times out on larger instances (with default timeout), intended for complex queries | |||||
defp restrict_hashtag_agg(query, opts) do | defp restrict_hashtag_agg(query, opts) do | ||||
[tag_any, tag_all, tag_reject] = hashtag_conditions(opts) | [tag_any, tag_all, tag_reject] = hashtag_conditions(opts) | ||||
has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1)) | has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1)) | ||||
@@ -1290,25 +1291,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do | |||||
hashtag_timeline_strategy == :prefer_aggregation -> | hashtag_timeline_strategy == :prefer_aggregation -> | ||||
restrict_hashtag_agg(query, opts) | restrict_hashtag_agg(query, opts) | ||||
hashtag_timeline_strategy == :avoid_aggregation or avoid_hashtags_aggregation?(opts) -> | |||||
true -> | |||||
query | query | ||||
|> distinct([activity], true) | |> distinct([activity], true) | ||||
|> restrict_hashtag_any(opts) | |> restrict_hashtag_any(opts) | ||||
|> restrict_hashtag_all(opts) | |> restrict_hashtag_all(opts) | ||||
|> restrict_hashtag_reject_any(opts) | |> restrict_hashtag_reject_any(opts) | ||||
true -> | |||||
restrict_hashtag_agg(query, opts) | |||||
end | end | ||||
end | end | ||||
defp avoid_hashtags_aggregation?(opts) do | |||||
[tag_any, tag_all, tag_reject] = hashtag_conditions(opts) | |||||
joins_count = length(tag_all) + if Enum.any?(tag_any), do: 1, else: 0 | |||||
Enum.empty?(tag_reject) and joins_count <= 2 | |||||
end | |||||
def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do | def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do | ||||
list_memberships = Pleroma.List.memberships(opts[:user]) | list_memberships = Pleroma.List.memberships(opts[:user]) | ||||
@@ -217,7 +217,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do | |||||
{:ok, status_two} = CommonAPI.post(user, %{status: ". #essais"}) | {:ok, status_two} = CommonAPI.post(user, %{status: ". #essais"}) | ||||
{:ok, status_three} = CommonAPI.post(user, %{status: ". #test #reject"}) | {:ok, status_three} = CommonAPI.post(user, %{status: ". #test #reject"}) | ||||
for hashtag_timeline_strategy <- [true, :prefer_aggregation, :avoid_aggregation, false] do | |||||
for hashtag_timeline_strategy <- [true, :prefer_aggregation, false] do | |||||
clear_config([:instance, :improved_hashtag_timeline], hashtag_timeline_strategy) | clear_config([:instance, :improved_hashtag_timeline], hashtag_timeline_strategy) | ||||
fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"}) | fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"}) | ||||