Fork of Pleroma with site-specific changes and feature branches https://git.pleroma.social/pleroma/pleroma
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

516 lines
13KB

  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.StreamerTest do
  5. use Pleroma.DataCase
  6. import Pleroma.Factory
  7. alias Pleroma.List
  8. alias Pleroma.User
  9. alias Pleroma.Web.CommonAPI
  10. alias Pleroma.Web.Streamer
  11. alias Pleroma.Web.Streamer.StreamerSocket
  12. alias Pleroma.Web.Streamer.Worker
  13. @moduletag needs_streamer: true
  14. clear_config_all([:instance, :skip_thread_containment])
  15. describe "user streams" do
  16. setup do
  17. user = insert(:user)
  18. notify = insert(:notification, user: user, activity: build(:note_activity))
  19. {:ok, %{user: user, notify: notify}}
  20. end
  21. test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
  22. task =
  23. Task.async(fn ->
  24. assert_receive {:text, _}, 4_000
  25. end)
  26. Streamer.add_socket(
  27. "user",
  28. %{transport_pid: task.pid, assigns: %{user: user}}
  29. )
  30. Streamer.stream("user", notify)
  31. Task.await(task)
  32. end
  33. test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
  34. task =
  35. Task.async(fn ->
  36. assert_receive {:text, _}, 4_000
  37. end)
  38. Streamer.add_socket(
  39. "user:notification",
  40. %{transport_pid: task.pid, assigns: %{user: user}}
  41. )
  42. Streamer.stream("user:notification", notify)
  43. Task.await(task)
  44. end
  45. test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
  46. user: user
  47. } do
  48. blocked = insert(:user)
  49. {:ok, user} = User.block(user, blocked)
  50. task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
  51. Streamer.add_socket(
  52. "user:notification",
  53. %{transport_pid: task.pid, assigns: %{user: user}}
  54. )
  55. {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
  56. {:ok, notif, _} = CommonAPI.favorite(activity.id, blocked)
  57. Streamer.stream("user:notification", notif)
  58. Task.await(task)
  59. end
  60. test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
  61. user: user
  62. } do
  63. user2 = insert(:user)
  64. task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
  65. Streamer.add_socket(
  66. "user:notification",
  67. %{transport_pid: task.pid, assigns: %{user: user}}
  68. )
  69. {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
  70. {:ok, activity} = CommonAPI.add_mute(user, activity)
  71. {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
  72. Streamer.stream("user:notification", notif)
  73. Task.await(task)
  74. end
  75. test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
  76. user: user
  77. } do
  78. user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
  79. task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
  80. Streamer.add_socket(
  81. "user:notification",
  82. %{transport_pid: task.pid, assigns: %{user: user}}
  83. )
  84. {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
  85. {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
  86. {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
  87. Streamer.stream("user:notification", notif)
  88. Task.await(task)
  89. end
  90. end
  91. test "it sends to public" do
  92. user = insert(:user)
  93. other_user = insert(:user)
  94. task =
  95. Task.async(fn ->
  96. assert_receive {:text, _}, 4_000
  97. end)
  98. fake_socket = %StreamerSocket{
  99. transport_pid: task.pid,
  100. user: user
  101. }
  102. {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
  103. topics = %{
  104. "public" => [fake_socket]
  105. }
  106. Worker.push_to_socket(topics, "public", activity)
  107. Task.await(task)
  108. task =
  109. Task.async(fn ->
  110. expected_event =
  111. %{
  112. "event" => "delete",
  113. "payload" => activity.id
  114. }
  115. |> Jason.encode!()
  116. assert_receive {:text, received_event}, 4_000
  117. assert received_event == expected_event
  118. end)
  119. fake_socket = %StreamerSocket{
  120. transport_pid: task.pid,
  121. user: user
  122. }
  123. {:ok, activity} = CommonAPI.delete(activity.id, other_user)
  124. topics = %{
  125. "public" => [fake_socket]
  126. }
  127. Worker.push_to_socket(topics, "public", activity)
  128. Task.await(task)
  129. end
  130. describe "thread_containment" do
  131. test "it doesn't send to user if recipients invalid and thread containment is enabled" do
  132. Pleroma.Config.put([:instance, :skip_thread_containment], false)
  133. author = insert(:user)
  134. user = insert(:user, following: [author.ap_id])
  135. activity =
  136. insert(:note_activity,
  137. note:
  138. insert(:note,
  139. user: author,
  140. data: %{"to" => ["TEST-FFF"]}
  141. )
  142. )
  143. task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
  144. fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
  145. topics = %{"public" => [fake_socket]}
  146. Worker.push_to_socket(topics, "public", activity)
  147. Task.await(task)
  148. end
  149. test "it sends message if recipients invalid and thread containment is disabled" do
  150. Pleroma.Config.put([:instance, :skip_thread_containment], true)
  151. author = insert(:user)
  152. user = insert(:user, following: [author.ap_id])
  153. activity =
  154. insert(:note_activity,
  155. note:
  156. insert(:note,
  157. user: author,
  158. data: %{"to" => ["TEST-FFF"]}
  159. )
  160. )
  161. task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
  162. fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
  163. topics = %{"public" => [fake_socket]}
  164. Worker.push_to_socket(topics, "public", activity)
  165. Task.await(task)
  166. end
  167. test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
  168. Pleroma.Config.put([:instance, :skip_thread_containment], false)
  169. author = insert(:user)
  170. user = insert(:user, following: [author.ap_id], info: %{skip_thread_containment: true})
  171. activity =
  172. insert(:note_activity,
  173. note:
  174. insert(:note,
  175. user: author,
  176. data: %{"to" => ["TEST-FFF"]}
  177. )
  178. )
  179. task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
  180. fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
  181. topics = %{"public" => [fake_socket]}
  182. Worker.push_to_socket(topics, "public", activity)
  183. Task.await(task)
  184. end
  185. end
  186. test "it doesn't send to blocked users" do
  187. user = insert(:user)
  188. blocked_user = insert(:user)
  189. {:ok, user} = User.block(user, blocked_user)
  190. task =
  191. Task.async(fn ->
  192. refute_receive {:text, _}, 1_000
  193. end)
  194. fake_socket = %StreamerSocket{
  195. transport_pid: task.pid,
  196. user: user
  197. }
  198. {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
  199. topics = %{
  200. "public" => [fake_socket]
  201. }
  202. Worker.push_to_socket(topics, "public", activity)
  203. Task.await(task)
  204. end
  205. test "it doesn't send unwanted DMs to list" do
  206. user_a = insert(:user)
  207. user_b = insert(:user)
  208. user_c = insert(:user)
  209. {:ok, user_a} = User.follow(user_a, user_b)
  210. {:ok, list} = List.create("Test", user_a)
  211. {:ok, list} = List.follow(list, user_b)
  212. task =
  213. Task.async(fn ->
  214. refute_receive {:text, _}, 1_000
  215. end)
  216. fake_socket = %StreamerSocket{
  217. transport_pid: task.pid,
  218. user: user_a
  219. }
  220. {:ok, activity} =
  221. CommonAPI.post(user_b, %{
  222. "status" => "@#{user_c.nickname} Test",
  223. "visibility" => "direct"
  224. })
  225. topics = %{
  226. "list:#{list.id}" => [fake_socket]
  227. }
  228. Worker.handle_call({:stream, "list", activity}, self(), topics)
  229. Task.await(task)
  230. end
  231. test "it doesn't send unwanted private posts to list" do
  232. user_a = insert(:user)
  233. user_b = insert(:user)
  234. {:ok, list} = List.create("Test", user_a)
  235. {:ok, list} = List.follow(list, user_b)
  236. task =
  237. Task.async(fn ->
  238. refute_receive {:text, _}, 1_000
  239. end)
  240. fake_socket = %StreamerSocket{
  241. transport_pid: task.pid,
  242. user: user_a
  243. }
  244. {:ok, activity} =
  245. CommonAPI.post(user_b, %{
  246. "status" => "Test",
  247. "visibility" => "private"
  248. })
  249. topics = %{
  250. "list:#{list.id}" => [fake_socket]
  251. }
  252. Worker.handle_call({:stream, "list", activity}, self(), topics)
  253. Task.await(task)
  254. end
  255. test "it sends wanted private posts to list" do
  256. user_a = insert(:user)
  257. user_b = insert(:user)
  258. {:ok, user_a} = User.follow(user_a, user_b)
  259. {:ok, list} = List.create("Test", user_a)
  260. {:ok, list} = List.follow(list, user_b)
  261. task =
  262. Task.async(fn ->
  263. assert_receive {:text, _}, 1_000
  264. end)
  265. fake_socket = %StreamerSocket{
  266. transport_pid: task.pid,
  267. user: user_a
  268. }
  269. {:ok, activity} =
  270. CommonAPI.post(user_b, %{
  271. "status" => "Test",
  272. "visibility" => "private"
  273. })
  274. Streamer.add_socket(
  275. "list:#{list.id}",
  276. fake_socket
  277. )
  278. Worker.handle_call({:stream, "list", activity}, self(), %{})
  279. Task.await(task)
  280. end
  281. test "it doesn't send muted reblogs" do
  282. user1 = insert(:user)
  283. user2 = insert(:user)
  284. user3 = insert(:user)
  285. CommonAPI.hide_reblogs(user1, user2)
  286. task =
  287. Task.async(fn ->
  288. refute_receive {:text, _}, 1_000
  289. end)
  290. fake_socket = %StreamerSocket{
  291. transport_pid: task.pid,
  292. user: user1
  293. }
  294. {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
  295. {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
  296. topics = %{
  297. "public" => [fake_socket]
  298. }
  299. Worker.push_to_socket(topics, "public", announce_activity)
  300. Task.await(task)
  301. end
  302. test "it doesn't send posts from muted threads" do
  303. user = insert(:user)
  304. user2 = insert(:user)
  305. {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
  306. {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
  307. {:ok, activity} = CommonAPI.add_mute(user2, activity)
  308. task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
  309. Process.sleep(4000)
  310. Streamer.add_socket(
  311. "user",
  312. %{transport_pid: task.pid, assigns: %{user: user2}}
  313. )
  314. Streamer.stream("user", activity)
  315. Task.await(task)
  316. end
  317. describe "direct streams" do
  318. setup do
  319. :ok
  320. end
  321. test "it sends conversation update to the 'direct' stream", %{} do
  322. user = insert(:user)
  323. another_user = insert(:user)
  324. task =
  325. Task.async(fn ->
  326. assert_receive {:text, _received_event}, 4_000
  327. end)
  328. Streamer.add_socket(
  329. "direct",
  330. %{transport_pid: task.pid, assigns: %{user: user}}
  331. )
  332. {:ok, _create_activity} =
  333. CommonAPI.post(another_user, %{
  334. "status" => "hey @#{user.nickname}",
  335. "visibility" => "direct"
  336. })
  337. Task.await(task)
  338. end
  339. test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
  340. user = insert(:user)
  341. another_user = insert(:user)
  342. {:ok, create_activity} =
  343. CommonAPI.post(another_user, %{
  344. "status" => "hi @#{user.nickname}",
  345. "visibility" => "direct"
  346. })
  347. task =
  348. Task.async(fn ->
  349. assert_receive {:text, received_event}, 4_000
  350. assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
  351. refute_receive {:text, _}, 4_000
  352. end)
  353. Process.sleep(1000)
  354. Streamer.add_socket(
  355. "direct",
  356. %{transport_pid: task.pid, assigns: %{user: user}}
  357. )
  358. {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
  359. Task.await(task)
  360. end
  361. test "it sends conversation update to the 'direct' stream when a message is deleted" do
  362. user = insert(:user)
  363. another_user = insert(:user)
  364. {:ok, create_activity} =
  365. CommonAPI.post(another_user, %{
  366. "status" => "hi @#{user.nickname}",
  367. "visibility" => "direct"
  368. })
  369. {:ok, create_activity2} =
  370. CommonAPI.post(another_user, %{
  371. "status" => "hi @#{user.nickname}",
  372. "in_reply_to_status_id" => create_activity.id,
  373. "visibility" => "direct"
  374. })
  375. task =
  376. Task.async(fn ->
  377. assert_receive {:text, received_event}, 4_000
  378. assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
  379. assert_receive {:text, received_event}, 4_000
  380. assert %{"event" => "conversation", "payload" => received_payload} =
  381. Jason.decode!(received_event)
  382. assert %{"last_status" => last_status} = Jason.decode!(received_payload)
  383. assert last_status["id"] == to_string(create_activity.id)
  384. end)
  385. Process.sleep(1000)
  386. Streamer.add_socket(
  387. "direct",
  388. %{transport_pid: task.pid, assigns: %{user: user}}
  389. )
  390. {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
  391. Task.await(task)
  392. end
  393. end
  394. end