From 7a3a88a13ef526fba18bb6aeadc93f5da934dc5b Mon Sep 17 00:00:00 2001 From: lain Date: Wed, 22 Apr 2020 17:21:13 +0200 Subject: [PATCH] Streamer: Stream boosts to the boosting user. --- lib/pleroma/user.ex | 4 +++- lib/pleroma/web/streamer/worker.ex | 18 ------------------ test/user_test.exs | 12 ++++++++++++ test/web/streamer/streamer_test.exs | 36 ++++++++++++++++++++++++++++++++++++ 4 files changed, 51 insertions(+), 19 deletions(-) diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index bef4679cb..477237756 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -1180,7 +1180,9 @@ defmodule Pleroma.User do end @spec get_recipients_from_activity(Activity.t()) :: [User.t()] - def get_recipients_from_activity(%Activity{recipients: to}) do + def get_recipients_from_activity(%Activity{recipients: to, actor: actor}) do + to = [actor | to] + User.Query.build(%{recipients_from_activity: to, local: true, deactivated: false}) |> Repo.all() end diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex index abfed21c8..f6160fa4d 100644 --- a/lib/pleroma/web/streamer/worker.ex +++ b/lib/pleroma/web/streamer/worker.ex @@ -158,24 +158,6 @@ defmodule Pleroma.Web.Streamer.Worker do should_send?(user, activity) end - def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do - Enum.each(topics[topic] || [], fn %StreamerSocket{ - transport_pid: transport_pid, - user: socket_user - } -> - # Get the current user so we have up-to-date blocks etc. - if socket_user do - user = User.get_cached_by_ap_id(socket_user.ap_id) - - if should_send?(user, item) do - send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) - end - else - send(transport_pid, {:text, StreamerView.render("update.json", item)}) - end - end) - end - def push_to_socket(topics, topic, %Participation{} = participation) do Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> send(transport_pid, {:text, StreamerView.render("conversation.json", participation)}) diff --git a/test/user_test.exs b/test/user_test.exs index 65e118d6d..cd4041673 100644 --- a/test/user_test.exs +++ b/test/user_test.exs @@ -987,6 +987,18 @@ defmodule Pleroma.UserTest do end describe "get_recipients_from_activity" do + test "works for announces" do + actor = insert(:user) + user = insert(:user, local: true) + + {:ok, activity} = CommonAPI.post(actor, %{"status" => "hello"}) + {:ok, announce, _} = CommonAPI.repeat(activity.id, user) + + recipients = User.get_recipients_from_activity(announce) + + assert user in recipients + end + test "get recipients" do actor = insert(:user) user = insert(:user, local: true) diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs index eb082b79f..8b8d8af6c 100644 --- a/test/web/streamer/streamer_test.exs +++ b/test/web/streamer/streamer_test.exs @@ -28,6 +28,42 @@ defmodule Pleroma.Web.StreamerTest do {:ok, %{user: user, notify: notify}} end + test "it streams the user's post in the 'user' stream", %{user: user} do + task = + Task.async(fn -> + assert_receive {:text, _}, @streamer_timeout + end) + + Streamer.add_socket( + "user", + %{transport_pid: task.pid, assigns: %{user: user}} + ) + + {:ok, activity} = CommonAPI.post(user, %{"status" => "hey"}) + + Streamer.stream("user", activity) + Task.await(task) + end + + test "it streams boosts of the user in the 'user' stream", %{user: user} do + task = + Task.async(fn -> + assert_receive {:text, _}, @streamer_timeout + end) + + Streamer.add_socket( + "user", + %{transport_pid: task.pid, assigns: %{user: user}} + ) + + other_user = insert(:user) + {:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"}) + {:ok, announce, _} = CommonAPI.repeat(activity.id, user) + + Streamer.stream("user", announce) + Task.await(task) + end + test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do task = Task.async(fn ->