From c012eb9f8a5f95df6136192ec27f794a9df4826f Mon Sep 17 00:00:00 2001 From: Alexander Strizhakov Date: Thu, 23 Apr 2020 15:52:13 +0300 Subject: [PATCH] opening connection improvements --- lib/pleroma/gun/conn.ex | 45 ++- lib/pleroma/http/adapter_helper/gun.ex | 1 - lib/pleroma/pool/connections.ex | 74 +++- test/http/adapter_helper/gun_test.exs | 91 ++--- test/pool/connections_test.exs | 611 +++++++++++++++++---------------- 5 files changed, 436 insertions(+), 386 deletions(-) diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex index a0f110b42..4c2f5b1bf 100644 --- a/lib/pleroma/gun/conn.ex +++ b/lib/pleroma/gun/conn.ex @@ -11,23 +11,25 @@ defmodule Pleroma.Gun.Conn do require Logger - @type gun_state :: :up | :down - @type conn_state :: :active | :idle + @type gun_state :: :init | :up | :down + @type conn_state :: :init | :active | :idle @type t :: %__MODULE__{ conn: pid(), gun_state: gun_state(), conn_state: conn_state(), - used_by: [pid()], + used_by: [GenServer.from()], + awaited_by: [GenServer.from()], last_reference: pos_integer(), crf: float(), retries: pos_integer() } defstruct conn: nil, - gun_state: :open, - conn_state: :idle, + gun_state: :init, + conn_state: :init, used_by: [], + awaited_by: [], last_reference: 0, crf: 1, retries: 0 @@ -51,22 +53,19 @@ defmodule Pleroma.Gun.Conn do max_connections = pool_opts[:max_connections] || 250 - conn_pid = - if Connections.count(name) < max_connections do - do_open(uri, opts) - else - close_least_used_and_do_open(name, uri, opts) - end - - if is_pid(conn_pid) do - conn = %Pleroma.Gun.Conn{ - conn: conn_pid, - gun_state: :up, - last_reference: :os.system_time(:second) - } - + with {:ok, conn_pid} <- try_open(name, uri, opts, max_connections) do :ok = Gun.set_owner(conn_pid, Process.whereis(name)) - Connections.add_conn(name, key, conn) + Connections.update_conn(name, key, conn_pid) + else + _error -> Connections.remove_conn(name, key) + end + end + + defp try_open(name, uri, opts, max_connections) do + if Connections.count(name) < max_connections do + do_open(uri, opts) + else + close_least_used_and_do_open(name, uri, opts) end end @@ -104,7 +103,7 @@ defmodule Pleroma.Gun.Conn do {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]), stream <- Gun.connect(conn, connect_opts), {:response, :fin, 200, _} <- Gun.await(conn, stream) do - conn + {:ok, conn} else error -> Logger.warn( @@ -140,7 +139,7 @@ defmodule Pleroma.Gun.Conn do with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts), {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do - conn + {:ok, conn} else error -> Logger.warn( @@ -158,7 +157,7 @@ defmodule Pleroma.Gun.Conn do with {:ok, conn} <- Gun.open(host, port, opts), {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do - conn + {:ok, conn} else error -> Logger.warn( diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex index 18cf64bcc..bdf2bcc06 100644 --- a/lib/pleroma/http/adapter_helper/gun.ex +++ b/lib/pleroma/http/adapter_helper/gun.ex @@ -73,7 +73,6 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do defp checkin_conn(uri, opts) do case Connections.checkin(uri, :gun_connections) do nil -> - Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts]) opts conn when is_pid(conn) -> diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex index b9618d3c7..28d37760e 100644 --- a/lib/pleroma/pool/connections.ex +++ b/lib/pleroma/pool/connections.ex @@ -7,11 +7,12 @@ defmodule Pleroma.Pool.Connections do alias Pleroma.Config alias Pleroma.Gun + alias Pleroma.Gun.Conn require Logger @type domain :: String.t() - @type conn :: Pleroma.Gun.Conn.t() + @type conn :: Conn.t() @type seconds :: pos_integer() @type t :: %__MODULE__{ @@ -33,13 +34,11 @@ defmodule Pleroma.Pool.Connections do end @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil - def checkin(url, name) - def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name) + def checkin(url, name, opts \\ []) + def checkin(url, name, opts) when is_binary(url), do: checkin(URI.parse(url), name, opts) - def checkin(%URI{} = uri, name) do - timeout = Config.get([:connections_pool, :checkin_timeout], 250) - - GenServer.call(name, {:checkin, uri}, timeout) + def checkin(%URI{} = uri, name, opts) do + GenServer.call(name, {:checkin, uri, opts, name}) end @spec alive?(atom()) :: boolean() @@ -71,21 +70,55 @@ defmodule Pleroma.Pool.Connections do GenServer.cast(name, {:checkout, conn, pid}) end - @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok + @spec add_conn(atom(), String.t(), Conn.t()) :: :ok def add_conn(name, key, conn) do GenServer.cast(name, {:add_conn, key, conn}) end + @spec update_conn(atom(), String.t(), pid()) :: :ok + def update_conn(name, key, conn_pid) do + GenServer.cast(name, {:update_conn, key, conn_pid}) + end + @spec remove_conn(atom(), String.t()) :: :ok def remove_conn(name, key) do GenServer.cast(name, {:remove_conn, key}) end + @spec refresh(atom()) :: :ok + def refresh(name) do + GenServer.call(name, :refresh) + end + @impl true def handle_cast({:add_conn, key, conn}, state) do - state = put_in(state.conns[key], conn) + {:noreply, put_in(state.conns[key], conn)} + end + + @impl true + def handle_cast({:update_conn, key, conn_pid}, state) do + conn = state.conns[key] + + Process.monitor(conn_pid) + + conn = + Enum.reduce(conn.awaited_by, conn, fn waiting, conn -> + GenServer.reply(waiting, conn_pid) + time = :os.system_time(:second) + last_reference = time - conn.last_reference + crf = crf(last_reference, 100, conn.crf) + + %{ + conn + | last_reference: time, + crf: crf, + conn_state: :active, + used_by: [waiting | conn.used_by] + } + end) + + state = put_in(state.conns[key], %{conn | conn: conn_pid, gun_state: :up, awaited_by: []}) - Process.monitor(conn.conn) {:noreply, state} end @@ -113,12 +146,14 @@ defmodule Pleroma.Pool.Connections do @impl true def handle_cast({:remove_conn, key}, state) do + conn = state.conns[key] + Enum.each(conn.awaited_by, fn waiting -> GenServer.reply(waiting, nil) end) state = put_in(state.conns, Map.delete(state.conns, key)) {:noreply, state} end @impl true - def handle_call({:checkin, uri}, from, state) do + def handle_call({:checkin, uri, opts, name}, from, state) do key = "#{uri.scheme}:#{uri.host}:#{uri.port}" case state.conns[key] do @@ -141,8 +176,18 @@ defmodule Pleroma.Pool.Connections do %{gun_state: :down} -> {:reply, nil, state} + %{gun_state: :init} = conn -> + state = put_in(state.conns[key], %{conn | awaited_by: [from | conn.awaited_by]}) + {:noreply, state} + nil -> - {:reply, nil, state} + state = + put_in(state.conns[key], %Conn{ + awaited_by: [from] + }) + + Task.start(Conn, :open, [uri, name, opts]) + {:noreply, state} end end @@ -150,6 +195,11 @@ defmodule Pleroma.Pool.Connections do def handle_call(:state, _from, state), do: {:reply, state, state} @impl true + def handle_call(:refresh, _from, state) do + {:reply, :ok, put_in(state.conns, %{})} + end + + @impl true def handle_call(:count, _from, state) do {:reply, Enum.count(state.conns), state} end diff --git a/test/http/adapter_helper/gun_test.exs b/test/http/adapter_helper/gun_test.exs index 2e961826e..95a8cf814 100644 --- a/test/http/adapter_helper/gun_test.exs +++ b/test/http/adapter_helper/gun_test.exs @@ -3,28 +3,27 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.HTTP.AdapterHelper.GunTest do - use ExUnit.Case, async: true + use ExUnit.Case use Pleroma.Tests.Helpers import Mox alias Pleroma.Config - alias Pleroma.Gun.Conn alias Pleroma.HTTP.AdapterHelper.Gun alias Pleroma.Pool.Connections setup :verify_on_exit! - - defp gun_mock(_) do - gun_mock() - :ok - end + setup :set_mox_global defp gun_mock do Pleroma.GunMock - |> stub(:open, fn _, _, _ -> Task.start_link(fn -> Process.sleep(1000) end) end) + |> stub(:open, fn _, _, _ -> + Task.start_link(fn -> Process.sleep(1000) end) + end) |> stub(:await_up, fn _, _ -> {:ok, :http} end) |> stub(:set_owner, fn _, _ -> :ok end) + + :ok end describe "options/1" do @@ -62,25 +61,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do assert opts[:certificates_verification] end - test "get conn on next request" do - gun_mock() - level = Application.get_env(:logger, :level) - Logger.configure(level: :debug) - on_exit(fn -> Logger.configure(level: level) end) - uri = URI.parse("http://some-domain2.com") - - opts = Gun.options(uri) - - assert opts[:conn] == nil - assert opts[:close_conn] == nil - - Process.sleep(50) - opts = Gun.options(uri) - - assert is_pid(opts[:conn]) - assert opts[:close_conn] == false - end - test "merges with defaul http adapter config" do defaults = Gun.options([receive_conn: false], URI.parse("https://example.com")) assert Keyword.has_key?(defaults, :a) @@ -91,8 +71,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do gun_mock() uri = URI.parse("https://some-domain.com") - :ok = Conn.open(uri, :gun_connections) - opts = Gun.options(uri) assert opts[:certificates_verification] @@ -134,11 +112,10 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do end describe "options/1 with receive_conn parameter" do - setup :gun_mock + setup do: gun_mock() test "receive conn by default" do uri = URI.parse("http://another-domain.com") - :ok = Conn.open(uri, :gun_connections) received_opts = Gun.options(uri) assert received_opts[:close_conn] == false @@ -147,7 +124,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do test "don't receive conn if receive_conn is false" do uri = URI.parse("http://another-domain.com") - :ok = Conn.open(uri, :gun_connections) opts = [receive_conn: false] received_opts = Gun.options(opts, uri) @@ -157,50 +133,51 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do end describe "after_request/1" do - setup :gun_mock + setup do: gun_mock() test "body_as not chunks" do - uri = URI.parse("http://some-domain.com") - :ok = Conn.open(uri, :gun_connections) + uri = URI.parse("http://some-domain-5.com") opts = Gun.options(uri) :ok = Gun.after_request(opts) conn = opts[:conn] - assert %Connections{ - conns: %{ - "http:some-domain.com:80" => %Pleroma.Gun.Conn{ - conn: ^conn, - conn_state: :idle, - used_by: [] + assert match?( + %Connections{ + conns: %{ + "http:some-domain-5.com:80" => %Pleroma.Gun.Conn{ + conn: ^conn, + conn_state: :idle, + used_by: [] + } } - } - } = Connections.get_state(:gun_connections) + }, + Connections.get_state(:gun_connections) + ) end test "body_as chunks" do - uri = URI.parse("http://some-domain.com") - :ok = Conn.open(uri, :gun_connections) + uri = URI.parse("http://some-domain-6.com") opts = Gun.options([body_as: :chunks], uri) :ok = Gun.after_request(opts) conn = opts[:conn] self = self() - assert %Connections{ - conns: %{ - "http:some-domain.com:80" => %Pleroma.Gun.Conn{ - conn: ^conn, - conn_state: :active, - used_by: [{^self, _}] + assert match?( + %Connections{ + conns: %{ + "http:some-domain-6.com:80" => %Pleroma.Gun.Conn{ + conn: ^conn, + conn_state: :active, + used_by: [{^self, _}] + } } - } - } = Connections.get_state(:gun_connections) + }, + Connections.get_state(:gun_connections) + ) end test "with no connection" do uri = URI.parse("http://uniq-domain.com") - - :ok = Conn.open(uri, :gun_connections) - opts = Gun.options([body_as: :chunks], uri) conn = opts[:conn] opts = Keyword.delete(opts, :conn) @@ -221,7 +198,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do test "with ipv4" do uri = URI.parse("http://127.0.0.1") - :ok = Conn.open(uri, :gun_connections) opts = Gun.options(uri) :ok = Gun.after_request(opts) conn = opts[:conn] @@ -239,7 +215,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do test "with ipv6" do uri = URI.parse("http://[2a03:2880:f10c:83:face:b00c:0:25de]") - :ok = Conn.open(uri, :gun_connections) opts = Gun.options(uri) :ok = Gun.after_request(opts) conn = opts[:conn] diff --git a/test/pool/connections_test.exs b/test/pool/connections_test.exs index a7afb2190..ddc91fbb1 100644 --- a/test/pool/connections_test.exs +++ b/test/pool/connections_test.exs @@ -3,7 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Pool.ConnectionsTest do - use ExUnit.Case, async: true + use ExUnit.Case use Pleroma.Tests.Helpers import ExUnit.CaptureLog @@ -14,11 +14,19 @@ defmodule Pleroma.Pool.ConnectionsTest do alias Pleroma.Pool.Connections setup :verify_on_exit! + setup :set_mox_global setup_all do + {:ok, pid} = Agent.start_link(fn -> %{} end, name: :gun_state) + + on_exit(fn -> + if Process.alive?(pid), do: Agent.stop(pid) + end) + end + + setup do name = :test_connections - {:ok, pid} = Connections.start_link({name, [checkin_timeout: 150]}) - {:ok, _} = Registry.start_link(keys: :unique, name: Pleroma.GunMock) + {:ok, pid} = Connections.start_link({name, []}) on_exit(fn -> if Process.alive?(pid), do: GenServer.stop(name) @@ -53,28 +61,27 @@ defmodule Pleroma.Pool.ConnectionsTest do _ -> "http" end - Registry.register(GunMock, pid, %{ + info = %{ origin_scheme: scheme, origin_host: host, origin_port: port - }) + } + + Agent.update(:gun_state, &Map.put(&1, pid, %{info: info, ref: nil})) {:ok, pid} end - defp info(pid) do - [{_, info}] = Registry.lookup(GunMock, pid) - info - end + defp info(pid), do: Agent.get(:gun_state, & &1[pid][:info]) defp connect(pid, _) do ref = make_ref() - Registry.register(GunMock, ref, pid) + Agent.update(:gun_state, &put_in(&1[pid][:ref], ref)) ref end - defp await(pid, ref) do - [{_, ^pid}] = Registry.lookup(GunMock, ref) + defp await(pid, _ref) do + Agent.get(:gun_state, & &1[pid][:ref]) {:response, :fin, 200, []} end @@ -94,18 +101,6 @@ defmodule Pleroma.Pool.ConnectionsTest do open_mock() url = "http://some-domain.com" key = "http:some-domain.com:80" - refute Connections.checkin(url, name) - :ok = Conn.open(url, name) - - %Connections{ - conns: %{ - ^key => %Conn{ - gun_state: :up, - used_by: [], - conn_state: :idle - } - } - } = Connections.get_state(name) conn = Connections.checkin(url, name) assert is_pid(conn) @@ -113,65 +108,74 @@ defmodule Pleroma.Pool.ConnectionsTest do self = self() - %Connections{ - conns: %{ - ^key => %Conn{ - conn: ^conn, - gun_state: :up, - used_by: [{^self, _}], - conn_state: :active - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + ^key => %Conn{ + conn: ^conn, + gun_state: :up, + used_by: [{^self, _}], + conn_state: :active + } + } + }, + Connections.get_state(name) + ) reused_conn = Connections.checkin(url, name) assert conn == reused_conn - %Connections{ - conns: %{ - ^key => %Conn{ - conn: ^conn, - gun_state: :up, - used_by: [{^self, _}, {^self, _}], - conn_state: :active - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + ^key => %Conn{ + conn: ^conn, + gun_state: :up, + used_by: [{^self, _}, {^self, _}], + conn_state: :active + } + } + }, + Connections.get_state(name) + ) :ok = Connections.checkout(conn, self, name) - %Connections{ - conns: %{ - ^key => %Conn{ - conn: ^conn, - gun_state: :up, - used_by: [{^self, _}], - conn_state: :active - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + ^key => %Conn{ + conn: ^conn, + gun_state: :up, + used_by: [{^self, _}], + conn_state: :active + } + } + }, + Connections.get_state(name) + ) :ok = Connections.checkout(conn, self, name) - %Connections{ - conns: %{ - ^key => %Conn{ - conn: ^conn, - gun_state: :up, - used_by: [], - conn_state: :idle - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + ^key => %Conn{ + conn: ^conn, + gun_state: :up, + used_by: [], + conn_state: :idle + } + } + }, + Connections.get_state(name) + ) end test "reuse connection for idna domains", %{name: name} do open_mock() url = "http://ですsome-domain.com" - refute Connections.checkin(url, name) - - :ok = Conn.open(url, name) conn = Connections.checkin(url, name) assert is_pid(conn) @@ -179,16 +183,19 @@ defmodule Pleroma.Pool.ConnectionsTest do self = self() - %Connections{ - conns: %{ - "http:ですsome-domain.com:80" => %Conn{ - conn: ^conn, - gun_state: :up, - used_by: [{^self, _}], - conn_state: :active - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "http:ですsome-domain.com:80" => %Conn{ + conn: ^conn, + gun_state: :up, + used_by: [{^self, _}], + conn_state: :active + } + } + }, + Connections.get_state(name) + ) reused_conn = Connections.checkin(url, name) @@ -199,26 +206,25 @@ defmodule Pleroma.Pool.ConnectionsTest do open_mock() url = "http://127.0.0.1" - refute Connections.checkin(url, name) - - :ok = Conn.open(url, name) - conn = Connections.checkin(url, name) assert is_pid(conn) assert Process.alive?(conn) self = self() - %Connections{ - conns: %{ - "http:127.0.0.1:80" => %Conn{ - conn: ^conn, - gun_state: :up, - used_by: [{^self, _}], - conn_state: :active - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "http:127.0.0.1:80" => %Conn{ + conn: ^conn, + gun_state: :up, + used_by: [{^self, _}], + conn_state: :active + } + } + }, + Connections.get_state(name) + ) reused_conn = Connections.checkin(url, name) @@ -227,42 +233,44 @@ defmodule Pleroma.Pool.ConnectionsTest do :ok = Connections.checkout(conn, self, name) :ok = Connections.checkout(reused_conn, self, name) - %Connections{ - conns: %{ - "http:127.0.0.1:80" => %Conn{ - conn: ^conn, - gun_state: :up, - used_by: [], - conn_state: :idle - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "http:127.0.0.1:80" => %Conn{ + conn: ^conn, + gun_state: :up, + used_by: [], + conn_state: :idle + } + } + }, + Connections.get_state(name) + ) end test "reuse for ipv6", %{name: name} do open_mock() url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]" - refute Connections.checkin(url, name) - - :ok = Conn.open(url, name) - conn = Connections.checkin(url, name) assert is_pid(conn) assert Process.alive?(conn) self = self() - %Connections{ - conns: %{ - "http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{ - conn: ^conn, - gun_state: :up, - used_by: [{^self, _}], - conn_state: :active - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{ + conn: ^conn, + gun_state: :up, + used_by: [{^self, _}], + conn_state: :active + } + } + }, + Connections.get_state(name) + ) reused_conn = Connections.checkin(url, name) @@ -276,21 +284,23 @@ defmodule Pleroma.Pool.ConnectionsTest do self = self() url = "http://127.0.0.1" - :ok = Conn.open(url, name) conn = Connections.checkin(url, name) send(name, {:gun_down, conn, nil, nil, nil}) send(name, {:gun_up, conn, nil}) - %Connections{ - conns: %{ - "http:127.0.0.1:80" => %Conn{ - conn: ^conn, - gun_state: :up, - used_by: [{^self, _}], - conn_state: :active - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "http:127.0.0.1:80" => %Conn{ + conn: ^conn, + gun_state: :up, + used_by: [{^self, _}], + conn_state: :active + } + } + }, + Connections.get_state(name) + ) end test "up and down ipv6", %{name: name} do @@ -301,21 +311,23 @@ defmodule Pleroma.Pool.ConnectionsTest do |> allow(self, name) url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]" - :ok = Conn.open(url, name) conn = Connections.checkin(url, name) send(name, {:gun_down, conn, nil, nil, nil}) send(name, {:gun_up, conn, nil}) - %Connections{ - conns: %{ - "http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{ - conn: ^conn, - gun_state: :up, - used_by: [{^self, _}], - conn_state: :active - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{ + conn: ^conn, + gun_state: :up, + used_by: [{^self, _}], + conn_state: :active + } + } + }, + Connections.get_state(name) + ) end test "reuses connection based on protocol", %{name: name} do @@ -325,14 +337,10 @@ defmodule Pleroma.Pool.ConnectionsTest do https_url = "https://some-domain.com" https_key = "https:some-domain.com:443" - refute Connections.checkin(http_url, name) - :ok = Conn.open(http_url, name) conn = Connections.checkin(http_url, name) assert is_pid(conn) assert Process.alive?(conn) - refute Connections.checkin(https_url, name) - :ok = Conn.open(https_url, name) https_conn = Connections.checkin(https_url, name) refute conn == https_conn @@ -343,18 +351,21 @@ defmodule Pleroma.Pool.ConnectionsTest do assert reused_https == https_conn - %Connections{ - conns: %{ - ^http_key => %Conn{ - conn: ^conn, - gun_state: :up - }, - ^https_key => %Conn{ - conn: ^https_conn, - gun_state: :up - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + ^http_key => %Conn{ + conn: ^conn, + gun_state: :up + }, + ^https_key => %Conn{ + conn: ^https_conn, + gun_state: :up + } + } + }, + Connections.get_state(name) + ) end test "connection can't get up", %{name: name} do @@ -362,7 +373,6 @@ defmodule Pleroma.Pool.ConnectionsTest do url = "http://gun-not-up.com" assert capture_log(fn -> - refute Conn.open(url, name) refute Connections.checkin(url, name) end) =~ "Opening connection to http://gun-not-up.com failed with error {:error, :timeout}" @@ -377,33 +387,38 @@ defmodule Pleroma.Pool.ConnectionsTest do url = "http://gun-down-and-up.com" key = "http:gun-down-and-up.com:80" - :ok = Conn.open(url, name) conn = Connections.checkin(url, name) assert is_pid(conn) assert Process.alive?(conn) - %Connections{ - conns: %{ - ^key => %Conn{ - conn: ^conn, - gun_state: :up, - used_by: [{^self, _}] - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + ^key => %Conn{ + conn: ^conn, + gun_state: :up, + used_by: [{^self, _}] + } + } + }, + Connections.get_state(name) + ) send(name, {:gun_down, conn, :http, nil, nil}) - %Connections{ - conns: %{ - ^key => %Conn{ - conn: ^conn, - gun_state: :down, - used_by: [{^self, _}] - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + ^key => %Conn{ + conn: ^conn, + gun_state: :down, + used_by: [{^self, _}] + } + } + }, + Connections.get_state(name) + ) send(name, {:gun_up, conn, :http}) @@ -413,21 +428,23 @@ defmodule Pleroma.Pool.ConnectionsTest do assert is_pid(conn2) assert Process.alive?(conn2) - %Connections{ - conns: %{ - ^key => %Conn{ - conn: _, - gun_state: :up, - used_by: [{^self, _}, {^self, _}] - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + ^key => %Conn{ + conn: _, + gun_state: :up, + used_by: [{^self, _}, {^self, _}] + } + } + }, + Connections.get_state(name) + ) end test "async processes get same conn for same domain", %{name: name} do open_mock() url = "http://some-domain.com" - :ok = Conn.open(url, name) tasks = for _ <- 1..5 do @@ -445,15 +462,8 @@ defmodule Pleroma.Pool.ConnectionsTest do conns = for {:ok, value} <- results, do: value - %Connections{ - conns: %{ - "http:some-domain.com:80" => %Conn{ - conn: conn, - gun_state: :up - } - } - } = Connections.get_state(name) - + state = Connections.get_state(name) + %{conn: conn} = Map.get(state.conns, "http:some-domain.com:80") assert Enum.all?(conns, fn res -> res == conn end) end @@ -462,8 +472,6 @@ defmodule Pleroma.Pool.ConnectionsTest do self = self() http_url = "http://some-domain.com" https_url = "https://some-domain.com" - :ok = Conn.open(https_url, name) - :ok = Conn.open(http_url, name) conn1 = Connections.checkin(https_url, name) @@ -474,41 +482,44 @@ defmodule Pleroma.Pool.ConnectionsTest do http_key = "http:some-domain.com:80" - %Connections{ - conns: %{ - ^http_key => %Conn{ - conn: ^conn2, - gun_state: :up, - conn_state: :active, - used_by: [{^self, _}, {^self, _}, {^self, _}, {^self, _}] - }, - "https:some-domain.com:443" => %Conn{ - conn: ^conn1, - gun_state: :up, - conn_state: :active, - used_by: [{^self, _}] - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + ^http_key => %Conn{ + conn: ^conn2, + gun_state: :up, + conn_state: :active + }, + "https:some-domain.com:443" => %Conn{ + conn: ^conn1, + gun_state: :up, + conn_state: :active + } + } + }, + Connections.get_state(name) + ) :ok = Connections.checkout(conn1, self, name) another_url = "http://another-domain.com" - :ok = Conn.open(another_url, name) conn = Connections.checkin(another_url, name) - %Connections{ - conns: %{ - "http:another-domain.com:80" => %Conn{ - conn: ^conn, - gun_state: :up - }, - ^http_key => %Conn{ - conn: _, - gun_state: :up - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "http:another-domain.com:80" => %Conn{ + conn: ^conn, + gun_state: :up + }, + ^http_key => %Conn{ + conn: _, + gun_state: :up + } + } + }, + Connections.get_state(name) + ) end describe "with proxy" do @@ -518,18 +529,19 @@ defmodule Pleroma.Pool.ConnectionsTest do url = "http://proxy-string.com" key = "http:proxy-string.com:80" - :ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123}) + conn = Connections.checkin(url, name, proxy: {{127, 0, 0, 1}, 8123}) - conn = Connections.checkin(url, name) - - %Connections{ - conns: %{ - ^key => %Conn{ - conn: ^conn, - gun_state: :up - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + ^key => %Conn{ + conn: ^conn, + gun_state: :up + } + } + }, + Connections.get_state(name) + ) reused_conn = Connections.checkin(url, name) @@ -541,17 +553,19 @@ defmodule Pleroma.Pool.ConnectionsTest do |> connect_mock() url = "http://proxy-tuple-atom.com" - :ok = Conn.open(url, name, proxy: {'localhost', 9050}) - conn = Connections.checkin(url, name) + conn = Connections.checkin(url, name, proxy: {'localhost', 9050}) - %Connections{ - conns: %{ - "http:proxy-tuple-atom.com:80" => %Conn{ - conn: ^conn, - gun_state: :up - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "http:proxy-tuple-atom.com:80" => %Conn{ + conn: ^conn, + gun_state: :up + } + } + }, + Connections.get_state(name) + ) reused_conn = Connections.checkin(url, name) @@ -564,17 +578,19 @@ defmodule Pleroma.Pool.ConnectionsTest do url = "https://proxy-string.com" - :ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123}) - conn = Connections.checkin(url, name) + conn = Connections.checkin(url, name, proxy: {{127, 0, 0, 1}, 8123}) - %Connections{ - conns: %{ - "https:proxy-string.com:443" => %Conn{ - conn: ^conn, - gun_state: :up - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "https:proxy-string.com:443" => %Conn{ + conn: ^conn, + gun_state: :up + } + } + }, + Connections.get_state(name) + ) reused_conn = Connections.checkin(url, name) @@ -586,17 +602,19 @@ defmodule Pleroma.Pool.ConnectionsTest do |> connect_mock() url = "https://proxy-tuple-atom.com" - :ok = Conn.open(url, name, proxy: {'localhost', 9050}) - conn = Connections.checkin(url, name) + conn = Connections.checkin(url, name, proxy: {'localhost', 9050}) - %Connections{ - conns: %{ - "https:proxy-tuple-atom.com:443" => %Conn{ - conn: ^conn, - gun_state: :up - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "https:proxy-tuple-atom.com:443" => %Conn{ + conn: ^conn, + gun_state: :up + } + } + }, + Connections.get_state(name) + ) reused_conn = Connections.checkin(url, name) @@ -608,18 +626,19 @@ defmodule Pleroma.Pool.ConnectionsTest do url = "http://proxy-socks.com" - :ok = Conn.open(url, name, proxy: {:socks5, 'localhost', 1234}) + conn = Connections.checkin(url, name, proxy: {:socks5, 'localhost', 1234}) - conn = Connections.checkin(url, name) - - %Connections{ - conns: %{ - "http:proxy-socks.com:80" => %Conn{ - conn: ^conn, - gun_state: :up - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "http:proxy-socks.com:80" => %Conn{ + conn: ^conn, + gun_state: :up + } + } + }, + Connections.get_state(name) + ) reused_conn = Connections.checkin(url, name) @@ -630,18 +649,19 @@ defmodule Pleroma.Pool.ConnectionsTest do open_mock() url = "https://proxy-socks.com" - :ok = Conn.open(url, name, proxy: {:socks4, 'localhost', 1234}) + conn = Connections.checkin(url, name, proxy: {:socks4, 'localhost', 1234}) - conn = Connections.checkin(url, name) - - %Connections{ - conns: %{ - "https:proxy-socks.com:443" => %Conn{ - conn: ^conn, - gun_state: :up - } - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "https:proxy-socks.com:443" => %Conn{ + conn: ^conn, + gun_state: :up + } + } + }, + Connections.get_state(name) + ) reused_conn = Connections.checkin(url, name) @@ -697,6 +717,10 @@ defmodule Pleroma.Pool.ConnectionsTest do end describe "get_unused_conns/1" do + setup %{name: name} do + Connections.refresh(name) + end + test "crf is equalent, sorting by reference", %{name: name} do Connections.add_conn(name, "1", %Conn{ conn_state: :idle, @@ -794,11 +818,14 @@ defmodule Pleroma.Pool.ConnectionsTest do |> Process.whereis() |> send({:close_idle_conns, 15}) - assert %Connections{ - conns: %{ - "3" => %Conn{}, - "2" => %Conn{} - } - } = Connections.get_state(name) + assert match?( + %Connections{ + conns: %{ + "3" => %Conn{}, + "2" => %Conn{} + } + }, + Connections.get_state(name) + ) end end