Fork of Pleroma with site-specific changes and feature branches https://git.pleroma.social/pleroma/pleroma
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

345 lines
10KB

  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.Streamer do
  5. require Logger
  6. alias Pleroma.Activity
  7. alias Pleroma.Chat.MessageReference
  8. alias Pleroma.Config
  9. alias Pleroma.Conversation.Participation
  10. alias Pleroma.Notification
  11. alias Pleroma.Object
  12. alias Pleroma.User
  13. alias Pleroma.Web.ActivityPub.ActivityPub
  14. alias Pleroma.Web.ActivityPub.Visibility
  15. alias Pleroma.Web.CommonAPI
  16. alias Pleroma.Web.OAuth.Token
  17. alias Pleroma.Web.Plugs.OAuthScopesPlug
  18. alias Pleroma.Web.StreamerView
  19. @mix_env Mix.env()
  20. @registry Pleroma.Web.StreamerRegistry
  21. def registry, do: @registry
  22. @public_streams ["public", "public:local", "public:media", "public:local:media"]
  23. @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
  24. @doc "Expands and authorizes a stream, and registers the process for streaming."
  25. @spec get_topic_and_add_socket(
  26. stream :: String.t(),
  27. User.t() | nil,
  28. Token.t() | nil,
  29. Map.t() | nil
  30. ) ::
  31. {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
  32. def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
  33. with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
  34. add_socket(topic, user)
  35. end
  36. end
  37. @doc "Expand and authorizes a stream"
  38. @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
  39. {:ok, topic :: String.t()} | {:error, :bad_topic}
  40. def get_topic(stream, user, oauth_token, params \\ %{})
  41. # Allow all public steams.
  42. def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
  43. {:ok, stream}
  44. end
  45. # Allow all hashtags streams.
  46. def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
  47. {:ok, "hashtag:" <> tag}
  48. end
  49. # Allow remote instance streams.
  50. def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
  51. {:ok, "public:remote:" <> instance}
  52. end
  53. def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
  54. {:ok, "public:remote:media:" <> instance}
  55. end
  56. # Expand user streams.
  57. def get_topic(
  58. stream,
  59. %User{id: user_id} = user,
  60. %Token{user_id: user_id} = oauth_token,
  61. _params
  62. )
  63. when stream in @user_streams do
  64. # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
  65. required_scopes =
  66. if stream == "user:notification" do
  67. ["read:notifications"]
  68. else
  69. ["read:statuses"]
  70. end
  71. if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
  72. {:error, :unauthorized}
  73. else
  74. {:ok, stream <> ":" <> to_string(user.id)}
  75. end
  76. end
  77. def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
  78. {:error, :unauthorized}
  79. end
  80. # List streams.
  81. def get_topic(
  82. "list",
  83. %User{id: user_id} = user,
  84. %Token{user_id: user_id} = oauth_token,
  85. %{"list" => id}
  86. ) do
  87. cond do
  88. OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
  89. {:error, :unauthorized}
  90. Pleroma.List.get(id, user) ->
  91. {:ok, "list:" <> to_string(id)}
  92. true ->
  93. {:error, :bad_topic}
  94. end
  95. end
  96. def get_topic("list", _user, _oauth_token, _params) do
  97. {:error, :unauthorized}
  98. end
  99. def get_topic(_stream, _user, _oauth_token, _params) do
  100. {:error, :bad_topic}
  101. end
  102. @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
  103. def add_socket(topic, user) do
  104. if should_env_send?() do
  105. auth? = if user, do: true
  106. Registry.register(@registry, topic, auth?)
  107. end
  108. {:ok, topic}
  109. end
  110. def remove_socket(topic) do
  111. if should_env_send?(), do: Registry.unregister(@registry, topic)
  112. end
  113. def stream(topics, items) do
  114. if should_env_send?() do
  115. for topic <- List.wrap(topics), item <- List.wrap(items) do
  116. spawn(fn -> do_stream(topic, item) end)
  117. end
  118. end
  119. end
  120. def filtered_by_user?(user, item, streamed_type \\ :activity)
  121. def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
  122. %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
  123. User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
  124. recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
  125. recipients = MapSet.new(item.recipients)
  126. domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
  127. with parent <- Object.normalize(item, fetch: false) || item,
  128. true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
  129. true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
  130. true <-
  131. !(streamed_type == :activity && item.data["type"] == "Announce" &&
  132. parent.data["actor"] == user.ap_id),
  133. true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
  134. true <- MapSet.disjoint?(recipients, recipient_blocks),
  135. %{host: item_host} <- URI.parse(item.actor),
  136. %{host: parent_host} <- URI.parse(parent.data["actor"]),
  137. false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
  138. false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
  139. true <- thread_containment(item, user),
  140. false <- CommonAPI.thread_muted?(user, parent) do
  141. false
  142. else
  143. _ -> true
  144. end
  145. end
  146. def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
  147. filtered_by_user?(user, activity, :notification)
  148. end
  149. defp do_stream("direct", item) do
  150. recipient_topics =
  151. User.get_recipients_from_activity(item)
  152. |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
  153. Enum.each(recipient_topics, fn user_topic ->
  154. Logger.debug("Trying to push direct message to #{user_topic}\n\n")
  155. push_to_socket(user_topic, item)
  156. end)
  157. end
  158. defp do_stream("follow_relationship", item) do
  159. text = StreamerView.render("follow_relationships_update.json", item)
  160. user_topic = "user:#{item.follower.id}"
  161. Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
  162. Registry.dispatch(@registry, user_topic, fn list ->
  163. Enum.each(list, fn {pid, _auth} ->
  164. send(pid, {:text, text})
  165. end)
  166. end)
  167. end
  168. defp do_stream("participation", participation) do
  169. user_topic = "direct:#{participation.user_id}"
  170. Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
  171. push_to_socket(user_topic, participation)
  172. end
  173. defp do_stream("list", item) do
  174. # filter the recipient list if the activity is not public, see #270.
  175. recipient_lists =
  176. case Visibility.is_public?(item) do
  177. true ->
  178. Pleroma.List.get_lists_from_activity(item)
  179. _ ->
  180. Pleroma.List.get_lists_from_activity(item)
  181. |> Enum.filter(fn list ->
  182. owner = User.get_cached_by_id(list.user_id)
  183. Visibility.visible_for_user?(item, owner)
  184. end)
  185. end
  186. recipient_topics =
  187. recipient_lists
  188. |> Enum.map(fn %{id: id} -> "list:#{id}" end)
  189. Enum.each(recipient_topics, fn list_topic ->
  190. Logger.debug("Trying to push message to #{list_topic}\n\n")
  191. push_to_socket(list_topic, item)
  192. end)
  193. end
  194. defp do_stream(topic, %Notification{} = item)
  195. when topic in ["user", "user:notification"] do
  196. Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
  197. Enum.each(list, fn {pid, _auth} ->
  198. send(pid, {:render_with_user, StreamerView, "notification.json", item})
  199. end)
  200. end)
  201. end
  202. defp do_stream(topic, {user, %MessageReference{} = cm_ref})
  203. when topic in ["user", "user:pleroma_chat"] do
  204. topic = "#{topic}:#{user.id}"
  205. text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
  206. Registry.dispatch(@registry, topic, fn list ->
  207. Enum.each(list, fn {pid, _auth} ->
  208. send(pid, {:text, text})
  209. end)
  210. end)
  211. end
  212. defp do_stream("user", item) do
  213. Logger.debug("Trying to push to users")
  214. recipient_topics =
  215. User.get_recipients_from_activity(item)
  216. |> Enum.map(fn %{id: id} -> "user:#{id}" end)
  217. Enum.each(recipient_topics, fn topic ->
  218. push_to_socket(topic, item)
  219. end)
  220. end
  221. defp do_stream(topic, item) do
  222. Logger.debug("Trying to push to #{topic}")
  223. Logger.debug("Pushing item to #{topic}")
  224. push_to_socket(topic, item)
  225. end
  226. defp push_to_socket(topic, %Participation{} = participation) do
  227. rendered = StreamerView.render("conversation.json", participation)
  228. Registry.dispatch(@registry, topic, fn list ->
  229. Enum.each(list, fn {pid, _} ->
  230. send(pid, {:text, rendered})
  231. end)
  232. end)
  233. end
  234. defp push_to_socket(topic, %Activity{
  235. data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
  236. }) do
  237. rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
  238. Registry.dispatch(@registry, topic, fn list ->
  239. Enum.each(list, fn {pid, _} ->
  240. send(pid, {:text, rendered})
  241. end)
  242. end)
  243. end
  244. defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
  245. defp push_to_socket(topic, item) do
  246. anon_render = StreamerView.render("update.json", item)
  247. Registry.dispatch(@registry, topic, fn list ->
  248. Enum.each(list, fn {pid, auth?} ->
  249. if auth? do
  250. send(pid, {:render_with_user, StreamerView, "update.json", item})
  251. else
  252. send(pid, {:text, anon_render})
  253. end
  254. end)
  255. end)
  256. end
  257. defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
  258. defp thread_containment(activity, user) do
  259. if Config.get([:instance, :skip_thread_containment]) do
  260. true
  261. else
  262. ActivityPub.contain_activity(activity, user)
  263. end
  264. end
  265. # In test environement, only return true if the registry is started.
  266. # In benchmark environment, returns false.
  267. # In any other environment, always returns true.
  268. cond do
  269. @mix_env == :test ->
  270. def should_env_send? do
  271. case Process.whereis(@registry) do
  272. nil ->
  273. false
  274. pid ->
  275. Process.alive?(pid)
  276. end
  277. end
  278. @mix_env == :benchmark ->
  279. def should_env_send?, do: false
  280. true ->
  281. def should_env_send?, do: true
  282. end
  283. end