Fork of Pleroma with site-specific changes and feature branches https://git.pleroma.social/pleroma/pleroma
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

261 lines
7.5KB

  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Object.Fetcher do
  5. alias Pleroma.HTTP
  6. alias Pleroma.Maps
  7. alias Pleroma.Object
  8. alias Pleroma.Object.Containment
  9. alias Pleroma.Repo
  10. alias Pleroma.Signature
  11. alias Pleroma.Web.ActivityPub.InternalFetchActor
  12. alias Pleroma.Web.ActivityPub.ObjectValidator
  13. alias Pleroma.Web.ActivityPub.Transmogrifier
  14. alias Pleroma.Web.Federator
  15. require Logger
  16. require Pleroma.Constants
  17. defp touch_changeset(changeset) do
  18. updated_at =
  19. NaiveDateTime.utc_now()
  20. |> NaiveDateTime.truncate(:second)
  21. Ecto.Changeset.put_change(changeset, :updated_at, updated_at)
  22. end
  23. defp maybe_reinject_internal_fields(%{data: %{} = old_data}, new_data) do
  24. internal_fields = Map.take(old_data, Pleroma.Constants.object_internal_fields())
  25. Map.merge(new_data, internal_fields)
  26. end
  27. defp maybe_reinject_internal_fields(_, new_data), do: new_data
  28. @spec reinject_object(struct(), map()) :: {:ok, Object.t()} | {:error, any()}
  29. defp reinject_object(%Object{data: %{"type" => "Question"}} = object, new_data) do
  30. Logger.debug("Reinjecting object #{new_data["id"]}")
  31. with data <- maybe_reinject_internal_fields(object, new_data),
  32. {:ok, data, _} <- ObjectValidator.validate(data, %{}),
  33. changeset <- Object.change(object, %{data: data}),
  34. changeset <- touch_changeset(changeset),
  35. {:ok, object} <- Repo.insert_or_update(changeset),
  36. {:ok, object} <- Object.set_cache(object) do
  37. {:ok, object}
  38. else
  39. e ->
  40. Logger.error("Error while processing object: #{inspect(e)}")
  41. {:error, e}
  42. end
  43. end
  44. defp reinject_object(%Object{} = object, new_data) do
  45. Logger.debug("Reinjecting object #{new_data["id"]}")
  46. with new_data <- Transmogrifier.fix_object(new_data),
  47. data <- maybe_reinject_internal_fields(object, new_data),
  48. changeset <- Object.change(object, %{data: data}),
  49. changeset <- touch_changeset(changeset),
  50. {:ok, object} <- Repo.insert_or_update(changeset),
  51. {:ok, object} <- Object.set_cache(object) do
  52. {:ok, object}
  53. else
  54. e ->
  55. Logger.error("Error while processing object: #{inspect(e)}")
  56. {:error, e}
  57. end
  58. end
  59. def refetch_object(%Object{data: %{"id" => id}} = object) do
  60. with {:local, false} <- {:local, Object.local?(object)},
  61. {:ok, new_data} <- fetch_and_contain_remote_object_from_id(id),
  62. {:ok, object} <- reinject_object(object, new_data) do
  63. {:ok, object}
  64. else
  65. {:local, true} -> {:ok, object}
  66. e -> {:error, e}
  67. end
  68. end
  69. # Note: will create a Create activity, which we need internally at the moment.
  70. def fetch_object_from_id(id, options \\ []) do
  71. with {_, nil} <- {:fetch_object, Object.get_cached_by_ap_id(id)},
  72. {_, true} <- {:allowed_depth, Federator.allowed_thread_distance?(options[:depth])},
  73. {_, {:ok, data}} <- {:fetch, fetch_and_contain_remote_object_from_id(id)},
  74. {_, nil} <- {:normalize, Object.normalize(data, fetch: false)},
  75. params <- prepare_activity_params(data),
  76. {_, :ok} <- {:containment, Containment.contain_origin(id, params)},
  77. {_, {:ok, activity}} <-
  78. {:transmogrifier, Transmogrifier.handle_incoming(params, options)},
  79. {_, _data, %Object{} = object} <-
  80. {:object, data, Object.normalize(activity, fetch: false)} do
  81. {:ok, object}
  82. else
  83. {:allowed_depth, false} ->
  84. {:error, "Max thread distance exceeded."}
  85. {:containment, _} ->
  86. {:error, "Object containment failed."}
  87. {:transmogrifier, {:error, {:reject, e}}} ->
  88. {:reject, e}
  89. {:transmogrifier, {:reject, e}} ->
  90. {:reject, e}
  91. {:transmogrifier, _} = e ->
  92. {:error, e}
  93. {:object, data, nil} ->
  94. reinject_object(%Object{}, data)
  95. {:normalize, object = %Object{}} ->
  96. {:ok, object}
  97. {:fetch_object, %Object{} = object} ->
  98. {:ok, object}
  99. {:fetch, {:error, error}} ->
  100. {:error, error}
  101. e ->
  102. e
  103. end
  104. end
  105. defp prepare_activity_params(data) do
  106. %{
  107. "type" => "Create",
  108. # Should we seriously keep this attributedTo thing?
  109. "actor" => data["actor"] || data["attributedTo"],
  110. "object" => data
  111. }
  112. |> Maps.put_if_present("to", data["to"])
  113. |> Maps.put_if_present("cc", data["cc"])
  114. |> Maps.put_if_present("bto", data["bto"])
  115. |> Maps.put_if_present("bcc", data["bcc"])
  116. end
  117. def fetch_object_from_id!(id, options \\ []) do
  118. with {:ok, object} <- fetch_object_from_id(id, options) do
  119. object
  120. else
  121. {:error, %Tesla.Mock.Error{}} ->
  122. nil
  123. {:error, "Object has been deleted"} ->
  124. nil
  125. {:reject, reason} ->
  126. Logger.info("Rejected #{id} while fetching: #{inspect(reason)}")
  127. nil
  128. e ->
  129. Logger.error("Error while fetching #{id}: #{inspect(e)}")
  130. nil
  131. end
  132. end
  133. defp make_signature(id, date) do
  134. uri = URI.parse(id)
  135. signature =
  136. InternalFetchActor.get_actor()
  137. |> Signature.sign(%{
  138. "(request-target)": "get #{uri.path}",
  139. host: uri.host,
  140. date: date
  141. })
  142. {"signature", signature}
  143. end
  144. defp sign_fetch(headers, id, date) do
  145. if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
  146. [make_signature(id, date) | headers]
  147. else
  148. headers
  149. end
  150. end
  151. defp maybe_date_fetch(headers, date) do
  152. if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
  153. [{"date", date} | headers]
  154. else
  155. headers
  156. end
  157. end
  158. def fetch_and_contain_remote_object_from_id(id)
  159. def fetch_and_contain_remote_object_from_id(%{"id" => id}),
  160. do: fetch_and_contain_remote_object_from_id(id)
  161. def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do
  162. Logger.debug("Fetching object #{id} via AP")
  163. with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
  164. {:ok, body} <- get_object(id),
  165. {:ok, data} <- safe_json_decode(body),
  166. :ok <- Containment.contain_origin_from_id(id, data) do
  167. {:ok, data}
  168. else
  169. {:scheme, _} ->
  170. {:error, "Unsupported URI scheme"}
  171. {:error, e} ->
  172. {:error, e}
  173. e ->
  174. {:error, e}
  175. end
  176. end
  177. def fetch_and_contain_remote_object_from_id(_id),
  178. do: {:error, "id must be a string"}
  179. defp get_object(id) do
  180. date = Pleroma.Signature.signed_date()
  181. headers =
  182. [{"accept", "application/activity+json"}]
  183. |> maybe_date_fetch(date)
  184. |> sign_fetch(id, date)
  185. case HTTP.get(id, headers) do
  186. {:ok, %{body: body, status: code, headers: headers}} when code in 200..299 ->
  187. case List.keyfind(headers, "content-type", 0) do
  188. {_, content_type} ->
  189. case Plug.Conn.Utils.media_type(content_type) do
  190. {:ok, "application", "activity+json", _} ->
  191. {:ok, body}
  192. {:ok, "application", "ld+json",
  193. %{"profile" => "https://www.w3.org/ns/activitystreams"}} ->
  194. {:ok, body}
  195. _ ->
  196. {:error, {:content_type, content_type}}
  197. end
  198. _ ->
  199. {:error, {:content_type, nil}}
  200. end
  201. {:ok, %{status: code}} when code in [404, 410] ->
  202. {:error, "Object has been deleted"}
  203. {:error, e} ->
  204. {:error, e}
  205. e ->
  206. {:error, e}
  207. end
  208. end
  209. defp safe_json_decode(nil), do: {:ok, nil}
  210. defp safe_json_decode(json), do: Jason.decode(json)
  211. end