Fix/1019 refactor See merge request pleroma/pleroma!1397tags/v1.1.4
@@ -250,13 +250,7 @@ config :pleroma, :instance, | |||
skip_thread_containment: true, | |||
limit_to_local_content: :unauthenticated, | |||
dynamic_configuration: false, | |||
external_user_synchronization: [ | |||
enabled: false, | |||
# every 2 hours | |||
interval: 60 * 60 * 2, | |||
max_retries: 3, | |||
limit: 500 | |||
] | |||
external_user_synchronization: true | |||
config :pleroma, :markup, | |||
# XXX - unfortunately, inline images must be enabled by default right now, because | |||
@@ -126,11 +126,7 @@ config :pleroma, Pleroma.Emails.Mailer, | |||
* `skip_thread_containment`: Skip filter out broken threads. The default is `false`. | |||
* `limit_to_local_content`: Limit unauthenticated users to search for local statutes and users only. Possible values: `:unauthenticated`, `:all` and `false`. The default is `:unauthenticated`. | |||
* `dynamic_configuration`: Allow transferring configuration to DB with the subsequent customization from Admin api. | |||
* `external_user_synchronization`: Following/followers counters synchronization settings. | |||
* `enabled`: Enables synchronization | |||
* `interval`: Interval between synchronization. | |||
* `max_retries`: Max rettries for host. After exceeding the limit, the check will not be carried out for users from this host. | |||
* `limit`: Users batch size for processing in one time. | |||
* `external_user_synchronization`: Enabling following/followers counters synchronization for external users. | |||
@@ -151,11 +151,7 @@ defmodule Pleroma.Application do | |||
start: {Pleroma.Web.Endpoint, :start_link, []}, | |||
type: :supervisor | |||
}, | |||
%{id: Pleroma.Gopher.Server, start: {Pleroma.Gopher.Server, :start_link, []}}, | |||
%{ | |||
id: Pleroma.User.SynchronizationWorker, | |||
start: {Pleroma.User.SynchronizationWorker, :start_link, []} | |||
} | |||
%{id: Pleroma.Gopher.Server, start: {Pleroma.Gopher.Server, :start_link, []}} | |||
] | |||
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html | |||
@@ -52,6 +52,7 @@ defmodule Pleroma.User do | |||
field(:avatar, :map) | |||
field(:local, :boolean, default: true) | |||
field(:follower_address, :string) | |||
field(:following_address, :string) | |||
field(:search_rank, :float, virtual: true) | |||
field(:search_type, :integer, virtual: true) | |||
field(:tags, {:array, :string}, default: []) | |||
@@ -107,6 +108,10 @@ defmodule Pleroma.User do | |||
def ap_followers(%User{follower_address: fa}) when is_binary(fa), do: fa | |||
def ap_followers(%User{} = user), do: "#{ap_id(user)}/followers" | |||
@spec ap_following(User.t()) :: Sring.t() | |||
def ap_following(%User{following_address: fa}) when is_binary(fa), do: fa | |||
def ap_following(%User{} = user), do: "#{ap_id(user)}/following" | |||
def user_info(%User{} = user, args \\ %{}) do | |||
following_count = | |||
if args[:following_count], do: args[:following_count], else: following_count(user) | |||
@@ -128,6 +133,7 @@ defmodule Pleroma.User do | |||
Cachex.put(:user_cache, "user_info:#{user.id}", user_info(user, args)) | |||
end | |||
@spec restrict_deactivated(Ecto.Query.t()) :: Ecto.Query.t() | |||
def restrict_deactivated(query) do | |||
from(u in query, | |||
where: not fragment("? \\? 'deactivated' AND ?->'deactivated' @> 'true'", u.info, u.info) | |||
@@ -162,9 +168,10 @@ defmodule Pleroma.User do | |||
if changes.valid? do | |||
case info_cng.changes[:source_data] do | |||
%{"followers" => followers} -> | |||
%{"followers" => followers, "following" => following} -> | |||
changes | |||
|> put_change(:follower_address, followers) | |||
|> put_change(:following_address, following) | |||
_ -> | |||
followers = User.ap_followers(%User{nickname: changes.changes[:nickname]}) | |||
@@ -196,7 +203,14 @@ defmodule Pleroma.User do | |||
|> User.Info.user_upgrade(params[:info]) | |||
struct | |||
|> cast(params, [:bio, :name, :follower_address, :avatar, :last_refreshed_at]) | |||
|> cast(params, [ | |||
:bio, | |||
:name, | |||
:follower_address, | |||
:following_address, | |||
:avatar, | |||
:last_refreshed_at | |||
]) | |||
|> unique_constraint(:nickname) | |||
|> validate_format(:nickname, local_nickname_regex()) | |||
|> validate_length(:bio, max: 5000) | |||
@@ -1012,42 +1026,20 @@ defmodule Pleroma.User do | |||
) | |||
end | |||
@spec sync_follow_counter() :: :ok | |||
def sync_follow_counter, | |||
do: PleromaJobQueue.enqueue(:background, __MODULE__, [:sync_follow_counters]) | |||
@spec perform(:sync_follow_counters) :: :ok | |||
def perform(:sync_follow_counters) do | |||
{:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors) | |||
config = Pleroma.Config.get([:instance, :external_user_synchronization]) | |||
:ok = sync_follow_counters(config) | |||
Agent.stop(:domain_errors) | |||
end | |||
@spec sync_follow_counters(keyword()) :: :ok | |||
def sync_follow_counters(opts \\ []) do | |||
users = external_users(opts) | |||
if length(users) > 0 do | |||
errors = Agent.get(:domain_errors, fn state -> state end) | |||
{last, updated_errors} = User.Synchronization.call(users, errors, opts) | |||
Agent.update(:domain_errors, fn _state -> updated_errors end) | |||
sync_follow_counters(max_id: last.id, limit: opts[:limit]) | |||
else | |||
:ok | |||
end | |||
@spec external_users_query() :: Ecto.Query.t() | |||
def external_users_query do | |||
User.Query.build(%{ | |||
external: true, | |||
active: true, | |||
order_by: :id | |||
}) | |||
end | |||
@spec external_users(keyword()) :: [User.t()] | |||
def external_users(opts \\ []) do | |||
query = | |||
User.Query.build(%{ | |||
external: true, | |||
active: true, | |||
order_by: :id, | |||
select: [:id, :ap_id, :info] | |||
}) | |||
external_users_query() | |||
|> select([u], struct(u, [:id, :ap_id, :info])) | |||
query = | |||
if opts[:max_id], | |||
@@ -1,60 +0,0 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.User.Synchronization do | |||
alias Pleroma.HTTP | |||
alias Pleroma.User | |||
@spec call([User.t()], map(), keyword()) :: {User.t(), map()} | |||
def call(users, errors, opts \\ []) do | |||
do_call(users, errors, opts) | |||
end | |||
defp do_call([user | []], errors, opts) do | |||
updated = fetch_counters(user, errors, opts) | |||
{user, updated} | |||
end | |||
defp do_call([user | others], errors, opts) do | |||
updated = fetch_counters(user, errors, opts) | |||
do_call(others, updated, opts) | |||
end | |||
defp fetch_counters(user, errors, opts) do | |||
%{host: host} = URI.parse(user.ap_id) | |||
info = %{} | |||
{following, errors} = fetch_counter(user.ap_id <> "/following", host, errors, opts) | |||
info = if following, do: Map.put(info, :following_count, following), else: info | |||
{followers, errors} = fetch_counter(user.ap_id <> "/followers", host, errors, opts) | |||
info = if followers, do: Map.put(info, :follower_count, followers), else: info | |||
User.set_info_cache(user, info) | |||
errors | |||
end | |||
defp available_domain?(domain, errors, opts) do | |||
max_retries = Keyword.get(opts, :max_retries, 3) | |||
not (Map.has_key?(errors, domain) && errors[domain] >= max_retries) | |||
end | |||
defp fetch_counter(url, host, errors, opts) do | |||
with true <- available_domain?(host, errors, opts), | |||
{:ok, %{body: body, status: code}} when code in 200..299 <- | |||
HTTP.get( | |||
url, | |||
[{:Accept, "application/activity+json"}] | |||
), | |||
{:ok, data} <- Jason.decode(body) do | |||
{data["totalItems"], errors} | |||
else | |||
false -> | |||
{nil, errors} | |||
_ -> | |||
{nil, Map.update(errors, host, 1, &(&1 + 1))} | |||
end | |||
end | |||
end |
@@ -1,32 +0,0 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-onl | |||
defmodule Pleroma.User.SynchronizationWorker do | |||
use GenServer | |||
def start_link do | |||
config = Pleroma.Config.get([:instance, :external_user_synchronization]) | |||
if config[:enabled] do | |||
GenServer.start_link(__MODULE__, interval: config[:interval]) | |||
else | |||
:ignore | |||
end | |||
end | |||
def init(opts) do | |||
schedule_next(opts) | |||
{:ok, opts} | |||
end | |||
def handle_info(:sync_follow_counters, opts) do | |||
Pleroma.User.sync_follow_counter() | |||
schedule_next(opts) | |||
{:noreply, opts} | |||
end | |||
defp schedule_next(opts) do | |||
Process.send_after(self(), :sync_follow_counters, opts[:interval]) | |||
end | |||
end |
@@ -994,6 +994,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do | |||
avatar: avatar, | |||
name: data["name"], | |||
follower_address: data["followers"], | |||
following_address: data["following"], | |||
bio: data["summary"] | |||
} | |||
@@ -1087,6 +1087,10 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do | |||
PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user]) | |||
end | |||
if Pleroma.Config.get([:instance, :external_user_synchronization]) do | |||
update_following_followers_counters(user) | |||
end | |||
{:ok, user} | |||
else | |||
%User{} = user -> {:ok, user} | |||
@@ -1119,4 +1123,27 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do | |||
data | |||
|> maybe_fix_user_url | |||
end | |||
def update_following_followers_counters(user) do | |||
info = %{} | |||
following = fetch_counter(user.following_address) | |||
info = if following, do: Map.put(info, :following_count, following), else: info | |||
followers = fetch_counter(user.follower_address) | |||
info = if followers, do: Map.put(info, :follower_count, followers), else: info | |||
User.set_info_cache(user, info) | |||
end | |||
defp fetch_counter(url) do | |||
with {:ok, %{body: body, status: code}} when code in 200..299 <- | |||
Pleroma.HTTP.get( | |||
url, | |||
[{:Accept, "application/activity+json"}] | |||
), | |||
{:ok, data} <- Jason.decode(body) do | |||
data["totalItems"] | |||
end | |||
end | |||
end |
@@ -0,0 +1,9 @@ | |||
defmodule Pleroma.Repo.Migrations.AddFollowingAddressToUser do | |||
use Ecto.Migration | |||
def change do | |||
alter table(:users) do | |||
add(:following_address, :string, unique: true) | |||
end | |||
end | |||
end |
@@ -0,0 +1,8 @@ | |||
defmodule Pleroma.Repo.Migrations.AddFollowingAddressIndexToUser do | |||
use Ecto.Migration | |||
@disable_ddl_transaction true | |||
def change do | |||
create(index(:users, [:following_address], concurrently: true)) | |||
end | |||
end |
@@ -0,0 +1,20 @@ | |||
defmodule Pleroma.Repo.Migrations.AddFollowingAddressFromSourceData do | |||
use Ecto.Migration | |||
import Ecto.Query | |||
alias Pleroma.User | |||
def change do | |||
query = | |||
User.external_users_query() | |||
|> select([u], struct(u, [:id, :ap_id, :info])) | |||
Pleroma.Repo.stream(query) | |||
|> Enum.each(fn | |||
%{info: %{source_data: source_data}} = user -> | |||
Ecto.Changeset.cast(user, %{following_address: source_data["following"]}, [ | |||
:following_address | |||
]) | |||
|> Pleroma.Repo.update() | |||
end) | |||
end | |||
end |
@@ -38,6 +38,7 @@ defmodule Pleroma.Factory do | |||
user | |||
| ap_id: User.ap_id(user), | |||
follower_address: User.ap_followers(user), | |||
following_address: User.ap_following(user), | |||
following: [User.ap_id(user)] | |||
} | |||
end | |||
@@ -3,7 +3,7 @@ | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Mix.Tasks.Pleroma.RobotsTxtTest do | |||
use ExUnit.Case, async: true | |||
use ExUnit.Case | |||
alias Mix.Tasks.Pleroma.RobotsTxt | |||
test "creates new dir" do | |||
@@ -1,104 +0,0 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.User.SynchronizationTest do | |||
use Pleroma.DataCase | |||
import Pleroma.Factory | |||
alias Pleroma.User | |||
alias Pleroma.User.Synchronization | |||
setup do | |||
Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end) | |||
:ok | |||
end | |||
test "update following/followers counters" do | |||
user1 = | |||
insert(:user, | |||
local: false, | |||
ap_id: "http://localhost:4001/users/masto_closed" | |||
) | |||
user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2") | |||
users = User.external_users() | |||
assert length(users) == 2 | |||
{user, %{}} = Synchronization.call(users, %{}) | |||
assert user == List.last(users) | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) | |||
assert followers == 437 | |||
assert following == 152 | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) | |||
assert followers == 527 | |||
assert following == 267 | |||
end | |||
test "don't check host if errors exist" do | |||
user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1") | |||
user2 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser2") | |||
users = User.external_users() | |||
assert length(users) == 2 | |||
{user, %{"domain-with-errors" => 2}} = | |||
Synchronization.call(users, %{"domain-with-errors" => 2}, max_retries: 2) | |||
assert user == List.last(users) | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) | |||
assert followers == 0 | |||
assert following == 0 | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) | |||
assert followers == 0 | |||
assert following == 0 | |||
end | |||
test "don't check host if errors appeared" do | |||
user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1") | |||
user2 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser2") | |||
users = User.external_users() | |||
assert length(users) == 2 | |||
{user, %{"domain-with-errors" => 2}} = Synchronization.call(users, %{}, max_retries: 2) | |||
assert user == List.last(users) | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) | |||
assert followers == 0 | |||
assert following == 0 | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) | |||
assert followers == 0 | |||
assert following == 0 | |||
end | |||
test "other users after error appeared" do | |||
user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1") | |||
user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2") | |||
users = User.external_users() | |||
assert length(users) == 2 | |||
{user, %{"domain-with-errors" => 2}} = Synchronization.call(users, %{}, max_retries: 2) | |||
assert user == List.last(users) | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) | |||
assert followers == 0 | |||
assert following == 0 | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) | |||
assert followers == 527 | |||
assert following == 267 | |||
end | |||
end |
@@ -1,49 +0,0 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.User.SynchronizationWorkerTest do | |||
use Pleroma.DataCase | |||
import Pleroma.Factory | |||
setup do | |||
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) | |||
config = Pleroma.Config.get([:instance, :external_user_synchronization]) | |||
for_update = [enabled: true, interval: 1000] | |||
Pleroma.Config.put([:instance, :external_user_synchronization], for_update) | |||
on_exit(fn -> | |||
Pleroma.Config.put([:instance, :external_user_synchronization], config) | |||
end) | |||
:ok | |||
end | |||
test "sync follow counters" do | |||
user1 = | |||
insert(:user, | |||
local: false, | |||
ap_id: "http://localhost:4001/users/masto_closed" | |||
) | |||
user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2") | |||
{:ok, _} = Pleroma.User.SynchronizationWorker.start_link() | |||
:timer.sleep(1500) | |||
%{follower_count: followers, following_count: following} = | |||
Pleroma.User.get_cached_user_info(user1) | |||
assert followers == 437 | |||
assert following == 152 | |||
%{follower_count: followers, following_count: following} = | |||
Pleroma.User.get_cached_user_info(user2) | |||
assert followers == 527 | |||
assert following == 267 | |||
end | |||
end |
@@ -54,6 +54,14 @@ defmodule Pleroma.UserTest do | |||
assert expected_followers_collection == User.ap_followers(user) | |||
end | |||
test "ap_following returns the following collection for the user" do | |||
user = UserBuilder.build() | |||
expected_followers_collection = "#{User.ap_id(user)}/following" | |||
assert expected_followers_collection == User.ap_following(user) | |||
end | |||
test "returns all pending follow requests" do | |||
unlocked = insert(:user) | |||
locked = insert(:user, %{info: %{locked: true}}) | |||
@@ -1240,52 +1248,6 @@ defmodule Pleroma.UserTest do | |||
assert User.external_users(max_id: fdb_user2.id, limit: 1) == [] | |||
end | |||
test "sync_follow_counters/1", %{user1: user1, user2: user2} do | |||
{:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors) | |||
:ok = User.sync_follow_counters() | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) | |||
assert followers == 437 | |||
assert following == 152 | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) | |||
assert followers == 527 | |||
assert following == 267 | |||
Agent.stop(:domain_errors) | |||
end | |||
test "sync_follow_counters/1 in separate batches", %{user1: user1, user2: user2} do | |||
{:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors) | |||
:ok = User.sync_follow_counters(limit: 1) | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) | |||
assert followers == 437 | |||
assert following == 152 | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) | |||
assert followers == 527 | |||
assert following == 267 | |||
Agent.stop(:domain_errors) | |||
end | |||
test "perform/1 with :sync_follow_counters", %{user1: user1, user2: user2} do | |||
:ok = User.perform(:sync_follow_counters) | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) | |||
assert followers == 437 | |||
assert following == 152 | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) | |||
assert followers == 527 | |||
assert following == 267 | |||
end | |||
end | |||
describe "set_info_cache/2" do | |||
@@ -1121,6 +1121,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do | |||
assert user.info.ap_enabled | |||
assert user.info.note_count == 1 | |||
assert user.follower_address == "https://niu.moe/users/rye/followers" | |||
assert user.following_address == "https://niu.moe/users/rye/following" | |||
user = User.get_cached_by_id(user.id) | |||
assert user.info.note_count == 1 | |||
@@ -1358,4 +1359,32 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do | |||
refute recipient.follower_address in fixed_object["to"] | |||
end | |||
end | |||
test "update_following_followers_counters/1" do | |||
user1 = | |||
insert(:user, | |||
local: false, | |||
follower_address: "http://localhost:4001/users/masto_closed/followers", | |||
following_address: "http://localhost:4001/users/masto_closed/following" | |||
) | |||
user2 = | |||
insert(:user, | |||
local: false, | |||
follower_address: "http://localhost:4001/users/fuser2/followers", | |||
following_address: "http://localhost:4001/users/fuser2/following" | |||
) | |||
Transmogrifier.update_following_followers_counters(user1) | |||
Transmogrifier.update_following_followers_counters(user2) | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) | |||
assert followers == 437 | |||
assert following == 152 | |||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) | |||
assert followers == 527 | |||
assert following == 267 | |||
end | |||
end |