Fork of Pleroma with site-specific changes and feature branches https://git.pleroma.social/pleroma/pleroma
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

1694 řádky
50KB

  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.ActivityPub.ActivityPub do
  5. alias Pleroma.Activity
  6. alias Pleroma.Activity.Ir.Topics
  7. alias Pleroma.Config
  8. alias Pleroma.Constants
  9. alias Pleroma.Conversation
  10. alias Pleroma.Conversation.Participation
  11. alias Pleroma.Filter
  12. alias Pleroma.Hashtag
  13. alias Pleroma.Maps
  14. alias Pleroma.Notification
  15. alias Pleroma.Object
  16. alias Pleroma.Object.Containment
  17. alias Pleroma.Object.Fetcher
  18. alias Pleroma.Pagination
  19. alias Pleroma.Repo
  20. alias Pleroma.Upload
  21. alias Pleroma.User
  22. alias Pleroma.Web.ActivityPub.MRF
  23. alias Pleroma.Web.ActivityPub.Transmogrifier
  24. alias Pleroma.Web.Streamer
  25. alias Pleroma.Web.WebFinger
  26. alias Pleroma.Workers.BackgroundWorker
  27. import Ecto.Query
  28. import Pleroma.Web.ActivityPub.Utils
  29. import Pleroma.Web.ActivityPub.Visibility
  30. require Logger
  31. require Pleroma.Constants
  32. @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
  33. @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
  34. defp get_recipients(%{"type" => "Create"} = data) do
  35. to = Map.get(data, "to", [])
  36. cc = Map.get(data, "cc", [])
  37. bcc = Map.get(data, "bcc", [])
  38. actor = Map.get(data, "actor", [])
  39. recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
  40. {recipients, to, cc}
  41. end
  42. defp get_recipients(data) do
  43. to = Map.get(data, "to", [])
  44. cc = Map.get(data, "cc", [])
  45. bcc = Map.get(data, "bcc", [])
  46. recipients = Enum.concat([to, cc, bcc])
  47. {recipients, to, cc}
  48. end
  49. defp check_actor_is_active(nil), do: true
  50. defp check_actor_is_active(actor) when is_binary(actor) do
  51. case User.get_cached_by_ap_id(actor) do
  52. %User{is_active: true} -> true
  53. _ -> false
  54. end
  55. end
  56. defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
  57. limit = Config.get([:instance, :remote_limit])
  58. String.length(content) <= limit
  59. end
  60. defp check_remote_limit(_), do: true
  61. def increase_note_count_if_public(actor, object) do
  62. if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
  63. end
  64. def decrease_note_count_if_public(actor, object) do
  65. if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
  66. end
  67. defp increase_replies_count_if_reply(%{
  68. "object" => %{"inReplyTo" => reply_ap_id} = object,
  69. "type" => "Create"
  70. }) do
  71. if is_public?(object) do
  72. Object.increase_replies_count(reply_ap_id)
  73. end
  74. end
  75. defp increase_replies_count_if_reply(_create_data), do: :noop
  76. @object_types ~w[ChatMessage Question Answer Audio Video Event Article]
  77. @impl true
  78. def persist(%{"type" => type} = object, meta) when type in @object_types do
  79. with {:ok, object} <- Object.create(object) do
  80. {:ok, object, meta}
  81. end
  82. end
  83. @impl true
  84. def persist(object, meta) do
  85. with local <- Keyword.fetch!(meta, :local),
  86. {recipients, _, _} <- get_recipients(object),
  87. {:ok, activity} <-
  88. Repo.insert(%Activity{
  89. data: object,
  90. local: local,
  91. recipients: recipients,
  92. actor: object["actor"]
  93. }),
  94. # TODO: add tests for expired activities, when Note type will be supported in new pipeline
  95. {:ok, _} <- maybe_create_activity_expiration(activity) do
  96. {:ok, activity, meta}
  97. end
  98. end
  99. @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  100. def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
  101. with nil <- Activity.normalize(map),
  102. map <- lazy_put_activity_defaults(map, fake),
  103. {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
  104. {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
  105. {:ok, map} <- MRF.filter(map),
  106. {recipients, _, _} = get_recipients(map),
  107. {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
  108. {:containment, :ok} <- {:containment, Containment.contain_child(map)},
  109. {:ok, map, object} <- insert_full_object(map),
  110. {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
  111. # Splice in the child object if we have one.
  112. activity = Maps.put_if_present(activity, :object, object)
  113. ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
  114. Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
  115. end)
  116. {:ok, activity}
  117. else
  118. %Activity{} = activity ->
  119. {:ok, activity}
  120. {:actor_check, _} ->
  121. {:error, false}
  122. {:containment, _} = error ->
  123. error
  124. {:error, _} = error ->
  125. error
  126. {:fake, true, map, recipients} ->
  127. activity = %Activity{
  128. data: map,
  129. local: local,
  130. actor: map["actor"],
  131. recipients: recipients,
  132. id: "pleroma:fakeid"
  133. }
  134. Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
  135. {:ok, activity}
  136. {:remote_limit_pass, _} ->
  137. {:error, :remote_limit}
  138. {:reject, _} = e ->
  139. {:error, e}
  140. end
  141. end
  142. defp insert_activity_with_expiration(data, local, recipients) do
  143. struct = %Activity{
  144. data: data,
  145. local: local,
  146. actor: data["actor"],
  147. recipients: recipients
  148. }
  149. with {:ok, activity} <- Repo.insert(struct) do
  150. maybe_create_activity_expiration(activity)
  151. end
  152. end
  153. def notify_and_stream(activity) do
  154. Notification.create_notifications(activity)
  155. conversation = create_or_bump_conversation(activity, activity.actor)
  156. participations = get_participations(conversation)
  157. stream_out(activity)
  158. stream_out_participations(participations)
  159. end
  160. defp maybe_create_activity_expiration(
  161. %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
  162. ) do
  163. with {:ok, _job} <-
  164. Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
  165. activity_id: activity.id,
  166. expires_at: expires_at
  167. }) do
  168. {:ok, activity}
  169. end
  170. end
  171. defp maybe_create_activity_expiration(activity), do: {:ok, activity}
  172. defp create_or_bump_conversation(activity, actor) do
  173. with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
  174. %User{} = user <- User.get_cached_by_ap_id(actor) do
  175. Participation.mark_as_read(user, conversation)
  176. {:ok, conversation}
  177. end
  178. end
  179. defp get_participations({:ok, conversation}) do
  180. conversation
  181. |> Repo.preload(:participations, force: true)
  182. |> Map.get(:participations)
  183. end
  184. defp get_participations(_), do: []
  185. def stream_out_participations(participations) do
  186. participations =
  187. participations
  188. |> Repo.preload(:user)
  189. Streamer.stream("participation", participations)
  190. end
  191. @impl true
  192. def stream_out_participations(%Object{data: %{"context" => context}}, user) do
  193. with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
  194. conversation = Repo.preload(conversation, :participations)
  195. last_activity_id =
  196. fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
  197. user: user,
  198. blocking_user: user
  199. })
  200. if last_activity_id do
  201. stream_out_participations(conversation.participations)
  202. end
  203. end
  204. end
  205. @impl true
  206. def stream_out_participations(_, _), do: :noop
  207. @impl true
  208. def stream_out(%Activity{data: %{"type" => data_type}} = activity)
  209. when data_type in ["Create", "Announce", "Delete"] do
  210. activity
  211. |> Topics.get_activity_topics()
  212. |> Streamer.stream(activity)
  213. end
  214. @impl true
  215. def stream_out(_activity) do
  216. :noop
  217. end
  218. @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  219. def create(params, fake \\ false) do
  220. with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
  221. result
  222. end
  223. end
  224. defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
  225. additional = params[:additional] || %{}
  226. # only accept false as false value
  227. local = !(params[:local] == false)
  228. published = params[:published]
  229. quick_insert? = Config.get([:env]) == :benchmark
  230. create_data =
  231. make_create_data(
  232. %{to: to, actor: actor, published: published, context: context, object: object},
  233. additional
  234. )
  235. with {:ok, activity} <- insert(create_data, local, fake),
  236. {:fake, false, activity} <- {:fake, fake, activity},
  237. _ <- increase_replies_count_if_reply(create_data),
  238. {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
  239. {:ok, _actor} <- increase_note_count_if_public(actor, activity),
  240. _ <- notify_and_stream(activity),
  241. :ok <- maybe_federate(activity) do
  242. {:ok, activity}
  243. else
  244. {:quick_insert, true, activity} ->
  245. {:ok, activity}
  246. {:fake, true, activity} ->
  247. {:ok, activity}
  248. {:error, message} ->
  249. Repo.rollback(message)
  250. end
  251. end
  252. @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
  253. def listen(%{to: to, actor: actor, context: context, object: object} = params) do
  254. additional = params[:additional] || %{}
  255. # only accept false as false value
  256. local = !(params[:local] == false)
  257. published = params[:published]
  258. listen_data =
  259. make_listen_data(
  260. %{to: to, actor: actor, published: published, context: context, object: object},
  261. additional
  262. )
  263. with {:ok, activity} <- insert(listen_data, local),
  264. _ <- notify_and_stream(activity),
  265. :ok <- maybe_federate(activity) do
  266. {:ok, activity}
  267. end
  268. end
  269. @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
  270. {:ok, Activity.t()} | nil | {:error, any()}
  271. def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
  272. with {:ok, result} <-
  273. Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
  274. result
  275. end
  276. end
  277. defp do_unfollow(follower, followed, activity_id, local) do
  278. with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
  279. {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
  280. unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
  281. {:ok, activity} <- insert(unfollow_data, local),
  282. _ <- notify_and_stream(activity),
  283. :ok <- maybe_federate(activity) do
  284. {:ok, activity}
  285. else
  286. nil -> nil
  287. {:error, error} -> Repo.rollback(error)
  288. end
  289. end
  290. @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
  291. def flag(params) do
  292. with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
  293. result
  294. end
  295. end
  296. defp do_flag(
  297. %{
  298. actor: actor,
  299. context: _context,
  300. account: account,
  301. statuses: statuses,
  302. content: content
  303. } = params
  304. ) do
  305. # only accept false as false value
  306. local = !(params[:local] == false)
  307. forward = !(params[:forward] == false)
  308. additional = params[:additional] || %{}
  309. additional =
  310. if forward do
  311. Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
  312. else
  313. Map.merge(additional, %{"to" => [], "cc" => []})
  314. end
  315. with flag_data <- make_flag_data(params, additional),
  316. {:ok, activity} <- insert(flag_data, local),
  317. {:ok, stripped_activity} <- strip_report_status_data(activity),
  318. _ <- notify_and_stream(activity),
  319. :ok <-
  320. maybe_federate(stripped_activity) do
  321. User.all_superusers()
  322. |> Enum.filter(fn user -> user.ap_id != actor end)
  323. |> Enum.filter(fn user -> not is_nil(user.email) end)
  324. |> Enum.each(fn superuser ->
  325. superuser
  326. |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
  327. |> Pleroma.Emails.Mailer.deliver_async()
  328. end)
  329. {:ok, activity}
  330. else
  331. {:error, error} -> Repo.rollback(error)
  332. end
  333. end
  334. @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  335. def move(%User{} = origin, %User{} = target, local \\ true) do
  336. params = %{
  337. "type" => "Move",
  338. "actor" => origin.ap_id,
  339. "object" => origin.ap_id,
  340. "target" => target.ap_id
  341. }
  342. with true <- origin.ap_id in target.also_known_as,
  343. {:ok, activity} <- insert(params, local),
  344. _ <- notify_and_stream(activity) do
  345. maybe_federate(activity)
  346. BackgroundWorker.enqueue("move_following", %{
  347. "origin_id" => origin.id,
  348. "target_id" => target.id
  349. })
  350. {:ok, activity}
  351. else
  352. false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
  353. err -> err
  354. end
  355. end
  356. def fetch_activities_for_context_query(context, opts) do
  357. public = [Constants.as_public()]
  358. recipients =
  359. if opts[:user],
  360. do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
  361. else: public
  362. from(activity in Activity)
  363. |> maybe_preload_objects(opts)
  364. |> maybe_preload_bookmarks(opts)
  365. |> maybe_set_thread_muted_field(opts)
  366. |> restrict_blocked(opts)
  367. |> restrict_recipients(recipients, opts[:user])
  368. |> restrict_filtered(opts)
  369. |> where(
  370. [activity],
  371. fragment(
  372. "?->>'type' = ? and ?->>'context' = ?",
  373. activity.data,
  374. "Create",
  375. activity.data,
  376. ^context
  377. )
  378. )
  379. |> exclude_poll_votes(opts)
  380. |> exclude_id(opts)
  381. |> order_by([activity], desc: activity.id)
  382. end
  383. @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
  384. def fetch_activities_for_context(context, opts \\ %{}) do
  385. context
  386. |> fetch_activities_for_context_query(opts)
  387. |> Repo.all()
  388. end
  389. @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
  390. FlakeId.Ecto.CompatType.t() | nil
  391. def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
  392. context
  393. |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
  394. |> restrict_visibility(%{visibility: "direct"})
  395. |> limit(1)
  396. |> select([a], a.id)
  397. |> Repo.one()
  398. end
  399. defp fetch_paginated_optimized(query, opts, pagination) do
  400. # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
  401. # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
  402. opts = Map.put(opts, :skip_extra_order, true)
  403. Pagination.fetch_paginated(query, opts, pagination)
  404. end
  405. def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
  406. list_memberships = Pleroma.List.memberships(opts[:user])
  407. fetch_activities_query(recipients ++ list_memberships, opts)
  408. |> fetch_paginated_optimized(opts, pagination)
  409. |> Enum.reverse()
  410. |> maybe_update_cc(list_memberships, opts[:user])
  411. end
  412. @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
  413. def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
  414. opts = Map.delete(opts, :user)
  415. [Constants.as_public()]
  416. |> fetch_activities_query(opts)
  417. |> restrict_unlisted(opts)
  418. |> fetch_paginated_optimized(opts, pagination)
  419. end
  420. @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
  421. def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
  422. opts
  423. |> Map.put(:restrict_unlisted, true)
  424. |> fetch_public_or_unlisted_activities(pagination)
  425. end
  426. @valid_visibilities ~w[direct unlisted public private]
  427. defp restrict_visibility(query, %{visibility: visibility})
  428. when is_list(visibility) do
  429. if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
  430. from(
  431. a in query,
  432. where:
  433. fragment(
  434. "activity_visibility(?, ?, ?) = ANY (?)",
  435. a.actor,
  436. a.recipients,
  437. a.data,
  438. ^visibility
  439. )
  440. )
  441. else
  442. Logger.error("Could not restrict visibility to #{visibility}")
  443. end
  444. end
  445. defp restrict_visibility(query, %{visibility: visibility})
  446. when visibility in @valid_visibilities do
  447. from(
  448. a in query,
  449. where:
  450. fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
  451. )
  452. end
  453. defp restrict_visibility(_query, %{visibility: visibility})
  454. when visibility not in @valid_visibilities do
  455. Logger.error("Could not restrict visibility to #{visibility}")
  456. end
  457. defp restrict_visibility(query, _visibility), do: query
  458. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  459. when is_list(visibility) do
  460. if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
  461. from(
  462. a in query,
  463. where:
  464. not fragment(
  465. "activity_visibility(?, ?, ?) = ANY (?)",
  466. a.actor,
  467. a.recipients,
  468. a.data,
  469. ^visibility
  470. )
  471. )
  472. else
  473. Logger.error("Could not exclude visibility to #{visibility}")
  474. query
  475. end
  476. end
  477. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  478. when visibility in @valid_visibilities do
  479. from(
  480. a in query,
  481. where:
  482. not fragment(
  483. "activity_visibility(?, ?, ?) = ?",
  484. a.actor,
  485. a.recipients,
  486. a.data,
  487. ^visibility
  488. )
  489. )
  490. end
  491. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  492. when visibility not in [nil | @valid_visibilities] do
  493. Logger.error("Could not exclude visibility to #{visibility}")
  494. query
  495. end
  496. defp exclude_visibility(query, _visibility), do: query
  497. defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
  498. do: query
  499. defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
  500. do: query
  501. defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
  502. from(
  503. a in query,
  504. where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data)
  505. )
  506. end
  507. defp restrict_thread_visibility(query, _, _), do: query
  508. def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
  509. params =
  510. params
  511. |> Map.put(:user, reading_user)
  512. |> Map.put(:actor_id, user.ap_id)
  513. %{
  514. godmode: params[:godmode],
  515. reading_user: reading_user
  516. }
  517. |> user_activities_recipients()
  518. |> fetch_activities(params)
  519. |> Enum.reverse()
  520. end
  521. def fetch_user_activities(user, reading_user, params \\ %{})
  522. def fetch_user_activities(user, reading_user, %{total: true} = params) do
  523. result = fetch_activities_for_user(user, reading_user, params)
  524. Keyword.put(result, :items, Enum.reverse(result[:items]))
  525. end
  526. def fetch_user_activities(user, reading_user, params) do
  527. user
  528. |> fetch_activities_for_user(reading_user, params)
  529. |> Enum.reverse()
  530. end
  531. defp fetch_activities_for_user(user, reading_user, params) do
  532. params =
  533. params
  534. |> Map.put(:type, ["Create", "Announce"])
  535. |> Map.put(:user, reading_user)
  536. |> Map.put(:actor_id, user.ap_id)
  537. |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
  538. params =
  539. if User.blocks?(reading_user, user) do
  540. params
  541. else
  542. params
  543. |> Map.put(:blocking_user, reading_user)
  544. |> Map.put(:muting_user, reading_user)
  545. end
  546. pagination_type = Map.get(params, :pagination_type) || :keyset
  547. %{
  548. godmode: params[:godmode],
  549. reading_user: reading_user
  550. }
  551. |> user_activities_recipients()
  552. |> fetch_activities(params, pagination_type)
  553. end
  554. def fetch_statuses(reading_user, %{total: true} = params) do
  555. result = fetch_activities_for_reading_user(reading_user, params)
  556. Keyword.put(result, :items, Enum.reverse(result[:items]))
  557. end
  558. def fetch_statuses(reading_user, params) do
  559. reading_user
  560. |> fetch_activities_for_reading_user(params)
  561. |> Enum.reverse()
  562. end
  563. defp fetch_activities_for_reading_user(reading_user, params) do
  564. params = Map.put(params, :type, ["Create", "Announce"])
  565. %{
  566. godmode: params[:godmode],
  567. reading_user: reading_user
  568. }
  569. |> user_activities_recipients()
  570. |> fetch_activities(params, :offset)
  571. end
  572. defp user_activities_recipients(%{godmode: true}), do: []
  573. defp user_activities_recipients(%{reading_user: reading_user}) do
  574. if reading_user do
  575. [Constants.as_public(), reading_user.ap_id | User.following(reading_user)]
  576. else
  577. [Constants.as_public()]
  578. end
  579. end
  580. defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
  581. raise "Can't use the child object without preloading!"
  582. end
  583. defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
  584. from(
  585. [activity, object] in query,
  586. where:
  587. fragment(
  588. "?->>'type' != ? or ?->>'actor' != ?",
  589. activity.data,
  590. "Announce",
  591. object.data,
  592. ^actor
  593. )
  594. )
  595. end
  596. defp restrict_announce_object_actor(query, _), do: query
  597. defp restrict_since(query, %{since_id: ""}), do: query
  598. defp restrict_since(query, %{since_id: since_id}) do
  599. from(activity in query, where: activity.id > ^since_id)
  600. end
  601. defp restrict_since(query, _), do: query
  602. defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
  603. raise_on_missing_preload()
  604. end
  605. defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
  606. from(
  607. [_activity, object] in query,
  608. where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
  609. )
  610. end
  611. defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
  612. restrict_embedded_tag_any(query, %{tag: tag})
  613. end
  614. defp restrict_embedded_tag_all(query, _), do: query
  615. defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
  616. raise_on_missing_preload()
  617. end
  618. defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
  619. from(
  620. [_activity, object] in query,
  621. where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
  622. )
  623. end
  624. defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
  625. restrict_embedded_tag_any(query, %{tag: [tag]})
  626. end
  627. defp restrict_embedded_tag_any(query, _), do: query
  628. defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
  629. raise_on_missing_preload()
  630. end
  631. defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
  632. from(
  633. [_activity, object] in query,
  634. where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
  635. )
  636. end
  637. defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
  638. when is_binary(tag_reject) do
  639. restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
  640. end
  641. defp restrict_embedded_tag_reject_any(query, _), do: query
  642. defp object_ids_query_for_tags(tags) do
  643. from(hto in "hashtags_objects")
  644. |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
  645. |> where([hto, ht], ht.name in ^tags)
  646. |> select([hto], hto.object_id)
  647. |> distinct([hto], true)
  648. end
  649. defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
  650. raise_on_missing_preload()
  651. end
  652. defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
  653. restrict_hashtag_any(query, %{tag: single_tag})
  654. end
  655. defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
  656. from(
  657. [_activity, object] in query,
  658. where:
  659. fragment(
  660. """
  661. (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
  662. ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
  663. AND hashtags_objects.object_id = ?) @> ?
  664. """,
  665. ^tags,
  666. object.id,
  667. ^tags
  668. )
  669. )
  670. end
  671. defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
  672. restrict_hashtag_all(query, %{tag_all: [tag]})
  673. end
  674. defp restrict_hashtag_all(query, _), do: query
  675. defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
  676. raise_on_missing_preload()
  677. end
  678. defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
  679. hashtag_ids =
  680. from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
  681. |> Repo.all()
  682. # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
  683. from(
  684. [_activity, object] in query,
  685. join: hto in "hashtags_objects",
  686. on: hto.object_id == object.id,
  687. where: hto.hashtag_id in ^hashtag_ids,
  688. distinct: [desc: object.id],
  689. order_by: [desc: object.id]
  690. )
  691. end
  692. defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
  693. restrict_hashtag_any(query, %{tag: [tag]})
  694. end
  695. defp restrict_hashtag_any(query, _), do: query
  696. defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
  697. raise_on_missing_preload()
  698. end
  699. defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
  700. from(
  701. [_activity, object] in query,
  702. where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
  703. )
  704. end
  705. defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
  706. restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
  707. end
  708. defp restrict_hashtag_reject_any(query, _), do: query
  709. defp raise_on_missing_preload do
  710. raise "Can't use the child object without preloading!"
  711. end
  712. defp restrict_recipients(query, [], _user), do: query
  713. defp restrict_recipients(query, recipients, nil) do
  714. from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
  715. end
  716. defp restrict_recipients(query, recipients, user) do
  717. from(
  718. activity in query,
  719. where: fragment("? && ?", ^recipients, activity.recipients),
  720. or_where: activity.actor == ^user.ap_id
  721. )
  722. end
  723. defp restrict_local(query, %{local_only: true}) do
  724. from(activity in query, where: activity.local == true)
  725. end
  726. defp restrict_local(query, _), do: query
  727. defp restrict_remote(query, %{remote: true}) do
  728. from(activity in query, where: activity.local == false)
  729. end
  730. defp restrict_remote(query, _), do: query
  731. defp restrict_actor(query, %{actor_id: actor_id}) do
  732. from(activity in query, where: activity.actor == ^actor_id)
  733. end
  734. defp restrict_actor(query, _), do: query
  735. defp restrict_type(query, %{type: type}) when is_binary(type) do
  736. from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
  737. end
  738. defp restrict_type(query, %{type: type}) do
  739. from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
  740. end
  741. defp restrict_type(query, _), do: query
  742. defp restrict_state(query, %{state: state}) do
  743. from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
  744. end
  745. defp restrict_state(query, _), do: query
  746. defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
  747. from(
  748. [_activity, object] in query,
  749. where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
  750. )
  751. end
  752. defp restrict_favorited_by(query, _), do: query
  753. defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
  754. raise "Can't use the child object without preloading!"
  755. end
  756. defp restrict_media(query, %{only_media: true}) do
  757. from(
  758. [activity, object] in query,
  759. where: fragment("(?)->>'type' = ?", activity.data, "Create"),
  760. where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
  761. )
  762. end
  763. defp restrict_media(query, _), do: query
  764. defp restrict_replies(query, %{exclude_replies: true}) do
  765. from(
  766. [_activity, object] in query,
  767. where: fragment("?->>'inReplyTo' is null", object.data)
  768. )
  769. end
  770. defp restrict_replies(query, %{
  771. reply_filtering_user: %User{} = user,
  772. reply_visibility: "self"
  773. }) do
  774. from(
  775. [activity, object] in query,
  776. where:
  777. fragment(
  778. "?->>'inReplyTo' is null OR ? = ANY(?)",
  779. object.data,
  780. ^user.ap_id,
  781. activity.recipients
  782. )
  783. )
  784. end
  785. defp restrict_replies(query, %{
  786. reply_filtering_user: %User{} = user,
  787. reply_visibility: "following"
  788. }) do
  789. from(
  790. [activity, object] in query,
  791. where:
  792. fragment(
  793. """
  794. ?->>'type' != 'Create' -- This isn't a Create
  795. OR ?->>'inReplyTo' is null -- this isn't a reply
  796. OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
  797. -- unless they are the author (because authors
  798. -- are also part of the recipients). This leads
  799. -- to a bug that self-replies by friends won't
  800. -- show up.
  801. OR ? = ? -- The actor is us
  802. """,
  803. activity.data,
  804. object.data,
  805. ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
  806. activity.recipients,
  807. activity.actor,
  808. activity.actor,
  809. ^user.ap_id
  810. )
  811. )
  812. end
  813. defp restrict_replies(query, _), do: query
  814. defp restrict_reblogs(query, %{exclude_reblogs: true}) do
  815. from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
  816. end
  817. defp restrict_reblogs(query, _), do: query
  818. defp restrict_muted(query, %{with_muted: true}), do: query
  819. defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
  820. mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
  821. query =
  822. from([activity] in query,
  823. where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
  824. where:
  825. fragment(
  826. "not (?->'to' \\?| ?) or ? = ?",
  827. activity.data,
  828. ^mutes,
  829. activity.actor,
  830. ^user.ap_id
  831. )
  832. )
  833. unless opts[:skip_preload] do
  834. from([thread_mute: tm] in query, where: is_nil(tm.user_id))
  835. else
  836. query
  837. end
  838. end
  839. defp restrict_muted(query, _), do: query
  840. defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
  841. blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
  842. domain_blocks = user.domain_blocks || []
  843. following_ap_ids = User.get_friends_ap_ids(user)
  844. query =
  845. if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
  846. from(
  847. [activity, object: o] in query,
  848. where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
  849. where:
  850. fragment(
  851. "((not (? && ?)) or ? = ?)",
  852. activity.recipients,
  853. ^blocked_ap_ids,
  854. activity.actor,
  855. ^user.ap_id
  856. ),
  857. where:
  858. fragment(
  859. "recipients_contain_blocked_domains(?, ?) = false",
  860. activity.recipients,
  861. ^domain_blocks
  862. ),
  863. where:
  864. fragment(
  865. "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
  866. activity.data,
  867. activity.data,
  868. ^blocked_ap_ids
  869. ),
  870. where:
  871. fragment(
  872. "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
  873. activity.actor,
  874. ^domain_blocks,
  875. activity.actor,
  876. ^following_ap_ids
  877. ),
  878. where:
  879. fragment(
  880. "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
  881. o.data,
  882. ^domain_blocks,
  883. o.data,
  884. ^following_ap_ids
  885. )
  886. )
  887. end
  888. defp restrict_blocked(query, _), do: query
  889. defp restrict_unlisted(query, %{restrict_unlisted: true}) do
  890. from(
  891. activity in query,
  892. where:
  893. fragment(
  894. "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
  895. activity.data,
  896. ^[Constants.as_public()]
  897. )
  898. )
  899. end
  900. defp restrict_unlisted(query, _), do: query
  901. defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
  902. from(
  903. [activity, object: o] in query,
  904. where:
  905. fragment(
  906. "(?)->>'type' = 'Create' and coalesce((?)->'object'->>'id', (?)->>'object') = any (?)",
  907. activity.data,
  908. activity.data,
  909. activity.data,
  910. ^ids
  911. )
  912. )
  913. end
  914. defp restrict_pinned(query, _), do: query
  915. defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
  916. muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
  917. from(
  918. activity in query,
  919. where:
  920. fragment(
  921. "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
  922. activity.data,
  923. activity.actor,
  924. ^muted_reblogs
  925. )
  926. )
  927. end
  928. defp restrict_muted_reblogs(query, _), do: query
  929. defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
  930. from(
  931. activity in query,
  932. where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
  933. )
  934. end
  935. defp restrict_instance(query, _), do: query
  936. defp restrict_filtered(query, %{user: %User{} = user}) do
  937. case Filter.compose_regex(user) do
  938. nil ->
  939. query
  940. regex ->
  941. from([activity, object] in query,
  942. where:
  943. fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
  944. activity.actor == ^user.ap_id
  945. )
  946. end
  947. end
  948. defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
  949. restrict_filtered(query, %{user: user})
  950. end
  951. defp restrict_filtered(query, _), do: query
  952. defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
  953. defp exclude_poll_votes(query, _) do
  954. if has_named_binding?(query, :object) do
  955. from([activity, object: o] in query,
  956. where: fragment("not(?->>'type' = ?)", o.data, "Answer")
  957. )
  958. else
  959. query
  960. end
  961. end
  962. defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
  963. defp exclude_chat_messages(query, _) do
  964. if has_named_binding?(query, :object) do
  965. from([activity, object: o] in query,
  966. where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
  967. )
  968. else
  969. query
  970. end
  971. end
  972. defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
  973. defp exclude_invisible_actors(query, _opts) do
  974. invisible_ap_ids =
  975. User.Query.build(%{invisible: true, select: [:ap_id]})
  976. |> Repo.all()
  977. |> Enum.map(fn %{ap_id: ap_id} -> ap_id end)
  978. from([activity] in query, where: activity.actor not in ^invisible_ap_ids)
  979. end
  980. defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
  981. from(activity in query, where: activity.id != ^id)
  982. end
  983. defp exclude_id(query, _), do: query
  984. defp maybe_preload_objects(query, %{skip_preload: true}), do: query
  985. defp maybe_preload_objects(query, _) do
  986. query
  987. |> Activity.with_preloaded_object()
  988. end
  989. defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
  990. defp maybe_preload_bookmarks(query, opts) do
  991. query
  992. |> Activity.with_preloaded_bookmark(opts[:user])
  993. end
  994. defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
  995. query
  996. |> Activity.with_preloaded_report_notes()
  997. end
  998. defp maybe_preload_report_notes(query, _), do: query
  999. defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
  1000. defp maybe_set_thread_muted_field(query, opts) do
  1001. query
  1002. |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
  1003. end
  1004. defp maybe_order(query, %{order: :desc}) do
  1005. query
  1006. |> order_by(desc: :id)
  1007. end
  1008. defp maybe_order(query, %{order: :asc}) do
  1009. query
  1010. |> order_by(asc: :id)
  1011. end
  1012. defp maybe_order(query, _), do: query
  1013. defp normalize_fetch_activities_query_opts(opts) do
  1014. Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
  1015. case opts[key] do
  1016. value when is_bitstring(value) ->
  1017. Map.put(opts, key, Hashtag.normalize_name(value))
  1018. value when is_list(value) ->
  1019. normalized_value =
  1020. value
  1021. |> Enum.map(&Hashtag.normalize_name/1)
  1022. |> Enum.uniq()
  1023. Map.put(opts, key, normalized_value)
  1024. _ ->
  1025. opts
  1026. end
  1027. end)
  1028. end
  1029. defp fetch_activities_query_ap_ids_ops(opts) do
  1030. source_user = opts[:muting_user]
  1031. ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
  1032. ap_id_relationships =
  1033. if opts[:blocking_user] && opts[:blocking_user] == source_user do
  1034. [:block | ap_id_relationships]
  1035. else
  1036. ap_id_relationships
  1037. end
  1038. preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
  1039. restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
  1040. restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
  1041. restrict_muted_reblogs_opts =
  1042. Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
  1043. {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
  1044. end
  1045. def fetch_activities_query(recipients, opts \\ %{}) do
  1046. opts = normalize_fetch_activities_query_opts(opts)
  1047. {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
  1048. fetch_activities_query_ap_ids_ops(opts)
  1049. config = %{
  1050. skip_thread_containment: Config.get([:instance, :skip_thread_containment])
  1051. }
  1052. query =
  1053. Activity
  1054. |> maybe_preload_objects(opts)
  1055. |> maybe_preload_bookmarks(opts)
  1056. |> maybe_preload_report_notes(opts)
  1057. |> maybe_set_thread_muted_field(opts)
  1058. |> maybe_order(opts)
  1059. |> restrict_recipients(recipients, opts[:user])
  1060. |> restrict_replies(opts)
  1061. |> restrict_since(opts)
  1062. |> restrict_local(opts)
  1063. |> restrict_remote(opts)
  1064. |> restrict_actor(opts)
  1065. |> restrict_type(opts)
  1066. |> restrict_state(opts)
  1067. |> restrict_favorited_by(opts)
  1068. |> restrict_blocked(restrict_blocked_opts)
  1069. |> restrict_muted(restrict_muted_opts)
  1070. |> restrict_filtered(opts)
  1071. |> restrict_media(opts)
  1072. |> restrict_visibility(opts)
  1073. |> restrict_thread_visibility(opts, config)
  1074. |> restrict_reblogs(opts)
  1075. |> restrict_pinned(opts)
  1076. |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
  1077. |> restrict_instance(opts)
  1078. |> restrict_announce_object_actor(opts)
  1079. |> restrict_filtered(opts)
  1080. |> Activity.restrict_deactivated_users()
  1081. |> exclude_poll_votes(opts)
  1082. |> exclude_chat_messages(opts)
  1083. |> exclude_invisible_actors(opts)
  1084. |> exclude_visibility(opts)
  1085. if Config.feature_enabled?(:improved_hashtag_timeline) do
  1086. query
  1087. |> restrict_hashtag_any(opts)
  1088. |> restrict_hashtag_all(opts)
  1089. |> restrict_hashtag_reject_any(opts)
  1090. else
  1091. query
  1092. |> restrict_embedded_tag_any(opts)
  1093. |> restrict_embedded_tag_all(opts)
  1094. |> restrict_embedded_tag_reject_any(opts)
  1095. end
  1096. end
  1097. @doc """
  1098. Fetch favorites activities of user with order by sort adds to favorites
  1099. """
  1100. @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
  1101. def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
  1102. user.ap_id
  1103. |> Activity.Queries.by_actor()
  1104. |> Activity.Queries.by_type("Like")
  1105. |> Activity.with_joined_object()
  1106. |> Object.with_joined_activity()
  1107. |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
  1108. |> order_by([like, _, _], desc_nulls_last: like.id)
  1109. |> Pagination.fetch_paginated(
  1110. Map.merge(params, %{skip_order: true}),
  1111. pagination
  1112. )
  1113. end
  1114. defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
  1115. Enum.map(activities, fn
  1116. %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
  1117. if Enum.any?(bcc, &(&1 in list_memberships)) do
  1118. update_in(activity.data["cc"], &[user_ap_id | &1])
  1119. else
  1120. activity
  1121. end
  1122. activity ->
  1123. activity
  1124. end)
  1125. end
  1126. defp maybe_update_cc(activities, _, _), do: activities
  1127. defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
  1128. from(activity in query,
  1129. where:
  1130. fragment("? && ?", activity.recipients, ^recipients) or
  1131. (fragment("? && ?", activity.recipients, ^recipients_with_public) and
  1132. ^Constants.as_public() in activity.recipients)
  1133. )
  1134. end
  1135. def fetch_activities_bounded(
  1136. recipients,
  1137. recipients_with_public,
  1138. opts \\ %{},
  1139. pagination \\ :keyset
  1140. ) do
  1141. fetch_activities_query([], opts)
  1142. |> fetch_activities_bounded_query(recipients, recipients_with_public)
  1143. |> Pagination.fetch_paginated(opts, pagination)
  1144. |> Enum.reverse()
  1145. end
  1146. @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
  1147. def upload(file, opts \\ []) do
  1148. with {:ok, data} <- Upload.store(file, opts) do
  1149. obj_data = Maps.put_if_present(data, "actor", opts[:actor])
  1150. Repo.insert(%Object{data: obj_data})
  1151. end
  1152. end
  1153. @spec get_actor_url(any()) :: binary() | nil
  1154. defp get_actor_url(url) when is_binary(url), do: url
  1155. defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
  1156. defp get_actor_url(url) when is_list(url) do
  1157. url
  1158. |> List.first()
  1159. |> get_actor_url()
  1160. end
  1161. defp get_actor_url(_url), do: nil
  1162. defp normalize_image(%{"url" => url}) do
  1163. %{
  1164. "type" => "Image",
  1165. "url" => [%{"href" => url}]
  1166. }
  1167. end
  1168. defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
  1169. defp normalize_image(_), do: nil
  1170. defp object_to_user_data(data) do
  1171. fields =
  1172. data
  1173. |> Map.get("attachment", [])
  1174. |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
  1175. |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
  1176. emojis =
  1177. data
  1178. |> Map.get("tag", [])
  1179. |> Enum.filter(fn
  1180. %{"type" => "Emoji"} -> true
  1181. _ -> false
  1182. end)
  1183. |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
  1184. {String.trim(name, ":"), url}
  1185. end)
  1186. is_locked = data["manuallyApprovesFollowers"] || false
  1187. capabilities = data["capabilities"] || %{}
  1188. accepts_chat_messages = capabilities["acceptsChatMessages"]
  1189. data = Transmogrifier.maybe_fix_user_object(data)
  1190. is_discoverable = data["discoverable"] || false
  1191. invisible = data["invisible"] || false
  1192. actor_type = data["type"] || "Person"
  1193. featured_address = data["featured"]
  1194. {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
  1195. public_key =
  1196. if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
  1197. data["publicKey"]["publicKeyPem"]
  1198. else
  1199. nil
  1200. end
  1201. shared_inbox =
  1202. if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
  1203. data["endpoints"]["sharedInbox"]
  1204. else
  1205. nil
  1206. end
  1207. user_data = %{
  1208. ap_id: data["id"],
  1209. uri: get_actor_url(data["url"]),
  1210. ap_enabled: true,
  1211. banner: normalize_image(data["image"]),
  1212. fields: fields,
  1213. emoji: emojis,
  1214. is_locked: is_locked,
  1215. is_discoverable: is_discoverable,
  1216. invisible: invisible,
  1217. avatar: normalize_image(data["icon"]),
  1218. name: data["name"],
  1219. follower_address: data["followers"],
  1220. following_address: data["following"],
  1221. featured_address: featured_address,
  1222. bio: data["summary"] || "",
  1223. actor_type: actor_type,
  1224. also_known_as: Map.get(data, "alsoKnownAs", []),
  1225. public_key: public_key,
  1226. inbox: data["inbox"],
  1227. shared_inbox: shared_inbox,
  1228. accepts_chat_messages: accepts_chat_messages,
  1229. pinned_objects: pinned_objects
  1230. }
  1231. # nickname can be nil because of virtual actors
  1232. if data["preferredUsername"] do
  1233. Map.put(
  1234. user_data,
  1235. :nickname,
  1236. "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}"
  1237. )
  1238. else
  1239. Map.put(user_data, :nickname, nil)
  1240. end
  1241. end
  1242. def fetch_follow_information_for_user(user) do
  1243. with {:ok, following_data} <-
  1244. Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
  1245. {:ok, hide_follows} <- collection_private(following_data),
  1246. {:ok, followers_data} <-
  1247. Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
  1248. {:ok, hide_followers} <- collection_private(followers_data) do
  1249. {:ok,
  1250. %{
  1251. hide_follows: hide_follows,
  1252. follower_count: normalize_counter(followers_data["totalItems"]),
  1253. following_count: normalize_counter(following_data["totalItems"]),
  1254. hide_followers: hide_followers
  1255. }}
  1256. else
  1257. {:error, _} = e -> e
  1258. e -> {:error, e}
  1259. end
  1260. end
  1261. defp normalize_counter(counter) when is_integer(counter), do: counter
  1262. defp normalize_counter(_), do: 0
  1263. def maybe_update_follow_information(user_data) do
  1264. with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
  1265. {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
  1266. {_, true} <-
  1267. {:collections_available,
  1268. !!(user_data[:following_address] && user_data[:follower_address])},
  1269. {:ok, info} <-
  1270. fetch_follow_information_for_user(user_data) do
  1271. info = Map.merge(user_data[:info] || %{}, info)
  1272. user_data
  1273. |> Map.put(:info, info)
  1274. else
  1275. {:user_type_check, false} ->
  1276. user_data
  1277. {:collections_available, false} ->
  1278. user_data
  1279. {:enabled, false} ->
  1280. user_data
  1281. e ->
  1282. Logger.error(
  1283. "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
  1284. )
  1285. user_data
  1286. end
  1287. end
  1288. defp collection_private(%{"first" => %{"type" => type}})
  1289. when type in ["CollectionPage", "OrderedCollectionPage"],
  1290. do: {:ok, false}
  1291. defp collection_private(%{"first" => first}) do
  1292. with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
  1293. Fetcher.fetch_and_contain_remote_object_from_id(first) do
  1294. {:ok, false}
  1295. else
  1296. {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
  1297. {:error, _} = e -> e
  1298. e -> {:error, e}
  1299. end
  1300. end
  1301. defp collection_private(_data), do: {:ok, true}
  1302. def user_data_from_user_object(data) do
  1303. with {:ok, data} <- MRF.filter(data) do
  1304. {:ok, object_to_user_data(data)}
  1305. else
  1306. e -> {:error, e}
  1307. end
  1308. end
  1309. def fetch_and_prepare_user_from_ap_id(ap_id) do
  1310. with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
  1311. {:ok, data} <- user_data_from_user_object(data) do
  1312. {:ok, maybe_update_follow_information(data)}
  1313. else
  1314. # If this has been deleted, only log a debug and not an error
  1315. {:error, "Object has been deleted" = e} ->
  1316. Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
  1317. {:error, e}
  1318. {:error, {:reject, reason} = e} ->
  1319. Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
  1320. {:error, e}
  1321. {:error, e} ->
  1322. Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
  1323. {:error, e}
  1324. end
  1325. end
  1326. def maybe_handle_clashing_nickname(data) do
  1327. with nickname when is_binary(nickname) <- data[:nickname],
  1328. %User{} = old_user <- User.get_by_nickname(nickname),
  1329. {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
  1330. Logger.info(
  1331. "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{
  1332. data[:ap_id]
  1333. }, renaming."
  1334. )
  1335. old_user
  1336. |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
  1337. |> User.update_and_set_cache()
  1338. else
  1339. {:ap_id_comparison, true} ->
  1340. Logger.info(
  1341. "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
  1342. )
  1343. _ ->
  1344. nil
  1345. end
  1346. end
  1347. def pin_data_from_featured_collection(%{
  1348. "type" => type,
  1349. "orderedItems" => objects
  1350. })
  1351. when type in ["OrderedCollection", "Collection"] do
  1352. Map.new(objects, fn %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()} end)
  1353. end
  1354. def fetch_and_prepare_featured_from_ap_id(nil) do
  1355. {:ok, %{}}
  1356. end
  1357. def fetch_and_prepare_featured_from_ap_id(ap_id) do
  1358. with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
  1359. {:ok, pin_data_from_featured_collection(data)}
  1360. else
  1361. e ->
  1362. Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
  1363. {:ok, %{}}
  1364. end
  1365. end
  1366. def pinned_fetch_task(nil), do: nil
  1367. def pinned_fetch_task(%{pinned_objects: pins}) do
  1368. if Enum.all?(pins, fn {ap_id, _} ->
  1369. Object.get_cached_by_ap_id(ap_id) ||
  1370. match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
  1371. end) do
  1372. :ok
  1373. else
  1374. :error
  1375. end
  1376. end
  1377. def make_user_from_ap_id(ap_id) do
  1378. user = User.get_cached_by_ap_id(ap_id)
  1379. if user && !User.ap_enabled?(user) do
  1380. Transmogrifier.upgrade_user_from_ap_id(ap_id)
  1381. else
  1382. with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
  1383. {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
  1384. if user do
  1385. user
  1386. |> User.remote_user_changeset(data)
  1387. |> User.update_and_set_cache()
  1388. else
  1389. maybe_handle_clashing_nickname(data)
  1390. data
  1391. |> User.remote_user_changeset()
  1392. |> Repo.insert()
  1393. |> User.set_cache()
  1394. end
  1395. end
  1396. end
  1397. end
  1398. def make_user_from_nickname(nickname) do
  1399. with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
  1400. make_user_from_ap_id(ap_id)
  1401. else
  1402. _e -> {:error, "No AP id in WebFinger"}
  1403. end
  1404. end
  1405. # filter out broken threads
  1406. defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
  1407. entire_thread_visible_for_user?(activity, user)
  1408. end
  1409. # do post-processing on a specific activity
  1410. def contain_activity(%Activity{} = activity, %User{} = user) do
  1411. contain_broken_threads(activity, user)
  1412. end
  1413. def fetch_direct_messages_query do
  1414. Activity
  1415. |> restrict_type(%{type: "Create"})
  1416. |> restrict_visibility(%{visibility: "direct"})
  1417. |> order_by([activity], asc: activity.id)
  1418. end
  1419. end