@@ -42,23 +42,9 @@ defmodule Pleroma.Application do | |||||
hackney_pool_children() ++ | hackney_pool_children() ++ | ||||
[ | [ | ||||
Pleroma.Web.Federator.RetryQueue, | Pleroma.Web.Federator.RetryQueue, | ||||
Pleroma.Stats, | |||||
%{ | |||||
id: :web_push_init, | |||||
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]}, | |||||
restart: :temporary | |||||
}, | |||||
%{ | |||||
id: :federator_init, | |||||
start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]}, | |||||
restart: :temporary | |||||
}, | |||||
%{ | |||||
id: :internal_fetch_init, | |||||
start: {Task, :start_link, [&Pleroma.Web.ActivityPub.InternalFetchActor.init/0]}, | |||||
restart: :temporary | |||||
} | |||||
Pleroma.Stats | |||||
] ++ | ] ++ | ||||
task_children(@env) ++ | |||||
oauth_cleanup_child(oauth_cleanup_enabled?()) ++ | oauth_cleanup_child(oauth_cleanup_enabled?()) ++ | ||||
streamer_child(@env) ++ | streamer_child(@env) ++ | ||||
chat_child(@env, chat_enabled?()) ++ | chat_child(@env, chat_enabled?()) ++ | ||||
@@ -165,16 +151,38 @@ defmodule Pleroma.Application do | |||||
end | end | ||||
end | end | ||||
defp after_supervisor_start do | |||||
with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest], | |||||
true <- digest_config[:active] do | |||||
PleromaJobQueue.schedule( | |||||
digest_config[:schedule], | |||||
:digest_emails, | |||||
Pleroma.DigestEmailWorker | |||||
) | |||||
end | |||||
defp task_children(:test) do | |||||
[ | |||||
%{ | |||||
id: :web_push_init, | |||||
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]}, | |||||
restart: :temporary | |||||
}, | |||||
%{ | |||||
id: :federator_init, | |||||
start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]}, | |||||
restart: :temporary | |||||
} | |||||
] | |||||
end | |||||
:ok | |||||
defp task_children(_) do | |||||
[ | |||||
%{ | |||||
id: :web_push_init, | |||||
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]}, | |||||
restart: :temporary | |||||
}, | |||||
%{ | |||||
id: :federator_init, | |||||
start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]}, | |||||
restart: :temporary | |||||
}, | |||||
%{ | |||||
id: :internal_fetch_init, | |||||
start: {Task, :start_link, [&Pleroma.Web.ActivityPub.InternalFetchActor.init/0]}, | |||||
restart: :temporary | |||||
} | |||||
] | |||||
end | end | ||||
end | end |
@@ -4,16 +4,18 @@ defmodule Pleroma.Web.Streamer.State do | |||||
alias Pleroma.Web.Streamer.StreamerSocket | alias Pleroma.Web.Streamer.StreamerSocket | ||||
@env Mix.env() | |||||
def start_link(_) do | def start_link(_) do | ||||
GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) | GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) | ||||
end | end | ||||
def add_socket(topic, socket) do | def add_socket(topic, socket) do | ||||
GenServer.call(__MODULE__, {:add, socket, topic}) | |||||
GenServer.call(__MODULE__, {:add, topic, socket}) | |||||
end | end | ||||
def remove_socket(topic, socket) do | def remove_socket(topic, socket) do | ||||
GenServer.call(__MODULE__, {:remove, socket, topic}) | |||||
do_remove_socket(@env, topic, socket) | |||||
end | end | ||||
def get_sockets do | def get_sockets do | ||||
@@ -29,7 +31,7 @@ defmodule Pleroma.Web.Streamer.State do | |||||
{:reply, state, state} | {:reply, state, state} | ||||
end | end | ||||
def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do | |||||
def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do | |||||
internal_topic = internal_topic(topic, socket) | internal_topic = internal_topic(topic, socket) | ||||
stream_socket = StreamerSocket.from_socket(socket) | stream_socket = StreamerSocket.from_socket(socket) | ||||
@@ -44,7 +46,7 @@ defmodule Pleroma.Web.Streamer.State do | |||||
{:reply, state, state} | {:reply, state, state} | ||||
end | end | ||||
def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do | |||||
def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do | |||||
internal_topic = internal_topic(topic, socket) | internal_topic = internal_topic(topic, socket) | ||||
stream_socket = StreamerSocket.from_socket(socket) | stream_socket = StreamerSocket.from_socket(socket) | ||||
@@ -57,6 +59,14 @@ defmodule Pleroma.Web.Streamer.State do | |||||
{:reply, state, state} | {:reply, state, state} | ||||
end | end | ||||
defp do_remove_socket(:test, _, _) do | |||||
:ok | |||||
end | |||||
defp do_remove_socket(_env, topic, socket) do | |||||
GenServer.call(__MODULE__, {:remove, topic, socket}) | |||||
end | |||||
defp internal_topic(topic, socket) | defp internal_topic(topic, socket) | ||||
when topic in ~w[user user:notification direct] do | when topic in ~w[user user:notification direct] do | ||||
"#{topic}:#{socket.assigns[:user].id}" | "#{topic}:#{socket.assigns[:user].id}" | ||||
@@ -17,6 +17,11 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do | |||||
|> Map.put(:path, "/api/v1/streaming") | |> Map.put(:path, "/api/v1/streaming") | ||||
|> URI.to_string() | |> URI.to_string() | ||||
setup_all do | |||||
start_supervised(Pleroma.Web.Streamer.supervisor()) | |||||
:ok | |||||
end | |||||
def start_socket(qs \\ nil, headers \\ []) do | def start_socket(qs \\ nil, headers \\ []) do | ||||
path = | path = | ||||
case qs do | case qs do | ||||
@@ -28,23 +33,27 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do | |||||
end | end | ||||
test "refuses invalid requests" do | test "refuses invalid requests" do | ||||
assert {:error, {400, _}} = start_socket() | |||||
assert {:error, {404, _}} = start_socket("?stream=ncjdk") | |||||
capture_log(fn -> | |||||
assert {:error, {400, _}} = start_socket() | |||||
assert {:error, {404, _}} = start_socket("?stream=ncjdk") | |||||
Process.sleep(30) | |||||
end) | |||||
end | end | ||||
test "requires authentication and a valid token for protected streams" do | test "requires authentication and a valid token for protected streams" do | ||||
assert {:error, {403, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa") | |||||
assert {:error, {403, _}} = start_socket("?stream=user") | |||||
capture_log(fn -> | |||||
assert {:error, {403, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa") | |||||
assert {:error, {403, _}} = start_socket("?stream=user") | |||||
Process.sleep(30) | |||||
end) | |||||
end | end | ||||
@tag needs_streamer: true | |||||
test "allows public streams without authentication" do | test "allows public streams without authentication" do | ||||
assert {:ok, _} = start_socket("?stream=public") | assert {:ok, _} = start_socket("?stream=public") | ||||
assert {:ok, _} = start_socket("?stream=public:local") | assert {:ok, _} = start_socket("?stream=public:local") | ||||
assert {:ok, _} = start_socket("?stream=hashtag&tag=lain") | assert {:ok, _} = start_socket("?stream=hashtag&tag=lain") | ||||
end | end | ||||
@tag needs_streamer: true | |||||
test "receives well formatted events" do | test "receives well formatted events" do | ||||
user = insert(:user) | user = insert(:user) | ||||
{:ok, _} = start_socket("?stream=public") | {:ok, _} = start_socket("?stream=public") | ||||
@@ -89,24 +98,33 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do | |||||
assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}") | assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}") | ||||
end | end | ||||
@tag needs_streamer: true | |||||
test "accepts the 'user' stream", %{token: token} = _state do | test "accepts the 'user' stream", %{token: token} = _state do | ||||
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") | assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") | ||||
assert {:error, {403, "Forbidden"}} = start_socket("?stream=user") | |||||
assert capture_log(fn -> | |||||
assert {:error, {403, "Forbidden"}} = start_socket("?stream=user") | |||||
Process.sleep(30) | |||||
end) =~ ":badarg" | |||||
end | end | ||||
@tag needs_streamer: true | |||||
test "accepts the 'user:notification' stream", %{token: token} = _state do | test "accepts the 'user:notification' stream", %{token: token} = _state do | ||||
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") | assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") | ||||
assert {:error, {403, "Forbidden"}} = start_socket("?stream=user:notification") | |||||
assert capture_log(fn -> | |||||
assert {:error, {403, "Forbidden"}} = start_socket("?stream=user:notification") | |||||
Process.sleep(30) | |||||
end) =~ ":badarg" | |||||
end | end | ||||
@tag needs_streamer: true | |||||
test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do | test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do | ||||
assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}]) | assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}]) | ||||
assert {:error, {403, "Forbidden"}} = | |||||
start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}]) | |||||
assert capture_log(fn -> | |||||
assert {:error, {403, "Forbidden"}} = | |||||
start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}]) | |||||
Process.sleep(30) | |||||
end) =~ ":badarg" | |||||
end | end | ||||
end | end | ||||
end | end |