168 lines
4.8 KiB
Elixir
168 lines
4.8 KiB
Elixir
# Pleroma: A lightweight social networking server
|
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
defmodule Pleroma.Gun.Connections do
|
|
use GenServer
|
|
|
|
@type domain :: String.t()
|
|
@type conn :: Pleroma.Gun.Conn.t()
|
|
|
|
@type t :: %__MODULE__{
|
|
conns: %{domain() => conn()},
|
|
opts: keyword()
|
|
}
|
|
|
|
defstruct conns: %{}, opts: []
|
|
|
|
@spec start_link({atom(), keyword()}) :: {:ok, pid()} | :ignore
|
|
def start_link({name, opts}) do
|
|
if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
|
|
GenServer.start_link(__MODULE__, opts, name: name)
|
|
else
|
|
:ignore
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
|
|
|
|
@spec get_conn(String.t(), keyword(), atom()) :: pid()
|
|
def get_conn(url, opts \\ [], name \\ :default) do
|
|
opts = Enum.into(opts, %{})
|
|
|
|
uri = URI.parse(url)
|
|
|
|
opts =
|
|
if uri.scheme == "https" and uri.port != 443,
|
|
do: Map.put(opts, :transport, :tls),
|
|
else: opts
|
|
|
|
GenServer.call(
|
|
name,
|
|
{:conn, %{opts: opts, uri: uri}}
|
|
)
|
|
end
|
|
|
|
# TODO: only for testing, add this parameter to the config
|
|
@spec try_to_get_gun_conn(String.t(), keyword(), atom()) :: nil | pid()
|
|
def try_to_get_gun_conn(url, opts \\ [], name \\ :default),
|
|
do: try_to_get_gun_conn(url, opts, name, 0)
|
|
|
|
@spec try_to_get_gun_conn(String.t(), keyword(), atom(), pos_integer()) :: nil | pid()
|
|
def try_to_get_gun_conn(_url, _, _, 3), do: nil
|
|
|
|
def try_to_get_gun_conn(url, opts, name, acc) do
|
|
case Pleroma.Gun.Connections.get_conn(url, opts, name) do
|
|
nil -> try_to_get_gun_conn(url, acc + 1)
|
|
conn -> conn
|
|
end
|
|
end
|
|
|
|
@spec alive?(atom()) :: boolean()
|
|
def alive?(name \\ :default) do
|
|
pid = Process.whereis(name)
|
|
if pid, do: Process.alive?(pid), else: false
|
|
end
|
|
|
|
@spec get_state(atom()) :: t()
|
|
def get_state(name \\ :default) do
|
|
GenServer.call(name, {:state})
|
|
end
|
|
|
|
@impl true
|
|
def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do
|
|
key = compose_key(uri)
|
|
|
|
case state.conns[key] do
|
|
%{conn: conn, state: conn_state, used: used} when conn_state == :up ->
|
|
state = put_in(state.conns[key].used, used + 1)
|
|
{:reply, conn, state}
|
|
|
|
%{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] ->
|
|
state = put_in(state.conns[key].waiting_pids, [from | pids])
|
|
{:noreply, state}
|
|
|
|
nil ->
|
|
max_connections = state.opts[:max_connections]
|
|
|
|
if Enum.count(state.conns) < max_connections do
|
|
open_conn(key, uri, from, state, opts)
|
|
else
|
|
[{close_key, least_used} | _conns] = Enum.sort_by(state.conns, fn {_k, v} -> v.used end)
|
|
|
|
:ok = Pleroma.Gun.API.close(least_used.conn)
|
|
|
|
state =
|
|
put_in(
|
|
state.conns,
|
|
Map.delete(state.conns, close_key)
|
|
)
|
|
|
|
open_conn(key, uri, from, state, opts)
|
|
end
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def handle_call({:state}, _from, state), do: {:reply, state, state}
|
|
|
|
@impl true
|
|
def handle_info({:gun_up, conn_pid, _protocol}, state) do
|
|
conn_key = compose_key_gun_info(conn_pid)
|
|
{key, conn} = find_conn(state.conns, conn_pid, conn_key)
|
|
|
|
# Send to all waiting processes connection pid
|
|
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
|
|
|
|
# Update state of the current connection and set waiting_pids to empty list
|
|
state =
|
|
put_in(state.conns[key], %{
|
|
conn
|
|
| state: :up,
|
|
waiting_pids: [],
|
|
used: conn.used + length(conn.waiting_pids)
|
|
})
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
# Do we need to do something with killed & unprocessed references?
|
|
def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed, _unprocessed}, state) do
|
|
conn_key = compose_key_gun_info(conn_pid)
|
|
{key, conn} = find_conn(state.conns, conn_pid, conn_key)
|
|
|
|
# We don't want to block requests to GenServer if gun send down message, return nil, so we can make some retries, while connection is not up
|
|
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end)
|
|
|
|
state = put_in(state.conns[key].state, :down)
|
|
{:noreply, state}
|
|
end
|
|
|
|
defp compose_key(uri), do: "#{uri.scheme}:#{uri.host}:#{uri.port}"
|
|
|
|
defp compose_key_gun_info(pid) do
|
|
info = Pleroma.Gun.API.info(pid)
|
|
"#{info.origin_scheme}:#{info.origin_host}:#{info.origin_port}"
|
|
end
|
|
|
|
defp find_conn(conns, conn_pid, conn_key) do
|
|
Enum.find(conns, fn {key, conn} ->
|
|
key == conn_key and conn.conn == conn_pid
|
|
end)
|
|
end
|
|
|
|
defp open_conn(key, uri, from, state, opts) do
|
|
{:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts)
|
|
|
|
state =
|
|
put_in(state.conns[key], %Pleroma.Gun.Conn{
|
|
conn: conn,
|
|
waiting_pids: [from]
|
|
})
|
|
|
|
{:noreply, state}
|
|
end
|
|
end
|