Streamer refactoring See merge request pleroma/pleroma!1653object-id-column
@@ -43,3 +43,7 @@ docs/generated_config.md | |||
# Code test coverage | |||
/cover | |||
/Elixir.*.coverdata | |||
.idea | |||
pleroma.iml | |||
@@ -331,6 +331,10 @@ config :pleroma, :activitypub, | |||
follow_handshake_timeout: 500, | |||
sign_object_fetches: true | |||
config :pleroma, :streamer, | |||
workers: 3, | |||
overflow_workers: 2 | |||
config :pleroma, :user, deny_follow_blocked: true | |||
config :pleroma, :mrf_normalize_markup, scrub_policy: Pleroma.HTML.Scrubber.Default | |||
@@ -0,0 +1,63 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Activity.Ir.Topics do | |||
alias Pleroma.Object | |||
alias Pleroma.Web.ActivityPub.Visibility | |||
def get_activity_topics(activity) do | |||
activity | |||
|> Object.normalize() | |||
|> generate_topics(activity) | |||
|> List.flatten() | |||
end | |||
defp generate_topics(%{data: %{"type" => "Answer"}}, _) do | |||
[] | |||
end | |||
defp generate_topics(object, activity) do | |||
["user", "list"] ++ visibility_tags(object, activity) | |||
end | |||
defp visibility_tags(object, activity) do | |||
case Visibility.get_visibility(activity) do | |||
"public" -> | |||
if activity.local do | |||
["public", "public:local"] | |||
else | |||
["public"] | |||
end | |||
|> item_creation_tags(object, activity) | |||
"direct" -> | |||
["direct"] | |||
_ -> | |||
[] | |||
end | |||
end | |||
defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do | |||
tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity) | |||
end | |||
defp item_creation_tags(tags, _, _) do | |||
tags | |||
end | |||
defp hashtags_to_topics(%{data: %{"tag" => tags}}) do | |||
tags | |||
|> Enum.filter(&is_bitstring(&1)) | |||
|> Enum.map(fn tag -> "hashtag:" <> tag end) | |||
end | |||
defp hashtags_to_topics(_), do: [] | |||
defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: [] | |||
defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"] | |||
defp attachment_topics(_object, _act), do: ["public:media"] | |||
end |
@@ -141,7 +141,7 @@ defmodule Pleroma.Application do | |||
defp streamer_child(:test), do: [] | |||
defp streamer_child(_) do | |||
[Pleroma.Web.Streamer] | |||
[Pleroma.Web.Streamer.supervisor()] | |||
end | |||
defp oauth_cleanup_child(true), | |||
@@ -210,8 +210,10 @@ defmodule Pleroma.Notification do | |||
unless skip?(activity, user) do | |||
notification = %Notification{user_id: user.id, activity: activity} | |||
{:ok, notification} = Repo.insert(notification) | |||
Streamer.stream("user", notification) | |||
Streamer.stream("user:notification", notification) | |||
["user", "user:notification"] | |||
|> Streamer.stream(notification) | |||
Push.send(notification) | |||
notification | |||
end | |||
@@ -4,6 +4,7 @@ | |||
defmodule Pleroma.Web.ActivityPub.ActivityPub do | |||
alias Pleroma.Activity | |||
alias Pleroma.Activity.Ir.Topics | |||
alias Pleroma.Config | |||
alias Pleroma.Conversation | |||
alias Pleroma.Notification | |||
@@ -16,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do | |||
alias Pleroma.User | |||
alias Pleroma.Web.ActivityPub.MRF | |||
alias Pleroma.Web.ActivityPub.Transmogrifier | |||
alias Pleroma.Web.Streamer | |||
alias Pleroma.Web.WebFinger | |||
alias Pleroma.Workers.BackgroundWorker | |||
@@ -187,9 +189,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do | |||
participations | |||
|> Repo.preload(:user) | |||
Enum.each(participations, fn participation -> | |||
Pleroma.Web.Streamer.stream("participation", participation) | |||
end) | |||
Streamer.stream("participation", participations) | |||
end | |||
def stream_out_participations(%Object{data: %{"context" => context}}, user) do | |||
@@ -208,41 +208,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do | |||
def stream_out_participations(_, _), do: :noop | |||
def stream_out(activity) do | |||
if activity.data["type"] in ["Create", "Announce", "Delete"] do | |||
object = Object.normalize(activity) | |||
# Do not stream out poll replies | |||
unless object.data["type"] == "Answer" do | |||
Pleroma.Web.Streamer.stream("user", activity) | |||
Pleroma.Web.Streamer.stream("list", activity) | |||
if get_visibility(activity) == "public" do | |||
Pleroma.Web.Streamer.stream("public", activity) | |||
if activity.local do | |||
Pleroma.Web.Streamer.stream("public:local", activity) | |||
end | |||
if activity.data["type"] in ["Create"] do | |||
object.data | |||
|> Map.get("tag", []) | |||
|> Enum.filter(fn tag -> is_bitstring(tag) end) | |||
|> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) | |||
if object.data["attachment"] != [] do | |||
Pleroma.Web.Streamer.stream("public:media", activity) | |||
if activity.local do | |||
Pleroma.Web.Streamer.stream("public:local:media", activity) | |||
end | |||
end | |||
end | |||
else | |||
if get_visibility(activity) == "direct", | |||
do: Pleroma.Web.Streamer.stream("direct", activity) | |||
end | |||
end | |||
end | |||
def stream_out(%Activity{data: %{"type" => data_type}} = activity) | |||
when data_type in ["Create", "Announce", "Delete"] do | |||
activity | |||
|> Topics.get_activity_topics() | |||
|> Streamer.stream(activity) | |||
end | |||
def stream_out(_activity) do | |||
:noop | |||
end | |||
def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do | |||
@@ -8,6 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do | |||
alias Pleroma.Repo | |||
alias Pleroma.User | |||
alias Pleroma.Web.OAuth.Token | |||
alias Pleroma.Web.Streamer | |||
@behaviour :cowboy_websocket | |||
@@ -24,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do | |||
] | |||
@anonymous_streams ["public", "public:local", "hashtag"] | |||
# Handled by periodic keepalive in Pleroma.Web.Streamer. | |||
# Handled by periodic keepalive in Pleroma.Web.Streamer.Ping. | |||
@timeout :infinity | |||
def init(%{qs: qs} = req, state) do | |||
@@ -65,7 +66,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do | |||
}, topic #{state.topic}" | |||
) | |||
Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state)) | |||
Streamer.add_socket(state.topic, streamer_socket(state)) | |||
{:ok, state} | |||
end | |||
@@ -80,7 +81,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do | |||
}, topic #{state.topic || "?"}: #{inspect(reason)}" | |||
) | |||
Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state)) | |||
Streamer.remove_socket(state.topic, streamer_socket(state)) | |||
:ok | |||
end | |||
@@ -1,318 +0,0 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.Streamer do | |||
use GenServer | |||
require Logger | |||
alias Pleroma.Activity | |||
alias Pleroma.Config | |||
alias Pleroma.Conversation.Participation | |||
alias Pleroma.Notification | |||
alias Pleroma.Object | |||
alias Pleroma.User | |||
alias Pleroma.Web.ActivityPub.ActivityPub | |||
alias Pleroma.Web.ActivityPub.Visibility | |||
alias Pleroma.Web.CommonAPI | |||
alias Pleroma.Web.MastodonAPI.NotificationView | |||
@keepalive_interval :timer.seconds(30) | |||
def start_link(_) do | |||
GenServer.start_link(__MODULE__, %{}, name: __MODULE__) | |||
end | |||
def add_socket(topic, socket) do | |||
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) | |||
end | |||
def remove_socket(topic, socket) do | |||
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic}) | |||
end | |||
def stream(topic, item) do | |||
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) | |||
end | |||
def init(args) do | |||
Process.send_after(self(), %{action: :ping}, @keepalive_interval) | |||
{:ok, args} | |||
end | |||
def handle_info(%{action: :ping}, topics) do | |||
topics | |||
|> Map.values() | |||
|> List.flatten() | |||
|> Enum.each(fn socket -> | |||
Logger.debug("Sending keepalive ping") | |||
send(socket.transport_pid, {:text, ""}) | |||
end) | |||
Process.send_after(self(), %{action: :ping}, @keepalive_interval) | |||
{:noreply, topics} | |||
end | |||
def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do | |||
recipient_topics = | |||
User.get_recipients_from_activity(item) | |||
|> Enum.map(fn %{id: id} -> "direct:#{id}" end) | |||
Enum.each(recipient_topics || [], fn user_topic -> | |||
Logger.debug("Trying to push direct message to #{user_topic}\n\n") | |||
push_to_socket(topics, user_topic, item) | |||
end) | |||
{:noreply, topics} | |||
end | |||
def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do | |||
user_topic = "direct:#{participation.user_id}" | |||
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") | |||
push_to_socket(topics, user_topic, participation) | |||
{:noreply, topics} | |||
end | |||
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do | |||
# filter the recipient list if the activity is not public, see #270. | |||
recipient_lists = | |||
case Visibility.is_public?(item) do | |||
true -> | |||
Pleroma.List.get_lists_from_activity(item) | |||
_ -> | |||
Pleroma.List.get_lists_from_activity(item) | |||
|> Enum.filter(fn list -> | |||
owner = User.get_cached_by_id(list.user_id) | |||
Visibility.visible_for_user?(item, owner) | |||
end) | |||
end | |||
recipient_topics = | |||
recipient_lists | |||
|> Enum.map(fn %{id: id} -> "list:#{id}" end) | |||
Enum.each(recipient_topics || [], fn list_topic -> | |||
Logger.debug("Trying to push message to #{list_topic}\n\n") | |||
push_to_socket(topics, list_topic, item) | |||
end) | |||
{:noreply, topics} | |||
end | |||
def handle_cast( | |||
%{action: :stream, topic: topic, item: %Notification{} = item}, | |||
topics | |||
) | |||
when topic in ["user", "user:notification"] do | |||
topics | |||
|> Map.get("#{topic}:#{item.user_id}", []) | |||
|> Enum.each(fn socket -> | |||
with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id), | |||
true <- should_send?(user, item) do | |||
send( | |||
socket.transport_pid, | |||
{:text, represent_notification(socket.assigns[:user], item)} | |||
) | |||
end | |||
end) | |||
{:noreply, topics} | |||
end | |||
def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do | |||
Logger.debug("Trying to push to users") | |||
recipient_topics = | |||
User.get_recipients_from_activity(item) | |||
|> Enum.map(fn %{id: id} -> "user:#{id}" end) | |||
Enum.each(recipient_topics, fn topic -> | |||
push_to_socket(topics, topic, item) | |||
end) | |||
{:noreply, topics} | |||
end | |||
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do | |||
Logger.debug("Trying to push to #{topic}") | |||
Logger.debug("Pushing item to #{topic}") | |||
push_to_socket(topics, topic, item) | |||
{:noreply, topics} | |||
end | |||
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do | |||
topic = internal_topic(topic, socket) | |||
sockets_for_topic = sockets[topic] || [] | |||
sockets_for_topic = Enum.uniq([socket | sockets_for_topic]) | |||
sockets = Map.put(sockets, topic, sockets_for_topic) | |||
Logger.debug("Got new conn for #{topic}") | |||
{:noreply, sockets} | |||
end | |||
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do | |||
topic = internal_topic(topic, socket) | |||
sockets_for_topic = sockets[topic] || [] | |||
sockets_for_topic = List.delete(sockets_for_topic, socket) | |||
sockets = Map.put(sockets, topic, sockets_for_topic) | |||
Logger.debug("Removed conn for #{topic}") | |||
{:noreply, sockets} | |||
end | |||
def handle_cast(m, state) do | |||
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}") | |||
{:noreply, state} | |||
end | |||
defp represent_update(%Activity{} = activity, %User{} = user) do | |||
%{ | |||
event: "update", | |||
payload: | |||
Pleroma.Web.MastodonAPI.StatusView.render( | |||
"status.json", | |||
activity: activity, | |||
for: user | |||
) | |||
|> Jason.encode!() | |||
} | |||
|> Jason.encode!() | |||
end | |||
defp represent_update(%Activity{} = activity) do | |||
%{ | |||
event: "update", | |||
payload: | |||
Pleroma.Web.MastodonAPI.StatusView.render( | |||
"status.json", | |||
activity: activity | |||
) | |||
|> Jason.encode!() | |||
} | |||
|> Jason.encode!() | |||
end | |||
def represent_conversation(%Participation{} = participation) do | |||
%{ | |||
event: "conversation", | |||
payload: | |||
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ | |||
participation: participation, | |||
for: participation.user | |||
}) | |||
|> Jason.encode!() | |||
} | |||
|> Jason.encode!() | |||
end | |||
@spec represent_notification(User.t(), Notification.t()) :: binary() | |||
defp represent_notification(%User{} = user, %Notification{} = notify) do | |||
%{ | |||
event: "notification", | |||
payload: | |||
NotificationView.render( | |||
"show.json", | |||
%{notification: notify, for: user} | |||
) | |||
|> Jason.encode!() | |||
} | |||
|> Jason.encode!() | |||
end | |||
defp should_send?(%User{} = user, %Activity{} = item) do | |||
blocks = user.info.blocks || [] | |||
mutes = user.info.mutes || [] | |||
reblog_mutes = user.info.muted_reblogs || [] | |||
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) | |||
with parent when not is_nil(parent) <- Object.normalize(item), | |||
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), | |||
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), | |||
%{host: item_host} <- URI.parse(item.actor), | |||
%{host: parent_host} <- URI.parse(parent.data["actor"]), | |||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), | |||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), | |||
true <- thread_containment(item, user), | |||
false <- CommonAPI.thread_muted?(user, item) do | |||
true | |||
else | |||
_ -> false | |||
end | |||
end | |||
defp should_send?(%User{} = user, %Notification{activity: activity}) do | |||
should_send?(user, activity) | |||
end | |||
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do | |||
Enum.each(topics[topic] || [], fn socket -> | |||
# Get the current user so we have up-to-date blocks etc. | |||
if socket.assigns[:user] do | |||
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) | |||
if should_send?(user, item) do | |||
send(socket.transport_pid, {:text, represent_update(item, user)}) | |||
end | |||
else | |||
send(socket.transport_pid, {:text, represent_update(item)}) | |||
end | |||
end) | |||
end | |||
def push_to_socket(topics, topic, %Participation{} = participation) do | |||
Enum.each(topics[topic] || [], fn socket -> | |||
send(socket.transport_pid, {:text, represent_conversation(participation)}) | |||
end) | |||
end | |||
def push_to_socket(topics, topic, %Activity{ | |||
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} | |||
}) do | |||
Enum.each(topics[topic] || [], fn socket -> | |||
send( | |||
socket.transport_pid, | |||
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} | |||
) | |||
end) | |||
end | |||
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop | |||
def push_to_socket(topics, topic, item) do | |||
Enum.each(topics[topic] || [], fn socket -> | |||
# Get the current user so we have up-to-date blocks etc. | |||
if socket.assigns[:user] do | |||
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) | |||
blocks = user.info.blocks || [] | |||
mutes = user.info.mutes || [] | |||
with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), | |||
true <- thread_containment(item, user) do | |||
send(socket.transport_pid, {:text, represent_update(item, user)}) | |||
end | |||
else | |||
send(socket.transport_pid, {:text, represent_update(item)}) | |||
end | |||
end) | |||
end | |||
defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do | |||
"#{topic}:#{socket.assigns[:user].id}" | |||
end | |||
defp internal_topic(topic, _), do: topic | |||
@spec thread_containment(Activity.t(), User.t()) :: boolean() | |||
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true | |||
defp thread_containment(activity, user) do | |||
if Config.get([:instance, :skip_thread_containment]) do | |||
true | |||
else | |||
ActivityPub.contain_activity(activity, user) | |||
end | |||
end | |||
end |
@@ -0,0 +1,33 @@ | |||
defmodule Pleroma.Web.Streamer.Ping do | |||
use GenServer | |||
require Logger | |||
alias Pleroma.Web.Streamer.State | |||
alias Pleroma.Web.Streamer.StreamerSocket | |||
@keepalive_interval :timer.seconds(30) | |||
def start_link(opts) do | |||
ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval) | |||
GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__) | |||
end | |||
def init(%{ping_interval: ping_interval} = args) do | |||
Process.send_after(self(), :ping, ping_interval) | |||
{:ok, args} | |||
end | |||
def handle_info(:ping, %{ping_interval: ping_interval} = state) do | |||
State.get_sockets() | |||
|> Map.values() | |||
|> List.flatten() | |||
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} -> | |||
Logger.debug("Sending keepalive ping") | |||
send(transport_pid, {:text, ""}) | |||
end) | |||
Process.send_after(self(), :ping, ping_interval) | |||
{:noreply, state} | |||
end | |||
end |
@@ -0,0 +1,68 @@ | |||
defmodule Pleroma.Web.Streamer.State do | |||
use GenServer | |||
require Logger | |||
alias Pleroma.Web.Streamer.StreamerSocket | |||
def start_link(_) do | |||
GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) | |||
end | |||
def add_socket(topic, socket) do | |||
GenServer.call(__MODULE__, {:add, socket, topic}) | |||
end | |||
def remove_socket(topic, socket) do | |||
GenServer.call(__MODULE__, {:remove, socket, topic}) | |||
end | |||
def get_sockets do | |||
%{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state) | |||
stream_sockets | |||
end | |||
def init(init_arg) do | |||
{:ok, init_arg} | |||
end | |||
def handle_call(:get_state, _from, state) do | |||
{:reply, state, state} | |||
end | |||
def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do | |||
internal_topic = internal_topic(topic, socket) | |||
stream_socket = StreamerSocket.from_socket(socket) | |||
sockets_for_topic = | |||
sockets | |||
|> Map.get(internal_topic, []) | |||
|> List.insert_at(0, stream_socket) | |||
|> Enum.uniq() | |||
state = put_in(state, [:sockets, internal_topic], sockets_for_topic) | |||
Logger.debug("Got new conn for #{topic}") | |||
{:reply, state, state} | |||
end | |||
def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do | |||
internal_topic = internal_topic(topic, socket) | |||
stream_socket = StreamerSocket.from_socket(socket) | |||
sockets_for_topic = | |||
sockets | |||
|> Map.get(internal_topic, []) | |||
|> List.delete(stream_socket) | |||
state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) | |||
{:reply, state, state} | |||
end | |||
defp internal_topic(topic, socket) | |||
when topic in ~w[user user:notification direct] do | |||
"#{topic}:#{socket.assigns[:user].id}" | |||
end | |||
defp internal_topic(topic, _) do | |||
topic | |||
end | |||
end |
@@ -0,0 +1,55 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.Streamer do | |||
alias Pleroma.Web.Streamer.State | |||
alias Pleroma.Web.Streamer.Worker | |||
@timeout 60_000 | |||
@mix_env Mix.env() | |||
def add_socket(topic, socket) do | |||
State.add_socket(topic, socket) | |||
end | |||
def remove_socket(topic, socket) do | |||
State.remove_socket(topic, socket) | |||
end | |||
def get_sockets do | |||
State.get_sockets() | |||
end | |||
def stream(topics, items) do | |||
if should_send?() do | |||
Task.async(fn -> | |||
:poolboy.transaction( | |||
:streamer_worker, | |||
&Worker.stream(&1, topics, items), | |||
@timeout | |||
) | |||
end) | |||
end | |||
end | |||
def supervisor, do: Pleroma.Web.Streamer.Supervisor | |||
defp should_send? do | |||
handle_should_send(@mix_env) | |||
end | |||
defp handle_should_send(:test) do | |||
case Process.whereis(:streamer_worker) do | |||
nil -> | |||
false | |||
pid -> | |||
Process.alive?(pid) | |||
end | |||
end | |||
defp handle_should_send(_) do | |||
true | |||
end | |||
end |
@@ -0,0 +1,31 @@ | |||
defmodule Pleroma.Web.Streamer.StreamerSocket do | |||
defstruct transport_pid: nil, user: nil | |||
alias Pleroma.User | |||
alias Pleroma.Web.Streamer.StreamerSocket | |||
def from_socket(%{ | |||
transport_pid: transport_pid, | |||
assigns: %{user: nil} | |||
}) do | |||
%StreamerSocket{ | |||
transport_pid: transport_pid | |||
} | |||
end | |||
def from_socket(%{ | |||
transport_pid: transport_pid, | |||
assigns: %{user: %User{} = user} | |||
}) do | |||
%StreamerSocket{ | |||
transport_pid: transport_pid, | |||
user: user | |||
} | |||
end | |||
def from_socket(%{transport_pid: transport_pid}) do | |||
%StreamerSocket{ | |||
transport_pid: transport_pid | |||
} | |||
end | |||
end |
@@ -0,0 +1,33 @@ | |||
defmodule Pleroma.Web.Streamer.Supervisor do | |||
use Supervisor | |||
def start_link(opts) do | |||
Supervisor.start_link(__MODULE__, opts, name: __MODULE__) | |||
end | |||
def init(args) do | |||
children = [ | |||
{Pleroma.Web.Streamer.State, args}, | |||
{Pleroma.Web.Streamer.Ping, args}, | |||
:poolboy.child_spec(:streamer_worker, poolboy_config()) | |||
] | |||
opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor] | |||
Supervisor.init(children, opts) | |||
end | |||
defp poolboy_config do | |||
opts = | |||
Pleroma.Config.get(:streamer, | |||
workers: 3, | |||
overflow_workers: 2 | |||
) | |||
[ | |||
{:name, {:local, :streamer_worker}}, | |||
{:worker_module, Pleroma.Web.Streamer.Worker}, | |||
{:size, opts[:workers]}, | |||
{:max_overflow, opts[:overflow_workers]} | |||
] | |||
end | |||
end |
@@ -0,0 +1,220 @@ | |||
defmodule Pleroma.Web.Streamer.Worker do | |||
use GenServer | |||
require Logger | |||
alias Pleroma.Activity | |||
alias Pleroma.Config | |||
alias Pleroma.Conversation.Participation | |||
alias Pleroma.Notification | |||
alias Pleroma.Object | |||
alias Pleroma.User | |||
alias Pleroma.Web.ActivityPub.ActivityPub | |||
alias Pleroma.Web.ActivityPub.Visibility | |||
alias Pleroma.Web.CommonAPI | |||
alias Pleroma.Web.Streamer.State | |||
alias Pleroma.Web.Streamer.StreamerSocket | |||
alias Pleroma.Web.StreamerView | |||
def start_link(_) do | |||
GenServer.start_link(__MODULE__, %{}, []) | |||
end | |||
def init(init_arg) do | |||
{:ok, init_arg} | |||
end | |||
def stream(pid, topics, items) do | |||
GenServer.call(pid, {:stream, topics, items}) | |||
end | |||
def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do | |||
Enum.each(topics, fn t -> | |||
do_stream(%{topic: t, item: item}) | |||
end) | |||
{:reply, state, state} | |||
end | |||
def handle_call({:stream, topic, items}, _from, state) when is_list(items) do | |||
Enum.each(items, fn i -> | |||
do_stream(%{topic: topic, item: i}) | |||
end) | |||
{:reply, state, state} | |||
end | |||
def handle_call({:stream, topic, item}, _from, state) do | |||
do_stream(%{topic: topic, item: item}) | |||
{:reply, state, state} | |||
end | |||
defp do_stream(%{topic: "direct", item: item}) do | |||
recipient_topics = | |||
User.get_recipients_from_activity(item) | |||
|> Enum.map(fn %{id: id} -> "direct:#{id}" end) | |||
Enum.each(recipient_topics, fn user_topic -> | |||
Logger.debug("Trying to push direct message to #{user_topic}\n\n") | |||
push_to_socket(State.get_sockets(), user_topic, item) | |||
end) | |||
end | |||
defp do_stream(%{topic: "participation", item: participation}) do | |||
user_topic = "direct:#{participation.user_id}" | |||
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") | |||
push_to_socket(State.get_sockets(), user_topic, participation) | |||
end | |||
defp do_stream(%{topic: "list", item: item}) do | |||
# filter the recipient list if the activity is not public, see #270. | |||
recipient_lists = | |||
case Visibility.is_public?(item) do | |||
true -> | |||
Pleroma.List.get_lists_from_activity(item) | |||
_ -> | |||
Pleroma.List.get_lists_from_activity(item) | |||
|> Enum.filter(fn list -> | |||
owner = User.get_cached_by_id(list.user_id) | |||
Visibility.visible_for_user?(item, owner) | |||
end) | |||
end | |||
recipient_topics = | |||
recipient_lists | |||
|> Enum.map(fn %{id: id} -> "list:#{id}" end) | |||
Enum.each(recipient_topics, fn list_topic -> | |||
Logger.debug("Trying to push message to #{list_topic}\n\n") | |||
push_to_socket(State.get_sockets(), list_topic, item) | |||
end) | |||
end | |||
defp do_stream(%{topic: topic, item: %Notification{} = item}) | |||
when topic in ["user", "user:notification"] do | |||
State.get_sockets() | |||
|> Map.get("#{topic}:#{item.user_id}", []) | |||
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> | |||
with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), | |||
true <- should_send?(user, item) do | |||
send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)}) | |||
end | |||
end) | |||
end | |||
defp do_stream(%{topic: "user", item: item}) do | |||
Logger.debug("Trying to push to users") | |||
recipient_topics = | |||
User.get_recipients_from_activity(item) | |||
|> Enum.map(fn %{id: id} -> "user:#{id}" end) | |||
Enum.each(recipient_topics, fn topic -> | |||
push_to_socket(State.get_sockets(), topic, item) | |||
end) | |||
end | |||
defp do_stream(%{topic: topic, item: item}) do | |||
Logger.debug("Trying to push to #{topic}") | |||
Logger.debug("Pushing item to #{topic}") | |||
push_to_socket(State.get_sockets(), topic, item) | |||
end | |||
defp should_send?(%User{} = user, %Activity{} = item) do | |||
blocks = user.info.blocks || [] | |||
mutes = user.info.mutes || [] | |||
reblog_mutes = user.info.muted_reblogs || [] | |||
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) | |||
with parent when not is_nil(parent) <- Object.normalize(item), | |||
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), | |||
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), | |||
%{host: item_host} <- URI.parse(item.actor), | |||
%{host: parent_host} <- URI.parse(parent.data["actor"]), | |||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), | |||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), | |||
true <- thread_containment(item, user), | |||
false <- CommonAPI.thread_muted?(user, item) do | |||
true | |||
else | |||
_ -> false | |||
end | |||
end | |||
defp should_send?(%User{} = user, %Notification{activity: activity}) do | |||
should_send?(user, activity) | |||
end | |||
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do | |||
Enum.each(topics[topic] || [], fn %StreamerSocket{ | |||
transport_pid: transport_pid, | |||
user: socket_user | |||
} -> | |||
# Get the current user so we have up-to-date blocks etc. | |||
if socket_user do | |||
user = User.get_cached_by_ap_id(socket_user.ap_id) | |||
if should_send?(user, item) do | |||
send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) | |||
end | |||
else | |||
send(transport_pid, {:text, StreamerView.render("update.json", item)}) | |||
end | |||
end) | |||
end | |||
def push_to_socket(topics, topic, %Participation{} = participation) do | |||
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> | |||
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)}) | |||
end) | |||
end | |||
def push_to_socket(topics, topic, %Activity{ | |||
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} | |||
}) do | |||
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> | |||
send( | |||
transport_pid, | |||
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} | |||
) | |||
end) | |||
end | |||
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop | |||
def push_to_socket(topics, topic, item) do | |||
Enum.each(topics[topic] || [], fn %StreamerSocket{ | |||
transport_pid: transport_pid, | |||
user: socket_user | |||
} -> | |||
# Get the current user so we have up-to-date blocks etc. | |||
if socket_user do | |||
user = User.get_cached_by_ap_id(socket_user.ap_id) | |||
blocks = user.info.blocks || [] | |||
mutes = user.info.mutes || [] | |||
with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), | |||
true <- thread_containment(item, user) do | |||
send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) | |||
end | |||
else | |||
send(transport_pid, {:text, StreamerView.render("update.json", item)}) | |||
end | |||
end) | |||
end | |||
@spec thread_containment(Activity.t(), User.t()) :: boolean() | |||
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true | |||
defp thread_containment(activity, user) do | |||
if Config.get([:instance, :skip_thread_containment]) do | |||
true | |||
else | |||
ActivityPub.contain_activity(activity, user) | |||
end | |||
end | |||
end |
@@ -0,0 +1,66 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.StreamerView do | |||
use Pleroma.Web, :view | |||
alias Pleroma.Activity | |||
alias Pleroma.Conversation.Participation | |||
alias Pleroma.Notification | |||
alias Pleroma.User | |||
alias Pleroma.Web.MastodonAPI.NotificationView | |||
def render("update.json", %Activity{} = activity, %User{} = user) do | |||
%{ | |||
event: "update", | |||
payload: | |||
Pleroma.Web.MastodonAPI.StatusView.render( | |||
"status.json", | |||
activity: activity, | |||
for: user | |||
) | |||
|> Jason.encode!() | |||
} | |||
|> Jason.encode!() | |||
end | |||
def render("notification.json", %User{} = user, %Notification{} = notify) do | |||
%{ | |||
event: "notification", | |||
payload: | |||
NotificationView.render( | |||
"show.json", | |||
%{notification: notify, for: user} | |||
) | |||
|> Jason.encode!() | |||
} | |||
|> Jason.encode!() | |||
end | |||
def render("update.json", %Activity{} = activity) do | |||
%{ | |||
event: "update", | |||
payload: | |||
Pleroma.Web.MastodonAPI.StatusView.render( | |||
"status.json", | |||
activity: activity | |||
) | |||
|> Jason.encode!() | |||
} | |||
|> Jason.encode!() | |||
end | |||
def render("conversation.json", %Participation{} = participation) do | |||
%{ | |||
event: "conversation", | |||
payload: | |||
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ | |||
participation: participation, | |||
for: participation.user | |||
}) | |||
|> Jason.encode!() | |||
} | |||
|> Jason.encode!() | |||
end | |||
end |
@@ -144,6 +144,7 @@ defmodule Pleroma.Mixfile do | |||
git: "https://git.pleroma.social/pleroma/http_signatures.git", | |||
ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"}, | |||
{:telemetry, "~> 0.3"}, | |||
{:poolboy, "~> 1.5"}, | |||
{:prometheus_ex, "~> 3.0"}, | |||
{:prometheus_plugs, "~> 1.1"}, | |||
{:prometheus_phoenix, "~> 1.3"}, | |||
@@ -73,6 +73,7 @@ | |||
"plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"}, | |||
"plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, | |||
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, | |||
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"}, | |||
"postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"}, | |||
"prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"}, | |||
"prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"}, | |||
@@ -0,0 +1,141 @@ | |||
defmodule Pleroma.Activity.Ir.TopicsTest do | |||
use Pleroma.DataCase | |||
alias Pleroma.Activity | |||
alias Pleroma.Activity.Ir.Topics | |||
alias Pleroma.Object | |||
require Pleroma.Constants | |||
describe "poll answer" do | |||
test "produce no topics" do | |||
activity = %Activity{object: %Object{data: %{"type" => "Answer"}}} | |||
assert [] == Topics.get_activity_topics(activity) | |||
end | |||
end | |||
describe "non poll answer" do | |||
test "always add user and list topics" do | |||
activity = %Activity{object: %Object{data: %{"type" => "FooBar"}}} | |||
topics = Topics.get_activity_topics(activity) | |||
assert Enum.member?(topics, "user") | |||
assert Enum.member?(topics, "list") | |||
end | |||
end | |||
describe "public visibility" do | |||
setup do | |||
activity = %Activity{ | |||
object: %Object{data: %{"type" => "Note"}}, | |||
data: %{"to" => [Pleroma.Constants.as_public()]} | |||
} | |||
{:ok, activity: activity} | |||
end | |||
test "produces public topic", %{activity: activity} do | |||
topics = Topics.get_activity_topics(activity) | |||
assert Enum.member?(topics, "public") | |||
end | |||
test "local action produces public:local topic", %{activity: activity} do | |||
activity = %{activity | local: true} | |||
topics = Topics.get_activity_topics(activity) | |||
assert Enum.member?(topics, "public:local") | |||
end | |||
test "non-local action does not produce public:local topic", %{activity: activity} do | |||
activity = %{activity | local: false} | |||
topics = Topics.get_activity_topics(activity) | |||
refute Enum.member?(topics, "public:local") | |||
end | |||
end | |||
describe "public visibility create events" do | |||
setup do | |||
activity = %Activity{ | |||
object: %Object{data: %{"type" => "Create", "attachment" => []}}, | |||
data: %{"to" => [Pleroma.Constants.as_public()]} | |||
} | |||
{:ok, activity: activity} | |||
end | |||
test "with no attachments doesn't produce public:media topics", %{activity: activity} do | |||
topics = Topics.get_activity_topics(activity) | |||
refute Enum.member?(topics, "public:media") | |||
refute Enum.member?(topics, "public:local:media") | |||
end | |||
test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do | |||
tagged_data = Map.put(data, "tag", ["foo", "bar"]) | |||
activity = %{activity | object: %{object | data: tagged_data}} | |||
topics = Topics.get_activity_topics(activity) | |||
assert Enum.member?(topics, "hashtag:foo") | |||
assert Enum.member?(topics, "hashtag:bar") | |||
end | |||
test "only converts strinngs to hash tags", %{ | |||
activity: %{object: %{data: data} = object} = activity | |||
} do | |||
tagged_data = Map.put(data, "tag", [2]) | |||
activity = %{activity | object: %{object | data: tagged_data}} | |||
topics = Topics.get_activity_topics(activity) | |||
refute Enum.member?(topics, "hashtag:2") | |||
end | |||
end | |||
describe "public visibility create events with attachments" do | |||
setup do | |||
activity = %Activity{ | |||
object: %Object{data: %{"type" => "Create", "attachment" => ["foo"]}}, | |||
data: %{"to" => [Pleroma.Constants.as_public()]} | |||
} | |||
{:ok, activity: activity} | |||
end | |||
test "produce public:media topics", %{activity: activity} do | |||
topics = Topics.get_activity_topics(activity) | |||
assert Enum.member?(topics, "public:media") | |||
end | |||
test "local produces public:local:media topics", %{activity: activity} do | |||
topics = Topics.get_activity_topics(activity) | |||
assert Enum.member?(topics, "public:local:media") | |||
end | |||
test "non-local doesn't produce public:local:media topics", %{activity: activity} do | |||
activity = %{activity | local: false} | |||
topics = Topics.get_activity_topics(activity) | |||
refute Enum.member?(topics, "public:local:media") | |||
end | |||
end | |||
describe "non-public visibility" do | |||
test "produces direct topic" do | |||
activity = %Activity{object: %Object{data: %{"type" => "Note"}}, data: %{"to" => []}} | |||
topics = Topics.get_activity_topics(activity) | |||
assert Enum.member?(topics, "direct") | |||
refute Enum.member?(topics, "public") | |||
refute Enum.member?(topics, "public:local") | |||
refute Enum.member?(topics, "public:media") | |||
refute Enum.member?(topics, "public:local:media") | |||
end | |||
end | |||
end |
@@ -11,7 +11,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do | |||
alias Pleroma.Integration.WebsocketClient | |||
alias Pleroma.Web.CommonAPI | |||
alias Pleroma.Web.OAuth | |||
alias Pleroma.Web.Streamer | |||
@path Pleroma.Web.Endpoint.url() | |||
|> URI.parse() | |||
@@ -19,16 +18,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do | |||
|> Map.put(:path, "/api/v1/streaming") | |||
|> URI.to_string() | |||
setup do | |||
GenServer.start(Streamer, %{}, name: Streamer) | |||
on_exit(fn -> | |||
if pid = Process.whereis(Streamer) do | |||
Process.exit(pid, :kill) | |||
end | |||
end) | |||
end | |||
def start_socket(qs \\ nil, headers \\ []) do | |||
path = | |||
case qs do | |||
@@ -53,12 +42,14 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do | |||
end) | |||
end | |||
@tag needs_streamer: true | |||
test "allows public streams without authentication" do | |||
assert {:ok, _} = start_socket("?stream=public") | |||
assert {:ok, _} = start_socket("?stream=public:local") | |||
assert {:ok, _} = start_socket("?stream=hashtag&tag=lain") | |||
end | |||
@tag needs_streamer: true | |||
test "receives well formatted events" do | |||
user = insert(:user) | |||
{:ok, _} = start_socket("?stream=public") | |||
@@ -103,6 +94,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do | |||
assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}") | |||
end | |||
@tag needs_streamer: true | |||
test "accepts the 'user' stream", %{token: token} = _state do | |||
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") | |||
@@ -111,6 +103,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do | |||
end) =~ ":badarg" | |||
end | |||
@tag needs_streamer: true | |||
test "accepts the 'user:notification' stream", %{token: token} = _state do | |||
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") | |||
@@ -119,6 +112,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do | |||
end) =~ ":badarg" | |||
end | |||
@tag needs_streamer: true | |||
test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do | |||
assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}]) | |||
@@ -69,16 +69,7 @@ defmodule Pleroma.NotificationTest do | |||
end | |||
describe "create_notification" do | |||
setup do | |||
GenServer.start(Streamer, %{}, name: Streamer) | |||
on_exit(fn -> | |||
if pid = Process.whereis(Streamer) do | |||
Process.exit(pid, :kill) | |||
end | |||
end) | |||
end | |||
@tag needs_streamer: true | |||
test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do | |||
user = insert(:user) | |||
task = Task.async(fn -> assert_receive {:text, _}, 4_000 end) | |||
@@ -40,6 +40,10 @@ defmodule Pleroma.Web.ConnCase do | |||
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()}) | |||
end | |||
if tags[:needs_streamer] do | |||
start_supervised(Pleroma.Web.Streamer.supervisor()) | |||
end | |||
{:ok, conn: Phoenix.ConnTest.build_conn()} | |||
end | |||
end |
@@ -39,6 +39,10 @@ defmodule Pleroma.DataCase do | |||
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()}) | |||
end | |||
if tags[:needs_streamer] do | |||
start_supervised(Pleroma.Web.Streamer.supervisor()) | |||
end | |||
:ok | |||
end | |||
@@ -38,9 +38,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do | |||
stream: fn _, _ -> nil end do | |||
ActivityPub.stream_out_participations(conversation.participations) | |||
Enum.each(participations, fn participation -> | |||
assert called(Pleroma.Web.Streamer.stream("participation", participation)) | |||
end) | |||
assert called(Pleroma.Web.Streamer.stream("participation", participations)) | |||
end | |||
end | |||
end | |||
@@ -0,0 +1,36 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.PingTest do | |||
use Pleroma.DataCase | |||
import Pleroma.Factory | |||
alias Pleroma.Web.Streamer | |||
setup do | |||
start_supervised({Streamer.supervisor(), [ping_interval: 30]}) | |||
:ok | |||
end | |||
describe "sockets" do | |||
setup do | |||
user = insert(:user) | |||
{:ok, %{user: user}} | |||
end | |||
test "it sends pings", %{user: user} do | |||
task = | |||
Task.async(fn -> | |||
assert_receive {:text, received_event}, 40 | |||
assert_receive {:text, received_event}, 40 | |||
assert_receive {:text, received_event}, 40 | |||
end) | |||
Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}}) | |||
Task.await(task) | |||
end | |||
end | |||
end |
@@ -0,0 +1,54 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.StateTest do | |||
use Pleroma.DataCase | |||
import Pleroma.Factory | |||
alias Pleroma.Web.Streamer | |||
alias Pleroma.Web.Streamer.StreamerSocket | |||
@moduletag needs_streamer: true | |||
describe "sockets" do | |||
setup do | |||
user = insert(:user) | |||
user2 = insert(:user) | |||
{:ok, %{user: user, user2: user2}} | |||
end | |||
test "it can add a socket", %{user: user} do | |||
Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}}) | |||
assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets()) | |||
end | |||
test "it can add multiple sockets per user", %{user: user} do | |||
Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}}) | |||
Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}}) | |||
assert( | |||
%{ | |||
"public" => [ | |||
%StreamerSocket{transport_pid: 2}, | |||
%StreamerSocket{transport_pid: 1} | |||
] | |||
} = Streamer.get_sockets() | |||
) | |||
end | |||
test "it will not add a duplicate socket", %{user: user} do | |||
Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}}) | |||
Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}}) | |||
assert( | |||
%{ | |||
"activity" => [ | |||
%StreamerSocket{transport_pid: 1} | |||
] | |||
} = Streamer.get_sockets() | |||
) | |||
end | |||
end | |||
end |
@@ -5,24 +5,20 @@ | |||
defmodule Pleroma.Web.StreamerTest do | |||
use Pleroma.DataCase | |||
import Pleroma.Factory | |||
alias Pleroma.List | |||
alias Pleroma.User | |||
alias Pleroma.Web.CommonAPI | |||
alias Pleroma.Web.Streamer | |||
import Pleroma.Factory | |||
alias Pleroma.Web.Streamer.StreamerSocket | |||
alias Pleroma.Web.Streamer.Worker | |||
@moduletag needs_streamer: true | |||
clear_config_all([:instance, :skip_thread_containment]) | |||
describe "user streams" do | |||
setup do | |||
GenServer.start(Streamer, %{}, name: Streamer) | |||
on_exit(fn -> | |||
if pid = Process.whereis(Streamer) do | |||
Process.exit(pid, :kill) | |||
end | |||
end) | |||
user = insert(:user) | |||
notify = insert(:notification, user: user, activity: build(:note_activity)) | |||
{:ok, %{user: user, notify: notify}} | |||
@@ -125,11 +121,9 @@ defmodule Pleroma.Web.StreamerTest do | |||
assert_receive {:text, _}, 4_000 | |||
end) | |||
fake_socket = %{ | |||
fake_socket = %StreamerSocket{ | |||
transport_pid: task.pid, | |||
assigns: %{ | |||
user: user | |||
} | |||
user: user | |||
} | |||
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"}) | |||
@@ -138,7 +132,7 @@ defmodule Pleroma.Web.StreamerTest do | |||
"public" => [fake_socket] | |||
} | |||
Streamer.push_to_socket(topics, "public", activity) | |||
Worker.push_to_socket(topics, "public", activity) | |||
Task.await(task) | |||
@@ -155,11 +149,9 @@ defmodule Pleroma.Web.StreamerTest do | |||
assert received_event == expected_event | |||
end) | |||
fake_socket = %{ | |||
fake_socket = %StreamerSocket{ | |||
transport_pid: task.pid, | |||
assigns: %{ | |||
user: user | |||
} | |||
user: user | |||
} | |||
{:ok, activity} = CommonAPI.delete(activity.id, other_user) | |||
@@ -168,7 +160,7 @@ defmodule Pleroma.Web.StreamerTest do | |||
"public" => [fake_socket] | |||
} | |||
Streamer.push_to_socket(topics, "public", activity) | |||
Worker.push_to_socket(topics, "public", activity) | |||
Task.await(task) | |||
end | |||
@@ -189,9 +181,9 @@ defmodule Pleroma.Web.StreamerTest do | |||
) | |||
task = Task.async(fn -> refute_receive {:text, _}, 1_000 end) | |||
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} | |||
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} | |||
topics = %{"public" => [fake_socket]} | |||
Streamer.push_to_socket(topics, "public", activity) | |||
Worker.push_to_socket(topics, "public", activity) | |||
Task.await(task) | |||
end | |||
@@ -211,9 +203,9 @@ defmodule Pleroma.Web.StreamerTest do | |||
) | |||
task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) | |||
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} | |||
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} | |||
topics = %{"public" => [fake_socket]} | |||
Streamer.push_to_socket(topics, "public", activity) | |||
Worker.push_to_socket(topics, "public", activity) | |||
Task.await(task) | |||
end | |||
@@ -233,9 +225,9 @@ defmodule Pleroma.Web.StreamerTest do | |||
) | |||
task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) | |||
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} | |||
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} | |||
topics = %{"public" => [fake_socket]} | |||
Streamer.push_to_socket(topics, "public", activity) | |||
Worker.push_to_socket(topics, "public", activity) | |||
Task.await(task) | |||
end | |||
@@ -251,11 +243,9 @@ defmodule Pleroma.Web.StreamerTest do | |||
refute_receive {:text, _}, 1_000 | |||
end) | |||
fake_socket = %{ | |||
fake_socket = %StreamerSocket{ | |||
transport_pid: task.pid, | |||
assigns: %{ | |||
user: user | |||
} | |||
user: user | |||
} | |||
{:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"}) | |||
@@ -264,7 +254,7 @@ defmodule Pleroma.Web.StreamerTest do | |||
"public" => [fake_socket] | |||
} | |||
Streamer.push_to_socket(topics, "public", activity) | |||
Worker.push_to_socket(topics, "public", activity) | |||
Task.await(task) | |||
end | |||
@@ -284,11 +274,9 @@ defmodule Pleroma.Web.StreamerTest do | |||
refute_receive {:text, _}, 1_000 | |||
end) | |||
fake_socket = %{ | |||
fake_socket = %StreamerSocket{ | |||
transport_pid: task.pid, | |||
assigns: %{ | |||
user: user_a | |||
} | |||
user: user_a | |||
} | |||
{:ok, activity} = | |||
@@ -301,7 +289,7 @@ defmodule Pleroma.Web.StreamerTest do | |||
"list:#{list.id}" => [fake_socket] | |||
} | |||
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) | |||
Worker.handle_call({:stream, "list", activity}, self(), topics) | |||
Task.await(task) | |||
end | |||
@@ -318,11 +306,9 @@ defmodule Pleroma.Web.StreamerTest do | |||
refute_receive {:text, _}, 1_000 | |||
end) | |||
fake_socket = %{ | |||
fake_socket = %StreamerSocket{ | |||
transport_pid: task.pid, | |||
assigns: %{ | |||
user: user_a | |||
} | |||
user: user_a | |||
} | |||
{:ok, activity} = | |||
@@ -335,12 +321,12 @@ defmodule Pleroma.Web.StreamerTest do | |||
"list:#{list.id}" => [fake_socket] | |||
} | |||
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) | |||
Worker.handle_call({:stream, "list", activity}, self(), topics) | |||
Task.await(task) | |||
end | |||
test "it send wanted private posts to list" do | |||
test "it sends wanted private posts to list" do | |||
user_a = insert(:user) | |||
user_b = insert(:user) | |||
@@ -354,11 +340,9 @@ defmodule Pleroma.Web.StreamerTest do | |||
assert_receive {:text, _}, 1_000 | |||
end) | |||
fake_socket = %{ | |||
fake_socket = %StreamerSocket{ | |||
transport_pid: task.pid, | |||
assigns: %{ | |||
user: user_a | |||
} | |||
user: user_a | |||
} | |||
{:ok, activity} = | |||
@@ -367,11 +351,12 @@ defmodule Pleroma.Web.StreamerTest do | |||
"visibility" => "private" | |||
}) | |||
topics = %{ | |||
"list:#{list.id}" => [fake_socket] | |||
} | |||
Streamer.add_socket( | |||
"list:#{list.id}", | |||
fake_socket | |||
) | |||
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) | |||
Worker.handle_call({:stream, "list", activity}, self(), %{}) | |||
Task.await(task) | |||
end | |||
@@ -387,11 +372,9 @@ defmodule Pleroma.Web.StreamerTest do | |||
refute_receive {:text, _}, 1_000 | |||
end) | |||
fake_socket = %{ | |||
fake_socket = %StreamerSocket{ | |||
transport_pid: task.pid, | |||
assigns: %{ | |||
user: user1 | |||
} | |||
user: user1 | |||
} | |||
{:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"}) | |||
@@ -401,7 +384,7 @@ defmodule Pleroma.Web.StreamerTest do | |||
"public" => [fake_socket] | |||
} | |||
Streamer.push_to_socket(topics, "public", announce_activity) | |||
Worker.push_to_socket(topics, "public", announce_activity) | |||
Task.await(task) | |||
end | |||
@@ -417,6 +400,8 @@ defmodule Pleroma.Web.StreamerTest do | |||
task = Task.async(fn -> refute_receive {:text, _}, 4_000 end) | |||
Process.sleep(4000) | |||
Streamer.add_socket( | |||
"user", | |||
%{transport_pid: task.pid, assigns: %{user: user2}} | |||
@@ -428,14 +413,6 @@ defmodule Pleroma.Web.StreamerTest do | |||
describe "direct streams" do | |||
setup do | |||
GenServer.start(Streamer, %{}, name: Streamer) | |||
on_exit(fn -> | |||
if pid = Process.whereis(Streamer) do | |||
Process.exit(pid, :kill) | |||
end | |||
end) | |||
:ok | |||
end | |||
@@ -480,6 +457,8 @@ defmodule Pleroma.Web.StreamerTest do | |||
refute_receive {:text, _}, 4_000 | |||
end) | |||
Process.sleep(1000) | |||
Streamer.add_socket( | |||
"direct", | |||
%{transport_pid: task.pid, assigns: %{user: user}} | |||
@@ -521,6 +500,8 @@ defmodule Pleroma.Web.StreamerTest do | |||
assert last_status["id"] == to_string(create_activity.id) | |||
end) | |||
Process.sleep(1000) | |||
Streamer.add_socket( | |||
"direct", | |||
%{transport_pid: task.pid, assigns: %{user: user}} |