added gun connections genserver
This commit is contained in:
parent
c51aa48e60
commit
2caf9ad954
@ -85,6 +85,8 @@ config :joken, default_signer: "yU8uHKq+yyAkZ11Hx//jcdacWc8yQ1bxAAGrplzB0Zwwjkp3
|
||||
|
||||
config :pleroma, Pleroma.ReverseProxy.Client, Pleroma.ReverseProxy.ClientMock
|
||||
|
||||
config :pleroma, Pleroma.Gun.API, Pleroma.Gun.API.Mock
|
||||
|
||||
try do
|
||||
import_config "test.secret.exs"
|
||||
rescue
|
||||
|
15
lib/pleroma/gun/api/api.ex
Normal file
15
lib/pleroma/gun/api/api.ex
Normal file
@ -0,0 +1,15 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Gun.API do
|
||||
@callback open(charlist(), pos_integer(), map()) :: {:ok, pid()}
|
||||
|
||||
def open(host, port, opts) do
|
||||
api().open(host, port, opts)
|
||||
end
|
||||
|
||||
defp api do
|
||||
Pleroma.Config.get([Pleroma.Gun.API], :gun)
|
||||
end
|
||||
end
|
40
lib/pleroma/gun/api/mock.ex
Normal file
40
lib/pleroma/gun/api/mock.ex
Normal file
@ -0,0 +1,40 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Gun.API.Mock do
|
||||
@behaviour Pleroma.Gun.API
|
||||
@impl Pleroma.Gun.API
|
||||
def open('some-domain.com', 80, %{genserver_pid: genserver_pid}) do
|
||||
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
|
||||
send(genserver_pid, {:gun_up, conn_pid, :http})
|
||||
{:ok, conn_pid}
|
||||
end
|
||||
|
||||
def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do
|
||||
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
|
||||
send(genserver_pid, {:gun_up, conn_pid, :https})
|
||||
{:ok, conn_pid}
|
||||
end
|
||||
|
||||
@impl Pleroma.Gun.API
|
||||
def open('gun_down.com', _port, %{genserver_pid: genserver_pid}) do
|
||||
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
|
||||
send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
|
||||
{:ok, conn_pid}
|
||||
end
|
||||
|
||||
@impl Pleroma.Gun.API
|
||||
def open('gun_down_and_up.com', _port, %{genserver_pid: genserver_pid}) do
|
||||
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
|
||||
send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
|
||||
|
||||
{:ok, _} =
|
||||
Task.start_link(fn ->
|
||||
Process.sleep(500)
|
||||
send(genserver_pid, {:gun_up, conn_pid, :http})
|
||||
end)
|
||||
|
||||
{:ok, conn_pid}
|
||||
end
|
||||
end
|
17
lib/pleroma/gun/conn.ex
Normal file
17
lib/pleroma/gun/conn.ex
Normal file
@ -0,0 +1,17 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Gun.Conn do
|
||||
@moduledoc """
|
||||
Struct for gun connection data
|
||||
"""
|
||||
@type t :: %__MODULE__{
|
||||
conn: pid(),
|
||||
state: atom(),
|
||||
waiting_pids: [pid()],
|
||||
protocol: atom()
|
||||
}
|
||||
|
||||
defstruct conn: nil, state: :open, waiting_pids: [], protocol: :http
|
||||
end
|
104
lib/pleroma/gun/connections.ex
Normal file
104
lib/pleroma/gun/connections.ex
Normal file
@ -0,0 +1,104 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Gun.Connections do
|
||||
use GenServer
|
||||
|
||||
@type domain :: String.t()
|
||||
@type conn :: Gun.Conn.t()
|
||||
@type t :: %__MODULE__{
|
||||
conns: %{domain() => conn()}
|
||||
}
|
||||
|
||||
defstruct conns: %{}
|
||||
|
||||
def start_link(name \\ __MODULE__) do
|
||||
if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
|
||||
GenServer.start_link(__MODULE__, [], name: name)
|
||||
else
|
||||
:ignore
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_) do
|
||||
{:ok, %__MODULE__{conns: %{}}}
|
||||
end
|
||||
|
||||
@spec get_conn(atom(), String.t(), keyword()) :: pid()
|
||||
def get_conn(name \\ __MODULE__, url, opts \\ []) do
|
||||
opts = Enum.into(opts, %{})
|
||||
uri = URI.parse(url)
|
||||
|
||||
opts = if uri.scheme == "https", do: Map.put(opts, :transport, :tls), else: opts
|
||||
|
||||
GenServer.call(
|
||||
name,
|
||||
{:conn, %{opts: opts, uri: uri}}
|
||||
)
|
||||
end
|
||||
|
||||
@spec get_state(atom()) :: t()
|
||||
def get_state(name \\ __MODULE__) do
|
||||
GenServer.call(name, {:state})
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do
|
||||
key = compose_key(uri)
|
||||
|
||||
case state.conns[key] do
|
||||
%{conn: conn, state: conn_state} when conn_state == :up ->
|
||||
{:reply, conn, state}
|
||||
|
||||
%{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] ->
|
||||
state = put_in(state.conns[key].waiting_pids, [from | pids])
|
||||
{:noreply, state}
|
||||
|
||||
nil ->
|
||||
{:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts)
|
||||
|
||||
state =
|
||||
put_in(state.conns[key], %Pleroma.Gun.Conn{
|
||||
conn: conn,
|
||||
waiting_pids: [from],
|
||||
protocol: String.to_atom(uri.scheme)
|
||||
})
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:state}, _from, state), do: {:reply, state, state}
|
||||
|
||||
@impl true
|
||||
def handle_info({:gun_up, conn_pid, protocol}, state) do
|
||||
{key, conn} = find_conn(state.conns, conn_pid, protocol)
|
||||
|
||||
# Send to all waiting processes connection pid
|
||||
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
|
||||
|
||||
# Update state of the current connection and set waiting_pids to empty list
|
||||
state = put_in(state.conns[key], %{conn | state: :up, waiting_pids: []})
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
# Do we need to do something with killed & unprocessed references?
|
||||
def handle_info({:gun_down, conn_pid, protocol, _reason, _killed, _unprocessed}, state) do
|
||||
{key, conn} = find_conn(state.conns, conn_pid, protocol)
|
||||
|
||||
# We don't want to block requests to GenServer if gun send down message, return nil, so we can make some retries, while connection is not up
|
||||
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end)
|
||||
|
||||
state = put_in(state.conns[key].state, :down)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp compose_key(uri), do: uri.host <> ":" <> to_string(uri.port)
|
||||
|
||||
defp find_conn(conns, conn_pid, protocol),
|
||||
do: Enum.find(conns, fn {_, conn} -> conn.conn == conn_pid and conn.protocol == protocol end)
|
||||
end
|
@ -9,7 +9,6 @@ defmodule Pleroma.HTTP.Connection do
|
||||
|
||||
@options [
|
||||
connect_timeout: 10_000,
|
||||
protocols: [:http],
|
||||
timeout: 20_000
|
||||
]
|
||||
|
||||
|
@ -28,6 +28,6 @@ defmodule Pleroma.ReverseProxy.Client do
|
||||
def close(ref), do: client().close(ref)
|
||||
|
||||
defp client do
|
||||
Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Hackney)
|
||||
Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Tesla)
|
||||
end
|
||||
end
|
||||
|
@ -1,3 +1,7 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-onl
|
||||
|
||||
defmodule Pleroma.ReverseProxy.Client.Tesla do
|
||||
@behaviour Pleroma.ReverseProxy.Client
|
||||
|
||||
@ -6,7 +10,7 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do
|
||||
def request(method, url, headers, body, opts \\ []) do
|
||||
adapter_opts =
|
||||
Keyword.get(opts, :adapter, [])
|
||||
|> Keyword.put(:chunks_response, true)
|
||||
|> Keyword.put(:body_as, :chunks)
|
||||
|
||||
with {:ok, response} <-
|
||||
Pleroma.HTTP.request(
|
||||
@ -44,13 +48,13 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do
|
||||
adapter.read_chunk(pid, stream, opts)
|
||||
end
|
||||
|
||||
def close(client) do
|
||||
def close(pid) do
|
||||
adapter = Application.get_env(:tesla, :adapter)
|
||||
|
||||
unless adapter in @adapters do
|
||||
raise "#{adapter} doesn't support closing connection"
|
||||
end
|
||||
|
||||
adapter.close(client)
|
||||
adapter.close(pid)
|
||||
end
|
||||
end
|
||||
|
2
mix.exs
2
mix.exs
@ -113,7 +113,7 @@ defmodule Pleroma.Mixfile do
|
||||
{:poison, "~> 3.0", override: true},
|
||||
{:tesla,
|
||||
github: "alex-strizhakov/tesla",
|
||||
ref: "beb8927358dfaa66ecd458df607befde12dd56e0",
|
||||
ref: "c29a7fd030fa6decbf7091152f563fe322e2b589",
|
||||
override: true},
|
||||
{:cowlib, "~> 2.6.0", override: true},
|
||||
{:gun, "~> 1.3"},
|
||||
|
2
mix.lock
2
mix.lock
@ -85,7 +85,7 @@
|
||||
"swoosh": {:hex, :swoosh, "0.23.2", "7dda95ff0bf54a2298328d6899c74dae1223777b43563ccebebb4b5d2b61df38", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm"},
|
||||
"syslog": {:git, "https://github.com/Vagabond/erlang-syslog.git", "4a6c6f2c996483e86c1320e9553f91d337bcb6aa", [tag: "1.0.5"]},
|
||||
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"},
|
||||
"tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "beb8927358dfaa66ecd458df607befde12dd56e0", [ref: "beb8927358dfaa66ecd458df607befde12dd56e0"]},
|
||||
"tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "c29a7fd030fa6decbf7091152f563fe322e2b589", [ref: "c29a7fd030fa6decbf7091152f563fe322e2b589"]},
|
||||
"timex": {:hex, :timex, "3.6.1", "efdf56d0e67a6b956cc57774353b0329c8ab7726766a11547e529357ffdc1d56", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"trailing_format_plug": {:hex, :trailing_format_plug, "0.0.7", "64b877f912cf7273bed03379936df39894149e35137ac9509117e59866e10e45", [:mix], [{:plug, "> 0.12.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"tzdata": {:hex, :tzdata, "0.5.21", "8cbf3607fcce69636c672d5be2bbb08687fe26639a62bdcc283d267277db7cf0", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
|
172
test/gun/connections_test.exs
Normal file
172
test/gun/connections_test.exs
Normal file
@ -0,0 +1,172 @@
|
||||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Gun.ConnectionsTest do
|
||||
use ExUnit.Case, async: true
|
||||
alias Pleroma.Gun.{Connections, Conn, API}
|
||||
|
||||
setup do
|
||||
name = :test_gun_connections
|
||||
{:ok, pid} = Connections.start_link(name)
|
||||
|
||||
{:ok, name: name, pid: pid}
|
||||
end
|
||||
|
||||
test "opens connection and reuse it on next request", %{name: name, pid: pid} do
|
||||
conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
|
||||
|
||||
assert is_pid(conn)
|
||||
assert Process.alive?(conn)
|
||||
|
||||
reused_conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
|
||||
|
||||
assert conn == reused_conn
|
||||
|
||||
%Connections{
|
||||
conns: %{
|
||||
"some-domain.com:80" => %Conn{
|
||||
conn: ^conn,
|
||||
state: :up,
|
||||
waiting_pids: []
|
||||
}
|
||||
}
|
||||
} = Connections.get_state(name)
|
||||
end
|
||||
|
||||
test "reuses connection based on protocol", %{name: name, pid: pid} do
|
||||
conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
|
||||
assert is_pid(conn)
|
||||
assert Process.alive?(conn)
|
||||
|
||||
https_conn = Connections.get_conn(name, "https://some-domain.com", genserver_pid: pid)
|
||||
|
||||
refute conn == https_conn
|
||||
|
||||
reused_https = Connections.get_conn(name, "https://some-domain.com", genserver_pid: pid)
|
||||
|
||||
refute conn == reused_https
|
||||
|
||||
assert reused_https == https_conn
|
||||
|
||||
%Connections{
|
||||
conns: %{
|
||||
"some-domain.com:80" => %Conn{
|
||||
conn: ^conn,
|
||||
state: :up,
|
||||
waiting_pids: []
|
||||
},
|
||||
"some-domain.com:443" => %Conn{
|
||||
conn: ^https_conn,
|
||||
state: :up,
|
||||
waiting_pids: []
|
||||
}
|
||||
}
|
||||
} = Connections.get_state(name)
|
||||
end
|
||||
|
||||
test "process gun_down message", %{name: name, pid: pid} do
|
||||
conn = Connections.get_conn(name, "http://gun_down.com", genserver_pid: pid)
|
||||
|
||||
refute conn
|
||||
|
||||
%Connections{
|
||||
conns: %{
|
||||
"gun_down.com:80" => %Conn{
|
||||
conn: _,
|
||||
state: :down,
|
||||
waiting_pids: _
|
||||
}
|
||||
}
|
||||
} = Connections.get_state(name)
|
||||
end
|
||||
|
||||
test "process gun_down message and then gun_up", %{name: name, pid: pid} do
|
||||
conn = Connections.get_conn(name, "http://gun_down_and_up.com", genserver_pid: pid)
|
||||
|
||||
refute conn
|
||||
|
||||
%Connections{
|
||||
conns: %{
|
||||
"gun_down_and_up.com:80" => %Conn{
|
||||
conn: _,
|
||||
state: :down,
|
||||
waiting_pids: _
|
||||
}
|
||||
}
|
||||
} = Connections.get_state(name)
|
||||
|
||||
conn = Connections.get_conn(name, "http://gun_down_and_up.com", genserver_pid: pid)
|
||||
|
||||
assert is_pid(conn)
|
||||
assert Process.alive?(conn)
|
||||
|
||||
%Connections{
|
||||
conns: %{
|
||||
"gun_down_and_up.com:80" => %Conn{
|
||||
conn: _,
|
||||
state: :up,
|
||||
waiting_pids: []
|
||||
}
|
||||
}
|
||||
} = Connections.get_state(name)
|
||||
end
|
||||
|
||||
test "async processes get same conn for same domain", %{name: name, pid: pid} do
|
||||
tasks =
|
||||
for _ <- 1..5 do
|
||||
Task.async(fn ->
|
||||
Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
|
||||
end)
|
||||
end
|
||||
|
||||
tasks_with_results = Task.yield_many(tasks)
|
||||
|
||||
results =
|
||||
Enum.map(tasks_with_results, fn {task, res} ->
|
||||
res || Task.shutdown(task, :brutal_kill)
|
||||
end)
|
||||
|
||||
conns = for {:ok, value} <- results, do: value
|
||||
|
||||
%Connections{
|
||||
conns: %{
|
||||
"some-domain.com:80" => %Conn{
|
||||
conn: conn,
|
||||
state: :up,
|
||||
waiting_pids: []
|
||||
}
|
||||
}
|
||||
} = Connections.get_state(name)
|
||||
|
||||
assert Enum.all?(conns, fn res -> res == conn end)
|
||||
end
|
||||
|
||||
describe "integration test" do
|
||||
@describetag :integration
|
||||
|
||||
test "opens connection and reuse it on next request", %{name: name} do
|
||||
api = Pleroma.Config.get([API])
|
||||
Pleroma.Config.put([API], :gun)
|
||||
on_exit(fn -> Pleroma.Config.put([API], api) end)
|
||||
conn = Connections.get_conn(name, "http://httpbin.org")
|
||||
|
||||
assert is_pid(conn)
|
||||
assert Process.alive?(conn)
|
||||
|
||||
reused_conn = Connections.get_conn(name, "http://httpbin.org")
|
||||
|
||||
assert conn == reused_conn
|
||||
|
||||
%Connections{
|
||||
conns: %{
|
||||
"httpbin.org:80" => %Conn{
|
||||
conn: ^conn,
|
||||
state: :up,
|
||||
waiting_pids: []
|
||||
}
|
||||
}
|
||||
} = Connections.get_state(name)
|
||||
end
|
||||
end
|
||||
end
|
@ -332,5 +332,38 @@ defmodule Pleroma.ReverseProxyTest do
|
||||
|
||||
describe "integration tests" do
|
||||
@describetag :integration
|
||||
|
||||
test "with hackney client", %{conn: conn} do
|
||||
client = Pleroma.Config.get([Pleroma.ReverseProxy.Client])
|
||||
Pleroma.Config.put([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Hackney)
|
||||
|
||||
on_exit(fn ->
|
||||
Pleroma.Config.put([Pleroma.ReverseProxy.Client], client)
|
||||
end)
|
||||
|
||||
conn = ReverseProxy.call(conn, "http://httpbin.org/stream-bytes/10")
|
||||
|
||||
assert byte_size(conn.resp_body) == 10
|
||||
assert conn.state == :chunked
|
||||
assert conn.status == 200
|
||||
end
|
||||
|
||||
test "with tesla client with gun adapter", %{conn: conn} do
|
||||
client = Pleroma.Config.get([Pleroma.ReverseProxy.Client])
|
||||
Pleroma.Config.put([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Tesla)
|
||||
adapter = Application.get_env(:tesla, :adapter)
|
||||
Application.put_env(:tesla, :adapter, Tesla.Adapter.Gun)
|
||||
|
||||
conn = ReverseProxy.call(conn, "http://httpbin.org/stream-bytes/10")
|
||||
|
||||
assert byte_size(conn.resp_body) == 10
|
||||
assert conn.state == :chunked
|
||||
assert conn.status == 200
|
||||
|
||||
on_exit(fn ->
|
||||
Pleroma.Config.put([Pleroma.ReverseProxy.Client], client)
|
||||
Application.put_env(:tesla, :adapter, adapter)
|
||||
end)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user