Fork of Pleroma with site-specific changes and feature branches https://git.pleroma.social/pleroma/pleroma
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1516 lines
44KB

  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.ActivityPub.ActivityPub do
  5. alias Pleroma.Activity
  6. alias Pleroma.Activity.Ir.Topics
  7. alias Pleroma.Config
  8. alias Pleroma.Constants
  9. alias Pleroma.Conversation
  10. alias Pleroma.Conversation.Participation
  11. alias Pleroma.Filter
  12. alias Pleroma.Maps
  13. alias Pleroma.Notification
  14. alias Pleroma.Object
  15. alias Pleroma.Object.Containment
  16. alias Pleroma.Object.Fetcher
  17. alias Pleroma.Pagination
  18. alias Pleroma.Repo
  19. alias Pleroma.Upload
  20. alias Pleroma.User
  21. alias Pleroma.Web.ActivityPub.MRF
  22. alias Pleroma.Web.ActivityPub.Transmogrifier
  23. alias Pleroma.Web.Streamer
  24. alias Pleroma.Web.WebFinger
  25. alias Pleroma.Workers.BackgroundWorker
  26. import Ecto.Query
  27. import Pleroma.Web.ActivityPub.Utils
  28. import Pleroma.Web.ActivityPub.Visibility
  29. require Logger
  30. require Pleroma.Constants
  31. @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
  32. @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
  33. defp get_recipients(%{"type" => "Create"} = data) do
  34. to = Map.get(data, "to", [])
  35. cc = Map.get(data, "cc", [])
  36. bcc = Map.get(data, "bcc", [])
  37. actor = Map.get(data, "actor", [])
  38. recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
  39. {recipients, to, cc}
  40. end
  41. defp get_recipients(data) do
  42. to = Map.get(data, "to", [])
  43. cc = Map.get(data, "cc", [])
  44. bcc = Map.get(data, "bcc", [])
  45. recipients = Enum.concat([to, cc, bcc])
  46. {recipients, to, cc}
  47. end
  48. defp check_actor_can_insert(%{"type" => "Delete"}), do: true
  49. defp check_actor_can_insert(%{"type" => "Undo"}), do: true
  50. defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
  51. case User.get_cached_by_ap_id(actor) do
  52. %User{is_active: true} -> true
  53. _ -> false
  54. end
  55. end
  56. defp check_actor_can_insert(_), do: true
  57. defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
  58. limit = Config.get([:instance, :remote_limit])
  59. String.length(content) <= limit
  60. end
  61. defp check_remote_limit(_), do: true
  62. def increase_note_count_if_public(actor, object) do
  63. if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
  64. end
  65. def decrease_note_count_if_public(actor, object) do
  66. if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
  67. end
  68. defp increase_replies_count_if_reply(%{
  69. "object" => %{"inReplyTo" => reply_ap_id} = object,
  70. "type" => "Create"
  71. }) do
  72. if is_public?(object) do
  73. Object.increase_replies_count(reply_ap_id)
  74. end
  75. end
  76. defp increase_replies_count_if_reply(_create_data), do: :noop
  77. @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
  78. @impl true
  79. def persist(%{"type" => type} = object, meta) when type in @object_types do
  80. with {:ok, object} <- Object.create(object) do
  81. {:ok, object, meta}
  82. end
  83. end
  84. @impl true
  85. def persist(object, meta) do
  86. with local <- Keyword.fetch!(meta, :local),
  87. {recipients, _, _} <- get_recipients(object),
  88. {:ok, activity} <-
  89. Repo.insert(%Activity{
  90. data: object,
  91. local: local,
  92. recipients: recipients,
  93. actor: object["actor"]
  94. }),
  95. # TODO: add tests for expired activities, when Note type will be supported in new pipeline
  96. {:ok, _} <- maybe_create_activity_expiration(activity) do
  97. {:ok, activity, meta}
  98. end
  99. end
  100. @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  101. def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
  102. with nil <- Activity.normalize(map),
  103. map <- lazy_put_activity_defaults(map, fake),
  104. {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
  105. {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
  106. {:ok, map} <- MRF.filter(map),
  107. {recipients, _, _} = get_recipients(map),
  108. {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
  109. {:containment, :ok} <- {:containment, Containment.contain_child(map)},
  110. {:ok, map, object} <- insert_full_object(map),
  111. {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
  112. # Splice in the child object if we have one.
  113. activity = Maps.put_if_present(activity, :object, object)
  114. ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
  115. Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
  116. end)
  117. {:ok, activity}
  118. else
  119. %Activity{} = activity ->
  120. {:ok, activity}
  121. {:actor_check, _} ->
  122. {:error, false}
  123. {:containment, _} = error ->
  124. error
  125. {:error, _} = error ->
  126. error
  127. {:fake, true, map, recipients} ->
  128. activity = %Activity{
  129. data: map,
  130. local: local,
  131. actor: map["actor"],
  132. recipients: recipients,
  133. id: "pleroma:fakeid"
  134. }
  135. Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
  136. {:ok, activity}
  137. {:remote_limit_pass, _} ->
  138. {:error, :remote_limit}
  139. {:reject, _} = e ->
  140. {:error, e}
  141. end
  142. end
  143. defp insert_activity_with_expiration(data, local, recipients) do
  144. struct = %Activity{
  145. data: data,
  146. local: local,
  147. actor: data["actor"],
  148. recipients: recipients
  149. }
  150. with {:ok, activity} <- Repo.insert(struct) do
  151. maybe_create_activity_expiration(activity)
  152. end
  153. end
  154. def notify_and_stream(activity) do
  155. Notification.create_notifications(activity)
  156. conversation = create_or_bump_conversation(activity, activity.actor)
  157. participations = get_participations(conversation)
  158. stream_out(activity)
  159. stream_out_participations(participations)
  160. end
  161. defp maybe_create_activity_expiration(
  162. %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
  163. ) do
  164. with {:ok, _job} <-
  165. Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
  166. activity_id: activity.id,
  167. expires_at: expires_at
  168. }) do
  169. {:ok, activity}
  170. end
  171. end
  172. defp maybe_create_activity_expiration(activity), do: {:ok, activity}
  173. defp create_or_bump_conversation(activity, actor) do
  174. with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
  175. %User{} = user <- User.get_cached_by_ap_id(actor) do
  176. Participation.mark_as_read(user, conversation)
  177. {:ok, conversation}
  178. end
  179. end
  180. defp get_participations({:ok, conversation}) do
  181. conversation
  182. |> Repo.preload(:participations, force: true)
  183. |> Map.get(:participations)
  184. end
  185. defp get_participations(_), do: []
  186. def stream_out_participations(participations) do
  187. participations =
  188. participations
  189. |> Repo.preload(:user)
  190. Streamer.stream("participation", participations)
  191. end
  192. @impl true
  193. def stream_out_participations(%Object{data: %{"context" => context}}, user) do
  194. with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
  195. conversation = Repo.preload(conversation, :participations)
  196. last_activity_id =
  197. fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
  198. user: user,
  199. blocking_user: user
  200. })
  201. if last_activity_id do
  202. stream_out_participations(conversation.participations)
  203. end
  204. end
  205. end
  206. @impl true
  207. def stream_out_participations(_, _), do: :noop
  208. @impl true
  209. def stream_out(%Activity{data: %{"type" => data_type}} = activity)
  210. when data_type in ["Create", "Announce", "Delete"] do
  211. activity
  212. |> Topics.get_activity_topics()
  213. |> Streamer.stream(activity)
  214. end
  215. @impl true
  216. def stream_out(_activity) do
  217. :noop
  218. end
  219. @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  220. def create(params, fake \\ false) do
  221. with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
  222. result
  223. end
  224. end
  225. defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
  226. additional = params[:additional] || %{}
  227. # only accept false as false value
  228. local = !(params[:local] == false)
  229. published = params[:published]
  230. quick_insert? = Config.get([:env]) == :benchmark
  231. create_data =
  232. make_create_data(
  233. %{to: to, actor: actor, published: published, context: context, object: object},
  234. additional
  235. )
  236. with {:ok, activity} <- insert(create_data, local, fake),
  237. {:fake, false, activity} <- {:fake, fake, activity},
  238. _ <- increase_replies_count_if_reply(create_data),
  239. {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
  240. {:ok, _actor} <- increase_note_count_if_public(actor, activity),
  241. _ <- notify_and_stream(activity),
  242. :ok <- maybe_federate(activity) do
  243. {:ok, activity}
  244. else
  245. {:quick_insert, true, activity} ->
  246. {:ok, activity}
  247. {:fake, true, activity} ->
  248. {:ok, activity}
  249. {:error, message} ->
  250. Repo.rollback(message)
  251. end
  252. end
  253. @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
  254. def listen(%{to: to, actor: actor, context: context, object: object} = params) do
  255. additional = params[:additional] || %{}
  256. # only accept false as false value
  257. local = !(params[:local] == false)
  258. published = params[:published]
  259. listen_data =
  260. make_listen_data(
  261. %{to: to, actor: actor, published: published, context: context, object: object},
  262. additional
  263. )
  264. with {:ok, activity} <- insert(listen_data, local),
  265. _ <- notify_and_stream(activity),
  266. :ok <- maybe_federate(activity) do
  267. {:ok, activity}
  268. end
  269. end
  270. @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
  271. {:ok, Activity.t()} | nil | {:error, any()}
  272. def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
  273. with {:ok, result} <-
  274. Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
  275. result
  276. end
  277. end
  278. defp do_unfollow(follower, followed, activity_id, local) do
  279. with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
  280. {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
  281. unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
  282. {:ok, activity} <- insert(unfollow_data, local),
  283. _ <- notify_and_stream(activity),
  284. :ok <- maybe_federate(activity) do
  285. {:ok, activity}
  286. else
  287. nil -> nil
  288. {:error, error} -> Repo.rollback(error)
  289. end
  290. end
  291. @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
  292. def flag(params) do
  293. with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
  294. result
  295. end
  296. end
  297. defp do_flag(
  298. %{
  299. actor: actor,
  300. context: _context,
  301. account: account,
  302. statuses: statuses,
  303. content: content
  304. } = params
  305. ) do
  306. # only accept false as false value
  307. local = !(params[:local] == false)
  308. forward = !(params[:forward] == false)
  309. additional = params[:additional] || %{}
  310. additional =
  311. if forward do
  312. Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
  313. else
  314. Map.merge(additional, %{"to" => [], "cc" => []})
  315. end
  316. with flag_data <- make_flag_data(params, additional),
  317. {:ok, activity} <- insert(flag_data, local),
  318. {:ok, stripped_activity} <- strip_report_status_data(activity),
  319. _ <- notify_and_stream(activity),
  320. :ok <-
  321. maybe_federate(stripped_activity) do
  322. User.all_superusers()
  323. |> Enum.filter(fn user -> user.ap_id != actor end)
  324. |> Enum.filter(fn user -> not is_nil(user.email) end)
  325. |> Enum.each(fn superuser ->
  326. superuser
  327. |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
  328. |> Pleroma.Emails.Mailer.deliver_async()
  329. end)
  330. {:ok, activity}
  331. else
  332. {:error, error} -> Repo.rollback(error)
  333. end
  334. end
  335. @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  336. def move(%User{} = origin, %User{} = target, local \\ true) do
  337. params = %{
  338. "type" => "Move",
  339. "actor" => origin.ap_id,
  340. "object" => origin.ap_id,
  341. "target" => target.ap_id
  342. }
  343. with true <- origin.ap_id in target.also_known_as,
  344. {:ok, activity} <- insert(params, local),
  345. _ <- notify_and_stream(activity) do
  346. maybe_federate(activity)
  347. BackgroundWorker.enqueue("move_following", %{
  348. "origin_id" => origin.id,
  349. "target_id" => target.id
  350. })
  351. {:ok, activity}
  352. else
  353. false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
  354. err -> err
  355. end
  356. end
  357. def fetch_activities_for_context_query(context, opts) do
  358. public = [Constants.as_public()]
  359. recipients =
  360. if opts[:user],
  361. do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
  362. else: public
  363. from(activity in Activity)
  364. |> maybe_preload_objects(opts)
  365. |> maybe_preload_bookmarks(opts)
  366. |> maybe_set_thread_muted_field(opts)
  367. |> restrict_blocked(opts)
  368. |> restrict_recipients(recipients, opts[:user])
  369. |> restrict_filtered(opts)
  370. |> where(
  371. [activity],
  372. fragment(
  373. "?->>'type' = ? and ?->>'context' = ?",
  374. activity.data,
  375. "Create",
  376. activity.data,
  377. ^context
  378. )
  379. )
  380. |> exclude_poll_votes(opts)
  381. |> exclude_id(opts)
  382. |> order_by([activity], desc: activity.id)
  383. end
  384. @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
  385. def fetch_activities_for_context(context, opts \\ %{}) do
  386. context
  387. |> fetch_activities_for_context_query(opts)
  388. |> Repo.all()
  389. end
  390. @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
  391. FlakeId.Ecto.CompatType.t() | nil
  392. def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
  393. context
  394. |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
  395. |> restrict_visibility(%{visibility: "direct"})
  396. |> limit(1)
  397. |> select([a], a.id)
  398. |> Repo.one()
  399. end
  400. @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
  401. def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
  402. opts = Map.delete(opts, :user)
  403. [Constants.as_public()]
  404. |> fetch_activities_query(opts)
  405. |> restrict_unlisted(opts)
  406. |> Pagination.fetch_paginated(opts, pagination)
  407. end
  408. @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
  409. def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
  410. opts
  411. |> Map.put(:restrict_unlisted, true)
  412. |> fetch_public_or_unlisted_activities(pagination)
  413. end
  414. @valid_visibilities ~w[direct unlisted public private]
  415. defp restrict_visibility(query, %{visibility: visibility})
  416. when is_list(visibility) do
  417. if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
  418. from(
  419. a in query,
  420. where:
  421. fragment(
  422. "activity_visibility(?, ?, ?) = ANY (?)",
  423. a.actor,
  424. a.recipients,
  425. a.data,
  426. ^visibility
  427. )
  428. )
  429. else
  430. Logger.error("Could not restrict visibility to #{visibility}")
  431. end
  432. end
  433. defp restrict_visibility(query, %{visibility: visibility})
  434. when visibility in @valid_visibilities do
  435. from(
  436. a in query,
  437. where:
  438. fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
  439. )
  440. end
  441. defp restrict_visibility(_query, %{visibility: visibility})
  442. when visibility not in @valid_visibilities do
  443. Logger.error("Could not restrict visibility to #{visibility}")
  444. end
  445. defp restrict_visibility(query, _visibility), do: query
  446. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  447. when is_list(visibility) do
  448. if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
  449. from(
  450. a in query,
  451. where:
  452. not fragment(
  453. "activity_visibility(?, ?, ?) = ANY (?)",
  454. a.actor,
  455. a.recipients,
  456. a.data,
  457. ^visibility
  458. )
  459. )
  460. else
  461. Logger.error("Could not exclude visibility to #{visibility}")
  462. query
  463. end
  464. end
  465. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  466. when visibility in @valid_visibilities do
  467. from(
  468. a in query,
  469. where:
  470. not fragment(
  471. "activity_visibility(?, ?, ?) = ?",
  472. a.actor,
  473. a.recipients,
  474. a.data,
  475. ^visibility
  476. )
  477. )
  478. end
  479. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  480. when visibility not in [nil | @valid_visibilities] do
  481. Logger.error("Could not exclude visibility to #{visibility}")
  482. query
  483. end
  484. defp exclude_visibility(query, _visibility), do: query
  485. defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
  486. do: query
  487. defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
  488. do: query
  489. defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
  490. from(
  491. a in query,
  492. where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
  493. )
  494. end
  495. defp restrict_thread_visibility(query, _, _), do: query
  496. def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
  497. params =
  498. params
  499. |> Map.put(:user, reading_user)
  500. |> Map.put(:actor_id, user.ap_id)
  501. %{
  502. godmode: params[:godmode],
  503. reading_user: reading_user
  504. }
  505. |> user_activities_recipients()
  506. |> fetch_activities(params)
  507. |> Enum.reverse()
  508. end
  509. def fetch_user_activities(user, reading_user, params \\ %{})
  510. def fetch_user_activities(user, reading_user, %{total: true} = params) do
  511. result = fetch_activities_for_user(user, reading_user, params)
  512. Keyword.put(result, :items, Enum.reverse(result[:items]))
  513. end
  514. def fetch_user_activities(user, reading_user, params) do
  515. user
  516. |> fetch_activities_for_user(reading_user, params)
  517. |> Enum.reverse()
  518. end
  519. defp fetch_activities_for_user(user, reading_user, params) do
  520. params =
  521. params
  522. |> Map.put(:type, ["Create", "Announce"])
  523. |> Map.put(:user, reading_user)
  524. |> Map.put(:actor_id, user.ap_id)
  525. |> Map.put(:pinned_activity_ids, user.pinned_activities)
  526. params =
  527. if User.blocks?(reading_user, user) do
  528. params
  529. else
  530. params
  531. |> Map.put(:blocking_user, reading_user)
  532. |> Map.put(:muting_user, reading_user)
  533. end
  534. pagination_type = Map.get(params, :pagination_type) || :keyset
  535. %{
  536. godmode: params[:godmode],
  537. reading_user: reading_user
  538. }
  539. |> user_activities_recipients()
  540. |> fetch_activities(params, pagination_type)
  541. end
  542. def fetch_statuses(reading_user, %{total: true} = params) do
  543. result = fetch_activities_for_reading_user(reading_user, params)
  544. Keyword.put(result, :items, Enum.reverse(result[:items]))
  545. end
  546. def fetch_statuses(reading_user, params) do
  547. reading_user
  548. |> fetch_activities_for_reading_user(params)
  549. |> Enum.reverse()
  550. end
  551. defp fetch_activities_for_reading_user(reading_user, params) do
  552. params = Map.put(params, :type, ["Create", "Announce"])
  553. %{
  554. godmode: params[:godmode],
  555. reading_user: reading_user
  556. }
  557. |> user_activities_recipients()
  558. |> fetch_activities(params, :offset)
  559. end
  560. defp user_activities_recipients(%{godmode: true}), do: []
  561. defp user_activities_recipients(%{reading_user: reading_user}) do
  562. if reading_user do
  563. [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
  564. else
  565. [Constants.as_public()]
  566. end
  567. end
  568. defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
  569. raise "Can't use the child object without preloading!"
  570. end
  571. defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
  572. from(
  573. [activity, object] in query,
  574. where:
  575. fragment(
  576. "?->>'type' != ? or ?->>'actor' != ?",
  577. activity.data,
  578. "Announce",
  579. object.data,
  580. ^actor
  581. )
  582. )
  583. end
  584. defp restrict_announce_object_actor(query, _), do: query
  585. defp restrict_since(query, %{since_id: ""}), do: query
  586. defp restrict_since(query, %{since_id: since_id}) do
  587. from(activity in query, where: activity.id > ^since_id)
  588. end
  589. defp restrict_since(query, _), do: query
  590. defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
  591. raise "Can't use the child object without preloading!"
  592. end
  593. defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do
  594. from(
  595. [_activity, object] in query,
  596. where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
  597. )
  598. end
  599. defp restrict_tag_reject(query, _), do: query
  600. defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
  601. raise "Can't use the child object without preloading!"
  602. end
  603. defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
  604. from(
  605. [_activity, object] in query,
  606. where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
  607. )
  608. end
  609. defp restrict_tag_all(query, _), do: query
  610. defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
  611. raise "Can't use the child object without preloading!"
  612. end
  613. defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
  614. from(
  615. [_activity, object] in query,
  616. where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
  617. )
  618. end
  619. defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
  620. from(
  621. [_activity, object] in query,
  622. where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
  623. )
  624. end
  625. defp restrict_tag(query, _), do: query
  626. defp restrict_recipients(query, [], _user), do: query
  627. defp restrict_recipients(query, recipients, nil) do
  628. from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
  629. end
  630. defp restrict_recipients(query, recipients, user) do
  631. from(
  632. activity in query,
  633. where: fragment("? && ?", ^recipients, activity.recipients),
  634. or_where: activity.actor == ^user.ap_id
  635. )
  636. end
  637. defp restrict_local(query, %{local_only: true}) do
  638. from(activity in query, where: activity.local == true)
  639. end
  640. defp restrict_local(query, _), do: query
  641. defp restrict_remote(query, %{remote: true}) do
  642. from(activity in query, where: activity.local == false)
  643. end
  644. defp restrict_remote(query, _), do: query
  645. defp restrict_actor(query, %{actor_id: actor_id}) do
  646. from(activity in query, where: activity.actor == ^actor_id)
  647. end
  648. defp restrict_actor(query, _), do: query
  649. defp restrict_type(query, %{type: type}) when is_binary(type) do
  650. from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
  651. end
  652. defp restrict_type(query, %{type: type}) do
  653. from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
  654. end
  655. defp restrict_type(query, _), do: query
  656. defp restrict_state(query, %{state: state}) do
  657. from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
  658. end
  659. defp restrict_state(query, _), do: query
  660. defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
  661. from(
  662. [_activity, object] in query,
  663. where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
  664. )
  665. end
  666. defp restrict_favorited_by(query, _), do: query
  667. defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
  668. raise "Can't use the child object without preloading!"
  669. end
  670. defp restrict_media(query, %{only_media: true}) do
  671. from(
  672. [activity, object] in query,
  673. where: fragment("(?)->>'type' = ?", activity.data, "Create"),
  674. where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
  675. )
  676. end
  677. defp restrict_media(query, _), do: query
  678. defp restrict_replies(query, %{exclude_replies: true}) do
  679. from(
  680. [_activity, object] in query,
  681. where: fragment("?->>'inReplyTo' is null", object.data)
  682. )
  683. end
  684. defp restrict_replies(query, %{
  685. reply_filtering_user: %User{} = user,
  686. reply_visibility: "self"
  687. }) do
  688. from(
  689. [activity, object] in query,
  690. where:
  691. fragment(
  692. "?->>'inReplyTo' is null OR ? = ANY(?)",
  693. object.data,
  694. ^user.ap_id,
  695. activity.recipients
  696. )
  697. )
  698. end
  699. defp restrict_replies(query, %{
  700. reply_filtering_user: %User{} = user,
  701. reply_visibility: "following"
  702. }) do
  703. from(
  704. [activity, object] in query,
  705. where:
  706. fragment(
  707. """
  708. ?->>'type' != 'Create' -- This isn't a Create
  709. OR ?->>'inReplyTo' is null -- this isn't a reply
  710. OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
  711. -- unless they are the author (because authors
  712. -- are also part of the recipients). This leads
  713. -- to a bug that self-replies by friends won't
  714. -- show up.
  715. OR ? = ? -- The actor is us
  716. """,
  717. activity.data,
  718. object.data,
  719. ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
  720. activity.recipients,
  721. activity.actor,
  722. activity.actor,
  723. ^user.ap_id
  724. )
  725. )
  726. end
  727. defp restrict_replies(query, _), do: query
  728. defp restrict_reblogs(query, %{exclude_reblogs: true}) do
  729. from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
  730. end
  731. defp restrict_reblogs(query, _), do: query
  732. defp restrict_muted(query, %{with_muted: true}), do: query
  733. defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
  734. mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
  735. query =
  736. from([activity] in query,
  737. where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
  738. where:
  739. fragment(
  740. "not (?->'to' \\?| ?) or ? = ?",
  741. activity.data,
  742. ^mutes,
  743. activity.actor,
  744. ^user.ap_id
  745. )
  746. )
  747. unless opts[:skip_preload] do
  748. from([thread_mute: tm] in query, where: is_nil(tm.user_id))
  749. else
  750. query
  751. end
  752. end
  753. defp restrict_muted(query, _), do: query
  754. defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
  755. blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
  756. domain_blocks = user.domain_blocks || []
  757. following_ap_ids = User.get_friends_ap_ids(user)
  758. query =
  759. if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
  760. from(
  761. [activity, object: o] in query,
  762. where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
  763. where:
  764. fragment(
  765. "((not (? && ?)) or ? = ?)",
  766. activity.recipients,
  767. ^blocked_ap_ids,
  768. activity.actor,
  769. ^user.ap_id
  770. ),
  771. where:
  772. fragment(
  773. "recipients_contain_blocked_domains(?, ?) = false",
  774. activity.recipients,
  775. ^domain_blocks
  776. ),
  777. where:
  778. fragment(
  779. "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
  780. activity.data,
  781. activity.data,
  782. ^blocked_ap_ids
  783. ),
  784. where:
  785. fragment(
  786. "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
  787. activity.actor,
  788. ^domain_blocks,
  789. activity.actor,
  790. ^following_ap_ids
  791. ),
  792. where:
  793. fragment(
  794. "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
  795. o.data,
  796. ^domain_blocks,
  797. o.data,
  798. ^following_ap_ids
  799. )
  800. )
  801. end
  802. defp restrict_blocked(query, _), do: query
  803. defp restrict_unlisted(query, %{restrict_unlisted: true}) do
  804. from(
  805. activity in query,
  806. where:
  807. fragment(
  808. "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
  809. activity.data,
  810. ^[Constants.as_public()]
  811. )
  812. )
  813. end
  814. defp restrict_unlisted(query, _), do: query
  815. defp restrict_pinned(query, %{pinned: true, pinned_activity_ids: ids}) do
  816. from(activity in query, where: activity.id in ^ids)
  817. end
  818. defp restrict_pinned(query, _), do: query
  819. defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
  820. muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
  821. from(
  822. activity in query,
  823. where:
  824. fragment(
  825. "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
  826. activity.data,
  827. activity.actor,
  828. ^muted_reblogs
  829. )
  830. )
  831. end
  832. defp restrict_muted_reblogs(query, _), do: query
  833. defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
  834. from(
  835. activity in query,
  836. where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
  837. )
  838. end
  839. defp restrict_instance(query, _), do: query
  840. defp restrict_filtered(query, %{user: %User{} = user}) do
  841. case Filter.compose_regex(user) do
  842. nil ->
  843. query
  844. regex ->
  845. from([activity, object] in query,
  846. where:
  847. fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
  848. activity.actor == ^user.ap_id
  849. )
  850. end
  851. end
  852. defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
  853. restrict_filtered(query, %{user: user})
  854. end
  855. defp restrict_filtered(query, _), do: query
  856. defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
  857. defp exclude_poll_votes(query, _) do
  858. if has_named_binding?(query, :object) do
  859. from([activity, object: o] in query,
  860. where: fragment("not(?->>'type' = ?)", o.data, "Answer")
  861. )
  862. else
  863. query
  864. end
  865. end
  866. defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
  867. defp exclude_chat_messages(query, _) do
  868. if has_named_binding?(query, :object) do
  869. from([activity, object: o] in query,
  870. where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
  871. )
  872. else
  873. query
  874. end
  875. end
  876. defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
  877. defp exclude_invisible_actors(query, _opts) do
  878. invisible_ap_ids =
  879. User.Query.build(%{invisible: true, select: [:ap_id]})
  880. |> Repo.all()
  881. |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
  882. from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
  883. end
  884. defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
  885. from(activity in query, where: activity.id != ^id)
  886. end
  887. defp exclude_id(query, _), do: query
  888. defp maybe_preload_objects(query, %{skip_preload: true}), do: query
  889. defp maybe_preload_objects(query, _) do
  890. query
  891. |> Activity.with_preloaded_object()
  892. end
  893. defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
  894. defp maybe_preload_bookmarks(query, opts) do
  895. query
  896. |> Activity.with_preloaded_bookmark(opts[:user])
  897. end
  898. defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
  899. query
  900. |> Activity.with_preloaded_report_notes()
  901. end
  902. defp maybe_preload_report_notes(query, _), do: query
  903. defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
  904. defp maybe_set_thread_muted_field(query, opts) do
  905. query
  906. |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
  907. end
  908. defp maybe_order(query, %{order: :desc}) do
  909. query
  910. |> order_by(desc: :id)
  911. end
  912. defp maybe_order(query, %{order: :asc}) do
  913. query
  914. |> order_by(asc: :id)
  915. end
  916. defp maybe_order(query, _), do: query
  917. defp fetch_activities_query_ap_ids_ops(opts) do
  918. source_user = opts[:muting_user]
  919. ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
  920. ap_id_relationships =
  921. if opts[:blocking_user] && opts[:blocking_user] == source_user do
  922. [:block | ap_id_relationships]
  923. else
  924. ap_id_relationships
  925. end
  926. preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
  927. restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
  928. restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
  929. restrict_muted_reblogs_opts =
  930. Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
  931. {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
  932. end
  933. def fetch_activities_query(recipients, opts \\ %{}) do
  934. {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
  935. fetch_activities_query_ap_ids_ops(opts)
  936. config = %{
  937. skip_thread_containment: Config.get([:instance, :skip_thread_containment])
  938. }
  939. Activity
  940. |> maybe_preload_objects(opts)
  941. |> maybe_preload_bookmarks(opts)
  942. |> maybe_preload_report_notes(opts)
  943. |> maybe_set_thread_muted_field(opts)
  944. |> maybe_order(opts)
  945. |> restrict_recipients(recipients, opts[:user])
  946. |> restrict_replies(opts)
  947. |> restrict_tag(opts)
  948. |> restrict_tag_reject(opts)
  949. |> restrict_tag_all(opts)
  950. |> restrict_since(opts)
  951. |> restrict_local(opts)
  952. |> restrict_remote(opts)
  953. |> restrict_actor(opts)
  954. |> restrict_type(opts)
  955. |> restrict_state(opts)
  956. |> restrict_favorited_by(opts)
  957. |> restrict_blocked(restrict_blocked_opts)
  958. |> restrict_muted(restrict_muted_opts)
  959. |> restrict_filtered(opts)
  960. |> restrict_media(opts)
  961. |> restrict_visibility(opts)
  962. |> restrict_thread_visibility(opts, config)
  963. |> restrict_reblogs(opts)
  964. |> restrict_pinned(opts)
  965. |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
  966. |> restrict_instance(opts)
  967. |> restrict_announce_object_actor(opts)
  968. |> restrict_filtered(opts)
  969. |> Activity.restrict_deactivated_users()
  970. |> exclude_poll_votes(opts)
  971. |> exclude_chat_messages(opts)
  972. |> exclude_invisible_actors(opts)
  973. |> exclude_visibility(opts)
  974. end
  975. def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
  976. list_memberships = Pleroma.List.memberships(opts[:user])
  977. fetch_activities_query(recipients ++ list_memberships, opts)
  978. |> Pagination.fetch_paginated(opts, pagination)
  979. |> Enum.reverse()
  980. |> maybe_update_cc(list_memberships, opts[:user])
  981. end
  982. @doc """
  983. Fetch favorites activities of user with order by sort adds to favorites
  984. """
  985. @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
  986. def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
  987. user.ap_id
  988. |> Activity.Queries.by_actor()
  989. |> Activity.Queries.by_type("Like")
  990. |> Activity.with_joined_object()
  991. |> Object.with_joined_activity()
  992. |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
  993. |> order_by([like, _, _], desc_nulls_last: like.id)
  994. |> Pagination.fetch_paginated(
  995. Map.merge(params, %{skip_order: true}),
  996. pagination
  997. )
  998. end
  999. defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
  1000. Enum.map(activities, fn
  1001. %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
  1002. if Enum.any?(bcc, &(&1 in list_memberships)) do
  1003. update_in(activity.data["cc"], &[user_ap_id | &1])
  1004. else
  1005. activity
  1006. end
  1007. activity ->
  1008. activity
  1009. end)
  1010. end
  1011. defp maybe_update_cc(activities, _, _), do: activities
  1012. defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
  1013. from(activity in query,
  1014. where:
  1015. fragment("? && ?", activity.recipients, ^recipients) or
  1016. (fragment("? && ?", activity.recipients, ^recipients_with_public) and
  1017. ^Constants.as_public() in activity.recipients)
  1018. )
  1019. end
  1020. def fetch_activities_bounded(
  1021. recipients,
  1022. recipients_with_public,
  1023. opts \\ %{},
  1024. pagination \\ :keyset
  1025. ) do
  1026. fetch_activities_query([], opts)
  1027. |> fetch_activities_bounded_query(recipients, recipients_with_public)
  1028. |> Pagination.fetch_paginated(opts, pagination)
  1029. |> Enum.reverse()
  1030. end
  1031. @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
  1032. def upload(file, opts \\ []) do
  1033. with {:ok, data} <- Upload.store(file, opts) do
  1034. obj_data = Maps.put_if_present(data, "actor", opts[:actor])
  1035. Repo.insert(%Object{data: obj_data})
  1036. end
  1037. end
  1038. @spec get_actor_url(any()) :: binary() | nil
  1039. defp get_actor_url(url) when is_binary(url), do: url
  1040. defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
  1041. defp get_actor_url(url) when is_list(url) do
  1042. url
  1043. |> List.first()
  1044. |> get_actor_url()
  1045. end
  1046. defp get_actor_url(_url), do: nil
  1047. defp object_to_user_data(data) do
  1048. avatar =
  1049. data["icon"]["url"] &&
  1050. %{
  1051. "type" => "Image",
  1052. "url" => [%{"href" => data["icon"]["url"]}]
  1053. }
  1054. banner =
  1055. data["image"]["url"] &&
  1056. %{
  1057. "type" => "Image",
  1058. "url" => [%{"href" => data["image"]["url"]}]
  1059. }
  1060. fields =
  1061. data
  1062. |> Map.get("attachment", [])
  1063. |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
  1064. |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
  1065. emojis =
  1066. data
  1067. |> Map.get("tag", [])
  1068. |> Enum.filter(fn
  1069. %{"type" => "Emoji"} -> true
  1070. _ -> false
  1071. end)
  1072. |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
  1073. {String.trim(name, ":"), url}
  1074. end)
  1075. is_locked = data["manuallyApprovesFollowers"] || false
  1076. capabilities = data["capabilities"] || %{}
  1077. accepts_chat_messages = capabilities["acceptsChatMessages"]
  1078. data = Transmogrifier.maybe_fix_user_object(data)
  1079. is_discoverable = data["discoverable"] || false
  1080. invisible = data["invisible"] || false
  1081. actor_type = data["type"] || "Person"
  1082. public_key =
  1083. if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
  1084. data["publicKey"]["publicKeyPem"]
  1085. else
  1086. nil
  1087. end
  1088. shared_inbox =
  1089. if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
  1090. data["endpoints"]["sharedInbox"]
  1091. else
  1092. nil
  1093. end
  1094. user_data = %{
  1095. ap_id: data["id"],
  1096. uri: get_actor_url(data["url"]),
  1097. ap_enabled: true,
  1098. banner: banner,
  1099. fields: fields,
  1100. emoji: emojis,
  1101. is_locked: is_locked,
  1102. is_discoverable: is_discoverable,
  1103. invisible: invisible,
  1104. avatar: avatar,
  1105. name: data["name"],
  1106. follower_address: data["followers"],
  1107. following_address: data["following"],
  1108. bio: data["summary"] || "",
  1109. actor_type: actor_type,
  1110. also_known_as: Map.get(data, "alsoKnownAs", []),
  1111. public_key: public_key,
  1112. inbox: data["inbox"],
  1113. shared_inbox: shared_inbox,
  1114. accepts_chat_messages: accepts_chat_messages
  1115. }
  1116. # nickname can be nil because of virtual actors
  1117. if data["preferredUsername"] do
  1118. Map.put(
  1119. user_data,
  1120. :nickname,
  1121. "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
  1122. )
  1123. else
  1124. Map.put(user_data, :nickname, nil)
  1125. end
  1126. end
  1127. def fetch_follow_information_for_user(user) do
  1128. with {:ok, following_data} <-
  1129. Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
  1130. {:ok, hide_follows} <- collection_private(following_data),
  1131. {:ok, followers_data} <-
  1132. Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
  1133. {:ok, hide_followers} <- collection_private(followers_data) do
  1134. {:ok,
  1135. %{
  1136. hide_follows: hide_follows,
  1137. follower_count: normalize_counter(followers_data["totalItems"]),
  1138. following_count: normalize_counter(following_data["totalItems"]),
  1139. hide_followers: hide_followers
  1140. }}
  1141. else
  1142. {:error, _} = e -> e
  1143. e -> {:error, e}
  1144. end
  1145. end
  1146. defp normalize_counter(counter) when is_integer(counter), do: counter
  1147. defp normalize_counter(_), do: 0
  1148. def maybe_update_follow_information(user_data) do
  1149. with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
  1150. {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
  1151. {_, true} <-
  1152. {:collections_available,
  1153. !!(user_data[:following_address] && user_data[:follower_address])},
  1154. {:ok, info} <-
  1155. fetch_follow_information_for_user(user_data) do
  1156. info = Map.merge(user_data[:info] || %{}, info)
  1157. user_data
  1158. |> Map.put(:info, info)
  1159. else
  1160. {:user_type_check, false} ->
  1161. user_data
  1162. {:collections_available, false} ->
  1163. user_data
  1164. {:enabled, false} ->
  1165. user_data
  1166. e ->
  1167. Logger.error(
  1168. "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
  1169. )
  1170. user_data
  1171. end
  1172. end
  1173. defp collection_private(%{"first" => %{"type" => type}})
  1174. when type in ["CollectionPage", "OrderedCollectionPage"],
  1175. do: {:ok, false}
  1176. defp collection_private(%{"first" => first}) do
  1177. with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
  1178. Fetcher.fetch_and_contain_remote_object_from_id(first) do
  1179. {:ok, false}
  1180. else
  1181. {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
  1182. {:error, _} = e -> e
  1183. e -> {:error, e}
  1184. end
  1185. end
  1186. defp collection_private(_data), do: {:ok, true}
  1187. def user_data_from_user_object(data) do
  1188. with {:ok, data} <- MRF.filter(data) do
  1189. {:ok, object_to_user_data(data)}
  1190. else
  1191. e -> {:error, e}
  1192. end
  1193. end
  1194. def fetch_and_prepare_user_from_ap_id(ap_id) do
  1195. with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
  1196. {:ok, data} <- user_data_from_user_object(data) do
  1197. {:ok, maybe_update_follow_information(data)}
  1198. else
  1199. # If this has been deleted, only log a debug and not an error
  1200. {:error, "Object has been deleted" = e} ->
  1201. Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
  1202. {:error, e}
  1203. {:error, {:reject, reason} = e} ->
  1204. Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
  1205. {:error, e}
  1206. {:error, e} ->
  1207. Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
  1208. {:error, e}
  1209. end
  1210. end
  1211. def maybe_handle_clashing_nickname(data) do
  1212. with nickname when is_binary(nickname) <- data[:nickname],
  1213. %User{} = old_user <- User.get_by_nickname(nickname),
  1214. {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
  1215. Logger.info(
  1216. "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
  1217. data[:ap_id]
  1218. }, renaming."
  1219. )
  1220. old_user
  1221. |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
  1222. |> User.update_and_set_cache()
  1223. else
  1224. {:ap_id_comparison, true} ->
  1225. Logger.info(
  1226. "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
  1227. )
  1228. _ ->
  1229. nil
  1230. end
  1231. end
  1232. def make_user_from_ap_id(ap_id) do
  1233. user = User.get_cached_by_ap_id(ap_id)
  1234. if user && !User.ap_enabled?(user) do
  1235. Transmogrifier.upgrade_user_from_ap_id(ap_id)
  1236. else
  1237. with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
  1238. if user do
  1239. user
  1240. |> User.remote_user_changeset(data)
  1241. |> User.update_and_set_cache()
  1242. else
  1243. maybe_handle_clashing_nickname(data)
  1244. data
  1245. |> User.remote_user_changeset()
  1246. |> Repo.insert()
  1247. |> User.set_cache()
  1248. end
  1249. end
  1250. end
  1251. end
  1252. def make_user_from_nickname(nickname) do
  1253. with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
  1254. make_user_from_ap_id(ap_id)
  1255. else
  1256. _e -> {:error, "No AP id in WebFinger"}
  1257. end
  1258. end
  1259. # filter out broken threads
  1260. defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
  1261. entire_thread_visible_for_user?(activity, user)
  1262. end
  1263. # do post-processing on a specific activity
  1264. def contain_activity(%Activity{} = activity, %User{} = user) do
  1265. contain_broken_threads(activity, user)
  1266. end
  1267. def fetch_direct_messages_query do
  1268. Activity
  1269. |> restrict_type(%{type: "Create"})
  1270. |> restrict_visibility(%{visibility: "direct"})
  1271. |> order_by([activity], asc: activity.id)
  1272. end
  1273. end