@@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). | |||
## Unreleased | |||
### Added | |||
- Experimental websocket-based federation between Pleroma instances. | |||
### Changed | |||
- Renamed `:await_up_timeout` in `:connections_pool` namespace to `:connect_timeout`, old name is deprecated. | |||
@@ -130,6 +130,7 @@ config :pleroma, Pleroma.Web.Endpoint, | |||
dispatch: [ | |||
{:_, | |||
[ | |||
{"/api/fedsocket/v1", Pleroma.Web.FedSockets.IncomingHandler, []}, | |||
{"/api/v1/streaming", Pleroma.Web.MastodonAPI.WebsocketHandler, []}, | |||
{"/websocket", Phoenix.Endpoint.CowboyWebSocket, | |||
{Phoenix.Transports.WebSocket, | |||
@@ -148,6 +149,16 @@ config :pleroma, Pleroma.Web.Endpoint, | |||
"SameSite=Lax" | |||
] | |||
config :pleroma, :fed_sockets, | |||
enabled: false, | |||
connection_duration: :timer.hours(8), | |||
rejection_duration: :timer.minutes(15), | |||
fed_socket_fetches: [ | |||
default: 12_000, | |||
interval: 3_000, | |||
lazy: false | |||
] | |||
# Configures Elixir's Logger | |||
config :logger, :console, | |||
level: :debug, | |||
@@ -532,6 +543,7 @@ config :pleroma, Oban, | |||
token_expiration: 5, | |||
federator_incoming: 50, | |||
federator_outgoing: 50, | |||
ingestion_queue: 50, | |||
web_push: 50, | |||
mailer: 10, | |||
transmogrifier: 20, | |||
@@ -272,6 +272,19 @@ config :pleroma, :config_description, [ | |||
}, | |||
%{ | |||
group: :pleroma, | |||
key: :fed_sockets, | |||
type: :group, | |||
description: "Websocket based federation", | |||
children: [ | |||
%{ | |||
key: :enabled, | |||
type: :boolean, | |||
description: "Enable FedSockets" | |||
} | |||
] | |||
}, | |||
%{ | |||
group: :pleroma, | |||
key: Pleroma.Emails.Mailer, | |||
type: :group, | |||
description: "Mailer-related settings", | |||
@@ -19,6 +19,11 @@ config :logger, :console, | |||
level: :warn, | |||
format: "\n[$level] $message\n" | |||
config :pleroma, :fed_sockets, | |||
enabled: false, | |||
connection_duration: 5, | |||
rejection_duration: 5 | |||
config :pleroma, :auth, oauth_consumer_strategies: [] | |||
config :pleroma, Pleroma.Upload, | |||
@@ -225,6 +225,16 @@ Enables the worker which processes posts scheduled for deletion. Pinned posts ar | |||
* `enabled`: whether expired activities will be sent to the job queue to be deleted | |||
## FedSockets | |||
FedSockets is an experimental feature allowing for Pleroma backends to federate using a persistant websocket connection as opposed to making each federation a seperate http connection. This feature is currently off by default. It is configurable throught he following options. | |||
### :fedsockets | |||
* `enabled`: Enables FedSockets for this instance. `false` by default. | |||
* `connection_duration`: Time an idle websocket is kept open. | |||
* `rejection_duration`: Failures to connect via FedSockets will not be retried for this period of time. | |||
* `fed_socket_fetches` and `fed_socket_rejections`: Settings passed to `cachex` for the fetch registry, and rejection stacks. See `Pleroma.Web.FedSockets` for more details. | |||
## Frontends | |||
### :frontend_configurations | |||
@@ -99,7 +99,7 @@ defmodule Pleroma.Application do | |||
{Oban, Config.get(Oban)} | |||
] ++ | |||
task_children(@env) ++ | |||
streamer_child(@env) ++ | |||
dont_run_in_test(@env) ++ | |||
chat_child(@env, chat_enabled?()) ++ | |||
[ | |||
Pleroma.Web.Endpoint, | |||
@@ -188,16 +188,17 @@ defmodule Pleroma.Application do | |||
defp chat_enabled?, do: Config.get([:chat, :enabled]) | |||
defp streamer_child(env) when env in [:test, :benchmark], do: [] | |||
defp dont_run_in_test(env) when env in [:test, :benchmark], do: [] | |||
defp streamer_child(_) do | |||
defp dont_run_in_test(_) do | |||
[ | |||
{Registry, | |||
[ | |||
name: Pleroma.Web.Streamer.registry(), | |||
keys: :duplicate, | |||
partitions: System.schedulers_online() | |||
]} | |||
]}, | |||
Pleroma.Web.FedSockets.Supervisor | |||
] | |||
end | |||
@@ -12,6 +12,7 @@ defmodule Pleroma.Object.Fetcher do | |||
alias Pleroma.Web.ActivityPub.ObjectValidator | |||
alias Pleroma.Web.ActivityPub.Transmogrifier | |||
alias Pleroma.Web.Federator | |||
alias Pleroma.Web.FedSockets | |||
require Logger | |||
require Pleroma.Constants | |||
@@ -182,9 +183,47 @@ defmodule Pleroma.Object.Fetcher do | |||
end | |||
end | |||
def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do | |||
def fetch_and_contain_remote_object_from_id(prm, opts \\ []) | |||
def fetch_and_contain_remote_object_from_id(%{"id" => id}, opts), | |||
do: fetch_and_contain_remote_object_from_id(id, opts) | |||
def fetch_and_contain_remote_object_from_id(id, opts) when is_binary(id) do | |||
Logger.debug("Fetching object #{id} via AP") | |||
with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")}, | |||
{:ok, body} <- get_object(id, opts), | |||
{:ok, data} <- safe_json_decode(body), | |||
:ok <- Containment.contain_origin_from_id(id, data) do | |||
{:ok, data} | |||
else | |||
{:scheme, _} -> | |||
{:error, "Unsupported URI scheme"} | |||
{:error, e} -> | |||
{:error, e} | |||
e -> | |||
{:error, e} | |||
end | |||
end | |||
def fetch_and_contain_remote_object_from_id(_id, _opts), | |||
do: {:error, "id must be a string"} | |||
defp get_object(id, opts) do | |||
with false <- Keyword.get(opts, :force_http, false), | |||
{:ok, fedsocket} <- FedSockets.get_or_create_fed_socket(id) do | |||
Logger.debug("fetching via fedsocket - #{inspect(id)}") | |||
FedSockets.fetch(fedsocket, id) | |||
else | |||
_other -> | |||
Logger.debug("fetching via http - #{inspect(id)}") | |||
get_object_http(id) | |||
end | |||
end | |||
defp get_object_http(id) do | |||
date = Pleroma.Signature.signed_date() | |||
headers = | |||
@@ -192,20 +231,13 @@ defmodule Pleroma.Object.Fetcher do | |||
|> maybe_date_fetch(date) | |||
|> sign_fetch(id, date) | |||
Logger.debug("Fetch headers: #{inspect(headers)}") | |||
case HTTP.get(id, headers) do | |||
{:ok, %{body: body, status: code}} when code in 200..299 -> | |||
{:ok, body} | |||
with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")}, | |||
{:ok, %{body: body, status: code}} when code in 200..299 <- HTTP.get(id, headers), | |||
{:ok, data} <- Jason.decode(body), | |||
:ok <- Containment.contain_origin_from_id(id, data) do | |||
{:ok, data} | |||
else | |||
{:ok, %{status: code}} when code in [404, 410] -> | |||
{:error, "Object has been deleted"} | |||
{:scheme, _} -> | |||
{:error, "Unsupported URI scheme"} | |||
{:error, e} -> | |||
{:error, e} | |||
@@ -214,8 +246,6 @@ defmodule Pleroma.Object.Fetcher do | |||
end | |||
end | |||
def fetch_and_contain_remote_object_from_id(%{"id" => id}), | |||
do: fetch_and_contain_remote_object_from_id(id) | |||
def fetch_and_contain_remote_object_from_id(_id), do: {:error, "id must be a string"} | |||
defp safe_json_decode(nil), do: {:ok, nil} | |||
defp safe_json_decode(json), do: Jason.decode(json) | |||
end |
@@ -39,7 +39,7 @@ defmodule Pleroma.Signature do | |||
def fetch_public_key(conn) do | |||
with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn), | |||
{:ok, actor_id} <- key_id_to_actor_id(kid), | |||
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do | |||
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do | |||
{:ok, public_key} | |||
else | |||
e -> | |||
@@ -50,8 +50,8 @@ defmodule Pleroma.Signature do | |||
def refetch_public_key(conn) do | |||
with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn), | |||
{:ok, actor_id} <- key_id_to_actor_id(kid), | |||
{:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id), | |||
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do | |||
{:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id, force_http: true), | |||
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do | |||
{:ok, public_key} | |||
else | |||
e -> | |||
@@ -1820,12 +1820,12 @@ defmodule Pleroma.User do | |||
def html_filter_policy(_), do: Config.get([:markup, :scrub_policy]) | |||
def fetch_by_ap_id(ap_id), do: ActivityPub.make_user_from_ap_id(ap_id) | |||
def fetch_by_ap_id(ap_id, opts \\ []), do: ActivityPub.make_user_from_ap_id(ap_id, opts) | |||
def get_or_fetch_by_ap_id(ap_id) do | |||
def get_or_fetch_by_ap_id(ap_id, opts \\ []) do | |||
cached_user = get_cached_by_ap_id(ap_id) | |||
maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id) | |||
maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id, opts) | |||
case {cached_user, maybe_fetched_user} do | |||
{_, {:ok, %User{} = user}} -> | |||
@@ -1898,8 +1898,8 @@ defmodule Pleroma.User do | |||
def public_key(_), do: {:error, "key not found"} | |||
def get_public_key_for_ap_id(ap_id) do | |||
with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id), | |||
def get_public_key_for_ap_id(ap_id, opts \\ []) do | |||
with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id, opts), | |||
{:ok, public_key} <- public_key(user) do | |||
{:ok, public_key} | |||
else | |||
@@ -1270,10 +1270,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do | |||
def fetch_follow_information_for_user(user) do | |||
with {:ok, following_data} <- | |||
Fetcher.fetch_and_contain_remote_object_from_id(user.following_address), | |||
Fetcher.fetch_and_contain_remote_object_from_id(user.following_address, | |||
force_http: true | |||
), | |||
{:ok, hide_follows} <- collection_private(following_data), | |||
{:ok, followers_data} <- | |||
Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address), | |||
Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true), | |||
{:ok, hide_followers} <- collection_private(followers_data) do | |||
{:ok, | |||
%{ | |||
@@ -1347,8 +1349,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do | |||
end | |||
end | |||
def fetch_and_prepare_user_from_ap_id(ap_id) do | |||
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id), | |||
def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do | |||
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts), | |||
{:ok, data} <- user_data_from_user_object(data) do | |||
{:ok, maybe_update_follow_information(data)} | |||
else | |||
@@ -1390,13 +1392,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do | |||
end | |||
end | |||
def make_user_from_ap_id(ap_id) do | |||
def make_user_from_ap_id(ap_id, opts \\ []) do | |||
user = User.get_cached_by_ap_id(ap_id) | |||
if user && !User.ap_enabled?(user) do | |||
Transmogrifier.upgrade_user_from_ap_id(ap_id) | |||
else | |||
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do | |||
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do | |||
if user do | |||
user | |||
|> User.remote_user_changeset(data) | |||
@@ -13,6 +13,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do | |||
alias Pleroma.User | |||
alias Pleroma.Web.ActivityPub.Relay | |||
alias Pleroma.Web.ActivityPub.Transmogrifier | |||
alias Pleroma.Web.FedSockets | |||
require Pleroma.Constants | |||
@@ -50,15 +51,35 @@ defmodule Pleroma.Web.ActivityPub.Publisher do | |||
def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do | |||
Logger.debug("Federating #{id} to #{inbox}") | |||
uri = URI.parse(inbox) | |||
case FedSockets.get_or_create_fed_socket(inbox) do | |||
{:ok, fedsocket} -> | |||
Logger.debug("publishing via fedsockets - #{inspect(inbox)}") | |||
FedSockets.publish(fedsocket, json) | |||
_ -> | |||
Logger.debug("publishing via http - #{inspect(inbox)}") | |||
http_publish(inbox, actor, json, params) | |||
end | |||
end | |||
def publish_one(%{actor_id: actor_id} = params) do | |||
actor = User.get_cached_by_id(actor_id) | |||
params | |||
|> Map.delete(:actor_id) | |||
|> Map.put(:actor, actor) | |||
|> publish_one() | |||
end | |||
defp http_publish(inbox, actor, json, params) do | |||
uri = %{path: path} = URI.parse(inbox) | |||
digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64()) | |||
date = Pleroma.Signature.signed_date() | |||
signature = | |||
Pleroma.Signature.sign(actor, %{ | |||
"(request-target)": "post #{uri.path}", | |||
"(request-target)": "post #{path}", | |||
host: signature_host(uri), | |||
"content-length": byte_size(json), | |||
digest: digest, | |||
@@ -89,15 +110,6 @@ defmodule Pleroma.Web.ActivityPub.Publisher do | |||
end | |||
end | |||
def publish_one(%{actor_id: actor_id} = params) do | |||
actor = User.get_cached_by_id(actor_id) | |||
params | |||
|> Map.delete(:actor_id) | |||
|> Map.put(:actor, actor) | |||
|> publish_one() | |||
end | |||
defp signature_host(%URI{port: port, scheme: scheme, host: host}) do | |||
if port == URI.default_port(scheme) do | |||
host | |||
@@ -1000,7 +1000,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do | |||
def upgrade_user_from_ap_id(ap_id) do | |||
with %User{local: false} = user <- User.get_cached_by_ap_id(ap_id), | |||
{:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id), | |||
{:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id, force_http: true), | |||
{:ok, user} <- update_user(user, data) do | |||
TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id}) | |||
{:ok, user} | |||
@@ -0,0 +1,185 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets.FedRegistry do | |||
@moduledoc """ | |||
The FedRegistry stores the active FedSockets for quick retrieval. | |||
The storage and retrieval portion of the FedRegistry is done in process through | |||
elixir's `Registry` module for speed and its ability to monitor for terminated processes. | |||
Dropped connections will be caught by `Registry` and deleted. Since the next | |||
message will initiate a new connection there is no reason to try and reconnect at that point. | |||
Normally outside modules should have no need to call or use the FedRegistry themselves. | |||
""" | |||
alias Pleroma.Web.FedSockets.FedSocket | |||
alias Pleroma.Web.FedSockets.SocketInfo | |||
require Logger | |||
@default_rejection_duration 15 * 60 * 1000 | |||
@rejections :fed_socket_rejections | |||
@doc """ | |||
Retrieves a FedSocket from the Registry given it's origin. | |||
The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080" | |||
Will return: | |||
* {:ok, fed_socket} for working FedSockets | |||
* {:error, :rejected} for origins that have been tried and refused within the rejection duration interval | |||
* {:error, some_reason} usually :missing for unknown origins | |||
""" | |||
def get_fed_socket(origin) do | |||
case get_registry_data(origin) do | |||
{:error, reason} -> | |||
{:error, reason} | |||
{:ok, %{state: :connected} = socket_info} -> | |||
{:ok, socket_info} | |||
end | |||
end | |||
@doc """ | |||
Adds a connected FedSocket to the Registry. | |||
Always returns {:ok, fed_socket} | |||
""" | |||
def add_fed_socket(origin, pid \\ nil) do | |||
origin | |||
|> SocketInfo.build(pid) | |||
|> SocketInfo.connect() | |||
|> add_socket_info | |||
end | |||
defp add_socket_info(%{origin: origin, state: :connected} = socket_info) do | |||
case Registry.register(FedSockets.Registry, origin, socket_info) do | |||
{:ok, _owner} -> | |||
clear_prior_rejection(origin) | |||
Logger.debug("fedsocket added: #{inspect(origin)}") | |||
{:ok, socket_info} | |||
{:error, {:already_registered, _pid}} -> | |||
FedSocket.close(socket_info) | |||
existing_socket_info = Registry.lookup(FedSockets.Registry, origin) | |||
{:ok, existing_socket_info} | |||
_ -> | |||
{:error, :error_adding_socket} | |||
end | |||
end | |||
@doc """ | |||
Mark this origin as having rejected a connection attempt. | |||
This will keep it from getting additional connection attempts | |||
for a period of time specified in the config. | |||
Always returns {:ok, new_reg_data} | |||
""" | |||
def set_host_rejected(uri) do | |||
new_reg_data = | |||
uri | |||
|> SocketInfo.origin() | |||
|> get_or_create_registry_data() | |||
|> set_to_rejected() | |||
|> save_registry_data() | |||
{:ok, new_reg_data} | |||
end | |||
@doc """ | |||
Retrieves the FedRegistryData from the Registry given it's origin. | |||
The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080" | |||
Will return: | |||
* {:ok, fed_registry_data} for known origins | |||
* {:error, :missing} for uniknown origins | |||
* {:error, :cache_error} indicating some low level runtime issues | |||
""" | |||
def get_registry_data(origin) do | |||
case Registry.lookup(FedSockets.Registry, origin) do | |||
[] -> | |||
if is_rejected?(origin) do | |||
Logger.debug("previously rejected fedsocket requested") | |||
{:error, :rejected} | |||
else | |||
{:error, :missing} | |||
end | |||
[{_pid, %{state: :connected} = socket_info}] -> | |||
{:ok, socket_info} | |||
_ -> | |||
{:error, :cache_error} | |||
end | |||
end | |||
@doc """ | |||
Retrieves a map of all sockets from the Registry. The keys are the origins and the values are the corresponding SocketInfo | |||
""" | |||
def list_all do | |||
(list_all_connected() ++ list_all_rejected()) | |||
|> Enum.into(%{}) | |||
end | |||
defp list_all_connected do | |||
FedSockets.Registry | |||
|> Registry.select([{{:"$1", :_, :"$3"}, [], [{{:"$1", :"$3"}}]}]) | |||
end | |||
defp list_all_rejected do | |||
{:ok, keys} = Cachex.keys(@rejections) | |||
{:ok, registry_data} = | |||
Cachex.execute(@rejections, fn worker -> | |||
Enum.map(keys, fn k -> {k, Cachex.get!(worker, k)} end) | |||
end) | |||
registry_data | |||
end | |||
defp clear_prior_rejection(origin), | |||
do: Cachex.del(@rejections, origin) | |||
defp is_rejected?(origin) do | |||
case Cachex.get(@rejections, origin) do | |||
{:ok, nil} -> | |||
false | |||
{:ok, _} -> | |||
true | |||
end | |||
end | |||
defp get_or_create_registry_data(origin) do | |||
case get_registry_data(origin) do | |||
{:error, :missing} -> | |||
%SocketInfo{origin: origin} | |||
{:ok, socket_info} -> | |||
socket_info | |||
end | |||
end | |||
defp save_registry_data(%SocketInfo{origin: origin, state: :connected} = socket_info) do | |||
{:ok, true} = Registry.update_value(FedSockets.Registry, origin, fn _ -> socket_info end) | |||
socket_info | |||
end | |||
defp save_registry_data(%SocketInfo{origin: origin, state: :rejected} = socket_info) do | |||
rejection_expiration = | |||
Pleroma.Config.get([:fed_sockets, :rejection_duration], @default_rejection_duration) | |||
{:ok, true} = Cachex.put(@rejections, origin, socket_info, ttl: rejection_expiration) | |||
socket_info | |||
end | |||
defp set_to_rejected(%SocketInfo{} = socket_info), | |||
do: %SocketInfo{socket_info | state: :rejected} | |||
end |
@@ -0,0 +1,137 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets.FedSocket do | |||
@moduledoc """ | |||
The FedSocket module abstracts the actions to be taken taken on connections regardless of | |||
whether the connection started as inbound or outbound. | |||
Normally outside modules will have no need to call the FedSocket module directly. | |||
""" | |||
alias Pleroma.Object | |||
alias Pleroma.Object.Containment | |||
alias Pleroma.User | |||
alias Pleroma.Web.ActivityPub.ObjectView | |||
alias Pleroma.Web.ActivityPub.UserView | |||
alias Pleroma.Web.ActivityPub.Visibility | |||
alias Pleroma.Web.FedSockets.FetchRegistry | |||
alias Pleroma.Web.FedSockets.IngesterWorker | |||
alias Pleroma.Web.FedSockets.OutgoingHandler | |||
alias Pleroma.Web.FedSockets.SocketInfo | |||
require Logger | |||
@shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103" | |||
def connect_to_host(uri) do | |||
case OutgoingHandler.start_link(uri) do | |||
{:ok, pid} -> | |||
{:ok, pid} | |||
error -> | |||
{:error, error} | |||
end | |||
end | |||
def close(%SocketInfo{pid: socket_pid}), | |||
do: Process.send(socket_pid, :close, []) | |||
def publish(%SocketInfo{pid: socket_pid}, json) do | |||
%{action: :publish, data: json} | |||
|> Jason.encode!() | |||
|> send_packet(socket_pid) | |||
end | |||
def fetch(%SocketInfo{pid: socket_pid}, id) do | |||
fetch_uuid = FetchRegistry.register_fetch(id) | |||
%{action: :fetch, data: id, uuid: fetch_uuid} | |||
|> Jason.encode!() | |||
|> send_packet(socket_pid) | |||
wait_for_fetch_to_return(fetch_uuid, 0) | |||
end | |||
def receive_package(%SocketInfo{} = fed_socket, json) do | |||
json | |||
|> Jason.decode!() | |||
|> process_package(fed_socket) | |||
end | |||
defp wait_for_fetch_to_return(uuid, cntr) do | |||
case FetchRegistry.check_fetch(uuid) do | |||
{:error, :waiting} -> | |||
Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc()) | |||
wait_for_fetch_to_return(uuid, cntr + 1) | |||
{:error, :missing} -> | |||
Logger.error("FedSocket fetch timed out - #{inspect(uuid)}") | |||
{:error, :timeout} | |||
{:ok, _fr} -> | |||
FetchRegistry.pop_fetch(uuid) | |||
end | |||
end | |||
defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do | |||
if Containment.contain_origin(origin, data) do | |||
IngesterWorker.enqueue("ingest", %{"object" => data}) | |||
end | |||
{:reply, %{"action" => "publish_reply", "status" => "processed"}} | |||
end | |||
defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do | |||
FetchRegistry.register_fetch_received(uuid, data) | |||
{:noreply, nil} | |||
end | |||
defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do | |||
{:ok, data} = render_fetched_data(ap_id, uuid) | |||
{:reply, data} | |||
end | |||
defp process_package(%{"action" => "publish_reply"}, _fed_socket) do | |||
{:noreply, nil} | |||
end | |||
defp process_package(other, _fed_socket) do | |||
Logger.warn("unknown json packages received #{inspect(other)}") | |||
{:noreply, nil} | |||
end | |||
defp render_fetched_data(ap_id, uuid) do | |||
{:ok, | |||
%{ | |||
"action" => "fetch_reply", | |||
"status" => "processed", | |||
"uuid" => uuid, | |||
"data" => represent_item(ap_id) | |||
}} | |||
end | |||
defp represent_item(ap_id) do | |||
case User.get_by_ap_id(ap_id) do | |||
nil -> | |||
object = Object.get_cached_by_ap_id(ap_id) | |||
if Visibility.is_public?(object) do | |||
Phoenix.View.render_to_string(ObjectView, "object.json", object: object) | |||
else | |||
nil | |||
end | |||
user -> | |||
Phoenix.View.render_to_string(UserView, "user.json", user: user) | |||
end | |||
end | |||
defp send_packet(data, socket_pid) do | |||
Process.send(socket_pid, {:send, data}, []) | |||
end | |||
def shake, do: @shake | |||
end |
@@ -0,0 +1,182 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets do | |||
@moduledoc """ | |||
This documents the FedSockets framework. A framework for federating | |||
ActivityPub objects between servers via persistant WebSocket connections. | |||
FedSockets allow servers to authenticate on first contact and maintain that | |||
connection, eliminating the need to authenticate every time data needs to be shared. | |||
## Protocol | |||
FedSockets currently support 2 types of data transfer: | |||
* `publish` method which doesn't require a response | |||
* `fetch` method requires a response be sent | |||
### Publish | |||
The publish operation sends a json encoded map of the shape: | |||
%{action: :publish, data: json} | |||
and accepts (but does not require) a reply of form: | |||
%{"action" => "publish_reply"} | |||
The outgoing params represent | |||
* data: ActivityPub object encoded into json | |||
### Fetch | |||
The fetch operation sends a json encoded map of the shape: | |||
%{action: :fetch, data: id, uuid: fetch_uuid} | |||
and requires a reply of form: | |||
%{"action" => "fetch_reply", "uuid" => uuid, "data" => data} | |||
The outgoing params represent | |||
* id: an ActivityPub object URI | |||
* uuid: a unique uuid generated by the sender | |||
The reply params represent | |||
* data: an ActivityPub object encoded into json | |||
* uuid: the uuid sent along with the fetch request | |||
## Examples | |||
Clients of FedSocket transfers shouldn't need to use any of the functions outside of this module. | |||
A typical publish operation can be performed through the following code, and a fetch operation in a similar manner. | |||
case FedSockets.get_or_create_fed_socket(inbox) do | |||
{:ok, fedsocket} -> | |||
FedSockets.publish(fedsocket, json) | |||
_ -> | |||
alternative_publish(inbox, actor, json, params) | |||
end | |||
## Configuration | |||
FedSockets have the following config settings | |||
config :pleroma, :fed_sockets, | |||
enabled: true, | |||
ping_interval: :timer.seconds(15), | |||
connection_duration: :timer.hours(1), | |||
rejection_duration: :timer.hours(1), | |||
fed_socket_fetches: [ | |||
default: 12_000, | |||
interval: 3_000, | |||
lazy: false | |||
] | |||
* enabled - turn FedSockets on or off with this flag. Can be toggled at runtime. | |||
* connection_duration - How long a FedSocket can sit idle before it's culled. | |||
* rejection_duration - After failing to make a FedSocket connection a host will be excluded | |||
from further connections for this amount of time | |||
* fed_socket_fetches - Use these parameters to pass options to the Cachex queue backing the FetchRegistry | |||
* fed_socket_rejections - Use these parameters to pass options to the Cachex queue backing the FedRegistry | |||
Cachex options are | |||
* default: the minimum amount of time a fetch can wait before it times out. | |||
* interval: the interval between checks for timed out entries. This plus the default represent the maximum time allowed | |||
* lazy: leave at false for consistant and fast lookups, set to true for stricter timeout enforcement | |||
""" | |||
require Logger | |||
alias Pleroma.Web.FedSockets.FedRegistry | |||
alias Pleroma.Web.FedSockets.FedSocket | |||
alias Pleroma.Web.FedSockets.SocketInfo | |||
@doc """ | |||
returns a FedSocket for the given origin. Will reuse an existing one or create a new one. | |||
address is expected to be a fully formed URL such as: | |||
"http://www.example.com" or "http://www.example.com:8080" | |||
It can and usually does include additional path parameters, | |||
but these are ignored as the FedSockets are organized by host and port info alone. | |||
""" | |||
def get_or_create_fed_socket(address) do | |||
with {:cache, {:error, :missing}} <- {:cache, get_fed_socket(address)}, | |||
{:connect, {:ok, _pid}} <- {:connect, FedSocket.connect_to_host(address)}, | |||
{:cache, {:ok, fed_socket}} <- {:cache, get_fed_socket(address)} do | |||
Logger.debug("fedsocket created for - #{inspect(address)}") | |||
{:ok, fed_socket} | |||
else | |||
{:cache, {:ok, socket}} -> | |||
Logger.debug("fedsocket found in cache - #{inspect(address)}") | |||
{:ok, socket} | |||
{:connect, {:error, _host}} -> | |||
Logger.debug("set host rejected for - #{inspect(address)}") | |||
FedRegistry.set_host_rejected(address) | |||
{:error, :rejected} | |||
{_, {:error, :disabled}} -> | |||
{:error, :disabled} | |||
{_, {:error, reason}} -> | |||
Logger.warn("get_or_create_fed_socket error - #{inspect(reason)}") | |||
{:error, reason} | |||
end | |||
end | |||
@doc """ | |||
returns a FedSocket for the given origin. Will not create a new FedSocket if one does not exist. | |||
address is expected to be a fully formed URL such as: | |||
"http://www.example.com" or "http://www.example.com:8080" | |||
""" | |||
def get_fed_socket(address) do | |||
origin = SocketInfo.origin(address) | |||
with {:config, true} <- {:config, Pleroma.Config.get([:fed_sockets, :enabled], false)}, | |||
{:ok, socket} <- FedRegistry.get_fed_socket(origin) do | |||
{:ok, socket} | |||
else | |||
{:config, _} -> | |||
{:error, :disabled} | |||
{:error, :rejected} -> | |||
Logger.debug("FedSocket previously rejected - #{inspect(origin)}") | |||
{:error, :rejected} | |||
{:error, reason} -> | |||
{:error, reason} | |||
end | |||
end | |||
@doc """ | |||
Sends the supplied data via the publish protocol. | |||
It will not block waiting for a reply. | |||
Returns :ok but this is not an indication of a successful transfer. | |||
the data is expected to be JSON encoded binary data. | |||
""" | |||
def publish(%SocketInfo{} = fed_socket, json) do | |||
FedSocket.publish(fed_socket, json) | |||
end | |||
@doc """ | |||
Sends the supplied data via the fetch protocol. | |||
It will block waiting for a reply or timeout. | |||
Returns {:ok, object} where object is the requested object (or nil) | |||
{:error, :timeout} in the event the message was not responded to | |||
the id is expected to be the URI of an ActivityPub object. | |||
""" | |||
def fetch(%SocketInfo{} = fed_socket, id) do | |||
FedSocket.fetch(fed_socket, id) | |||
end | |||
@doc """ | |||
Disconnect all and restart FedSockets. | |||
This is mainly used in development and testing but could be useful in production. | |||
""" | |||
def reset do | |||
FedRegistry | |||
|> Process.whereis() | |||
|> Process.exit(:testing) | |||
end | |||
def uri_for_origin(origin), | |||
do: "ws://#{origin}/api/fedsocket/v1" | |||
end |
@@ -0,0 +1,151 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets.FetchRegistry do | |||
@moduledoc """ | |||
The FetchRegistry acts as a broker for fetch requests and return values. | |||
This allows calling processes to block while waiting for a reply. | |||
It doesn't impose it's own process instead using `Cachex` to handle fetches in process, allowing | |||
multi threaded processes to avoid bottlenecking. | |||
Normally outside modules will have no need to call or use the FetchRegistry themselves. | |||
The `Cachex` parameters can be controlled from the config. Since exact timeout intervals | |||
aren't necessary the following settings are used by default: | |||
config :pleroma, :fed_sockets, | |||
fed_socket_fetches: [ | |||
default: 12_000, | |||
interval: 3_000, | |||
lazy: false | |||
] | |||
""" | |||
defmodule FetchRegistryData do | |||
defstruct uuid: nil, | |||
sent_json: nil, | |||
received_json: nil, | |||
sent_at: nil, | |||
received_at: nil | |||
end | |||
alias Ecto.UUID | |||
require Logger | |||
@fetches :fed_socket_fetches | |||
@doc """ | |||
Registers a json request wth the FetchRegistry and returns the identifying UUID. | |||
""" | |||
def register_fetch(json) do | |||
%FetchRegistryData{uuid: uuid} = | |||
json | |||
|> new_registry_data | |||
|> save_registry_data | |||
uuid | |||
end | |||
@doc """ | |||
Reports on the status of a Fetch given the identifying UUID. | |||
Will return | |||
* {:ok, fetched_object} if a fetch has completed | |||
* {:error, :waiting} if a fetch is still pending | |||
* {:error, other_error} usually :missing to indicate a fetch that has timed out | |||
""" | |||
def check_fetch(uuid) do | |||
case get_registry_data(uuid) do | |||
{:ok, %FetchRegistryData{received_at: nil}} -> | |||
{:error, :waiting} | |||
{:ok, %FetchRegistryData{} = reg_data} -> | |||
{:ok, reg_data} | |||
e -> | |||
e | |||
end | |||
end | |||
@doc """ | |||
Retrieves the response to a fetch given the identifying UUID. | |||
The completed fetch will be deleted from the FetchRegistry | |||
Will return | |||
* {:ok, fetched_object} if a fetch has completed | |||
* {:error, :waiting} if a fetch is still pending | |||
* {:error, other_error} usually :missing to indicate a fetch that has timed out | |||
""" | |||
def pop_fetch(uuid) do | |||
case check_fetch(uuid) do | |||
{:ok, %FetchRegistryData{received_json: received_json}} -> | |||
delete_registry_data(uuid) | |||
{:ok, received_json} | |||
e -> | |||
e | |||
end | |||
end | |||
@doc """ | |||
This is called to register a fetch has returned. | |||
It expects the result data along with the UUID that was sent in the request | |||
Will return the fetched object or :error | |||
""" | |||
def register_fetch_received(uuid, data) do | |||
case get_registry_data(uuid) do | |||
{:ok, %FetchRegistryData{received_at: nil} = reg_data} -> | |||
reg_data | |||
|> set_fetch_received(data) | |||
|> save_registry_data() | |||
{:ok, %FetchRegistryData{} = reg_data} -> | |||
Logger.warn("tried to add fetched data twice - #{uuid}") | |||
reg_data | |||
{:error, _} -> | |||
Logger.warn("Error adding fetch to registry - #{uuid}") | |||
:error | |||
end | |||
end | |||
defp new_registry_data(json) do | |||
%FetchRegistryData{ | |||
uuid: UUID.generate(), | |||
sent_json: json, | |||
sent_at: :erlang.monotonic_time(:millisecond) | |||
} | |||
end | |||
defp get_registry_data(origin) do | |||
case Cachex.get(@fetches, origin) do | |||
{:ok, nil} -> | |||
{:error, :missing} | |||
{:ok, reg_data} -> | |||
{:ok, reg_data} | |||
_ -> | |||
{:error, :cache_error} | |||
end | |||
end | |||
defp set_fetch_received(%FetchRegistryData{} = reg_data, data), | |||
do: %FetchRegistryData{ | |||
reg_data | |||
| received_at: :erlang.monotonic_time(:millisecond), | |||
received_json: data | |||
} | |||
defp save_registry_data(%FetchRegistryData{uuid: uuid} = reg_data) do | |||
{:ok, true} = Cachex.put(@fetches, uuid, reg_data) | |||
reg_data | |||
end | |||
defp delete_registry_data(origin), | |||
do: {:ok, true} = Cachex.del(@fetches, origin) | |||
end |
@@ -0,0 +1,88 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets.IncomingHandler do | |||
require Logger | |||
alias Pleroma.Web.FedSockets.FedRegistry | |||
alias Pleroma.Web.FedSockets.FedSocket | |||
alias Pleroma.Web.FedSockets.SocketInfo | |||
import HTTPSignatures, only: [validate_conn: 1, split_signature: 1] | |||
@behaviour :cowboy_websocket | |||
def init(req, state) do | |||
shake = FedSocket.shake() | |||
with true <- Pleroma.Config.get([:fed_sockets, :enabled]), | |||
sec_protocol <- :cowboy_req.header("sec-websocket-protocol", req, nil), | |||
headers = %{"(request-target)" => ^shake} <- :cowboy_req.headers(req), | |||
true <- validate_conn(%{req_headers: headers}), | |||
%{"keyId" => origin} <- split_signature(headers["signature"]) do | |||
req = | |||
if is_nil(sec_protocol) do | |||
req | |||
else | |||
:cowboy_req.set_resp_header("sec-websocket-protocol", sec_protocol, req) | |||
end | |||
{:cowboy_websocket, req, %{origin: origin}, %{}} | |||
else | |||
_ -> | |||
{:ok, req, state} | |||
end | |||
end | |||
def websocket_init(%{origin: origin}) do | |||
case FedRegistry.add_fed_socket(origin) do | |||
{:ok, socket_info} -> | |||
{:ok, socket_info} | |||
e -> | |||
Logger.error("FedSocket websocket_init failed - #{inspect(e)}") | |||
{:error, inspect(e)} | |||
end | |||
end | |||
# Use the ping to check if the connection should be expired | |||
def websocket_handle(:ping, socket_info) do | |||
if SocketInfo.expired?(socket_info) do | |||
{:stop, socket_info} | |||
else | |||
{:ok, socket_info, :hibernate} | |||
end | |||
end | |||
def websocket_handle({:text, data}, socket_info) do | |||
socket_info = SocketInfo.touch(socket_info) | |||
case FedSocket.receive_package(socket_info, data) do | |||
{:noreply, _} -> | |||
{:ok, socket_info} | |||
{:reply, reply} -> | |||
{:reply, {:text, Jason.encode!(reply)}, socket_info} | |||
{:error, reason} -> | |||
Logger.error("incoming error - receive_package: #{inspect(reason)}") | |||
{:ok, socket_info} | |||
end | |||
end | |||
def websocket_info({:send, message}, socket_info) do | |||
socket_info = SocketInfo.touch(socket_info) | |||
{:reply, {:text, message}, socket_info} | |||
end | |||
def websocket_info(:close, state) do | |||
{:stop, state} | |||
end | |||
def websocket_info(message, state) do | |||
Logger.debug("#{__MODULE__} unknown message #{inspect(message)}") | |||
{:ok, state} | |||
end | |||
end |
@@ -0,0 +1,33 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets.IngesterWorker do | |||
use Pleroma.Workers.WorkerHelper, queue: "ingestion_queue" | |||
require Logger | |||
alias Pleroma.Web.Federator | |||
@impl Oban.Worker | |||
def perform(%Job{args: %{"op" => "ingest", "object" => ingestee}}) do | |||
try do | |||
ingestee | |||
|> Jason.decode!() | |||
|> do_ingestion() | |||
rescue | |||
e -> | |||
Logger.error("IngesterWorker error - #{inspect(e)}") | |||
e | |||
end | |||
end | |||
defp do_ingestion(params) do | |||
case Federator.incoming_ap_doc(params) do | |||
{:error, reason} -> | |||
{:error, reason} | |||
{:ok, object} -> | |||
{:ok, object} | |||
end | |||
end | |||
end |
@@ -0,0 +1,146 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets.OutgoingHandler do | |||
use GenServer | |||
require Logger | |||
alias Pleroma.Web.ActivityPub.InternalFetchActor | |||
alias Pleroma.Web.FedSockets | |||
alias Pleroma.Web.FedSockets.FedRegistry | |||
alias Pleroma.Web.FedSockets.FedSocket | |||
alias Pleroma.Web.FedSockets.SocketInfo | |||
def start_link(uri) do | |||
GenServer.start_link(__MODULE__, %{uri: uri}) | |||
end | |||
def init(%{uri: uri}) do | |||
case initiate_connection(uri) do | |||
{:ok, ws_origin, conn_pid} -> | |||
FedRegistry.add_fed_socket(ws_origin, conn_pid) | |||
{:error, reason} -> | |||
Logger.debug("Outgoing connection failed - #{inspect(reason)}") | |||
:ignore | |||
end | |||
end | |||
def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do | |||
socket_info = SocketInfo.touch(socket_info) | |||
case FedSocket.receive_package(socket_info, data) do | |||
{:noreply, _} -> | |||
{:noreply, socket_info} | |||
{:reply, reply} -> | |||
:gun.ws_send(conn_pid, {:text, Jason.encode!(reply)}) | |||
{:noreply, socket_info} | |||
{:error, reason} -> | |||
Logger.error("incoming error - receive_package: #{inspect(reason)}") | |||
{:noreply, socket_info} | |||
end | |||
end | |||
def handle_info(:close, state) do | |||
Logger.debug("Sending close frame !!!!!!!") | |||
{:close, state} | |||
end | |||
def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do | |||
{:stop, :normal, state} | |||
end | |||
def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do | |||
socket_info = SocketInfo.touch(socket_info) | |||
:gun.ws_send(conn_pid, {:text, data}) | |||
{:noreply, socket_info} | |||
end | |||
def handle_info({:gun_ws, _, _, :pong}, state) do | |||
{:noreply, state, :hibernate} | |||
end | |||
def handle_info(msg, state) do | |||
Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}") | |||
{:noreply, state} | |||
end | |||
def terminate(reason, state) do | |||
Logger.debug( | |||
"#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}" | |||
) | |||
{:ok, state} | |||
end | |||
def initiate_connection(uri) do | |||
ws_uri = | |||
uri | |||
|> SocketInfo.origin() | |||
|> FedSockets.uri_for_origin() | |||
%{host: host, port: port, path: path} = URI.parse(ws_uri) | |||
with {:ok, conn_pid} <- :gun.open(to_charlist(host), port), | |||
{:ok, _} <- :gun.await_up(conn_pid), | |||
reference <- :gun.get(conn_pid, to_charlist(path)), | |||
{:response, :fin, 204, _} <- :gun.await(conn_pid, reference), | |||
headers <- build_headers(uri), | |||
ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do | |||
receive do | |||
{:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} -> | |||
{:ok, ws_uri, conn_pid} | |||
after | |||
15_000 -> | |||
Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}") | |||
{:error, :timeout} | |||
end | |||
else | |||
{:response, :nofin, 404, _} -> | |||
{:error, :fedsockets_not_supported} | |||
e -> | |||
Logger.debug("Fedsocket error connecting to #{inspect(uri)}") | |||
{:error, e} | |||
end | |||
end | |||
defp build_headers(uri) do | |||
host_for_sig = uri |> URI.parse() |> host_signature() | |||
shake = FedSocket.shake() | |||
digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64()) | |||
date = Pleroma.Signature.signed_date() | |||
shake_size = byte_size(shake) | |||
signature_opts = %{ | |||
"(request-target)": shake, | |||
"content-length": to_charlist("#{shake_size}"), | |||
date: date, | |||
digest: digest, | |||
host: host_for_sig | |||
} | |||
signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts) | |||
[ | |||
{'signature', to_charlist(signature)}, | |||
{'date', date}, | |||
{'digest', to_charlist(digest)}, | |||
{'content-length', to_charlist("#{shake_size}")}, | |||
{to_charlist("(request-target)"), to_charlist(shake)} | |||
] | |||
end | |||
defp host_signature(%{host: host, scheme: scheme, port: port}) do | |||
if port == URI.default_port(scheme) do | |||
host | |||
else | |||
"#{host}:#{port}" | |||
end | |||
end | |||
end |
@@ -0,0 +1,52 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets.SocketInfo do | |||
defstruct origin: nil, | |||
pid: nil, | |||
conn_pid: nil, | |||
state: :default, | |||
connected_until: nil | |||
alias Pleroma.Web.FedSockets.SocketInfo | |||
@default_connection_duration 15 * 60 * 1000 | |||
def build(uri, conn_pid \\ nil) do | |||
uri | |||
|> build_origin() | |||
|> build_pids(conn_pid) | |||
|> touch() | |||
end | |||
def touch(%SocketInfo{} = socket_info), | |||
do: %{socket_info | connected_until: new_ttl()} | |||
def connect(%SocketInfo{} = socket_info), | |||
do: %{socket_info | state: :connected} | |||
def expired?(%{connected_until: connected_until}), | |||
do: connected_until < :erlang.monotonic_time(:millisecond) | |||
def origin(uri), | |||
do: build_origin(uri).origin | |||
defp build_pids(socket_info, conn_pid), | |||
do: struct(socket_info, pid: self(), conn_pid: conn_pid) | |||
defp build_origin(uri) when is_binary(uri), | |||
do: uri |> URI.parse() |> build_origin | |||
defp build_origin(%{host: host, port: nil, scheme: scheme}), | |||
do: build_origin(%{host: host, port: URI.default_port(scheme)}) | |||
defp build_origin(%{host: host, port: port}), | |||
do: %SocketInfo{origin: "#{host}:#{port}"} | |||
defp new_ttl do | |||
connection_duration = | |||
Pleroma.Config.get([:fed_sockets, :connection_duration], @default_connection_duration) | |||
:erlang.monotonic_time(:millisecond) + connection_duration | |||
end | |||
end |
@@ -0,0 +1,59 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets.Supervisor do | |||
use Supervisor | |||
import Cachex.Spec | |||
def start_link(opts) do | |||
Supervisor.start_link(__MODULE__, opts, name: __MODULE__) | |||
end | |||
def init(args) do | |||
children = [ | |||
build_cache(:fed_socket_fetches, args), | |||
build_cache(:fed_socket_rejections, args), | |||
{Registry, keys: :unique, name: FedSockets.Registry, meta: [rejected: %{}]} | |||
] | |||
opts = [strategy: :one_for_all, name: Pleroma.Web.Streamer.Supervisor] | |||
Supervisor.init(children, opts) | |||
end | |||
defp build_cache(name, args) do | |||
opts = get_opts(name, args) | |||
%{ | |||
id: String.to_atom("#{name}_cache"), | |||
start: {Cachex, :start_link, [name, opts]}, | |||
type: :worker | |||
} | |||
end | |||
defp get_opts(cache_name, args) | |||
when cache_name in [:fed_socket_fetches, :fed_socket_rejections] do | |||
default = get_opts_or_config(args, cache_name, :default, 15_000) | |||
interval = get_opts_or_config(args, cache_name, :interval, 3_000) | |||
lazy = get_opts_or_config(args, cache_name, :lazy, false) | |||
[expiration: expiration(default: default, interval: interval, lazy: lazy)] | |||
end | |||
defp get_opts(name, args) do | |||
Keyword.get(args, name, []) | |||
end | |||
defp get_opts_or_config(args, name, key, default) do | |||
args | |||
|> Keyword.get(name, []) | |||
|> Keyword.get(key) | |||
|> case do | |||
nil -> | |||
Pleroma.Config.get([:fed_sockets, name, key], default) | |||
value -> | |||
value | |||
end | |||
end | |||
end |
@@ -118,5 +118,5 @@ | |||
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"}, | |||
"unsafe": {:hex, :unsafe, "1.0.1", "a27e1874f72ee49312e0a9ec2e0b27924214a05e3ddac90e91727bc76f8613d8", [:mix], [], "hexpm", "6c7729a2d214806450d29766abc2afaa7a2cbecf415be64f36a6691afebb50e5"}, | |||
"web_push_encryption": {:hex, :web_push_encryption, "0.3.0", "598b5135e696fd1404dc8d0d7c0fa2c027244a4e5d5e5a98ba267f14fdeaabc8", [:mix], [{:httpoison, "~> 1.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:jose, "~> 1.8", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "f10bdd1afe527ede694749fb77a2f22f146a51b054c7fa541c9fd920fba7c875"}, | |||
"websocket_client": {:git, "https://github.com/jeremyong/websocket_client.git", "9a6f65d05ebf2725d62fb19262b21f1805a59fbf", []}, | |||
"websocket_client": {:git, "https://github.com/jeremyong/websocket_client.git", "9a6f65d05ebf2725d62fb19262b21f1805a59fbf", []} | |||
} |
@@ -0,0 +1,124 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets.FedRegistryTest do | |||
use ExUnit.Case | |||
alias Pleroma.Web.FedSockets | |||
alias Pleroma.Web.FedSockets.FedRegistry | |||
alias Pleroma.Web.FedSockets.SocketInfo | |||
@good_domain "http://good.domain" | |||
@good_domain_origin "good.domain:80" | |||
setup do | |||
start_supervised({Pleroma.Web.FedSockets.Supervisor, []}) | |||
build_test_socket(@good_domain) | |||
Process.sleep(10) | |||
:ok | |||
end | |||
describe "add_fed_socket/1 without conflicting sockets" do | |||
test "can be added" do | |||
Process.sleep(10) | |||
assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin) | |||
assert origin == "good.domain:80" | |||
end | |||
test "multiple origins can be added" do | |||
build_test_socket("http://anothergood.domain") | |||
Process.sleep(10) | |||
assert {:ok, %SocketInfo{origin: origin_1}} = | |||
FedRegistry.get_fed_socket(@good_domain_origin) | |||
assert {:ok, %SocketInfo{origin: origin_2}} = | |||
FedRegistry.get_fed_socket("anothergood.domain:80") | |||
assert origin_1 == "good.domain:80" | |||
assert origin_2 == "anothergood.domain:80" | |||
assert FedRegistry.list_all() |> Enum.count() == 2 | |||
end | |||
end | |||
describe "add_fed_socket/1 when duplicate sockets conflict" do | |||
setup do | |||
build_test_socket(@good_domain) | |||
build_test_socket(@good_domain) | |||
Process.sleep(10) | |||
:ok | |||
end | |||
test "will be ignored" do | |||
assert {:ok, %SocketInfo{origin: origin, pid: pid_one}} = | |||
FedRegistry.get_fed_socket(@good_domain_origin) | |||
assert origin == "good.domain:80" | |||
assert FedRegistry.list_all() |> Enum.count() == 1 | |||
end | |||
test "the newer process will be closed" do | |||
pid_two = build_test_socket(@good_domain) | |||
assert {:ok, %SocketInfo{origin: origin, pid: pid_one}} = | |||
FedRegistry.get_fed_socket(@good_domain_origin) | |||
assert origin == "good.domain:80" | |||
Process.sleep(10) | |||
refute Process.alive?(pid_two) | |||
assert FedRegistry.list_all() |> Enum.count() == 1 | |||
end | |||
end | |||
describe "get_fed_socket/1" do | |||
test "returns missing for unknown hosts" do | |||
assert {:error, :missing} = FedRegistry.get_fed_socket("not_a_dmoain") | |||
end | |||
test "returns rejected for hosts previously rejected" do | |||
"rejected.domain:80" | |||
|> FedSockets.uri_for_origin() | |||
|> FedRegistry.set_host_rejected() | |||
assert {:error, :rejected} = FedRegistry.get_fed_socket("rejected.domain:80") | |||
end | |||
test "can retrieve a previously added SocketInfo" do | |||
build_test_socket(@good_domain) | |||
Process.sleep(10) | |||
assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin) | |||
assert origin == "good.domain:80" | |||
end | |||
test "removes references to SocketInfos when the process crashes" do | |||
assert {:ok, %SocketInfo{origin: origin, pid: pid}} = | |||
FedRegistry.get_fed_socket(@good_domain_origin) | |||
assert origin == "good.domain:80" | |||
Process.exit(pid, :testing) | |||
Process.sleep(100) | |||
assert {:error, :missing} = FedRegistry.get_fed_socket(@good_domain_origin) | |||
end | |||
end | |||
def build_test_socket(uri) do | |||
Kernel.spawn(fn -> fed_socket_almost(uri) end) | |||
end | |||
def fed_socket_almost(origin) do | |||
FedRegistry.add_fed_socket(origin) | |||
receive do | |||
:close -> | |||
:ok | |||
after | |||
5_000 -> :timeout | |||
end | |||
end | |||
end |
@@ -0,0 +1,67 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets.FetchRegistryTest do | |||
use ExUnit.Case | |||
alias Pleroma.Web.FedSockets.FetchRegistry | |||
alias Pleroma.Web.FedSockets.FetchRegistry.FetchRegistryData | |||
@json_message "hello" | |||
@json_reply "hello back" | |||
setup do | |||
start_supervised( | |||
{Pleroma.Web.FedSockets.Supervisor, | |||
[ | |||
ping_interval: 8, | |||
connection_duration: 15, | |||
rejection_duration: 5, | |||
fed_socket_fetches: [default: 10, interval: 10] | |||
]} | |||
) | |||
:ok | |||
end | |||
test "fetches can be stored" do | |||
uuid = FetchRegistry.register_fetch(@json_message) | |||
assert {:error, :waiting} = FetchRegistry.check_fetch(uuid) | |||
end | |||
test "fetches can return" do | |||
uuid = FetchRegistry.register_fetch(@json_message) | |||
task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end) | |||
assert {:error, :waiting} = FetchRegistry.check_fetch(uuid) | |||
Task.await(task) | |||
assert {:ok, %FetchRegistryData{received_json: received_json}} = | |||
FetchRegistry.check_fetch(uuid) | |||
assert received_json == @json_reply | |||
end | |||
test "fetches are deleted once popped from stack" do | |||
uuid = FetchRegistry.register_fetch(@json_message) | |||
task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end) | |||
Task.await(task) | |||
assert {:ok, %FetchRegistryData{received_json: received_json}} = | |||
FetchRegistry.check_fetch(uuid) | |||
assert received_json == @json_reply | |||
assert {:ok, @json_reply} = FetchRegistry.pop_fetch(uuid) | |||
assert {:error, :missing} = FetchRegistry.check_fetch(uuid) | |||
end | |||
test "fetches can time out" do | |||
uuid = FetchRegistry.register_fetch(@json_message) | |||
assert {:error, :waiting} = FetchRegistry.check_fetch(uuid) | |||
Process.sleep(500) | |||
assert {:error, :missing} = FetchRegistry.check_fetch(uuid) | |||
end | |||
end |
@@ -0,0 +1,118 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.FedSockets.SocketInfoTest do | |||
use ExUnit.Case | |||
alias Pleroma.Web.FedSockets | |||
alias Pleroma.Web.FedSockets.SocketInfo | |||
describe "uri_for_origin" do | |||
test "provides the fed_socket URL given the origin information" do | |||
endpoint = "example.com:4000" | |||
assert FedSockets.uri_for_origin(endpoint) =~ "ws://" | |||
assert FedSockets.uri_for_origin(endpoint) =~ endpoint | |||
end | |||
end | |||
describe "origin" do | |||
test "will provide the origin field given a url" do | |||
endpoint = "example.com:4000" | |||
assert SocketInfo.origin("ws://#{endpoint}") == endpoint | |||
assert SocketInfo.origin("http://#{endpoint}") == endpoint | |||
assert SocketInfo.origin("https://#{endpoint}") == endpoint | |||
end | |||
test "will proide the origin field given a uri" do | |||
endpoint = "example.com:4000" | |||
uri = URI.parse("http://#{endpoint}") | |||
assert SocketInfo.origin(uri) == endpoint | |||
end | |||
end | |||
describe "touch" do | |||
test "will update the TTL" do | |||
endpoint = "example.com:4000" | |||
socket = SocketInfo.build("ws://#{endpoint}") | |||
Process.sleep(2) | |||
touched_socket = SocketInfo.touch(socket) | |||
assert socket.connected_until < touched_socket.connected_until | |||
end | |||
end | |||
describe "expired?" do | |||
setup do | |||
start_supervised( | |||
{Pleroma.Web.FedSockets.Supervisor, | |||
[ | |||
ping_interval: 8, | |||
connection_duration: 5, | |||
rejection_duration: 5, | |||
fed_socket_rejections: [lazy: true] | |||
]} | |||
) | |||
:ok | |||
end | |||
test "tests if the TTL is exceeded" do | |||
endpoint = "example.com:4000" | |||
socket = SocketInfo.build("ws://#{endpoint}") | |||
refute SocketInfo.expired?(socket) | |||
Process.sleep(10) | |||
assert SocketInfo.expired?(socket) | |||
end | |||
end | |||
describe "creating outgoing connection records" do | |||
test "can be passed a string" do | |||
assert %{conn_pid: :pid, origin: _origin} = SocketInfo.build("example.com:4000", :pid) | |||
end | |||
test "can be passed a URI" do | |||
uri = URI.parse("http://example.com:4000") | |||
assert %{conn_pid: :pid, origin: origin} = SocketInfo.build(uri, :pid) | |||
assert origin =~ "example.com:4000" | |||
end | |||
test "will include the port number" do | |||
assert %{conn_pid: :pid, origin: origin} = SocketInfo.build("http://example.com:4000", :pid) | |||
assert origin =~ ":4000" | |||
end | |||
test "will provide the port if missing" do | |||
assert %{conn_pid: :pid, origin: "example.com:80"} = | |||
SocketInfo.build("http://example.com", :pid) | |||
assert %{conn_pid: :pid, origin: "example.com:443"} = | |||
SocketInfo.build("https://example.com", :pid) | |||
end | |||
end | |||
describe "creating incoming connection records" do | |||
test "can be passed a string" do | |||
assert %{pid: _, origin: _origin} = SocketInfo.build("example.com:4000") | |||
end | |||
test "can be passed a URI" do | |||
uri = URI.parse("example.com:4000") | |||
assert %{pid: _, origin: _origin} = SocketInfo.build(uri) | |||
end | |||
test "will include the port number" do | |||
assert %{pid: _, origin: origin} = SocketInfo.build("http://example.com:4000") | |||
assert origin =~ ":4000" | |||
end | |||
test "will provide the port if missing" do | |||
assert %{pid: _, origin: "example.com:80"} = SocketInfo.build("http://example.com") | |||
assert %{pid: _, origin: "example.com:443"} = SocketInfo.build("https://example.com") | |||
end | |||
end | |||
end |