Fork of Pleroma with site-specific changes and feature branches https://git.pleroma.social/pleroma/pleroma
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

525 lines
16KB

  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.ActivityPub.SideEffects do
  5. @moduledoc """
  6. This module looks at an inserted object and executes the side effects that it
  7. implies. For example, a `Like` activity will increase the like count on the
  8. liked object, a `Follow` activity will add the user to the follower
  9. collection, and so on.
  10. """
  11. alias Pleroma.Activity
  12. alias Pleroma.Activity.Ir.Topics
  13. alias Pleroma.Chat
  14. alias Pleroma.Chat.MessageReference
  15. alias Pleroma.FollowingRelationship
  16. alias Pleroma.Notification
  17. alias Pleroma.Object
  18. alias Pleroma.Repo
  19. alias Pleroma.User
  20. alias Pleroma.Web.ActivityPub.ActivityPub
  21. alias Pleroma.Web.ActivityPub.Builder
  22. alias Pleroma.Web.ActivityPub.Pipeline
  23. alias Pleroma.Web.ActivityPub.Utils
  24. alias Pleroma.Web.Push
  25. alias Pleroma.Web.Streamer
  26. require Logger
  27. @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
  28. @ap_streamer Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
  29. @logger Pleroma.Config.get([:side_effects, :logger], Logger)
  30. @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
  31. @impl true
  32. def handle(object, meta \\ [])
  33. # Task this handles
  34. # - Follows
  35. # - Sends a notification
  36. @impl true
  37. def handle(
  38. %{
  39. data: %{
  40. "actor" => actor,
  41. "type" => "Accept",
  42. "object" => follow_activity_id
  43. }
  44. } = object,
  45. meta
  46. ) do
  47. with %Activity{actor: follower_id} = follow_activity <-
  48. Activity.get_by_ap_id(follow_activity_id),
  49. %User{} = followed <- User.get_cached_by_ap_id(actor),
  50. %User{} = follower <- User.get_cached_by_ap_id(follower_id),
  51. {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
  52. {:ok, _follower, followed} <-
  53. FollowingRelationship.update(follower, followed, :follow_accept) do
  54. Notification.update_notification_type(followed, follow_activity)
  55. end
  56. {:ok, object, meta}
  57. end
  58. # Task this handles
  59. # - Rejects all existing follow activities for this person
  60. # - Updates the follow state
  61. # - Dismisses notification
  62. @impl true
  63. def handle(
  64. %{
  65. data: %{
  66. "actor" => actor,
  67. "type" => "Reject",
  68. "object" => follow_activity_id
  69. }
  70. } = object,
  71. meta
  72. ) do
  73. with %Activity{actor: follower_id} = follow_activity <-
  74. Activity.get_by_ap_id(follow_activity_id),
  75. %User{} = followed <- User.get_cached_by_ap_id(actor),
  76. %User{} = follower <- User.get_cached_by_ap_id(follower_id),
  77. {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
  78. FollowingRelationship.update(follower, followed, :follow_reject)
  79. Notification.dismiss(follow_activity)
  80. end
  81. {:ok, object, meta}
  82. end
  83. # Tasks this handle
  84. # - Follows if possible
  85. # - Sends a notification
  86. # - Generates accept or reject if appropriate
  87. @impl true
  88. def handle(
  89. %{
  90. data: %{
  91. "id" => follow_id,
  92. "type" => "Follow",
  93. "object" => followed_user,
  94. "actor" => following_user
  95. }
  96. } = object,
  97. meta
  98. ) do
  99. with %User{} = follower <- User.get_cached_by_ap_id(following_user),
  100. %User{} = followed <- User.get_cached_by_ap_id(followed_user),
  101. {_, {:ok, _, _}, _, _} <-
  102. {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
  103. if followed.local && !followed.is_locked do
  104. {:ok, accept_data, _} = Builder.accept(followed, object)
  105. {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
  106. end
  107. else
  108. {:following, {:error, _}, _follower, followed} ->
  109. {:ok, reject_data, _} = Builder.reject(followed, object)
  110. {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
  111. _ ->
  112. nil
  113. end
  114. {:ok, notifications} = Notification.create_notifications(object, do_send: false)
  115. meta =
  116. meta
  117. |> add_notifications(notifications)
  118. updated_object = Activity.get_by_ap_id(follow_id)
  119. {:ok, updated_object, meta}
  120. end
  121. # Tasks this handles:
  122. # - Unfollow and block
  123. @impl true
  124. def handle(
  125. %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
  126. object,
  127. meta
  128. ) do
  129. with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
  130. %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
  131. User.block(blocker, blocked)
  132. end
  133. {:ok, object, meta}
  134. end
  135. # Tasks this handles:
  136. # - Update the user
  137. #
  138. # For a local user, we also get a changeset with the full information, so we
  139. # can update non-federating, non-activitypub settings as well.
  140. @impl true
  141. def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
  142. if changeset = Keyword.get(meta, :user_update_changeset) do
  143. changeset
  144. |> User.update_and_set_cache()
  145. else
  146. {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
  147. User.get_by_ap_id(updated_object["id"])
  148. |> User.remote_user_changeset(new_user_data)
  149. |> User.update_and_set_cache()
  150. end
  151. {:ok, object, meta}
  152. end
  153. # Tasks this handles:
  154. # - Add like to object
  155. # - Set up notification
  156. @impl true
  157. def handle(%{data: %{"type" => "Like"}} = object, meta) do
  158. liked_object = Object.get_by_ap_id(object.data["object"])
  159. Utils.add_like_to_object(object, liked_object)
  160. Notification.create_notifications(object)
  161. {:ok, object, meta}
  162. end
  163. # Tasks this handles
  164. # - Actually create object
  165. # - Rollback if we couldn't create it
  166. # - Increase the user note count
  167. # - Increase the reply count
  168. # - Increase replies count
  169. # - Set up ActivityExpiration
  170. # - Set up notifications
  171. @impl true
  172. def handle(%{data: %{"type" => "Create"}} = activity, meta) do
  173. with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
  174. %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
  175. {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
  176. {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
  177. if in_reply_to = object.data["inReplyTo"] && object.data["type"] != "Answer" do
  178. Object.increase_replies_count(in_reply_to)
  179. end
  180. ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
  181. Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
  182. end)
  183. meta =
  184. meta
  185. |> add_notifications(notifications)
  186. {:ok, activity, meta}
  187. else
  188. e -> Repo.rollback(e)
  189. end
  190. end
  191. # Tasks this handles:
  192. # - Add announce to object
  193. # - Set up notification
  194. # - Stream out the announce
  195. @impl true
  196. def handle(%{data: %{"type" => "Announce"}} = object, meta) do
  197. announced_object = Object.get_by_ap_id(object.data["object"])
  198. user = User.get_cached_by_ap_id(object.data["actor"])
  199. Utils.add_announce_to_object(object, announced_object)
  200. if !User.is_internal_user?(user) do
  201. Notification.create_notifications(object)
  202. object
  203. |> Topics.get_activity_topics()
  204. |> Streamer.stream(object)
  205. end
  206. {:ok, object, meta}
  207. end
  208. @impl true
  209. def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
  210. with undone_object <- Activity.get_by_ap_id(undone_object),
  211. :ok <- handle_undoing(undone_object) do
  212. {:ok, object, meta}
  213. end
  214. end
  215. # Tasks this handles:
  216. # - Add reaction to object
  217. # - Set up notification
  218. @impl true
  219. def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
  220. reacted_object = Object.get_by_ap_id(object.data["object"])
  221. Utils.add_emoji_reaction_to_object(object, reacted_object)
  222. Notification.create_notifications(object)
  223. {:ok, object, meta}
  224. end
  225. # Tasks this handles:
  226. # - Delete and unpins the create activity
  227. # - Replace object with Tombstone
  228. # - Set up notification
  229. # - Reduce the user note count
  230. # - Reduce the reply count
  231. # - Stream out the activity
  232. @impl true
  233. def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
  234. deleted_object =
  235. Object.normalize(deleted_object, fetch: false) ||
  236. User.get_cached_by_ap_id(deleted_object)
  237. result =
  238. case deleted_object do
  239. %Object{} ->
  240. with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
  241. {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
  242. %User{} = user <- User.get_cached_by_ap_id(actor) do
  243. User.remove_pinned_object_id(user, deleted_object.data["id"])
  244. {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
  245. if in_reply_to = deleted_object.data["inReplyTo"] do
  246. Object.decrease_replies_count(in_reply_to)
  247. end
  248. MessageReference.delete_for_object(deleted_object)
  249. @ap_streamer.stream_out(object)
  250. @ap_streamer.stream_out_participations(deleted_object, user)
  251. :ok
  252. else
  253. {:actor, _} ->
  254. @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
  255. :no_object_actor
  256. end
  257. %User{} ->
  258. with {:ok, _} <- User.delete(deleted_object) do
  259. :ok
  260. end
  261. end
  262. if result == :ok do
  263. Notification.create_notifications(object)
  264. {:ok, object, meta}
  265. else
  266. {:error, result}
  267. end
  268. end
  269. # Tasks this handles:
  270. # - adds pin to user
  271. # - removes expiration job for pinned activity, if was set for expiration
  272. @impl true
  273. def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
  274. with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
  275. {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
  276. # if pinned activity was scheduled for deletion, we remove job
  277. if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
  278. Oban.cancel_job(expiration.id)
  279. end
  280. {:ok, object, meta}
  281. else
  282. nil ->
  283. {:error, :user_not_found}
  284. {:error, changeset} ->
  285. if changeset.errors[:pinned_objects] do
  286. {:error, :pinned_statuses_limit_reached}
  287. else
  288. changeset.errors
  289. end
  290. end
  291. end
  292. # Tasks this handles:
  293. # - removes pin from user
  294. # - removes corresponding Add activity
  295. # - if activity had expiration, recreates activity expiration job
  296. @impl true
  297. def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
  298. with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
  299. {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
  300. data["object"]
  301. |> Activity.add_by_params_query(user.ap_id, user.featured_address)
  302. |> Repo.delete_all()
  303. # if pinned activity was scheduled for deletion, we reschedule it for deletion
  304. if meta[:expires_at] do
  305. # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
  306. {:ok, expires_at} =
  307. Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
  308. Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
  309. activity_id: meta[:activity_id],
  310. expires_at: expires_at
  311. })
  312. end
  313. {:ok, object, meta}
  314. else
  315. nil -> {:error, :user_not_found}
  316. error -> error
  317. end
  318. end
  319. # Nothing to do
  320. @impl true
  321. def handle(object, meta) do
  322. {:ok, object, meta}
  323. end
  324. def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
  325. with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
  326. actor = User.get_cached_by_ap_id(object.data["actor"])
  327. recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
  328. streamables =
  329. [[actor, recipient], [recipient, actor]]
  330. |> Enum.uniq()
  331. |> Enum.map(fn [user, other_user] ->
  332. if user.local do
  333. {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
  334. {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
  335. @cachex.put(
  336. :chat_message_id_idempotency_key_cache,
  337. cm_ref.id,
  338. meta[:idempotency_key]
  339. )
  340. {
  341. ["user", "user:pleroma_chat"],
  342. {user, %{cm_ref | chat: chat, object: object}}
  343. }
  344. end
  345. end)
  346. |> Enum.filter(& &1)
  347. meta =
  348. meta
  349. |> add_streamables(streamables)
  350. {:ok, object, meta}
  351. end
  352. end
  353. def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
  354. with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
  355. Object.increase_vote_count(
  356. object.data["inReplyTo"],
  357. object.data["name"],
  358. object.data["actor"]
  359. )
  360. {:ok, object, meta}
  361. end
  362. end
  363. def handle_object_creation(%{"type" => objtype} = object, meta)
  364. when objtype in ~w[Audio Video Question Event Article] do
  365. with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
  366. {:ok, object, meta}
  367. end
  368. end
  369. # Nothing to do
  370. def handle_object_creation(object, meta) do
  371. {:ok, object, meta}
  372. end
  373. defp undo_like(nil, object), do: delete_object(object)
  374. defp undo_like(%Object{} = liked_object, object) do
  375. with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
  376. delete_object(object)
  377. end
  378. end
  379. def handle_undoing(%{data: %{"type" => "Like"}} = object) do
  380. object.data["object"]
  381. |> Object.get_by_ap_id()
  382. |> undo_like(object)
  383. end
  384. def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
  385. with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
  386. {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
  387. {:ok, _} <- Repo.delete(object) do
  388. :ok
  389. end
  390. end
  391. def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
  392. with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
  393. {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
  394. {:ok, _} <- Repo.delete(object) do
  395. :ok
  396. end
  397. end
  398. def handle_undoing(
  399. %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
  400. ) do
  401. with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
  402. %User{} = blocked <- User.get_cached_by_ap_id(blocked),
  403. {:ok, _} <- User.unblock(blocker, blocked),
  404. {:ok, _} <- Repo.delete(object) do
  405. :ok
  406. end
  407. end
  408. def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
  409. @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
  410. defp delete_object(object) do
  411. with {:ok, _} <- Repo.delete(object), do: :ok
  412. end
  413. defp send_notifications(meta) do
  414. Keyword.get(meta, :notifications, [])
  415. |> Enum.each(fn notification ->
  416. Streamer.stream(["user", "user:notification"], notification)
  417. Push.send(notification)
  418. end)
  419. meta
  420. end
  421. defp send_streamables(meta) do
  422. Keyword.get(meta, :streamables, [])
  423. |> Enum.each(fn {topics, items} ->
  424. Streamer.stream(topics, items)
  425. end)
  426. meta
  427. end
  428. defp add_streamables(meta, streamables) do
  429. existing = Keyword.get(meta, :streamables, [])
  430. meta
  431. |> Keyword.put(:streamables, streamables ++ existing)
  432. end
  433. defp add_notifications(meta, notifications) do
  434. existing = Keyword.get(meta, :notifications, [])
  435. meta
  436. |> Keyword.put(:notifications, notifications ++ existing)
  437. end
  438. @impl true
  439. def handle_after_transaction(meta) do
  440. meta
  441. |> send_notifications()
  442. |> send_streamables()
  443. end
  444. end