Browse Source

Streamer refactoring

object-id-column
Steven Fuchs kaniini 4 years ago
parent
commit
aab264db82
26 changed files with 889 additions and 448 deletions
  1. +4
    -0
      .gitignore
  2. +4
    -0
      config/config.exs
  3. +63
    -0
      lib/pleroma/activity/ir/topics.ex
  4. +1
    -1
      lib/pleroma/application.ex
  5. +4
    -2
      lib/pleroma/notification.ex
  6. +12
    -38
      lib/pleroma/web/activity_pub/activity_pub.ex
  7. +4
    -3
      lib/pleroma/web/mastodon_api/websocket_handler.ex
  8. +0
    -318
      lib/pleroma/web/streamer.ex
  9. +33
    -0
      lib/pleroma/web/streamer/ping.ex
  10. +68
    -0
      lib/pleroma/web/streamer/state.ex
  11. +55
    -0
      lib/pleroma/web/streamer/streamer.ex
  12. +31
    -0
      lib/pleroma/web/streamer/streamer_socket.ex
  13. +33
    -0
      lib/pleroma/web/streamer/supervisor.ex
  14. +220
    -0
      lib/pleroma/web/streamer/worker.ex
  15. +66
    -0
      lib/pleroma/web/views/streamer_view.ex
  16. +1
    -0
      mix.exs
  17. +1
    -0
      mix.lock
  18. +141
    -0
      test/activity/ir/topics_test.exs
  19. +5
    -11
      test/integration/mastodon_websocket_test.exs
  20. +1
    -10
      test/notification_test.exs
  21. +4
    -0
      test/support/conn_case.ex
  22. +4
    -0
      test/support/data_case.ex
  23. +1
    -3
      test/web/activity_pub/activity_pub_test.exs
  24. +36
    -0
      test/web/streamer/ping_test.exs
  25. +54
    -0
      test/web/streamer/state_test.exs
  26. +43
    -62
      test/web/streamer/streamer_test.exs

+ 4
- 0
.gitignore View File

@@ -43,3 +43,7 @@ docs/generated_config.md
# Code test coverage
/cover
/Elixir.*.coverdata

.idea
pleroma.iml


+ 4
- 0
config/config.exs View File

@@ -331,6 +331,10 @@ config :pleroma, :activitypub,
follow_handshake_timeout: 500,
sign_object_fetches: true

config :pleroma, :streamer,
workers: 3,
overflow_workers: 2

config :pleroma, :user, deny_follow_blocked: true

config :pleroma, :mrf_normalize_markup, scrub_policy: Pleroma.HTML.Scrubber.Default


+ 63
- 0
lib/pleroma/activity/ir/topics.ex View File

@@ -0,0 +1,63 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Activity.Ir.Topics do
alias Pleroma.Object
alias Pleroma.Web.ActivityPub.Visibility

def get_activity_topics(activity) do
activity
|> Object.normalize()
|> generate_topics(activity)
|> List.flatten()
end

defp generate_topics(%{data: %{"type" => "Answer"}}, _) do
[]
end

defp generate_topics(object, activity) do
["user", "list"] ++ visibility_tags(object, activity)
end

defp visibility_tags(object, activity) do
case Visibility.get_visibility(activity) do
"public" ->
if activity.local do
["public", "public:local"]
else
["public"]
end
|> item_creation_tags(object, activity)

"direct" ->
["direct"]

_ ->
[]
end
end

defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do
tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity)
end

defp item_creation_tags(tags, _, _) do
tags
end

defp hashtags_to_topics(%{data: %{"tag" => tags}}) do
tags
|> Enum.filter(&is_bitstring(&1))
|> Enum.map(fn tag -> "hashtag:" <> tag end)
end

defp hashtags_to_topics(_), do: []

defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: []

defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"]

defp attachment_topics(_object, _act), do: ["public:media"]
end

+ 1
- 1
lib/pleroma/application.ex View File

@@ -141,7 +141,7 @@ defmodule Pleroma.Application do
defp streamer_child(:test), do: []

defp streamer_child(_) do
[Pleroma.Web.Streamer]
[Pleroma.Web.Streamer.supervisor()]
end

defp oauth_cleanup_child(true),


+ 4
- 2
lib/pleroma/notification.ex View File

@@ -210,8 +210,10 @@ defmodule Pleroma.Notification do
unless skip?(activity, user) do
notification = %Notification{user_id: user.id, activity: activity}
{:ok, notification} = Repo.insert(notification)
Streamer.stream("user", notification)
Streamer.stream("user:notification", notification)

["user", "user:notification"]
|> Streamer.stream(notification)

Push.send(notification)
notification
end


+ 12
- 38
lib/pleroma/web/activity_pub/activity_pub.ex View File

@@ -4,6 +4,7 @@

defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity
alias Pleroma.Activity.Ir.Topics
alias Pleroma.Config
alias Pleroma.Conversation
alias Pleroma.Notification
@@ -16,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.Streamer
alias Pleroma.Web.WebFinger
alias Pleroma.Workers.BackgroundWorker

@@ -187,9 +189,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
participations
|> Repo.preload(:user)

Enum.each(participations, fn participation ->
Pleroma.Web.Streamer.stream("participation", participation)
end)
Streamer.stream("participation", participations)
end

def stream_out_participations(%Object{data: %{"context" => context}}, user) do
@@ -208,41 +208,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do

def stream_out_participations(_, _), do: :noop

def stream_out(activity) do
if activity.data["type"] in ["Create", "Announce", "Delete"] do
object = Object.normalize(activity)
# Do not stream out poll replies
unless object.data["type"] == "Answer" do
Pleroma.Web.Streamer.stream("user", activity)
Pleroma.Web.Streamer.stream("list", activity)

if get_visibility(activity) == "public" do
Pleroma.Web.Streamer.stream("public", activity)

if activity.local do
Pleroma.Web.Streamer.stream("public:local", activity)
end

if activity.data["type"] in ["Create"] do
object.data
|> Map.get("tag", [])
|> Enum.filter(fn tag -> is_bitstring(tag) end)
|> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)

if object.data["attachment"] != [] do
Pleroma.Web.Streamer.stream("public:media", activity)

if activity.local do
Pleroma.Web.Streamer.stream("public:local:media", activity)
end
end
end
else
if get_visibility(activity) == "direct",
do: Pleroma.Web.Streamer.stream("direct", activity)
end
end
end
def stream_out(%Activity{data: %{"type" => data_type}} = activity)
when data_type in ["Create", "Announce", "Delete"] do
activity
|> Topics.get_activity_topics()
|> Streamer.stream(activity)
end

def stream_out(_activity) do
:noop
end

def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do


+ 4
- 3
lib/pleroma/web/mastodon_api/websocket_handler.ex View File

@@ -8,6 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.Streamer

@behaviour :cowboy_websocket

@@ -24,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
]
@anonymous_streams ["public", "public:local", "hashtag"]

# Handled by periodic keepalive in Pleroma.Web.Streamer.
# Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
@timeout :infinity

def init(%{qs: qs} = req, state) do
@@ -65,7 +66,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
}, topic #{state.topic}"
)

Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state))
Streamer.add_socket(state.topic, streamer_socket(state))
{:ok, state}
end

@@ -80,7 +81,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
}, topic #{state.topic || "?"}: #{inspect(reason)}"
)

Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state))
Streamer.remove_socket(state.topic, streamer_socket(state))
:ok
end



+ 0
- 318
lib/pleroma/web/streamer.ex View File

@@ -1,318 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.Streamer do
use GenServer
require Logger
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.MastodonAPI.NotificationView

@keepalive_interval :timer.seconds(30)

def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

def add_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
end

def remove_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
end

def stream(topic, item) do
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
end

def init(args) do
Process.send_after(self(), %{action: :ping}, @keepalive_interval)

{:ok, args}
end

def handle_info(%{action: :ping}, topics) do
topics
|> Map.values()
|> List.flatten()
|> Enum.each(fn socket ->
Logger.debug("Sending keepalive ping")
send(socket.transport_pid, {:text, ""})
end)

Process.send_after(self(), %{action: :ping}, @keepalive_interval)

{:noreply, topics}
end

def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)

Enum.each(recipient_topics || [], fn user_topic ->
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
push_to_socket(topics, user_topic, item)
end)

{:noreply, topics}
end

def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
user_topic = "direct:#{participation.user_id}"
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")

push_to_socket(topics, user_topic, participation)

{:noreply, topics}
end

def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
case Visibility.is_public?(item) do
true ->
Pleroma.List.get_lists_from_activity(item)

_ ->
Pleroma.List.get_lists_from_activity(item)
|> Enum.filter(fn list ->
owner = User.get_cached_by_id(list.user_id)

Visibility.visible_for_user?(item, owner)
end)
end

recipient_topics =
recipient_lists
|> Enum.map(fn %{id: id} -> "list:#{id}" end)

Enum.each(recipient_topics || [], fn list_topic ->
Logger.debug("Trying to push message to #{list_topic}\n\n")
push_to_socket(topics, list_topic, item)
end)

{:noreply, topics}
end

def handle_cast(
%{action: :stream, topic: topic, item: %Notification{} = item},
topics
)
when topic in ["user", "user:notification"] do
topics
|> Map.get("#{topic}:#{item.user_id}", [])
|> Enum.each(fn socket ->
with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
true <- should_send?(user, item) do
send(
socket.transport_pid,
{:text, represent_notification(socket.assigns[:user], item)}
)
end
end)

{:noreply, topics}
end

def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
Logger.debug("Trying to push to users")

recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "user:#{id}" end)

Enum.each(recipient_topics, fn topic ->
push_to_socket(topics, topic, item)
end)

{:noreply, topics}
end

def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
push_to_socket(topics, topic, item)
{:noreply, topics}
end

def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Got new conn for #{topic}")
{:noreply, sockets}
end

def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = List.delete(sockets_for_topic, socket)
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Removed conn for #{topic}")
{:noreply, sockets}
end

def handle_cast(m, state) do
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
{:noreply, state}
end

defp represent_update(%Activity{} = activity, %User{} = user) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity,
for: user
)
|> Jason.encode!()
}
|> Jason.encode!()
end

defp represent_update(%Activity{} = activity) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity
)
|> Jason.encode!()
}
|> Jason.encode!()
end

def represent_conversation(%Participation{} = participation) do
%{
event: "conversation",
payload:
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
participation: participation,
for: participation.user
})
|> Jason.encode!()
}
|> Jason.encode!()
end

@spec represent_notification(User.t(), Notification.t()) :: binary()
defp represent_notification(%User{} = user, %Notification{} = notify) do
%{
event: "notification",
payload:
NotificationView.render(
"show.json",
%{notification: notify, for: user}
)
|> Jason.encode!()
}
|> Jason.encode!()
end

defp should_send?(%User{} = user, %Activity{} = item) do
blocks = user.info.blocks || []
mutes = user.info.mutes || []
reblog_mutes = user.info.muted_reblogs || []
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)

with parent when not is_nil(parent) <- Object.normalize(item),
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
%{host: item_host} <- URI.parse(item.actor),
%{host: parent_host} <- URI.parse(parent.data["actor"]),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
true <- thread_containment(item, user),
false <- CommonAPI.thread_muted?(user, item) do
true
else
_ -> false
end
end

defp should_send?(%User{} = user, %Notification{activity: activity}) do
should_send?(user, activity)
end

def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)

if should_send?(user, item) do
send(socket.transport_pid, {:text, represent_update(item, user)})
end
else
send(socket.transport_pid, {:text, represent_update(item)})
end
end)
end

def push_to_socket(topics, topic, %Participation{} = participation) do
Enum.each(topics[topic] || [], fn socket ->
send(socket.transport_pid, {:text, represent_conversation(participation)})
end)
end

def push_to_socket(topics, topic, %Activity{
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
}) do
Enum.each(topics[topic] || [], fn socket ->
send(
socket.transport_pid,
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
)
end)
end

def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop

def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info.blocks || []
mutes = user.info.mutes || []

with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
true <- thread_containment(item, user) do
send(socket.transport_pid, {:text, represent_update(item, user)})
end
else
send(socket.transport_pid, {:text, represent_update(item)})
end
end)
end

defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
"#{topic}:#{socket.assigns[:user].id}"
end

defp internal_topic(topic, _), do: topic

@spec thread_containment(Activity.t(), User.t()) :: boolean()
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true

defp thread_containment(activity, user) do
if Config.get([:instance, :skip_thread_containment]) do
true
else
ActivityPub.contain_activity(activity, user)
end
end
end

+ 33
- 0
lib/pleroma/web/streamer/ping.ex View File

@@ -0,0 +1,33 @@
defmodule Pleroma.Web.Streamer.Ping do
use GenServer
require Logger

alias Pleroma.Web.Streamer.State
alias Pleroma.Web.Streamer.StreamerSocket

@keepalive_interval :timer.seconds(30)

def start_link(opts) do
ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
end

def init(%{ping_interval: ping_interval} = args) do
Process.send_after(self(), :ping, ping_interval)
{:ok, args}
end

def handle_info(:ping, %{ping_interval: ping_interval} = state) do
State.get_sockets()
|> Map.values()
|> List.flatten()
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
Logger.debug("Sending keepalive ping")
send(transport_pid, {:text, ""})
end)

Process.send_after(self(), :ping, ping_interval)

{:noreply, state}
end
end

+ 68
- 0
lib/pleroma/web/streamer/state.ex View File

@@ -0,0 +1,68 @@
defmodule Pleroma.Web.Streamer.State do
use GenServer
require Logger

alias Pleroma.Web.Streamer.StreamerSocket

def start_link(_) do
GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
end

def add_socket(topic, socket) do
GenServer.call(__MODULE__, {:add, socket, topic})
end

def remove_socket(topic, socket) do
GenServer.call(__MODULE__, {:remove, socket, topic})
end

def get_sockets do
%{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
stream_sockets
end

def init(init_arg) do
{:ok, init_arg}
end

def handle_call(:get_state, _from, state) do
{:reply, state, state}
end

def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do
internal_topic = internal_topic(topic, socket)
stream_socket = StreamerSocket.from_socket(socket)

sockets_for_topic =
sockets
|> Map.get(internal_topic, [])
|> List.insert_at(0, stream_socket)
|> Enum.uniq()

state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
Logger.debug("Got new conn for #{topic}")
{:reply, state, state}
end

def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do
internal_topic = internal_topic(topic, socket)
stream_socket = StreamerSocket.from_socket(socket)

sockets_for_topic =
sockets
|> Map.get(internal_topic, [])
|> List.delete(stream_socket)

state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
{:reply, state, state}
end

defp internal_topic(topic, socket)
when topic in ~w[user user:notification direct] do
"#{topic}:#{socket.assigns[:user].id}"
end

defp internal_topic(topic, _) do
topic
end
end

+ 55
- 0
lib/pleroma/web/streamer/streamer.ex View File

@@ -0,0 +1,55 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.Streamer do
alias Pleroma.Web.Streamer.State
alias Pleroma.Web.Streamer.Worker

@timeout 60_000
@mix_env Mix.env()

def add_socket(topic, socket) do
State.add_socket(topic, socket)
end

def remove_socket(topic, socket) do
State.remove_socket(topic, socket)
end

def get_sockets do
State.get_sockets()
end

def stream(topics, items) do
if should_send?() do
Task.async(fn ->
:poolboy.transaction(
:streamer_worker,
&Worker.stream(&1, topics, items),
@timeout
)
end)
end
end

def supervisor, do: Pleroma.Web.Streamer.Supervisor

defp should_send? do
handle_should_send(@mix_env)
end

defp handle_should_send(:test) do
case Process.whereis(:streamer_worker) do
nil ->
false

pid ->
Process.alive?(pid)
end
end

defp handle_should_send(_) do
true
end
end

+ 31
- 0
lib/pleroma/web/streamer/streamer_socket.ex View File

@@ -0,0 +1,31 @@
defmodule Pleroma.Web.Streamer.StreamerSocket do
defstruct transport_pid: nil, user: nil

alias Pleroma.User
alias Pleroma.Web.Streamer.StreamerSocket

def from_socket(%{
transport_pid: transport_pid,
assigns: %{user: nil}
}) do
%StreamerSocket{
transport_pid: transport_pid
}
end

def from_socket(%{
transport_pid: transport_pid,
assigns: %{user: %User{} = user}
}) do
%StreamerSocket{
transport_pid: transport_pid,
user: user
}
end

def from_socket(%{transport_pid: transport_pid}) do
%StreamerSocket{
transport_pid: transport_pid
}
end
end

+ 33
- 0
lib/pleroma/web/streamer/supervisor.ex View File

@@ -0,0 +1,33 @@
defmodule Pleroma.Web.Streamer.Supervisor do
use Supervisor

def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(args) do
children = [
{Pleroma.Web.Streamer.State, args},
{Pleroma.Web.Streamer.Ping, args},
:poolboy.child_spec(:streamer_worker, poolboy_config())
]

opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
Supervisor.init(children, opts)
end

defp poolboy_config do
opts =
Pleroma.Config.get(:streamer,
workers: 3,
overflow_workers: 2
)

[
{:name, {:local, :streamer_worker}},
{:worker_module, Pleroma.Web.Streamer.Worker},
{:size, opts[:workers]},
{:max_overflow, opts[:overflow_workers]}
]
end
end

+ 220
- 0
lib/pleroma/web/streamer/worker.ex View File

@@ -0,0 +1,220 @@
defmodule Pleroma.Web.Streamer.Worker do
use GenServer

require Logger

alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer.State
alias Pleroma.Web.Streamer.StreamerSocket
alias Pleroma.Web.StreamerView

def start_link(_) do
GenServer.start_link(__MODULE__, %{}, [])
end

def init(init_arg) do
{:ok, init_arg}
end

def stream(pid, topics, items) do
GenServer.call(pid, {:stream, topics, items})
end

def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
Enum.each(topics, fn t ->
do_stream(%{topic: t, item: item})
end)

{:reply, state, state}
end

def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
Enum.each(items, fn i ->
do_stream(%{topic: topic, item: i})
end)

{:reply, state, state}
end

def handle_call({:stream, topic, item}, _from, state) do
do_stream(%{topic: topic, item: item})

{:reply, state, state}
end

defp do_stream(%{topic: "direct", item: item}) do
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)

Enum.each(recipient_topics, fn user_topic ->
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
push_to_socket(State.get_sockets(), user_topic, item)
end)
end

defp do_stream(%{topic: "participation", item: participation}) do
user_topic = "direct:#{participation.user_id}"
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")

push_to_socket(State.get_sockets(), user_topic, participation)
end

defp do_stream(%{topic: "list", item: item}) do
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
case Visibility.is_public?(item) do
true ->
Pleroma.List.get_lists_from_activity(item)

_ ->
Pleroma.List.get_lists_from_activity(item)
|> Enum.filter(fn list ->
owner = User.get_cached_by_id(list.user_id)

Visibility.visible_for_user?(item, owner)
end)
end

recipient_topics =
recipient_lists
|> Enum.map(fn %{id: id} -> "list:#{id}" end)

Enum.each(recipient_topics, fn list_topic ->
Logger.debug("Trying to push message to #{list_topic}\n\n")
push_to_socket(State.get_sockets(), list_topic, item)
end)
end

defp do_stream(%{topic: topic, item: %Notification{} = item})
when topic in ["user", "user:notification"] do
State.get_sockets()
|> Map.get("#{topic}:#{item.user_id}", [])
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
true <- should_send?(user, item) do
send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
end
end)
end

defp do_stream(%{topic: "user", item: item}) do
Logger.debug("Trying to push to users")

recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "user:#{id}" end)

Enum.each(recipient_topics, fn topic ->
push_to_socket(State.get_sockets(), topic, item)
end)
end

defp do_stream(%{topic: topic, item: item}) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
push_to_socket(State.get_sockets(), topic, item)
end

defp should_send?(%User{} = user, %Activity{} = item) do
blocks = user.info.blocks || []
mutes = user.info.mutes || []
reblog_mutes = user.info.muted_reblogs || []
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)

with parent when not is_nil(parent) <- Object.normalize(item),
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
%{host: item_host} <- URI.parse(item.actor),
%{host: parent_host} <- URI.parse(parent.data["actor"]),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
true <- thread_containment(item, user),
false <- CommonAPI.thread_muted?(user, item) do
true
else
_ -> false
end
end

defp should_send?(%User{} = user, %Notification{activity: activity}) do
should_send?(user, activity)
end

def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn %StreamerSocket{
transport_pid: transport_pid,
user: socket_user
} ->
# Get the current user so we have up-to-date blocks etc.
if socket_user do
user = User.get_cached_by_ap_id(socket_user.ap_id)

if should_send?(user, item) do
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
end
else
send(transport_pid, {:text, StreamerView.render("update.json", item)})
end
end)
end

def push_to_socket(topics, topic, %Participation{} = participation) do
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
end)
end

def push_to_socket(topics, topic, %Activity{
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
}) do
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
send(
transport_pid,
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
)
end)
end

def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop

def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn %StreamerSocket{
transport_pid: transport_pid,
user: socket_user
} ->
# Get the current user so we have up-to-date blocks etc.
if socket_user do
user = User.get_cached_by_ap_id(socket_user.ap_id)
blocks = user.info.blocks || []
mutes = user.info.mutes || []

with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
true <- thread_containment(item, user) do
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
end
else
send(transport_pid, {:text, StreamerView.render("update.json", item)})
end
end)
end

@spec thread_containment(Activity.t(), User.t()) :: boolean()
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true

defp thread_containment(activity, user) do
if Config.get([:instance, :skip_thread_containment]) do
true
else
ActivityPub.contain_activity(activity, user)
end
end
end

+ 66
- 0
lib/pleroma/web/views/streamer_view.ex View File

@@ -0,0 +1,66 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.StreamerView do
use Pleroma.Web, :view

alias Pleroma.Activity
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.User
alias Pleroma.Web.MastodonAPI.NotificationView

def render("update.json", %Activity{} = activity, %User{} = user) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity,
for: user
)
|> Jason.encode!()
}
|> Jason.encode!()
end

def render("notification.json", %User{} = user, %Notification{} = notify) do
%{
event: "notification",
payload:
NotificationView.render(
"show.json",
%{notification: notify, for: user}
)
|> Jason.encode!()
}
|> Jason.encode!()
end

def render("update.json", %Activity{} = activity) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity
)
|> Jason.encode!()
}
|> Jason.encode!()
end

def render("conversation.json", %Participation{} = participation) do
%{
event: "conversation",
payload:
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
participation: participation,
for: participation.user
})
|> Jason.encode!()
}
|> Jason.encode!()
end
end

+ 1
- 0
mix.exs View File

@@ -144,6 +144,7 @@ defmodule Pleroma.Mixfile do
git: "https://git.pleroma.social/pleroma/http_signatures.git",
ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"},
{:telemetry, "~> 0.3"},
{:poolboy, "~> 1.5"},
{:prometheus_ex, "~> 3.0"},
{:prometheus_plugs, "~> 1.1"},
{:prometheus_phoenix, "~> 1.3"},


+ 1
- 0
mix.lock View File

@@ -73,6 +73,7 @@
"plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"},
"plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"},
"postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
"prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"},
"prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"},


+ 141
- 0
test/activity/ir/topics_test.exs View File

@@ -0,0 +1,141 @@
defmodule Pleroma.Activity.Ir.TopicsTest do
use Pleroma.DataCase

alias Pleroma.Activity
alias Pleroma.Activity.Ir.Topics
alias Pleroma.Object

require Pleroma.Constants

describe "poll answer" do
test "produce no topics" do
activity = %Activity{object: %Object{data: %{"type" => "Answer"}}}

assert [] == Topics.get_activity_topics(activity)
end
end

describe "non poll answer" do
test "always add user and list topics" do
activity = %Activity{object: %Object{data: %{"type" => "FooBar"}}}
topics = Topics.get_activity_topics(activity)

assert Enum.member?(topics, "user")
assert Enum.member?(topics, "list")
end
end

describe "public visibility" do
setup do
activity = %Activity{
object: %Object{data: %{"type" => "Note"}},
data: %{"to" => [Pleroma.Constants.as_public()]}
}

{:ok, activity: activity}
end

test "produces public topic", %{activity: activity} do
topics = Topics.get_activity_topics(activity)

assert Enum.member?(topics, "public")
end

test "local action produces public:local topic", %{activity: activity} do
activity = %{activity | local: true}
topics = Topics.get_activity_topics(activity)

assert Enum.member?(topics, "public:local")
end

test "non-local action does not produce public:local topic", %{activity: activity} do
activity = %{activity | local: false}
topics = Topics.get_activity_topics(activity)

refute Enum.member?(topics, "public:local")
end
end

describe "public visibility create events" do
setup do
activity = %Activity{
object: %Object{data: %{"type" => "Create", "attachment" => []}},
data: %{"to" => [Pleroma.Constants.as_public()]}
}

{:ok, activity: activity}
end

test "with no attachments doesn't produce public:media topics", %{activity: activity} do
topics = Topics.get_activity_topics(activity)

refute Enum.member?(topics, "public:media")
refute Enum.member?(topics, "public:local:media")
end

test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do
tagged_data = Map.put(data, "tag", ["foo", "bar"])
activity = %{activity | object: %{object | data: tagged_data}}

topics = Topics.get_activity_topics(activity)

assert Enum.member?(topics, "hashtag:foo")
assert Enum.member?(topics, "hashtag:bar")
end

test "only converts strinngs to hash tags", %{
activity: %{object: %{data: data} = object} = activity
} do
tagged_data = Map.put(data, "tag", [2])
activity = %{activity | object: %{object | data: tagged_data}}

topics = Topics.get_activity_topics(activity)

refute Enum.member?(topics, "hashtag:2")
end
end

describe "public visibility create events with attachments" do
setup do
activity = %Activity{
object: %Object{data: %{"type" => "Create", "attachment" => ["foo"]}},
data: %{"to" => [Pleroma.Constants.as_public()]}
}

{:ok, activity: activity}
end

test "produce public:media topics", %{activity: activity} do
topics = Topics.get_activity_topics(activity)

assert Enum.member?(topics, "public:media")
end

test "local produces public:local:media topics", %{activity: activity} do
topics = Topics.get_activity_topics(activity)

assert Enum.member?(topics, "public:local:media")
end

test "non-local doesn't produce public:local:media topics", %{activity: activity} do
activity = %{activity | local: false}

topics = Topics.get_activity_topics(activity)

refute Enum.member?(topics, "public:local:media")
end
end

describe "non-public visibility" do
test "produces direct topic" do
activity = %Activity{object: %Object{data: %{"type" => "Note"}}, data: %{"to" => []}}
topics = Topics.get_activity_topics(activity)

assert Enum.member?(topics, "direct")
refute Enum.member?(topics, "public")
refute Enum.member?(topics, "public:local")
refute Enum.member?(topics, "public:media")
refute Enum.member?(topics, "public:local:media")
end
end
end

+ 5
- 11
test/integration/mastodon_websocket_test.exs View File

@@ -11,7 +11,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
alias Pleroma.Integration.WebsocketClient
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.OAuth
alias Pleroma.Web.Streamer

@path Pleroma.Web.Endpoint.url()
|> URI.parse()
@@ -19,16 +18,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
|> Map.put(:path, "/api/v1/streaming")
|> URI.to_string()

setup do
GenServer.start(Streamer, %{}, name: Streamer)

on_exit(fn ->
if pid = Process.whereis(Streamer) do
Process.exit(pid, :kill)
end
end)
end

def start_socket(qs \\ nil, headers \\ []) do
path =
case qs do
@@ -53,12 +42,14 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
end)
end

@tag needs_streamer: true
test "allows public streams without authentication" do
assert {:ok, _} = start_socket("?stream=public")
assert {:ok, _} = start_socket("?stream=public:local")
assert {:ok, _} = start_socket("?stream=hashtag&tag=lain")
end

@tag needs_streamer: true
test "receives well formatted events" do
user = insert(:user)
{:ok, _} = start_socket("?stream=public")
@@ -103,6 +94,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}")
end

@tag needs_streamer: true
test "accepts the 'user' stream", %{token: token} = _state do
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")

@@ -111,6 +103,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
end) =~ ":badarg"
end

@tag needs_streamer: true
test "accepts the 'user:notification' stream", %{token: token} = _state do
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")

@@ -119,6 +112,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
end) =~ ":badarg"
end

@tag needs_streamer: true
test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do
assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}])



+ 1
- 10
test/notification_test.exs View File

@@ -69,16 +69,7 @@ defmodule Pleroma.NotificationTest do
end

describe "create_notification" do
setup do
GenServer.start(Streamer, %{}, name: Streamer)

on_exit(fn ->
if pid = Process.whereis(Streamer) do
Process.exit(pid, :kill)
end
end)
end

@tag needs_streamer: true
test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
user = insert(:user)
task = Task.async(fn -> assert_receive {:text, _}, 4_000 end)


+ 4
- 0
test/support/conn_case.ex View File

@@ -40,6 +40,10 @@ defmodule Pleroma.Web.ConnCase do
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
end

if tags[:needs_streamer] do
start_supervised(Pleroma.Web.Streamer.supervisor())
end

{:ok, conn: Phoenix.ConnTest.build_conn()}
end
end

+ 4
- 0
test/support/data_case.ex View File

@@ -39,6 +39,10 @@ defmodule Pleroma.DataCase do
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
end

if tags[:needs_streamer] do
start_supervised(Pleroma.Web.Streamer.supervisor())
end

:ok
end



+ 1
- 3
test/web/activity_pub/activity_pub_test.exs View File

@@ -38,9 +38,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
stream: fn _, _ -> nil end do
ActivityPub.stream_out_participations(conversation.participations)

Enum.each(participations, fn participation ->
assert called(Pleroma.Web.Streamer.stream("participation", participation))
end)
assert called(Pleroma.Web.Streamer.stream("participation", participations))
end
end
end


+ 36
- 0
test/web/streamer/ping_test.exs View File

@@ -0,0 +1,36 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.PingTest do
use Pleroma.DataCase

import Pleroma.Factory
alias Pleroma.Web.Streamer

setup do
start_supervised({Streamer.supervisor(), [ping_interval: 30]})

:ok
end

describe "sockets" do
setup do
user = insert(:user)
{:ok, %{user: user}}
end

test "it sends pings", %{user: user} do
task =
Task.async(fn ->
assert_receive {:text, received_event}, 40
assert_receive {:text, received_event}, 40
assert_receive {:text, received_event}, 40
end)

Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}})

Task.await(task)
end
end
end

+ 54
- 0
test/web/streamer/state_test.exs View File

@@ -0,0 +1,54 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.StateTest do
use Pleroma.DataCase

import Pleroma.Factory
alias Pleroma.Web.Streamer
alias Pleroma.Web.Streamer.StreamerSocket

@moduletag needs_streamer: true

describe "sockets" do
setup do
user = insert(:user)
user2 = insert(:user)
{:ok, %{user: user, user2: user2}}
end

test "it can add a socket", %{user: user} do
Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})

assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets())
end

test "it can add multiple sockets per user", %{user: user} do
Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}})

assert(
%{
"public" => [
%StreamerSocket{transport_pid: 2},
%StreamerSocket{transport_pid: 1}
]
} = Streamer.get_sockets()
)
end

test "it will not add a duplicate socket", %{user: user} do
Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})

assert(
%{
"activity" => [
%StreamerSocket{transport_pid: 1}
]
} = Streamer.get_sockets()
)
end
end
end

test/web/streamer_test.exs → test/web/streamer/streamer_test.exs View File

@@ -5,24 +5,20 @@
defmodule Pleroma.Web.StreamerTest do
use Pleroma.DataCase

import Pleroma.Factory

alias Pleroma.List
alias Pleroma.User
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer
import Pleroma.Factory
alias Pleroma.Web.Streamer.StreamerSocket
alias Pleroma.Web.Streamer.Worker

@moduletag needs_streamer: true
clear_config_all([:instance, :skip_thread_containment])

describe "user streams" do
setup do
GenServer.start(Streamer, %{}, name: Streamer)

on_exit(fn ->
if pid = Process.whereis(Streamer) do
Process.exit(pid, :kill)
end
end)

user = insert(:user)
notify = insert(:notification, user: user, activity: build(:note_activity))
{:ok, %{user: user, notify: notify}}
@@ -125,11 +121,9 @@ defmodule Pleroma.Web.StreamerTest do
assert_receive {:text, _}, 4_000
end)

fake_socket = %{
fake_socket = %StreamerSocket{
transport_pid: task.pid,
assigns: %{
user: user
}
user: user
}

{:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
@@ -138,7 +132,7 @@ defmodule Pleroma.Web.StreamerTest do
"public" => [fake_socket]
}

Streamer.push_to_socket(topics, "public", activity)
Worker.push_to_socket(topics, "public", activity)

Task.await(task)

@@ -155,11 +149,9 @@ defmodule Pleroma.Web.StreamerTest do
assert received_event == expected_event
end)

fake_socket = %{
fake_socket = %StreamerSocket{
transport_pid: task.pid,
assigns: %{
user: user
}
user: user
}

{:ok, activity} = CommonAPI.delete(activity.id, other_user)
@@ -168,7 +160,7 @@ defmodule Pleroma.Web.StreamerTest do
"public" => [fake_socket]
}

Streamer.push_to_socket(topics, "public", activity)
Worker.push_to_socket(topics, "public", activity)

Task.await(task)
end
@@ -189,9 +181,9 @@ defmodule Pleroma.Web.StreamerTest do
)

task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
topics = %{"public" => [fake_socket]}
Streamer.push_to_socket(topics, "public", activity)
Worker.push_to_socket(topics, "public", activity)

Task.await(task)
end
@@ -211,9 +203,9 @@ defmodule Pleroma.Web.StreamerTest do
)

task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
topics = %{"public" => [fake_socket]}
Streamer.push_to_socket(topics, "public", activity)
Worker.push_to_socket(topics, "public", activity)

Task.await(task)
end
@@ -233,9 +225,9 @@ defmodule Pleroma.Web.StreamerTest do
)

task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
topics = %{"public" => [fake_socket]}
Streamer.push_to_socket(topics, "public", activity)
Worker.push_to_socket(topics, "public", activity)

Task.await(task)
end
@@ -251,11 +243,9 @@ defmodule Pleroma.Web.StreamerTest do
refute_receive {:text, _}, 1_000
end)

fake_socket = %{
fake_socket = %StreamerSocket{
transport_pid: task.pid,
assigns: %{
user: user
}
user: user
}

{:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
@@ -264,7 +254,7 @@ defmodule Pleroma.Web.StreamerTest do
"public" => [fake_socket]
}

Streamer.push_to_socket(topics, "public", activity)
Worker.push_to_socket(topics, "public", activity)

Task.await(task)
end
@@ -284,11 +274,9 @@ defmodule Pleroma.Web.StreamerTest do
refute_receive {:text, _}, 1_000
end)

fake_socket = %{
fake_socket = %StreamerSocket{
transport_pid: task.pid,
assigns: %{
user: user_a
}
user: user_a
}

{:ok, activity} =
@@ -301,7 +289,7 @@ defmodule Pleroma.Web.StreamerTest do
"list:#{list.id}" => [fake_socket]
}

Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
Worker.handle_call({:stream, "list", activity}, self(), topics)

Task.await(task)
end
@@ -318,11 +306,9 @@ defmodule Pleroma.Web.StreamerTest do
refute_receive {:text, _}, 1_000
end)

fake_socket = %{
fake_socket = %StreamerSocket{
transport_pid: task.pid,
assigns: %{
user: user_a
}
user: user_a
}

{:ok, activity} =
@@ -335,12 +321,12 @@ defmodule Pleroma.Web.StreamerTest do
"list:#{list.id}" => [fake_socket]
}

Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
Worker.handle_call({:stream, "list", activity}, self(), topics)

Task.await(task)
end

test "it send wanted private posts to list" do
test "it sends wanted private posts to list" do
user_a = insert(:user)
user_b = insert(:user)

@@ -354,11 +340,9 @@ defmodule Pleroma.Web.StreamerTest do
assert_receive {:text, _}, 1_000
end)

fake_socket = %{
fake_socket = %StreamerSocket{
transport_pid: task.pid,
assigns: %{
user: user_a
}
user: user_a
}

{:ok, activity} =
@@ -367,11 +351,12 @@ defmodule Pleroma.Web.StreamerTest do
"visibility" => "private"
})

topics = %{
"list:#{list.id}" => [fake_socket]
}
Streamer.add_socket(
"list:#{list.id}",
fake_socket
)

Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
Worker.handle_call({:stream, "list", activity}, self(), %{})

Task.await(task)
end
@@ -387,11 +372,9 @@ defmodule Pleroma.Web.StreamerTest do
refute_receive {:text, _}, 1_000
end)

fake_socket = %{
fake_socket = %StreamerSocket{
transport_pid: task.pid,
assigns: %{
user: user1
}
user: user1
}

{:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
@@ -401,7 +384,7 @@ defmodule Pleroma.Web.StreamerTest do
"public" => [fake_socket]
}

Streamer.push_to_socket(topics, "public", announce_activity)
Worker.push_to_socket(topics, "public", announce_activity)

Task.await(task)
end
@@ -417,6 +400,8 @@ defmodule Pleroma.Web.StreamerTest do

task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)

Process.sleep(4000)

Streamer.add_socket(
"user",
%{transport_pid: task.pid, assigns: %{user: user2}}
@@ -428,14 +413,6 @@ defmodule Pleroma.Web.StreamerTest do

describe "direct streams" do
setup do
GenServer.start(Streamer, %{}, name: Streamer)

on_exit(fn ->
if pid = Process.whereis(Streamer) do
Process.exit(pid, :kill)
end
end)

:ok
end

@@ -480,6 +457,8 @@ defmodule Pleroma.Web.StreamerTest do
refute_receive {:text, _}, 4_000
end)

Process.sleep(1000)

Streamer.add_socket(
"direct",
%{transport_pid: task.pid, assigns: %{user: user}}
@@ -521,6 +500,8 @@ defmodule Pleroma.Web.StreamerTest do
assert last_status["id"] == to_string(create_activity.id)
end)

Process.sleep(1000)

Streamer.add_socket(
"direct",
%{transport_pid: task.pid, assigns: %{user: user}}

Loading…
Cancel
Save