- fix for gun worker termination in some circumstances - pool for http clients (ex_aws, tzdata) - default pool timeouts for gun - gun retries on gun_down messages - s3 upload timeout if streaming enabledmessage-debug-mode
@@ -740,19 +740,23 @@ config :pleroma, :connections_pool, | |||
config :pleroma, :pools, | |||
federation: [ | |||
size: 50, | |||
max_waiting: 10 | |||
max_waiting: 10, | |||
timeout: 10_000 | |||
], | |||
media: [ | |||
size: 50, | |||
max_waiting: 10 | |||
max_waiting: 10, | |||
timeout: 10_000 | |||
], | |||
upload: [ | |||
size: 25, | |||
max_waiting: 5 | |||
max_waiting: 5, | |||
timeout: 15_000 | |||
], | |||
default: [ | |||
size: 10, | |||
max_waiting: 2 | |||
max_waiting: 2, | |||
timeout: 5_000 | |||
] | |||
config :pleroma, :hackney_pools, | |||
@@ -83,17 +83,25 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do | |||
end) | |||
{ref, state} = pop_in(state.client_monitors[client_pid]) | |||
Process.demonitor(ref) | |||
timer = | |||
if used_by == [] do | |||
max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000) | |||
Process.send_after(self(), :idle_close, max_idle) | |||
# DOWN message can receive right after `remove_client` call and cause worker to terminate | |||
state = | |||
if is_nil(ref) do | |||
state | |||
else | |||
nil | |||
Process.demonitor(ref) | |||
timer = | |||
if used_by == [] do | |||
max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000) | |||
Process.send_after(self(), :idle_close, max_idle) | |||
else | |||
nil | |||
end | |||
%{state | timer: timer} | |||
end | |||
{:reply, :ok, %{state | timer: timer}, :hibernate} | |||
{:reply, :ok, state, :hibernate} | |||
end | |||
@impl true | |||
@@ -103,16 +111,21 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do | |||
{:stop, :normal, state} | |||
end | |||
@impl true | |||
def handle_info({:gun_up, _pid, _protocol}, state) do | |||
{:noreply, state, :hibernate} | |||
end | |||
# Gracefully shutdown if the connection got closed without any streams left | |||
@impl true | |||
def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do | |||
{:stop, :normal, state} | |||
end | |||
# Otherwise, shutdown with an error | |||
# Otherwise, wait for retry | |||
@impl true | |||
def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do | |||
{:stop, {:error, down_message}, state} | |||
def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams}, state) do | |||
{:noreply, state, :hibernate} | |||
end | |||
@impl true | |||
@@ -10,6 +10,7 @@ defmodule Pleroma.HTTP.AdapterHelper do | |||
@type proxy_type() :: :socks4 | :socks5 | |||
@type host() :: charlist() | :inet.ip_address() | |||
@type pool() :: :federation | :upload | :media | :default | |||
alias Pleroma.Config | |||
alias Pleroma.HTTP.AdapterHelper | |||
@@ -44,14 +45,13 @@ defmodule Pleroma.HTTP.AdapterHelper do | |||
@spec options(URI.t(), keyword()) :: keyword() | |||
def options(%URI{} = uri, opts \\ []) do | |||
@defaults | |||
|> put_timeout() | |||
|> Keyword.merge(opts) | |||
|> put_timeout() | |||
|> adapter_helper().options(uri) | |||
end | |||
# For Hackney, this is the time a connection can stay idle in the pool. | |||
# For Gun, this is the timeout to receive a message from Gun. | |||
defp put_timeout(opts) do | |||
@spec pool_timeout(pool()) :: non_neg_integer() | |||
def pool_timeout(pool) do | |||
{config_key, default} = | |||
if adapter() == Tesla.Adapter.Gun do | |||
{:pools, Config.get([:pools, :default, :timeout], 5_000)} | |||
@@ -59,9 +59,13 @@ defmodule Pleroma.HTTP.AdapterHelper do | |||
{:hackney_pools, 10_000} | |||
end | |||
timeout = Config.get([config_key, opts[:pool], :timeout], default) | |||
Config.get([config_key, pool, :timeout], default) | |||
end | |||
Keyword.merge(opts, timeout: timeout) | |||
# For Hackney, this is the time a connection can stay idle in the pool. | |||
# For Gun, this is the timeout to receive a message from Gun. | |||
defp put_timeout(opts) do | |||
Keyword.put_new(opts, :timeout, pool_timeout(opts[:pool])) | |||
end | |||
def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts) | |||
@@ -5,6 +5,7 @@ | |||
defmodule Pleroma.HTTP.AdapterHelper.Gun do | |||
@behaviour Pleroma.HTTP.AdapterHelper | |||
alias Pleroma.Config | |||
alias Pleroma.Gun.ConnectionPool | |||
alias Pleroma.HTTP.AdapterHelper | |||
@@ -14,7 +15,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do | |||
connect_timeout: 5_000, | |||
domain_lookup_timeout: 5_000, | |||
tls_handshake_timeout: 5_000, | |||
retry: 0, | |||
retry: 1, | |||
retry_timeout: 1000, | |||
await_up_timeout: 5_000 | |||
] | |||
@@ -22,10 +23,11 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do | |||
@spec options(keyword(), URI.t()) :: keyword() | |||
def options(incoming_opts \\ [], %URI{} = uri) do | |||
proxy = | |||
Pleroma.Config.get([:http, :proxy_url]) | |||
[:http, :proxy_url] | |||
|> Config.get() | |||
|> AdapterHelper.format_proxy() | |||
config_opts = Pleroma.Config.get([:http, :adapter], []) | |||
config_opts = Config.get([:http, :adapter], []) | |||
@defaults | |||
|> Keyword.merge(config_opts) | |||
@@ -37,8 +39,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do | |||
defp add_scheme_opts(opts, %{scheme: "http"}), do: opts | |||
defp add_scheme_opts(opts, %{scheme: "https"}) do | |||
opts | |||
|> Keyword.put(:certificates_verification, true) | |||
Keyword.put(opts, :certificates_verification, true) | |||
end | |||
@spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()} | |||
@@ -51,11 +52,11 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do | |||
@prefix Pleroma.Gun.ConnectionPool | |||
def limiter_setup do | |||
wait = Pleroma.Config.get([:connections_pool, :connection_acquisition_wait]) | |||
retries = Pleroma.Config.get([:connections_pool, :connection_acquisition_retries]) | |||
wait = Config.get([:connections_pool, :connection_acquisition_wait]) | |||
retries = Config.get([:connections_pool, :connection_acquisition_retries]) | |||
:pools | |||
|> Pleroma.Config.get([]) | |||
|> Config.get([]) | |||
|> Enum.each(fn {name, opts} -> | |||
max_running = Keyword.get(opts, :size, 50) | |||
max_waiting = Keyword.get(opts, :max_waiting, 10) | |||
@@ -69,7 +70,6 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do | |||
case result do | |||
:ok -> :ok | |||
{:error, :existing} -> :ok | |||
e -> raise e | |||
end | |||
end) | |||
@@ -11,6 +11,8 @@ defmodule Pleroma.HTTP.ExAws do | |||
@impl true | |||
def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do | |||
http_opts = Keyword.put(http_opts, :adapter, pool: :upload) | |||
case HTTP.request(method, url, body, headers, http_opts) do | |||
{:ok, env} -> | |||
{:ok, %{status_code: env.status, headers: env.headers, body: env.body}} | |||
@@ -11,6 +11,8 @@ defmodule Pleroma.HTTP.Tzdata do | |||
@impl true | |||
def get(url, headers, options) do | |||
options = Keyword.put(options, :adapter, pool: :upload) | |||
with {:ok, %Tesla.Env{} = env} <- HTTP.get(url, headers, options) do | |||
{:ok, {env.status, env.headers, env.body}} | |||
end | |||
@@ -18,6 +20,8 @@ defmodule Pleroma.HTTP.Tzdata do | |||
@impl true | |||
def head(url, headers, options) do | |||
options = Keyword.put(options, :adapter, pool: :upload) | |||
with {:ok, %Tesla.Env{} = env} <- HTTP.head(url, headers, options) do | |||
{:ok, {env.status, env.headers}} | |||
end | |||
@@ -46,12 +46,19 @@ defmodule Pleroma.Uploaders.S3 do | |||
op = | |||
if streaming do | |||
upload.tempfile | |||
|> ExAws.S3.Upload.stream_file() | |||
|> ExAws.S3.upload(bucket, s3_name, [ | |||
{:acl, :public_read}, | |||
{:content_type, upload.content_type} | |||
]) | |||
op = | |||
upload.tempfile | |||
|> ExAws.S3.Upload.stream_file() | |||
|> ExAws.S3.upload(bucket, s3_name, [ | |||
{:acl, :public_read}, | |||
{:content_type, upload.content_type} | |||
]) | |||
# set s3 upload timeout to respect :upload pool timeout | |||
# timeout should be slightly larger, so s3 can retry upload on fail | |||
timeout = Pleroma.HTTP.AdapterHelper.pool_timeout(:upload) + 1_000 | |||
opts = Keyword.put(op.opts, :timeout, timeout) | |||
Map.put(op, :opts, opts) | |||
else | |||
{:ok, file_data} = File.read(upload.tempfile) | |||