opening connection improvements

This commit is contained in:
Alexander Strizhakov 2020-04-23 15:52:13 +03:00
parent 7fb95ddb31
commit c012eb9f8a
No known key found for this signature in database
GPG Key ID: 022896A53AEF1381
5 changed files with 436 additions and 386 deletions

View File

@ -11,23 +11,25 @@ defmodule Pleroma.Gun.Conn do
require Logger require Logger
@type gun_state :: :up | :down @type gun_state :: :init | :up | :down
@type conn_state :: :active | :idle @type conn_state :: :init | :active | :idle
@type t :: %__MODULE__{ @type t :: %__MODULE__{
conn: pid(), conn: pid(),
gun_state: gun_state(), gun_state: gun_state(),
conn_state: conn_state(), conn_state: conn_state(),
used_by: [pid()], used_by: [GenServer.from()],
awaited_by: [GenServer.from()],
last_reference: pos_integer(), last_reference: pos_integer(),
crf: float(), crf: float(),
retries: pos_integer() retries: pos_integer()
} }
defstruct conn: nil, defstruct conn: nil,
gun_state: :open, gun_state: :init,
conn_state: :idle, conn_state: :init,
used_by: [], used_by: [],
awaited_by: [],
last_reference: 0, last_reference: 0,
crf: 1, crf: 1,
retries: 0 retries: 0
@ -51,22 +53,19 @@ defmodule Pleroma.Gun.Conn do
max_connections = pool_opts[:max_connections] || 250 max_connections = pool_opts[:max_connections] || 250
conn_pid = with {:ok, conn_pid} <- try_open(name, uri, opts, max_connections) do
if Connections.count(name) < max_connections do
do_open(uri, opts)
else
close_least_used_and_do_open(name, uri, opts)
end
if is_pid(conn_pid) do
conn = %Pleroma.Gun.Conn{
conn: conn_pid,
gun_state: :up,
last_reference: :os.system_time(:second)
}
:ok = Gun.set_owner(conn_pid, Process.whereis(name)) :ok = Gun.set_owner(conn_pid, Process.whereis(name))
Connections.add_conn(name, key, conn) Connections.update_conn(name, key, conn_pid)
else
_error -> Connections.remove_conn(name, key)
end
end
defp try_open(name, uri, opts, max_connections) do
if Connections.count(name) < max_connections do
do_open(uri, opts)
else
close_least_used_and_do_open(name, uri, opts)
end end
end end
@ -104,7 +103,7 @@ defmodule Pleroma.Gun.Conn do
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]), {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]),
stream <- Gun.connect(conn, connect_opts), stream <- Gun.connect(conn, connect_opts),
{:response, :fin, 200, _} <- Gun.await(conn, stream) do {:response, :fin, 200, _} <- Gun.await(conn, stream) do
conn {:ok, conn}
else else
error -> error ->
Logger.warn( Logger.warn(
@ -140,7 +139,7 @@ defmodule Pleroma.Gun.Conn do
with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts), with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
conn {:ok, conn}
else else
error -> error ->
Logger.warn( Logger.warn(
@ -158,7 +157,7 @@ defmodule Pleroma.Gun.Conn do
with {:ok, conn} <- Gun.open(host, port, opts), with {:ok, conn} <- Gun.open(host, port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
conn {:ok, conn}
else else
error -> error ->
Logger.warn( Logger.warn(

View File

@ -73,7 +73,6 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
defp checkin_conn(uri, opts) do defp checkin_conn(uri, opts) do
case Connections.checkin(uri, :gun_connections) do case Connections.checkin(uri, :gun_connections) do
nil -> nil ->
Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts])
opts opts
conn when is_pid(conn) -> conn when is_pid(conn) ->

View File

@ -7,11 +7,12 @@ defmodule Pleroma.Pool.Connections do
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.Gun alias Pleroma.Gun
alias Pleroma.Gun.Conn
require Logger require Logger
@type domain :: String.t() @type domain :: String.t()
@type conn :: Pleroma.Gun.Conn.t() @type conn :: Conn.t()
@type seconds :: pos_integer() @type seconds :: pos_integer()
@type t :: %__MODULE__{ @type t :: %__MODULE__{
@ -33,13 +34,11 @@ defmodule Pleroma.Pool.Connections do
end end
@spec checkin(String.t() | URI.t(), atom()) :: pid() | nil @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
def checkin(url, name) def checkin(url, name, opts \\ [])
def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name) def checkin(url, name, opts) when is_binary(url), do: checkin(URI.parse(url), name, opts)
def checkin(%URI{} = uri, name) do def checkin(%URI{} = uri, name, opts) do
timeout = Config.get([:connections_pool, :checkin_timeout], 250) GenServer.call(name, {:checkin, uri, opts, name})
GenServer.call(name, {:checkin, uri}, timeout)
end end
@spec alive?(atom()) :: boolean() @spec alive?(atom()) :: boolean()
@ -71,21 +70,55 @@ defmodule Pleroma.Pool.Connections do
GenServer.cast(name, {:checkout, conn, pid}) GenServer.cast(name, {:checkout, conn, pid})
end end
@spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok @spec add_conn(atom(), String.t(), Conn.t()) :: :ok
def add_conn(name, key, conn) do def add_conn(name, key, conn) do
GenServer.cast(name, {:add_conn, key, conn}) GenServer.cast(name, {:add_conn, key, conn})
end end
@spec update_conn(atom(), String.t(), pid()) :: :ok
def update_conn(name, key, conn_pid) do
GenServer.cast(name, {:update_conn, key, conn_pid})
end
@spec remove_conn(atom(), String.t()) :: :ok @spec remove_conn(atom(), String.t()) :: :ok
def remove_conn(name, key) do def remove_conn(name, key) do
GenServer.cast(name, {:remove_conn, key}) GenServer.cast(name, {:remove_conn, key})
end end
@spec refresh(atom()) :: :ok
def refresh(name) do
GenServer.call(name, :refresh)
end
@impl true @impl true
def handle_cast({:add_conn, key, conn}, state) do def handle_cast({:add_conn, key, conn}, state) do
state = put_in(state.conns[key], conn) {:noreply, put_in(state.conns[key], conn)}
end
@impl true
def handle_cast({:update_conn, key, conn_pid}, state) do
conn = state.conns[key]
Process.monitor(conn_pid)
conn =
Enum.reduce(conn.awaited_by, conn, fn waiting, conn ->
GenServer.reply(waiting, conn_pid)
time = :os.system_time(:second)
last_reference = time - conn.last_reference
crf = crf(last_reference, 100, conn.crf)
%{
conn
| last_reference: time,
crf: crf,
conn_state: :active,
used_by: [waiting | conn.used_by]
}
end)
state = put_in(state.conns[key], %{conn | conn: conn_pid, gun_state: :up, awaited_by: []})
Process.monitor(conn.conn)
{:noreply, state} {:noreply, state}
end end
@ -113,12 +146,14 @@ defmodule Pleroma.Pool.Connections do
@impl true @impl true
def handle_cast({:remove_conn, key}, state) do def handle_cast({:remove_conn, key}, state) do
conn = state.conns[key]
Enum.each(conn.awaited_by, fn waiting -> GenServer.reply(waiting, nil) end)
state = put_in(state.conns, Map.delete(state.conns, key)) state = put_in(state.conns, Map.delete(state.conns, key))
{:noreply, state} {:noreply, state}
end end
@impl true @impl true
def handle_call({:checkin, uri}, from, state) do def handle_call({:checkin, uri, opts, name}, from, state) do
key = "#{uri.scheme}:#{uri.host}:#{uri.port}" key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
case state.conns[key] do case state.conns[key] do
@ -141,8 +176,18 @@ defmodule Pleroma.Pool.Connections do
%{gun_state: :down} -> %{gun_state: :down} ->
{:reply, nil, state} {:reply, nil, state}
%{gun_state: :init} = conn ->
state = put_in(state.conns[key], %{conn | awaited_by: [from | conn.awaited_by]})
{:noreply, state}
nil -> nil ->
{:reply, nil, state} state =
put_in(state.conns[key], %Conn{
awaited_by: [from]
})
Task.start(Conn, :open, [uri, name, opts])
{:noreply, state}
end end
end end
@ -150,6 +195,11 @@ defmodule Pleroma.Pool.Connections do
def handle_call(:state, _from, state), do: {:reply, state, state} def handle_call(:state, _from, state), do: {:reply, state, state}
@impl true @impl true
def handle_call(:refresh, _from, state) do
{:reply, :ok, put_in(state.conns, %{})}
end
@impl true
def handle_call(:count, _from, state) do def handle_call(:count, _from, state) do
{:reply, Enum.count(state.conns), state} {:reply, Enum.count(state.conns), state}
end end

View File

@ -3,28 +3,27 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.HTTP.AdapterHelper.GunTest do defmodule Pleroma.HTTP.AdapterHelper.GunTest do
use ExUnit.Case, async: true use ExUnit.Case
use Pleroma.Tests.Helpers use Pleroma.Tests.Helpers
import Mox import Mox
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.Gun.Conn
alias Pleroma.HTTP.AdapterHelper.Gun alias Pleroma.HTTP.AdapterHelper.Gun
alias Pleroma.Pool.Connections alias Pleroma.Pool.Connections
setup :verify_on_exit! setup :verify_on_exit!
setup :set_mox_global
defp gun_mock(_) do
gun_mock()
:ok
end
defp gun_mock do defp gun_mock do
Pleroma.GunMock Pleroma.GunMock
|> stub(:open, fn _, _, _ -> Task.start_link(fn -> Process.sleep(1000) end) end) |> stub(:open, fn _, _, _ ->
Task.start_link(fn -> Process.sleep(1000) end)
end)
|> stub(:await_up, fn _, _ -> {:ok, :http} end) |> stub(:await_up, fn _, _ -> {:ok, :http} end)
|> stub(:set_owner, fn _, _ -> :ok end) |> stub(:set_owner, fn _, _ -> :ok end)
:ok
end end
describe "options/1" do describe "options/1" do
@ -62,25 +61,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
assert opts[:certificates_verification] assert opts[:certificates_verification]
end end
test "get conn on next request" do
gun_mock()
level = Application.get_env(:logger, :level)
Logger.configure(level: :debug)
on_exit(fn -> Logger.configure(level: level) end)
uri = URI.parse("http://some-domain2.com")
opts = Gun.options(uri)
assert opts[:conn] == nil
assert opts[:close_conn] == nil
Process.sleep(50)
opts = Gun.options(uri)
assert is_pid(opts[:conn])
assert opts[:close_conn] == false
end
test "merges with defaul http adapter config" do test "merges with defaul http adapter config" do
defaults = Gun.options([receive_conn: false], URI.parse("https://example.com")) defaults = Gun.options([receive_conn: false], URI.parse("https://example.com"))
assert Keyword.has_key?(defaults, :a) assert Keyword.has_key?(defaults, :a)
@ -91,8 +71,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
gun_mock() gun_mock()
uri = URI.parse("https://some-domain.com") uri = URI.parse("https://some-domain.com")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri) opts = Gun.options(uri)
assert opts[:certificates_verification] assert opts[:certificates_verification]
@ -134,11 +112,10 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
end end
describe "options/1 with receive_conn parameter" do describe "options/1 with receive_conn parameter" do
setup :gun_mock setup do: gun_mock()
test "receive conn by default" do test "receive conn by default" do
uri = URI.parse("http://another-domain.com") uri = URI.parse("http://another-domain.com")
:ok = Conn.open(uri, :gun_connections)
received_opts = Gun.options(uri) received_opts = Gun.options(uri)
assert received_opts[:close_conn] == false assert received_opts[:close_conn] == false
@ -147,7 +124,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
test "don't receive conn if receive_conn is false" do test "don't receive conn if receive_conn is false" do
uri = URI.parse("http://another-domain.com") uri = URI.parse("http://another-domain.com")
:ok = Conn.open(uri, :gun_connections)
opts = [receive_conn: false] opts = [receive_conn: false]
received_opts = Gun.options(opts, uri) received_opts = Gun.options(opts, uri)
@ -157,50 +133,51 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
end end
describe "after_request/1" do describe "after_request/1" do
setup :gun_mock setup do: gun_mock()
test "body_as not chunks" do test "body_as not chunks" do
uri = URI.parse("http://some-domain.com") uri = URI.parse("http://some-domain-5.com")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri) opts = Gun.options(uri)
:ok = Gun.after_request(opts) :ok = Gun.after_request(opts)
conn = opts[:conn] conn = opts[:conn]
assert %Connections{ assert match?(
conns: %{ %Connections{
"http:some-domain.com:80" => %Pleroma.Gun.Conn{ conns: %{
conn: ^conn, "http:some-domain-5.com:80" => %Pleroma.Gun.Conn{
conn_state: :idle, conn: ^conn,
used_by: [] conn_state: :idle,
used_by: []
}
} }
} },
} = Connections.get_state(:gun_connections) Connections.get_state(:gun_connections)
)
end end
test "body_as chunks" do test "body_as chunks" do
uri = URI.parse("http://some-domain.com") uri = URI.parse("http://some-domain-6.com")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options([body_as: :chunks], uri) opts = Gun.options([body_as: :chunks], uri)
:ok = Gun.after_request(opts) :ok = Gun.after_request(opts)
conn = opts[:conn] conn = opts[:conn]
self = self() self = self()
assert %Connections{ assert match?(
conns: %{ %Connections{
"http:some-domain.com:80" => %Pleroma.Gun.Conn{ conns: %{
conn: ^conn, "http:some-domain-6.com:80" => %Pleroma.Gun.Conn{
conn_state: :active, conn: ^conn,
used_by: [{^self, _}] conn_state: :active,
used_by: [{^self, _}]
}
} }
} },
} = Connections.get_state(:gun_connections) Connections.get_state(:gun_connections)
)
end end
test "with no connection" do test "with no connection" do
uri = URI.parse("http://uniq-domain.com") uri = URI.parse("http://uniq-domain.com")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options([body_as: :chunks], uri) opts = Gun.options([body_as: :chunks], uri)
conn = opts[:conn] conn = opts[:conn]
opts = Keyword.delete(opts, :conn) opts = Keyword.delete(opts, :conn)
@ -221,7 +198,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
test "with ipv4" do test "with ipv4" do
uri = URI.parse("http://127.0.0.1") uri = URI.parse("http://127.0.0.1")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri) opts = Gun.options(uri)
:ok = Gun.after_request(opts) :ok = Gun.after_request(opts)
conn = opts[:conn] conn = opts[:conn]
@ -239,7 +215,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
test "with ipv6" do test "with ipv6" do
uri = URI.parse("http://[2a03:2880:f10c:83:face:b00c:0:25de]") uri = URI.parse("http://[2a03:2880:f10c:83:face:b00c:0:25de]")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri) opts = Gun.options(uri)
:ok = Gun.after_request(opts) :ok = Gun.after_request(opts)
conn = opts[:conn] conn = opts[:conn]

View File

@ -3,7 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Pool.ConnectionsTest do defmodule Pleroma.Pool.ConnectionsTest do
use ExUnit.Case, async: true use ExUnit.Case
use Pleroma.Tests.Helpers use Pleroma.Tests.Helpers
import ExUnit.CaptureLog import ExUnit.CaptureLog
@ -14,11 +14,19 @@ defmodule Pleroma.Pool.ConnectionsTest do
alias Pleroma.Pool.Connections alias Pleroma.Pool.Connections
setup :verify_on_exit! setup :verify_on_exit!
setup :set_mox_global
setup_all do setup_all do
{:ok, pid} = Agent.start_link(fn -> %{} end, name: :gun_state)
on_exit(fn ->
if Process.alive?(pid), do: Agent.stop(pid)
end)
end
setup do
name = :test_connections name = :test_connections
{:ok, pid} = Connections.start_link({name, [checkin_timeout: 150]}) {:ok, pid} = Connections.start_link({name, []})
{:ok, _} = Registry.start_link(keys: :unique, name: Pleroma.GunMock)
on_exit(fn -> on_exit(fn ->
if Process.alive?(pid), do: GenServer.stop(name) if Process.alive?(pid), do: GenServer.stop(name)
@ -53,28 +61,27 @@ defmodule Pleroma.Pool.ConnectionsTest do
_ -> "http" _ -> "http"
end end
Registry.register(GunMock, pid, %{ info = %{
origin_scheme: scheme, origin_scheme: scheme,
origin_host: host, origin_host: host,
origin_port: port origin_port: port
}) }
Agent.update(:gun_state, &Map.put(&1, pid, %{info: info, ref: nil}))
{:ok, pid} {:ok, pid}
end end
defp info(pid) do defp info(pid), do: Agent.get(:gun_state, & &1[pid][:info])
[{_, info}] = Registry.lookup(GunMock, pid)
info
end
defp connect(pid, _) do defp connect(pid, _) do
ref = make_ref() ref = make_ref()
Registry.register(GunMock, ref, pid) Agent.update(:gun_state, &put_in(&1[pid][:ref], ref))
ref ref
end end
defp await(pid, ref) do defp await(pid, _ref) do
[{_, ^pid}] = Registry.lookup(GunMock, ref) Agent.get(:gun_state, & &1[pid][:ref])
{:response, :fin, 200, []} {:response, :fin, 200, []}
end end
@ -94,18 +101,6 @@ defmodule Pleroma.Pool.ConnectionsTest do
open_mock() open_mock()
url = "http://some-domain.com" url = "http://some-domain.com"
key = "http:some-domain.com:80" key = "http:some-domain.com:80"
refute Connections.checkin(url, name)
:ok = Conn.open(url, name)
%Connections{
conns: %{
^key => %Conn{
gun_state: :up,
used_by: [],
conn_state: :idle
}
}
} = Connections.get_state(name)
conn = Connections.checkin(url, name) conn = Connections.checkin(url, name)
assert is_pid(conn) assert is_pid(conn)
@ -113,65 +108,74 @@ defmodule Pleroma.Pool.ConnectionsTest do
self = self() self = self()
%Connections{ assert match?(
conns: %{ %Connections{
^key => %Conn{ conns: %{
conn: ^conn, ^key => %Conn{
gun_state: :up, conn: ^conn,
used_by: [{^self, _}], gun_state: :up,
conn_state: :active used_by: [{^self, _}],
} conn_state: :active
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
reused_conn = Connections.checkin(url, name) reused_conn = Connections.checkin(url, name)
assert conn == reused_conn assert conn == reused_conn
%Connections{ assert match?(
conns: %{ %Connections{
^key => %Conn{ conns: %{
conn: ^conn, ^key => %Conn{
gun_state: :up, conn: ^conn,
used_by: [{^self, _}, {^self, _}], gun_state: :up,
conn_state: :active used_by: [{^self, _}, {^self, _}],
} conn_state: :active
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
:ok = Connections.checkout(conn, self, name) :ok = Connections.checkout(conn, self, name)
%Connections{ assert match?(
conns: %{ %Connections{
^key => %Conn{ conns: %{
conn: ^conn, ^key => %Conn{
gun_state: :up, conn: ^conn,
used_by: [{^self, _}], gun_state: :up,
conn_state: :active used_by: [{^self, _}],
} conn_state: :active
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
:ok = Connections.checkout(conn, self, name) :ok = Connections.checkout(conn, self, name)
%Connections{ assert match?(
conns: %{ %Connections{
^key => %Conn{ conns: %{
conn: ^conn, ^key => %Conn{
gun_state: :up, conn: ^conn,
used_by: [], gun_state: :up,
conn_state: :idle used_by: [],
} conn_state: :idle
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
end end
test "reuse connection for idna domains", %{name: name} do test "reuse connection for idna domains", %{name: name} do
open_mock() open_mock()
url = "http://ですsome-domain.com" url = "http://ですsome-domain.com"
refute Connections.checkin(url, name)
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name) conn = Connections.checkin(url, name)
assert is_pid(conn) assert is_pid(conn)
@ -179,16 +183,19 @@ defmodule Pleroma.Pool.ConnectionsTest do
self = self() self = self()
%Connections{ assert match?(
conns: %{ %Connections{
"http:ですsome-domain.com:80" => %Conn{ conns: %{
conn: ^conn, "http:ですsome-domain.com:80" => %Conn{
gun_state: :up, conn: ^conn,
used_by: [{^self, _}], gun_state: :up,
conn_state: :active used_by: [{^self, _}],
} conn_state: :active
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
reused_conn = Connections.checkin(url, name) reused_conn = Connections.checkin(url, name)
@ -199,26 +206,25 @@ defmodule Pleroma.Pool.ConnectionsTest do
open_mock() open_mock()
url = "http://127.0.0.1" url = "http://127.0.0.1"
refute Connections.checkin(url, name)
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name) conn = Connections.checkin(url, name)
assert is_pid(conn) assert is_pid(conn)
assert Process.alive?(conn) assert Process.alive?(conn)
self = self() self = self()
%Connections{ assert match?(
conns: %{ %Connections{
"http:127.0.0.1:80" => %Conn{ conns: %{
conn: ^conn, "http:127.0.0.1:80" => %Conn{
gun_state: :up, conn: ^conn,
used_by: [{^self, _}], gun_state: :up,
conn_state: :active used_by: [{^self, _}],
} conn_state: :active
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
reused_conn = Connections.checkin(url, name) reused_conn = Connections.checkin(url, name)
@ -227,42 +233,44 @@ defmodule Pleroma.Pool.ConnectionsTest do
:ok = Connections.checkout(conn, self, name) :ok = Connections.checkout(conn, self, name)
:ok = Connections.checkout(reused_conn, self, name) :ok = Connections.checkout(reused_conn, self, name)
%Connections{ assert match?(
conns: %{ %Connections{
"http:127.0.0.1:80" => %Conn{ conns: %{
conn: ^conn, "http:127.0.0.1:80" => %Conn{
gun_state: :up, conn: ^conn,
used_by: [], gun_state: :up,
conn_state: :idle used_by: [],
} conn_state: :idle
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
end end
test "reuse for ipv6", %{name: name} do test "reuse for ipv6", %{name: name} do
open_mock() open_mock()
url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]" url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]"
refute Connections.checkin(url, name)
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name) conn = Connections.checkin(url, name)
assert is_pid(conn) assert is_pid(conn)
assert Process.alive?(conn) assert Process.alive?(conn)
self = self() self = self()
%Connections{ assert match?(
conns: %{ %Connections{
"http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{ conns: %{
conn: ^conn, "http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{
gun_state: :up, conn: ^conn,
used_by: [{^self, _}], gun_state: :up,
conn_state: :active used_by: [{^self, _}],
} conn_state: :active
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
reused_conn = Connections.checkin(url, name) reused_conn = Connections.checkin(url, name)
@ -276,21 +284,23 @@ defmodule Pleroma.Pool.ConnectionsTest do
self = self() self = self()
url = "http://127.0.0.1" url = "http://127.0.0.1"
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name) conn = Connections.checkin(url, name)
send(name, {:gun_down, conn, nil, nil, nil}) send(name, {:gun_down, conn, nil, nil, nil})
send(name, {:gun_up, conn, nil}) send(name, {:gun_up, conn, nil})
%Connections{ assert match?(
conns: %{ %Connections{
"http:127.0.0.1:80" => %Conn{ conns: %{
conn: ^conn, "http:127.0.0.1:80" => %Conn{
gun_state: :up, conn: ^conn,
used_by: [{^self, _}], gun_state: :up,
conn_state: :active used_by: [{^self, _}],
} conn_state: :active
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
end end
test "up and down ipv6", %{name: name} do test "up and down ipv6", %{name: name} do
@ -301,21 +311,23 @@ defmodule Pleroma.Pool.ConnectionsTest do
|> allow(self, name) |> allow(self, name)
url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]" url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]"
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name) conn = Connections.checkin(url, name)
send(name, {:gun_down, conn, nil, nil, nil}) send(name, {:gun_down, conn, nil, nil, nil})
send(name, {:gun_up, conn, nil}) send(name, {:gun_up, conn, nil})
%Connections{ assert match?(
conns: %{ %Connections{
"http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{ conns: %{
conn: ^conn, "http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{
gun_state: :up, conn: ^conn,
used_by: [{^self, _}], gun_state: :up,
conn_state: :active used_by: [{^self, _}],
} conn_state: :active
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
end end
test "reuses connection based on protocol", %{name: name} do test "reuses connection based on protocol", %{name: name} do
@ -325,14 +337,10 @@ defmodule Pleroma.Pool.ConnectionsTest do
https_url = "https://some-domain.com" https_url = "https://some-domain.com"
https_key = "https:some-domain.com:443" https_key = "https:some-domain.com:443"
refute Connections.checkin(http_url, name)
:ok = Conn.open(http_url, name)
conn = Connections.checkin(http_url, name) conn = Connections.checkin(http_url, name)
assert is_pid(conn) assert is_pid(conn)
assert Process.alive?(conn) assert Process.alive?(conn)
refute Connections.checkin(https_url, name)
:ok = Conn.open(https_url, name)
https_conn = Connections.checkin(https_url, name) https_conn = Connections.checkin(https_url, name)
refute conn == https_conn refute conn == https_conn
@ -343,18 +351,21 @@ defmodule Pleroma.Pool.ConnectionsTest do
assert reused_https == https_conn assert reused_https == https_conn
%Connections{ assert match?(
conns: %{ %Connections{
^http_key => %Conn{ conns: %{
conn: ^conn, ^http_key => %Conn{
gun_state: :up conn: ^conn,
}, gun_state: :up
^https_key => %Conn{ },
conn: ^https_conn, ^https_key => %Conn{
gun_state: :up conn: ^https_conn,
} gun_state: :up
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
end end
test "connection can't get up", %{name: name} do test "connection can't get up", %{name: name} do
@ -362,7 +373,6 @@ defmodule Pleroma.Pool.ConnectionsTest do
url = "http://gun-not-up.com" url = "http://gun-not-up.com"
assert capture_log(fn -> assert capture_log(fn ->
refute Conn.open(url, name)
refute Connections.checkin(url, name) refute Connections.checkin(url, name)
end) =~ end) =~
"Opening connection to http://gun-not-up.com failed with error {:error, :timeout}" "Opening connection to http://gun-not-up.com failed with error {:error, :timeout}"
@ -377,33 +387,38 @@ defmodule Pleroma.Pool.ConnectionsTest do
url = "http://gun-down-and-up.com" url = "http://gun-down-and-up.com"
key = "http:gun-down-and-up.com:80" key = "http:gun-down-and-up.com:80"
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name) conn = Connections.checkin(url, name)
assert is_pid(conn) assert is_pid(conn)
assert Process.alive?(conn) assert Process.alive?(conn)
%Connections{ assert match?(
conns: %{ %Connections{
^key => %Conn{ conns: %{
conn: ^conn, ^key => %Conn{
gun_state: :up, conn: ^conn,
used_by: [{^self, _}] gun_state: :up,
} used_by: [{^self, _}]
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
send(name, {:gun_down, conn, :http, nil, nil}) send(name, {:gun_down, conn, :http, nil, nil})
%Connections{ assert match?(
conns: %{ %Connections{
^key => %Conn{ conns: %{
conn: ^conn, ^key => %Conn{
gun_state: :down, conn: ^conn,
used_by: [{^self, _}] gun_state: :down,
} used_by: [{^self, _}]
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
send(name, {:gun_up, conn, :http}) send(name, {:gun_up, conn, :http})
@ -413,21 +428,23 @@ defmodule Pleroma.Pool.ConnectionsTest do
assert is_pid(conn2) assert is_pid(conn2)
assert Process.alive?(conn2) assert Process.alive?(conn2)
%Connections{ assert match?(
conns: %{ %Connections{
^key => %Conn{ conns: %{
conn: _, ^key => %Conn{
gun_state: :up, conn: _,
used_by: [{^self, _}, {^self, _}] gun_state: :up,
} used_by: [{^self, _}, {^self, _}]
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
end end
test "async processes get same conn for same domain", %{name: name} do test "async processes get same conn for same domain", %{name: name} do
open_mock() open_mock()
url = "http://some-domain.com" url = "http://some-domain.com"
:ok = Conn.open(url, name)
tasks = tasks =
for _ <- 1..5 do for _ <- 1..5 do
@ -445,15 +462,8 @@ defmodule Pleroma.Pool.ConnectionsTest do
conns = for {:ok, value} <- results, do: value conns = for {:ok, value} <- results, do: value
%Connections{ state = Connections.get_state(name)
conns: %{ %{conn: conn} = Map.get(state.conns, "http:some-domain.com:80")
"http:some-domain.com:80" => %Conn{
conn: conn,
gun_state: :up
}
}
} = Connections.get_state(name)
assert Enum.all?(conns, fn res -> res == conn end) assert Enum.all?(conns, fn res -> res == conn end)
end end
@ -462,8 +472,6 @@ defmodule Pleroma.Pool.ConnectionsTest do
self = self() self = self()
http_url = "http://some-domain.com" http_url = "http://some-domain.com"
https_url = "https://some-domain.com" https_url = "https://some-domain.com"
:ok = Conn.open(https_url, name)
:ok = Conn.open(http_url, name)
conn1 = Connections.checkin(https_url, name) conn1 = Connections.checkin(https_url, name)
@ -474,41 +482,44 @@ defmodule Pleroma.Pool.ConnectionsTest do
http_key = "http:some-domain.com:80" http_key = "http:some-domain.com:80"
%Connections{ assert match?(
conns: %{ %Connections{
^http_key => %Conn{ conns: %{
conn: ^conn2, ^http_key => %Conn{
gun_state: :up, conn: ^conn2,
conn_state: :active, gun_state: :up,
used_by: [{^self, _}, {^self, _}, {^self, _}, {^self, _}] conn_state: :active
}, },
"https:some-domain.com:443" => %Conn{ "https:some-domain.com:443" => %Conn{
conn: ^conn1, conn: ^conn1,
gun_state: :up, gun_state: :up,
conn_state: :active, conn_state: :active
used_by: [{^self, _}] }
} }
} },
} = Connections.get_state(name) Connections.get_state(name)
)
:ok = Connections.checkout(conn1, self, name) :ok = Connections.checkout(conn1, self, name)
another_url = "http://another-domain.com" another_url = "http://another-domain.com"
:ok = Conn.open(another_url, name)
conn = Connections.checkin(another_url, name) conn = Connections.checkin(another_url, name)
%Connections{ assert match?(
conns: %{ %Connections{
"http:another-domain.com:80" => %Conn{ conns: %{
conn: ^conn, "http:another-domain.com:80" => %Conn{
gun_state: :up conn: ^conn,
}, gun_state: :up
^http_key => %Conn{ },
conn: _, ^http_key => %Conn{
gun_state: :up conn: _,
} gun_state: :up
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
end end
describe "with proxy" do describe "with proxy" do
@ -518,18 +529,19 @@ defmodule Pleroma.Pool.ConnectionsTest do
url = "http://proxy-string.com" url = "http://proxy-string.com"
key = "http:proxy-string.com:80" key = "http:proxy-string.com:80"
:ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123}) conn = Connections.checkin(url, name, proxy: {{127, 0, 0, 1}, 8123})
conn = Connections.checkin(url, name) assert match?(
%Connections{
%Connections{ conns: %{
conns: %{ ^key => %Conn{
^key => %Conn{ conn: ^conn,
conn: ^conn, gun_state: :up
gun_state: :up }
} }
} },
} = Connections.get_state(name) Connections.get_state(name)
)
reused_conn = Connections.checkin(url, name) reused_conn = Connections.checkin(url, name)
@ -541,17 +553,19 @@ defmodule Pleroma.Pool.ConnectionsTest do
|> connect_mock() |> connect_mock()
url = "http://proxy-tuple-atom.com" url = "http://proxy-tuple-atom.com"
:ok = Conn.open(url, name, proxy: {'localhost', 9050}) conn = Connections.checkin(url, name, proxy: {'localhost', 9050})
conn = Connections.checkin(url, name)
%Connections{ assert match?(
conns: %{ %Connections{
"http:proxy-tuple-atom.com:80" => %Conn{ conns: %{
conn: ^conn, "http:proxy-tuple-atom.com:80" => %Conn{
gun_state: :up conn: ^conn,
} gun_state: :up
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
reused_conn = Connections.checkin(url, name) reused_conn = Connections.checkin(url, name)
@ -564,17 +578,19 @@ defmodule Pleroma.Pool.ConnectionsTest do
url = "https://proxy-string.com" url = "https://proxy-string.com"
:ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123}) conn = Connections.checkin(url, name, proxy: {{127, 0, 0, 1}, 8123})
conn = Connections.checkin(url, name)
%Connections{ assert match?(
conns: %{ %Connections{
"https:proxy-string.com:443" => %Conn{ conns: %{
conn: ^conn, "https:proxy-string.com:443" => %Conn{
gun_state: :up conn: ^conn,
} gun_state: :up
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
reused_conn = Connections.checkin(url, name) reused_conn = Connections.checkin(url, name)
@ -586,17 +602,19 @@ defmodule Pleroma.Pool.ConnectionsTest do
|> connect_mock() |> connect_mock()
url = "https://proxy-tuple-atom.com" url = "https://proxy-tuple-atom.com"
:ok = Conn.open(url, name, proxy: {'localhost', 9050}) conn = Connections.checkin(url, name, proxy: {'localhost', 9050})
conn = Connections.checkin(url, name)
%Connections{ assert match?(
conns: %{ %Connections{
"https:proxy-tuple-atom.com:443" => %Conn{ conns: %{
conn: ^conn, "https:proxy-tuple-atom.com:443" => %Conn{
gun_state: :up conn: ^conn,
} gun_state: :up
} }
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
reused_conn = Connections.checkin(url, name) reused_conn = Connections.checkin(url, name)
@ -608,18 +626,19 @@ defmodule Pleroma.Pool.ConnectionsTest do
url = "http://proxy-socks.com" url = "http://proxy-socks.com"
:ok = Conn.open(url, name, proxy: {:socks5, 'localhost', 1234}) conn = Connections.checkin(url, name, proxy: {:socks5, 'localhost', 1234})
conn = Connections.checkin(url, name) assert match?(
%Connections{
%Connections{ conns: %{
conns: %{ "http:proxy-socks.com:80" => %Conn{
"http:proxy-socks.com:80" => %Conn{ conn: ^conn,
conn: ^conn, gun_state: :up
gun_state: :up }
} }
} },
} = Connections.get_state(name) Connections.get_state(name)
)
reused_conn = Connections.checkin(url, name) reused_conn = Connections.checkin(url, name)
@ -630,18 +649,19 @@ defmodule Pleroma.Pool.ConnectionsTest do
open_mock() open_mock()
url = "https://proxy-socks.com" url = "https://proxy-socks.com"
:ok = Conn.open(url, name, proxy: {:socks4, 'localhost', 1234}) conn = Connections.checkin(url, name, proxy: {:socks4, 'localhost', 1234})
conn = Connections.checkin(url, name) assert match?(
%Connections{
%Connections{ conns: %{
conns: %{ "https:proxy-socks.com:443" => %Conn{
"https:proxy-socks.com:443" => %Conn{ conn: ^conn,
conn: ^conn, gun_state: :up
gun_state: :up }
} }
} },
} = Connections.get_state(name) Connections.get_state(name)
)
reused_conn = Connections.checkin(url, name) reused_conn = Connections.checkin(url, name)
@ -697,6 +717,10 @@ defmodule Pleroma.Pool.ConnectionsTest do
end end
describe "get_unused_conns/1" do describe "get_unused_conns/1" do
setup %{name: name} do
Connections.refresh(name)
end
test "crf is equalent, sorting by reference", %{name: name} do test "crf is equalent, sorting by reference", %{name: name} do
Connections.add_conn(name, "1", %Conn{ Connections.add_conn(name, "1", %Conn{
conn_state: :idle, conn_state: :idle,
@ -794,11 +818,14 @@ defmodule Pleroma.Pool.ConnectionsTest do
|> Process.whereis() |> Process.whereis()
|> send({:close_idle_conns, 15}) |> send({:close_idle_conns, 15})
assert %Connections{ assert match?(
conns: %{ %Connections{
"3" => %Conn{}, conns: %{
"2" => %Conn{} "3" => %Conn{},
} "2" => %Conn{}
} = Connections.get_state(name) }
},
Connections.get_state(name)
)
end end
end end