@@ -42,7 +42,9 @@ defmodule Pleroma.Application do | |||
setup_instrumenters() | |||
load_custom_modules() | |||
if adapter() == Tesla.Adapter.Gun do | |||
adapter = Application.get_env(:tesla, :adapter) | |||
if adapter == Tesla.Adapter.Gun do | |||
if version = Pleroma.OTPVersion.version() do | |||
[major, minor] = | |||
version | |||
@@ -74,7 +76,7 @@ defmodule Pleroma.Application do | |||
Pleroma.Plugs.RateLimiter.Supervisor | |||
] ++ | |||
cachex_children() ++ | |||
http_pools_children(Config.get(:env)) ++ | |||
http_children(adapter, @env) ++ | |||
[ | |||
Pleroma.Stats, | |||
Pleroma.JobQueueMonitor, | |||
@@ -206,15 +208,13 @@ defmodule Pleroma.Application do | |||
end | |||
# start hackney and gun pools in tests | |||
defp http_pools_children(:test) do | |||
defp http_children(_, :test) do | |||
hackney_options = Config.get([:hackney_pools, :federation]) | |||
hackney_pool = :hackney_pool.child_spec(:federation, hackney_options) | |||
[hackney_pool, Pleroma.Pool.Supervisor] | |||
end | |||
defp http_pools_children(_), do: http_pools(adapter()) | |||
defp http_pools(Tesla.Adapter.Hackney) do | |||
defp http_children(Tesla.Adapter.Hackney, _) do | |||
pools = [:federation, :media] | |||
pools = | |||
@@ -230,9 +230,7 @@ defmodule Pleroma.Application do | |||
end | |||
end | |||
defp http_pools(Tesla.Adapter.Gun), do: [Pleroma.Pool.Supervisor] | |||
defp http_pools(_), do: [] | |||
defp http_children(Tesla.Adapter.Gun, _), do: [Pleroma.Pool.Supervisor] | |||
defp adapter, do: Application.get_env(:tesla, :adapter) | |||
defp http_children(_, _), do: [] | |||
end |
@@ -5,6 +5,7 @@ | |||
defmodule Pleroma.Config.TransferTask do | |||
use Task | |||
alias Pleroma.Config | |||
alias Pleroma.ConfigDB | |||
alias Pleroma.Repo | |||
@@ -36,36 +37,31 @@ defmodule Pleroma.Config.TransferTask do | |||
def start_link(_) do | |||
load_and_update_env() | |||
if Pleroma.Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Repo) | |||
if Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Repo) | |||
:ignore | |||
end | |||
@spec load_and_update_env([ConfigDB.t()]) :: :ok | false | |||
def load_and_update_env(deleted \\ [], restart_pleroma? \\ true) do | |||
with {_, true} <- {:configurable, Pleroma.Config.get(:configurable_from_database)} do | |||
@spec load_and_update_env([ConfigDB.t()], boolean()) :: :ok | |||
def load_and_update_env(deleted_settings \\ [], restart_pleroma? \\ true) do | |||
with {_, true} <- {:configurable, Config.get(:configurable_from_database)} do | |||
# We need to restart applications for loaded settings take effect | |||
in_db = Repo.all(ConfigDB) | |||
with_deleted = in_db ++ deleted | |||
# TODO: some problem with prometheus after restart! | |||
reject = [nil, :prometheus] | |||
reject_for_restart = | |||
reject_restart = | |||
if restart_pleroma? do | |||
reject | |||
[nil, :prometheus] | |||
else | |||
[:pleroma | reject] | |||
[:pleroma, nil, :prometheus] | |||
end | |||
started_applications = Application.started_applications() | |||
with_deleted | |||
|> Enum.map(&merge_and_update(&1)) | |||
(Repo.all(ConfigDB) ++ deleted_settings) | |||
|> Enum.map(&merge_and_update/1) | |||
|> Enum.uniq() | |||
|> Enum.reject(&(&1 in reject_for_restart)) | |||
|> Enum.reject(&(&1 in reject_restart)) | |||
|> maybe_set_pleroma_last() | |||
|> Enum.each(&restart(started_applications, &1, Pleroma.Config.get(:env))) | |||
|> Enum.each(&restart(started_applications, &1, Config.get(:env))) | |||
:ok | |||
else | |||
@@ -108,18 +104,14 @@ defmodule Pleroma.Config.TransferTask do | |||
key = ConfigDB.from_string(setting.key) | |||
group = ConfigDB.from_string(setting.group) | |||
default = Pleroma.Config.Holder.config(group, key) | |||
default = Config.Holder.config(group, key) | |||
value = ConfigDB.from_binary(setting.value) | |||
merged_value = | |||
if Ecto.get_meta(setting, :state) == :deleted do | |||
default | |||
else | |||
if can_be_merged?(default, value) do | |||
ConfigDB.merge_group(group, key, default, value) | |||
else | |||
value | |||
end | |||
cond do | |||
Ecto.get_meta(setting, :state) == :deleted -> default | |||
can_be_merged?(default, value) -> ConfigDB.merge_group(group, key, default, value) | |||
true -> value | |||
end | |||
:ok = update_env(group, key, merged_value) | |||
@@ -49,8 +49,6 @@ defmodule Pleroma.Gun.Conn do | |||
key = "#{uri.scheme}:#{uri.host}:#{uri.port}" | |||
Logger.debug("opening new connection #{Connections.compose_uri_log(uri)}") | |||
conn_pid = | |||
if Connections.count(name) < opts[:max_connection] do | |||
do_open(uri, opts) | |||
@@ -109,9 +107,9 @@ defmodule Pleroma.Gun.Conn do | |||
else | |||
error -> | |||
Logger.warn( | |||
"Received error on opening connection with http proxy #{ | |||
Connections.compose_uri_log(uri) | |||
} #{inspect(error)}" | |||
"Opening proxied connection to #{compose_uri_log(uri)} failed with error #{ | |||
inspect(error) | |||
}" | |||
) | |||
error | |||
@@ -145,9 +143,9 @@ defmodule Pleroma.Gun.Conn do | |||
else | |||
error -> | |||
Logger.warn( | |||
"Received error on opening connection with socks proxy #{ | |||
Connections.compose_uri_log(uri) | |||
} #{inspect(error)}" | |||
"Opening socks proxied connection to #{compose_uri_log(uri)} failed with error #{ | |||
inspect(error) | |||
}" | |||
) | |||
error | |||
@@ -163,9 +161,7 @@ defmodule Pleroma.Gun.Conn do | |||
else | |||
error -> | |||
Logger.warn( | |||
"Received error on opening connection #{Connections.compose_uri_log(uri)} #{ | |||
inspect(error) | |||
}" | |||
"Opening connection to #{compose_uri_log(uri)} failed with error #{inspect(error)}" | |||
) | |||
error | |||
@@ -184,16 +180,17 @@ defmodule Pleroma.Gun.Conn do | |||
defp add_http2_opts(opts, _, _), do: opts | |||
defp close_least_used_and_do_open(name, uri, opts) do | |||
Logger.debug("try to open conn #{Connections.compose_uri_log(uri)}") | |||
with [{close_key, least_used} | _conns] <- | |||
Connections.get_unused_conns(name), | |||
:ok <- Gun.close(least_used.conn) do | |||
Connections.remove_conn(name, close_key) | |||
with [{key, conn} | _conns] <- Connections.get_unused_conns(name), | |||
:ok <- Gun.close(conn.conn) do | |||
Connections.remove_conn(name, key) | |||
do_open(uri, opts) | |||
else | |||
[] -> {:error, :pool_overflowed} | |||
end | |||
end | |||
def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do | |||
"#{scheme}://#{host}#{path}" | |||
end | |||
end |
@@ -7,7 +7,7 @@ defmodule Pleroma.HTTP.AdapterHelper do | |||
@type proxy :: | |||
{Connection.host(), pos_integer()} | |||
| {Connection.proxy_type(), pos_integer()} | |||
| {Connection.proxy_type(), Connection.host(), pos_integer()} | |||
@callback options(keyword(), URI.t()) :: keyword() | |||
@callback after_request(keyword()) :: :ok | |||
@@ -20,8 +20,8 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do | |||
] | |||
@spec options(keyword(), URI.t()) :: keyword() | |||
def options(connection_opts \\ [], %URI{} = uri) do | |||
formatted_proxy = | |||
def options(incoming_opts \\ [], %URI{} = uri) do | |||
proxy = | |||
Pleroma.Config.get([:http, :proxy_url], nil) | |||
|> AdapterHelper.format_proxy() | |||
@@ -30,8 +30,8 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do | |||
@defaults | |||
|> Keyword.merge(config_opts) | |||
|> add_scheme_opts(uri) | |||
|> AdapterHelper.maybe_add_proxy(formatted_proxy) | |||
|> maybe_get_conn(uri, connection_opts) | |||
|> AdapterHelper.maybe_add_proxy(proxy) | |||
|> maybe_get_conn(uri, incoming_opts) | |||
end | |||
@spec after_request(keyword()) :: :ok | |||
@@ -43,44 +43,35 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do | |||
:ok | |||
end | |||
defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts | |||
defp add_scheme_opts(opts, %{scheme: "http"}), do: opts | |||
defp add_scheme_opts(opts, %URI{scheme: "https"}) do | |||
defp add_scheme_opts(opts, %{scheme: "https"}) do | |||
opts | |||
|> Keyword.put(:certificates_verification, true) | |||
|> Keyword.put(:transport, :tls) | |||
|> Keyword.put(:tls_opts, log_level: :warning) | |||
end | |||
defp maybe_get_conn(adapter_opts, uri, connection_opts) do | |||
defp maybe_get_conn(adapter_opts, uri, incoming_opts) do | |||
{receive_conn?, opts} = | |||
adapter_opts | |||
|> Keyword.merge(connection_opts) | |||
|> Keyword.merge(incoming_opts) | |||
|> Keyword.pop(:receive_conn, true) | |||
if Connections.alive?(:gun_connections) and receive_conn? do | |||
try_to_get_conn(uri, opts) | |||
checkin_conn(uri, opts) | |||
else | |||
opts | |||
end | |||
end | |||
defp try_to_get_conn(uri, opts) do | |||
defp checkin_conn(uri, opts) do | |||
case Connections.checkin(uri, :gun_connections) do | |||
nil -> | |||
Logger.debug( | |||
"Gun connections pool checkin was not successful. Trying to open conn for next request." | |||
) | |||
Task.start(fn -> Pleroma.Gun.Conn.open(uri, :gun_connections, opts) end) | |||
Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts]) | |||
opts | |||
conn when is_pid(conn) -> | |||
Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri_log(uri)}") | |||
opts | |||
|> Keyword.put(:conn, conn) | |||
|> Keyword.put(:close_conn, false) | |||
Keyword.merge(opts, conn: conn, close_conn: false) | |||
end | |||
end | |||
end |
@@ -71,15 +71,15 @@ defmodule Pleroma.HTTP.Connection do | |||
{:ok, parse_host(host), port} | |||
else | |||
{_, _} -> | |||
Logger.warn("parsing port in proxy fail #{inspect(proxy)}") | |||
Logger.warn("Parsing port failed #{inspect(proxy)}") | |||
{:error, :invalid_proxy_port} | |||
:error -> | |||
Logger.warn("parsing port in proxy fail #{inspect(proxy)}") | |||
Logger.warn("Parsing port failed #{inspect(proxy)}") | |||
{:error, :invalid_proxy_port} | |||
_ -> | |||
Logger.warn("parsing proxy fail #{inspect(proxy)}") | |||
Logger.warn("Parsing proxy failed #{inspect(proxy)}") | |||
{:error, :invalid_proxy} | |||
end | |||
end | |||
@@ -89,7 +89,7 @@ defmodule Pleroma.HTTP.Connection do | |||
{:ok, type, parse_host(host), port} | |||
else | |||
_ -> | |||
Logger.warn("parsing proxy fail #{inspect(proxy)}") | |||
Logger.warn("Parsing proxy failed #{inspect(proxy)}") | |||
{:error, :invalid_proxy} | |||
end | |||
end | |||
@@ -56,10 +56,9 @@ defmodule Pleroma.HTTP do | |||
{:ok, Env.t()} | {:error, any()} | |||
def request(method, url, body, headers, options) when is_binary(url) do | |||
uri = URI.parse(url) | |||
received_adapter_opts = Keyword.get(options, :adapter, []) | |||
adapter_opts = Connection.options(uri, received_adapter_opts) | |||
adapter_opts = Connection.options(uri, options[:adapter] || []) | |||
options = put_in(options[:adapter], adapter_opts) | |||
params = Keyword.get(options, :params, []) | |||
params = options[:params] || [] | |||
request = build_request(method, headers, options, url, body, params) | |||
adapter = Application.get_env(:tesla, :adapter) | |||
@@ -87,18 +87,11 @@ defmodule Pleroma.Pool.Connections do | |||
@impl true | |||
def handle_cast({:checkout, conn_pid, pid}, state) do | |||
Logger.debug("checkout #{inspect(conn_pid)}") | |||
state = | |||
with true <- Process.alive?(conn_pid), | |||
{key, conn} <- find_conn(state.conns, conn_pid), | |||
used_by <- List.keydelete(conn.used_by, pid, 0) do | |||
conn_state = | |||
if used_by == [] do | |||
:idle | |||
else | |||
conn.conn_state | |||
end | |||
conn_state = if used_by == [], do: :idle, else: conn.conn_state | |||
put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by}) | |||
else | |||
@@ -123,26 +116,23 @@ defmodule Pleroma.Pool.Connections do | |||
@impl true | |||
def handle_call({:checkin, uri}, from, state) do | |||
key = "#{uri.scheme}:#{uri.host}:#{uri.port}" | |||
Logger.debug("checkin #{key}") | |||
case state.conns[key] do | |||
%{conn: conn, gun_state: :up} = current_conn -> | |||
Logger.debug("reusing conn #{key}") | |||
%{conn: pid, gun_state: :up} = conn -> | |||
time = :os.system_time(:second) | |||
last_reference = time - current_conn.last_reference | |||
current_crf = crf(last_reference, 100, current_conn.crf) | |||
last_reference = time - conn.last_reference | |||
crf = crf(last_reference, 100, conn.crf) | |||
state = | |||
put_in(state.conns[key], %{ | |||
current_conn | |||
conn | |||
| last_reference: time, | |||
crf: current_crf, | |||
crf: crf, | |||
conn_state: :active, | |||
used_by: [from | current_conn.used_by] | |||
used_by: [from | conn.used_by] | |||
}) | |||
{:reply, conn, state} | |||
{:reply, pid, state} | |||
%{gun_state: :down} -> | |||
{:reply, nil, state} | |||
@@ -164,50 +154,48 @@ defmodule Pleroma.Pool.Connections do | |||
def handle_call(:unused_conns, _from, state) do | |||
unused_conns = | |||
state.conns | |||
|> Enum.filter(fn {_k, v} -> | |||
v.conn_state == :idle and v.used_by == [] | |||
end) | |||
|> Enum.sort(fn {_x_k, x}, {_y_k, y} -> | |||
x.crf <= y.crf and x.last_reference <= y.last_reference | |||
end) | |||
|> Enum.filter(&filter_conns/1) | |||
|> Enum.sort(&sort_conns/2) | |||
{:reply, unused_conns, state} | |||
end | |||
defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true | |||
defp filter_conns(_), do: false | |||
defp sort_conns({_, c1}, {_, c2}) do | |||
c1.crf <= c2.crf and c1.last_reference <= c2.last_reference | |||
end | |||
@impl true | |||
def handle_info({:gun_up, conn_pid, _protocol}, state) do | |||
%{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid) | |||
host = | |||
case :inet.ntoa(host) do | |||
{:error, :einval} -> host | |||
ip -> ip | |||
end | |||
key = "#{scheme}:#{host}:#{port}" | |||
state = | |||
with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid), | |||
{key, conn} <- find_conn(state.conns, conn_pid, conn_key), | |||
with {_key, conn} <- find_conn(state.conns, conn_pid, key), | |||
{true, key} <- {Process.alive?(conn_pid), key} do | |||
time = :os.system_time(:second) | |||
last_reference = time - conn.last_reference | |||
current_crf = crf(last_reference, 100, conn.crf) | |||
put_in(state.conns[key], %{ | |||
conn | |||
| gun_state: :up, | |||
last_reference: time, | |||
crf: current_crf, | |||
conn_state: :active, | |||
retries: 0 | |||
}) | |||
else | |||
:error_gun_info -> | |||
Logger.debug(":gun.info caused error") | |||
state | |||
{false, key} -> | |||
Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}") | |||
put_in( | |||
state.conns, | |||
Map.delete(state.conns, key) | |||
) | |||
nil -> | |||
Logger.debug(":gun_up message for conn which is not found in state") | |||
:ok = Gun.close(conn_pid) | |||
state | |||
@@ -224,7 +212,6 @@ defmodule Pleroma.Pool.Connections do | |||
with {key, conn} <- find_conn(state.conns, conn_pid), | |||
{true, key} <- {Process.alive?(conn_pid), key} do | |||
if conn.retries == retries do | |||
Logger.debug("closing conn if retries is eq #{inspect(conn_pid)}") | |||
:ok = Gun.close(conn.conn) | |||
put_in( | |||
@@ -240,18 +227,13 @@ defmodule Pleroma.Pool.Connections do | |||
end | |||
else | |||
{false, key} -> | |||
# gun can send gun_down for closed conn, maybe connection is not closed yet | |||
Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}") | |||
put_in( | |||
state.conns, | |||
Map.delete(state.conns, key) | |||
) | |||
nil -> | |||
Logger.debug(":gun_down message for conn which is not found in state") | |||
:ok = Gun.close(conn_pid) | |||
Logger.debug(":gun_down for conn which isn't found in state") | |||
state | |||
end | |||
@@ -275,7 +257,7 @@ defmodule Pleroma.Pool.Connections do | |||
) | |||
else | |||
nil -> | |||
Logger.debug(":DOWN message for conn which is not found in state") | |||
Logger.debug(":DOWN for conn which isn't found in state") | |||
state | |||
end | |||
@@ -283,18 +265,6 @@ defmodule Pleroma.Pool.Connections do | |||
{:noreply, state} | |||
end | |||
defp compose_key_gun_info(pid) do | |||
%{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = Gun.info(pid) | |||
host = | |||
case :inet.ntoa(origin_host) do | |||
{:error, :einval} -> origin_host | |||
ip -> ip | |||
end | |||
"#{scheme}:#{host}:#{port}" | |||
end | |||
defp find_conn(conns, conn_pid) do | |||
Enum.find(conns, fn {_key, conn} -> | |||
conn.conn == conn_pid | |||
@@ -310,8 +280,4 @@ defmodule Pleroma.Pool.Connections do | |||
def crf(current, steps, crf) do | |||
1 + :math.pow(0.5, current / steps) * crf | |||
end | |||
def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do | |||
"#{scheme}://#{host}#{path}" | |||
end | |||
end |
@@ -6,7 +6,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do | |||
use ExUnit.Case, async: true | |||
use Pleroma.Tests.Helpers | |||
import ExUnit.CaptureLog | |||
import Mox | |||
alias Pleroma.Config | |||
@@ -63,7 +62,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do | |||
opts = Gun.options([receive_conn: false], uri) | |||
assert opts[:certificates_verification] | |||
assert opts[:transport] == :tls | |||
end | |||
test "get conn on next request" do | |||
@@ -73,14 +71,12 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do | |||
on_exit(fn -> Logger.configure(level: level) end) | |||
uri = URI.parse("http://some-domain2.com") | |||
assert capture_log(fn -> | |||
opts = Gun.options(uri) | |||
opts = Gun.options(uri) | |||
assert opts[:conn] == nil | |||
assert opts[:close_conn] == nil | |||
end) =~ | |||
"Gun connections pool checkin was not successful. Trying to open conn for next request." | |||
assert opts[:conn] == nil | |||
assert opts[:close_conn] == nil | |||
Process.sleep(50) | |||
opts = Gun.options(uri) | |||
assert is_pid(opts[:conn]) | |||
@@ -355,7 +355,7 @@ defmodule Pleroma.Pool.ConnectionsTest do | |||
refute Conn.open(url, name) | |||
refute Connections.checkin(url, name) | |||
end) =~ | |||
"Received error on opening connection http://gun-not-up.com {:error, :timeout}" | |||
"Opening connection to http://gun-not-up.com failed with error {:error, :timeout}" | |||
end | |||
test "process gun_down message and then gun_up", %{name: name} do | |||