expanding gun connections

closing least frequently used
separate pools with settings
This commit is contained in:
Alex S 2019-08-19 15:40:06 +03:00
parent 210e116dc0
commit 814159e668
8 changed files with 230 additions and 50 deletions

View File

@ -550,6 +550,24 @@ config :pleroma, :rate_limit,
password_reset: {1_800_000, 5}, password_reset: {1_800_000, 5},
account_confirmation_resend: {8_640_000, 5} account_confirmation_resend: {8_640_000, 5}
config :pleroma, :gun_pools,
federation: [
max_connections: 50,
timeout: 150_000
],
media: [
max_connections: 50,
timeout: 150_000
],
upload: [
max_connections: 25,
timeout: 300_000
],
default: [
max_connections: 10,
timout: 20_000
]
# Import environment specific config. This must remain at the bottom # Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above. # of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs" import_config "#{Mix.env()}.exs"

View File

@ -39,6 +39,7 @@ defmodule Pleroma.Application do
] ++ ] ++
cachex_children() ++ cachex_children() ++
hackney_pool_children() ++ hackney_pool_children() ++
gun_pools() ++
[ [
Pleroma.Web.Federator.RetryQueue, Pleroma.Web.Federator.RetryQueue,
Pleroma.Stats, Pleroma.Stats,
@ -163,6 +164,19 @@ defmodule Pleroma.Application do
end end
end end
defp gun_pools do
if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
for {pool_name, opts} <- Pleroma.Config.get([:gun_pools]) do
%{
id: :"gun_pool_#{pool_name}",
start: {Pleroma.Gun.Connections, :start_link, [{pool_name, opts}]}
}
end
else
[]
end
end
defp after_supervisor_start do defp after_supervisor_start do
with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest], with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
true <- digest_config[:active] do true <- digest_config[:active] do

View File

@ -4,11 +4,21 @@
defmodule Pleroma.Gun.API do defmodule Pleroma.Gun.API do
@callback open(charlist(), pos_integer(), map()) :: {:ok, pid()} @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()}
@callback info(pid()) :: map()
@callback close(pid()) :: :ok
def open(host, port, opts) do def open(host, port, opts) do
api().open(host, port, opts) api().open(host, port, opts)
end end
def info(pid) do
api().info(pid)
end
def close(pid) do
api().close(pid)
end
defp api do defp api do
Pleroma.Config.get([Pleroma.Gun.API], Pleroma.Gun.API.Gun) Pleroma.Config.get([Pleroma.Gun.API], Pleroma.Gun.API.Gun)
end end

View File

@ -19,4 +19,10 @@ defmodule Pleroma.Gun.API.Gun do
def open(host, port, opts) do def open(host, port, opts) do
:gun.open(host, port, Map.take(opts, @gun_keys)) :gun.open(host, port, Map.take(opts, @gun_keys))
end end
@impl Pleroma.Gun.API
def info(pid), do: :gun.info(pid)
@impl Pleroma.Gun.API
def close(pid), do: :gun.close(pid)
end end

View File

@ -5,36 +5,75 @@
defmodule Pleroma.Gun.API.Mock do defmodule Pleroma.Gun.API.Mock do
@behaviour Pleroma.Gun.API @behaviour Pleroma.Gun.API
@impl Pleroma.Gun.API @impl Pleroma.Gun.API
def open('some-domain.com', 80, %{genserver_pid: genserver_pid}) do def open(domain, 80, %{genserver_pid: genserver_pid})
when domain in ['another-domain.com', 'some-domain.com'] do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{
origin_scheme: "http",
origin_host: domain,
origin_port: 80
})
send(genserver_pid, {:gun_up, conn_pid, :http}) send(genserver_pid, {:gun_up, conn_pid, :http})
{:ok, conn_pid} {:ok, conn_pid}
end end
def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{
origin_scheme: "https",
origin_host: 'some-domain.com',
origin_port: 443
})
send(genserver_pid, {:gun_up, conn_pid, :http2}) send(genserver_pid, {:gun_up, conn_pid, :http2})
{:ok, conn_pid} {:ok, conn_pid}
end end
@impl Pleroma.Gun.API @impl Pleroma.Gun.API
def open('gun_down.com', _port, %{genserver_pid: genserver_pid}) do def open('gun_down.com', 80, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{
origin_scheme: "http",
origin_host: 'gun_down.com',
origin_port: 80
})
send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
{:ok, conn_pid} {:ok, conn_pid}
end end
@impl Pleroma.Gun.API @impl Pleroma.Gun.API
def open('gun_down_and_up.com', _port, %{genserver_pid: genserver_pid}) do def open('gun_down_and_up.com', 80, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{
origin_scheme: "http",
origin_host: 'gun_down_and_up.com',
origin_port: 80
})
send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
{:ok, _} = {:ok, _} =
Task.start_link(fn -> Task.start_link(fn ->
Process.sleep(500) Process.sleep(500)
send(genserver_pid, {:gun_up, conn_pid, :http}) send(genserver_pid, {:gun_up, conn_pid, :http})
end) end)
{:ok, conn_pid} {:ok, conn_pid}
end end
@impl Pleroma.Gun.API
def info(pid) do
[{_, info}] = Registry.lookup(Pleroma.Gun.API.Mock, pid)
info
end
@impl Pleroma.Gun.API
def close(_pid), do: :ok
end end

View File

@ -10,8 +10,8 @@ defmodule Pleroma.Gun.Conn do
conn: pid(), conn: pid(),
state: atom(), state: atom(),
waiting_pids: [pid()], waiting_pids: [pid()],
protocol: atom() used: pos_integer()
} }
defstruct conn: nil, state: :open, waiting_pids: [], protocol: :http defstruct conn: nil, state: :open, waiting_pids: [], used: 0
end end

View File

@ -6,29 +6,31 @@ defmodule Pleroma.Gun.Connections do
use GenServer use GenServer
@type domain :: String.t() @type domain :: String.t()
@type conn :: Gun.Conn.t() @type conn :: Pleroma.Gun.Conn.t()
@type t :: %__MODULE__{ @type t :: %__MODULE__{
conns: %{domain() => conn()} conns: %{domain() => conn()},
opts: keyword()
} }
defstruct conns: %{} defstruct conns: %{}, opts: []
def start_link(name \\ __MODULE__) do @spec start_link({atom(), keyword()}) :: {:ok, pid()} | :ignore
def start_link({name, opts}) do
if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
GenServer.start_link(__MODULE__, [], name: name) GenServer.start_link(__MODULE__, opts, name: name)
else else
:ignore :ignore
end end
end end
@impl true @impl true
def init(_) do def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
{:ok, %__MODULE__{conns: %{}}}
end
@spec get_conn(String.t(), keyword(), atom()) :: pid() @spec get_conn(String.t(), keyword(), atom()) :: pid()
def get_conn(url, opts \\ [], name \\ __MODULE__) do def get_conn(url, opts \\ [], name \\ :default) do
opts = Enum.into(opts, %{}) opts = Enum.into(opts, %{})
uri = URI.parse(url) uri = URI.parse(url)
opts = opts =
@ -58,13 +60,13 @@ defmodule Pleroma.Gun.Connections do
end end
@spec alive?(atom()) :: boolean() @spec alive?(atom()) :: boolean()
def alive?(name \\ __MODULE__) do def alive?(name \\ :default) do
pid = Process.whereis(name) pid = Process.whereis(name)
if pid, do: Process.alive?(pid), else: false if pid, do: Process.alive?(pid), else: false
end end
@spec get_state(atom()) :: t() @spec get_state(atom()) :: t()
def get_state(name \\ __MODULE__) do def get_state(name \\ :default) do
GenServer.call(name, {:state}) GenServer.call(name, {:state})
end end
@ -73,7 +75,8 @@ defmodule Pleroma.Gun.Connections do
key = compose_key(uri) key = compose_key(uri)
case state.conns[key] do case state.conns[key] do
%{conn: conn, state: conn_state} when conn_state == :up -> %{conn: conn, state: conn_state, used: used} when conn_state == :up ->
state = put_in(state.conns[key].used, used + 1)
{:reply, conn, state} {:reply, conn, state}
%{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] -> %{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] ->
@ -81,16 +84,23 @@ defmodule Pleroma.Gun.Connections do
{:noreply, state} {:noreply, state}
nil -> nil ->
{:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts) max_connections = state.opts[:max_connections]
if Enum.count(state.conns) < max_connections do
open_conn(key, uri, from, state, opts)
else
[{close_key, least_used} | _conns] = Enum.sort_by(state.conns, fn {_k, v} -> v.used end)
:ok = Pleroma.Gun.API.close(least_used.conn)
state = state =
put_in(state.conns[key], %Pleroma.Gun.Conn{ put_in(
conn: conn, state.conns,
waiting_pids: [from], Map.delete(state.conns, close_key)
protocol: String.to_atom(uri.scheme) )
})
{:noreply, state} open_conn(key, uri, from, state, opts)
end
end end
end end
@ -99,20 +109,29 @@ defmodule Pleroma.Gun.Connections do
@impl true @impl true
def handle_info({:gun_up, conn_pid, _protocol}, state) do def handle_info({:gun_up, conn_pid, _protocol}, state) do
{key, conn} = find_conn(state.conns, conn_pid) conn_key = compose_key_gun_info(conn_pid)
{key, conn} = find_conn(state.conns, conn_pid, conn_key)
# Send to all waiting processes connection pid # Send to all waiting processes connection pid
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end) Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
# Update state of the current connection and set waiting_pids to empty list # Update state of the current connection and set waiting_pids to empty list
state = put_in(state.conns[key], %{conn | state: :up, waiting_pids: []}) state =
put_in(state.conns[key], %{
conn
| state: :up,
waiting_pids: [],
used: conn.used + length(conn.waiting_pids)
})
{:noreply, state} {:noreply, state}
end end
@impl true @impl true
# Do we need to do something with killed & unprocessed references? # Do we need to do something with killed & unprocessed references?
def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed, _unprocessed}, state) do def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed, _unprocessed}, state) do
{key, conn} = find_conn(state.conns, conn_pid) conn_key = compose_key_gun_info(conn_pid)
{key, conn} = find_conn(state.conns, conn_pid, conn_key)
# We don't want to block requests to GenServer if gun send down message, return nil, so we can make some retries, while connection is not up # We don't want to block requests to GenServer if gun send down message, return nil, so we can make some retries, while connection is not up
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end) Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end)
@ -121,12 +140,28 @@ defmodule Pleroma.Gun.Connections do
{:noreply, state} {:noreply, state}
end end
defp compose_key(uri), do: uri.host <> ":" <> to_string(uri.port) defp compose_key(uri), do: "#{uri.scheme}:#{uri.host}:#{uri.port}"
defp find_conn(conns, conn_pid) do defp compose_key_gun_info(pid) do
info = Pleroma.Gun.API.info(pid)
"#{info.origin_scheme}:#{info.origin_host}:#{info.origin_port}"
end
defp find_conn(conns, conn_pid, conn_key) do
Enum.find(conns, fn {key, conn} -> Enum.find(conns, fn {key, conn} ->
protocol = if String.ends_with?(key, ":443"), do: :https, else: :http key == conn_key and conn.conn == conn_pid
conn.conn == conn_pid and conn.protocol == protocol
end) end)
end end
defp open_conn(key, uri, from, state, opts) do
{:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts)
state =
put_in(state.conns[key], %Pleroma.Gun.Conn{
conn: conn,
waiting_pids: [from]
})
{:noreply, state}
end
end end

View File

@ -6,12 +6,17 @@ defmodule Gun.ConnectionsTest do
use ExUnit.Case use ExUnit.Case
alias Pleroma.Gun.{Connections, Conn, API} alias Pleroma.Gun.{Connections, Conn, API}
setup_all do
{:ok, _} = Registry.start_link(keys: :unique, name: API.Mock)
:ok
end
setup do setup do
name = :test_gun_connections name = :test_gun_connections
adapter = Application.get_env(:tesla, :adapter) adapter = Application.get_env(:tesla, :adapter)
Application.put_env(:tesla, :adapter, Tesla.Adapter.Gun) Application.put_env(:tesla, :adapter, Tesla.Adapter.Gun)
on_exit(fn -> Application.put_env(:tesla, :adapter, adapter) end) on_exit(fn -> Application.put_env(:tesla, :adapter, adapter) end)
{:ok, pid} = Connections.start_link(name) {:ok, pid} = Connections.start_link({name, [max_connections: 2, timeout: 10]})
{:ok, name: name, pid: pid} {:ok, name: name, pid: pid}
end end
@ -37,10 +42,11 @@ defmodule Gun.ConnectionsTest do
%Connections{ %Connections{
conns: %{ conns: %{
"some-domain.com:80" => %Conn{ "http:some-domain.com:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, state: :up,
waiting_pids: [] waiting_pids: [],
used: 2
} }
} }
} = Connections.get_state(name) } = Connections.get_state(name)
@ -58,10 +64,11 @@ defmodule Gun.ConnectionsTest do
%Connections{ %Connections{
conns: %{ conns: %{
"some-domain.com:80" => %Conn{ "http:some-domain.com:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, state: :up,
waiting_pids: [] waiting_pids: [],
used: 2
} }
} }
} = Connections.get_state(name) } = Connections.get_state(name)
@ -84,12 +91,12 @@ defmodule Gun.ConnectionsTest do
%Connections{ %Connections{
conns: %{ conns: %{
"some-domain.com:80" => %Conn{ "http:some-domain.com:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, state: :up,
waiting_pids: [] waiting_pids: []
}, },
"some-domain.com:443" => %Conn{ "https:some-domain.com:443" => %Conn{
conn: ^https_conn, conn: ^https_conn,
state: :up, state: :up,
waiting_pids: [] waiting_pids: []
@ -105,7 +112,7 @@ defmodule Gun.ConnectionsTest do
%Connections{ %Connections{
conns: %{ conns: %{
"gun_down.com:80" => %Conn{ "http:gun_down.com:80" => %Conn{
conn: _, conn: _,
state: :down, state: :down,
waiting_pids: _ waiting_pids: _
@ -121,10 +128,11 @@ defmodule Gun.ConnectionsTest do
%Connections{ %Connections{
conns: %{ conns: %{
"gun_down_and_up.com:80" => %Conn{ "http:gun_down_and_up.com:80" => %Conn{
conn: _, conn: _,
state: :down, state: :down,
waiting_pids: _ waiting_pids: _,
used: 0
} }
} }
} = Connections.get_state(name) } = Connections.get_state(name)
@ -136,10 +144,11 @@ defmodule Gun.ConnectionsTest do
%Connections{ %Connections{
conns: %{ conns: %{
"gun_down_and_up.com:80" => %Conn{ "http:gun_down_and_up.com:80" => %Conn{
conn: _, conn: _,
state: :up, state: :up,
waiting_pids: [] waiting_pids: [],
used: 2
} }
} }
} = Connections.get_state(name) } = Connections.get_state(name)
@ -164,10 +173,11 @@ defmodule Gun.ConnectionsTest do
%Connections{ %Connections{
conns: %{ conns: %{
"some-domain.com:80" => %Conn{ "http:some-domain.com:80" => %Conn{
conn: conn, conn: conn,
state: :up, state: :up,
waiting_pids: [] waiting_pids: [],
used: 5
} }
} }
} = Connections.get_state(name) } = Connections.get_state(name)
@ -175,6 +185,52 @@ defmodule Gun.ConnectionsTest do
assert Enum.all?(conns, fn res -> res == conn end) assert Enum.all?(conns, fn res -> res == conn end)
end end
test "remove frequently used", %{name: name, pid: pid} do
Connections.get_conn("https://some-domain.com", [genserver_pid: pid], name)
for _ <- 1..4 do
Connections.get_conn("http://some-domain.com", [genserver_pid: pid], name)
end
%Connections{
conns: %{
"http:some-domain.com:80" => %Conn{
conn: _,
state: :up,
waiting_pids: [],
used: 4
},
"https:some-domain.com:443" => %Conn{
conn: _,
state: :up,
waiting_pids: [],
used: 1
}
},
opts: [max_connections: 2, timeout: 10]
} = Connections.get_state(name)
conn = Connections.get_conn("http://another-domain.com", [genserver_pid: pid], name)
%Connections{
conns: %{
"http:another-domain.com:80" => %Conn{
conn: ^conn,
state: :up,
waiting_pids: [],
used: 1
},
"http:some-domain.com:80" => %Conn{
conn: _,
state: :up,
waiting_pids: [],
used: 4
}
},
opts: [max_connections: 2, timeout: 10]
} = Connections.get_state(name)
end
describe "integration test" do describe "integration test" do
@describetag :integration @describetag :integration
@ -193,10 +249,11 @@ defmodule Gun.ConnectionsTest do
%Connections{ %Connections{
conns: %{ conns: %{
"httpbin.org:80" => %Conn{ "http:httpbin.org:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, state: :up,
waiting_pids: [] waiting_pids: [],
used: 2
} }
} }
} = Connections.get_state(name) } = Connections.get_state(name)
@ -217,10 +274,11 @@ defmodule Gun.ConnectionsTest do
%Connections{ %Connections{
conns: %{ conns: %{
"httpbin.org:443" => %Conn{ "https:httpbin.org:443" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, state: :up,
waiting_pids: [] waiting_pids: [],
used: 2
} }
} }
} = Connections.get_state(name) } = Connections.get_state(name)