Fork of Pleroma with site-specific changes and feature branches https://git.pleroma.social/pleroma/pleroma
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

249 lines
7.1KB

  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Mix.Tasks.Pleroma.Database do
  5. alias Pleroma.Conversation
  6. alias Pleroma.Maintenance
  7. alias Pleroma.Object
  8. alias Pleroma.Repo
  9. alias Pleroma.User
  10. require Logger
  11. require Pleroma.Constants
  12. import Ecto.Query
  13. import Mix.Pleroma
  14. use Mix.Task
  15. @shortdoc "A collection of database related tasks"
  16. @moduledoc File.read!("docs/administration/CLI_tasks/database.md")
  17. def run(["remove_embedded_objects" | args]) do
  18. {options, [], []} =
  19. OptionParser.parse(
  20. args,
  21. strict: [
  22. vacuum: :boolean
  23. ]
  24. )
  25. start_pleroma()
  26. Logger.info("Removing embedded objects")
  27. Repo.query!(
  28. "update activities set data = safe_jsonb_set(data, '{object}'::text[], data->'object'->'id') where data->'object'->>'id' is not null;",
  29. [],
  30. timeout: :infinity
  31. )
  32. if Keyword.get(options, :vacuum) do
  33. Maintenance.vacuum("full")
  34. end
  35. end
  36. def run(["bump_all_conversations"]) do
  37. start_pleroma()
  38. Conversation.bump_for_all_activities()
  39. end
  40. def run(["update_users_following_followers_counts"]) do
  41. start_pleroma()
  42. Repo.transaction(
  43. fn ->
  44. from(u in User, select: u)
  45. |> Repo.stream()
  46. |> Stream.each(&User.update_follower_count/1)
  47. |> Stream.run()
  48. end,
  49. timeout: :infinity
  50. )
  51. end
  52. def run(["prune_objects" | args]) do
  53. {options, [], []} =
  54. OptionParser.parse(
  55. args,
  56. strict: [
  57. vacuum: :boolean
  58. ]
  59. )
  60. start_pleroma()
  61. deadline = Pleroma.Config.get([:instance, :remote_post_retention_days])
  62. Logger.info("Pruning objects older than #{deadline} days")
  63. time_deadline =
  64. NaiveDateTime.utc_now()
  65. |> NaiveDateTime.add(-(deadline * 86_400))
  66. from(o in Object,
  67. where:
  68. fragment(
  69. "?->'to' \\? ? OR ?->'cc' \\? ?",
  70. o.data,
  71. ^Pleroma.Constants.as_public(),
  72. o.data,
  73. ^Pleroma.Constants.as_public()
  74. ),
  75. where: o.inserted_at < ^time_deadline,
  76. where:
  77. fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host())
  78. )
  79. |> Repo.delete_all(timeout: :infinity)
  80. if Keyword.get(options, :vacuum) do
  81. Maintenance.vacuum("full")
  82. end
  83. end
  84. def run(["fix_likes_collections"]) do
  85. start_pleroma()
  86. from(object in Object,
  87. where: fragment("(?)->>'likes' is not null", object.data),
  88. select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
  89. )
  90. |> Pleroma.Repo.chunk_stream(100, :batches)
  91. |> Stream.each(fn objects ->
  92. ids =
  93. objects
  94. |> Enum.filter(fn object -> object.likes |> Jason.decode!() |> is_map() end)
  95. |> Enum.map(& &1.id)
  96. Object
  97. |> where([object], object.id in ^ids)
  98. |> update([object],
  99. set: [
  100. data:
  101. fragment(
  102. "safe_jsonb_set(?, '{likes}', '[]'::jsonb, true)",
  103. object.data
  104. )
  105. ]
  106. )
  107. |> Repo.update_all([], timeout: :infinity)
  108. end)
  109. |> Stream.run()
  110. end
  111. def run(["vacuum", args]) do
  112. start_pleroma()
  113. Maintenance.vacuum(args)
  114. end
  115. def run(["ensure_expiration"]) do
  116. start_pleroma()
  117. days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365)
  118. Pleroma.Activity
  119. |> join(:inner, [a], o in Object,
  120. on:
  121. fragment(
  122. "(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
  123. o.data,
  124. a.data,
  125. a.data
  126. )
  127. )
  128. |> where(local: true)
  129. |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
  130. |> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
  131. |> Pleroma.Repo.chunk_stream(100, :batches)
  132. |> Stream.each(fn activities ->
  133. Enum.each(activities, fn activity ->
  134. expires_at =
  135. activity.inserted_at
  136. |> DateTime.from_naive!("Etc/UTC")
  137. |> Timex.shift(days: days)
  138. Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
  139. activity_id: activity.id,
  140. expires_at: expires_at
  141. })
  142. end)
  143. end)
  144. |> Stream.run()
  145. end
  146. def run(["set_text_search_config", tsconfig]) do
  147. start_pleroma()
  148. %{rows: [[tsc]]} = Ecto.Adapters.SQL.query!(Pleroma.Repo, "SHOW default_text_search_config;")
  149. shell_info("Current default_text_search_config: #{tsc}")
  150. %{rows: [[db]]} = Ecto.Adapters.SQL.query!(Pleroma.Repo, "SELECT current_database();")
  151. shell_info("Update default_text_search_config: #{tsconfig}")
  152. %{messages: msg} =
  153. Ecto.Adapters.SQL.query!(
  154. Pleroma.Repo,
  155. "ALTER DATABASE #{db} SET default_text_search_config = '#{tsconfig}';"
  156. )
  157. # non-exist config will not raise excpetion but only give >0 messages
  158. if length(msg) > 0 do
  159. shell_info("Error: #{inspect(msg, pretty: true)}")
  160. else
  161. rum_enabled = Pleroma.Config.get([:database, :rum_enabled])
  162. shell_info("Recreate index, RUM: #{rum_enabled}")
  163. # Note SQL below needs to be kept up-to-date with latest GIN or RUM index definition in future
  164. if rum_enabled do
  165. Ecto.Adapters.SQL.query!(
  166. Pleroma.Repo,
  167. "CREATE OR REPLACE FUNCTION objects_fts_update() RETURNS trigger AS $$ BEGIN
  168. new.fts_content := to_tsvector(new.data->>'content');
  169. RETURN new;
  170. END
  171. $$ LANGUAGE plpgsql"
  172. )
  173. shell_info("Refresh RUM index")
  174. Ecto.Adapters.SQL.query!(Pleroma.Repo, "UPDATE objects SET updated_at = NOW();")
  175. else
  176. Ecto.Adapters.SQL.query!(Pleroma.Repo, "DROP INDEX IF EXISTS objects_fts;")
  177. Ecto.Adapters.SQL.query!(
  178. Pleroma.Repo,
  179. "CREATE INDEX objects_fts ON objects USING gin(to_tsvector('#{tsconfig}', data->>'content')); "
  180. )
  181. end
  182. shell_info('Done.')
  183. end
  184. end
  185. # Rolls back a specific migration (leaving subsequent migrations applied).
  186. # WARNING: imposes a risk of unrecoverable data loss — proceed at your own responsibility.
  187. # Based on https://stackoverflow.com/a/53825840
  188. def run(["rollback", version]) do
  189. prompt = "SEVERE WARNING: this operation may result in unrecoverable data loss. Continue?"
  190. if shell_prompt(prompt, "n") in ~w(Yn Y y) do
  191. {_, result, _} =
  192. Ecto.Migrator.with_repo(Pleroma.Repo, fn repo ->
  193. version = String.to_integer(version)
  194. re = ~r/^#{version}_.*\.exs/
  195. path = Ecto.Migrator.migrations_path(repo)
  196. with {_, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))},
  197. {_, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))},
  198. {_, :ok} <- {:rollback, Ecto.Migrator.down(repo, version, mod)} do
  199. {:ok, "Reversed migration: #{file}"}
  200. else
  201. {:find, _} -> {:error, "No migration found with version prefix: #{version}"}
  202. {:compile, e} -> {:error, "Problem compiling migration module: #{inspect(e)}"}
  203. {:rollback, e} -> {:error, "Problem reversing migration: #{inspect(e)}"}
  204. end
  205. end)
  206. shell_info(inspect(result))
  207. end
  208. end
  209. end