pleroma/lib/pleroma/repo.ex
rinpatch 5c0e1039ce Chunk the notification type backfill migration
Long-term we want that migration to be done entirely in SQL,
but for now this is a hotfix to not cause OOMs on large databases.

This is using a homegrown version of `Repo.stream`, it's worse in
terms of performance than the upstream since it doesn't use the same
prepared query for chunk queries, but unlike the upstream it supports
preloads.
2020-06-16 23:53:13 +03:00

114 lines
3.1 KiB
Elixir

# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Repo do
use Ecto.Repo,
otp_app: :pleroma,
adapter: Ecto.Adapters.Postgres,
migration_timestamps: [type: :naive_datetime_usec]
import Ecto.Query
require Logger
defmodule Instrumenter do
use Prometheus.EctoInstrumenter
end
@doc """
Dynamically loads the repository url from the
DATABASE_URL environment variable.
"""
def init(_, opts) do
{:ok, Keyword.put(opts, :url, System.get_env("DATABASE_URL"))}
end
@doc "find resource based on prepared query"
@spec find_resource(Ecto.Query.t()) :: {:ok, struct()} | {:error, :not_found}
def find_resource(%Ecto.Query{} = query) do
case __MODULE__.one(query) do
nil -> {:error, :not_found}
resource -> {:ok, resource}
end
end
def find_resource(_query), do: {:error, :not_found}
@doc """
Gets association from cache or loads if need
## Examples
iex> Repo.get_assoc(token, :user)
%User{}
"""
@spec get_assoc(struct(), atom()) :: {:ok, struct()} | {:error, :not_found}
def get_assoc(resource, association) do
case __MODULE__.preload(resource, association) do
%{^association => assoc} when not is_nil(assoc) -> {:ok, assoc}
_ -> {:error, :not_found}
end
end
def check_migrations_applied!() do
unless Pleroma.Config.get(
[:i_am_aware_this_may_cause_data_loss, :disable_migration_check],
false
) do
Ecto.Migrator.with_repo(__MODULE__, fn repo ->
down_migrations =
Ecto.Migrator.migrations(repo)
|> Enum.reject(fn
{:up, _, _} -> true
{:down, _, _} -> false
end)
if length(down_migrations) > 0 do
down_migrations_text =
Enum.map(down_migrations, fn {:down, id, name} -> "- #{name} (#{id})\n" end)
Logger.error(
"The following migrations were not applied:\n#{down_migrations_text}If you want to start Pleroma anyway, set\nconfig :pleroma, :i_am_aware_this_may_cause_data_loss, disable_migration_check: true"
)
raise Pleroma.Repo.UnappliedMigrationsError
end
end)
else
:ok
end
end
def chunk_stream(query, chunk_size) do
# We don't actually need start and end funcitons of resource streaming,
# but it seems to be the only way to not fetch records one-by-one and
# have individual records be the elements of the stream, instead of
# lists of records
Stream.resource(
fn -> 0 end,
fn
last_id ->
query
|> order_by(asc: :id)
|> where([r], r.id > ^last_id)
|> limit(^chunk_size)
|> all()
|> case do
[] ->
{:halt, last_id}
records ->
last_id = List.last(records).id
{records, last_id}
end
end,
fn _ -> :ok end
)
end
end
defmodule Pleroma.Repo.UnappliedMigrationsError do
defexception message: "Unapplied Migrations detected"
end