Browse Source

Connection Pool: register workers using :via

chores/our-libs-hex-releases
rinpatch 4 years ago
parent
commit
0ffde499b8
3 changed files with 14 additions and 14 deletions
  1. +5
    -3
      lib/pleroma/gun/connection_pool.ex
  2. +8
    -9
      lib/pleroma/gun/connection_pool/worker.ex
  3. +1
    -2
      lib/pleroma/gun/connection_pool/worker_supervisor.ex

+ 5
- 3
lib/pleroma/gun/connection_pool.ex View File

@@ -15,7 +15,7 @@ defmodule Pleroma.Gun.ConnectionPool do


case Registry.lookup(@registry, key) do case Registry.lookup(@registry, key) do
# The key has already been registered, but connection is not up yet # The key has already been registered, but connection is not up yet
[{worker_pid, {nil, _used_by, _crf, _last_reference}}] ->
[{worker_pid, nil}] ->
get_gun_pid_from_worker(worker_pid) get_gun_pid_from_worker(worker_pid)


[{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] -> [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] ->
@@ -26,13 +26,13 @@ defmodule Pleroma.Gun.ConnectionPool do
# :gun.set_owner fails in :connected state for whatevever reason, # :gun.set_owner fails in :connected state for whatevever reason,
# so we open the connection in the process directly and send it's pid back # so we open the connection in the process directly and send it's pid back
# We trust gun to handle timeouts by itself # We trust gun to handle timeouts by itself
case WorkerSupervisor.start_worker([uri, key, opts, self()]) do
case WorkerSupervisor.start_worker([key, uri, opts, self()]) do
{:ok, _worker_pid} -> {:ok, _worker_pid} ->
receive do receive do
{:conn_pid, pid} -> {:ok, pid} {:conn_pid, pid} -> {:ok, pid}
end end


{:error, {:error, {:already_registered, worker_pid}}} ->
{:error, {:already_started, worker_pid}} ->
get_gun_pid_from_worker(worker_pid) get_gun_pid_from_worker(worker_pid)


err -> err ->
@@ -56,6 +56,8 @@ defmodule Pleroma.Gun.ConnectionPool do
end end


def release_conn(conn_pid) do def release_conn(conn_pid) do
# :ets.fun2ms(fn {_, {worker_pid, {gun_pid, _, _, _}}} when gun_pid == conn_pid ->
# worker_pid end)
query_result = query_result =
Registry.select(@registry, [ Registry.select(@registry, [
{{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]} {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]}


+ 8
- 9
lib/pleroma/gun/connection_pool/worker.ex View File

@@ -4,20 +4,19 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do


@registry Pleroma.Gun.ConnectionPool @registry Pleroma.Gun.ConnectionPool


def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
def start_link([key | _] = opts) do
GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {@registry, key}})
end end


@impl true @impl true
def init([uri, key, opts, client_pid]) do
time = :os.system_time(:second)
# Register before opening connection to prevent race conditions
with {:ok, _owner} <- Registry.register(@registry, key, {nil, [client_pid], 1, time}),
{:ok, conn_pid} <- Gun.Conn.open(uri, opts),
def init([key, uri, opts, client_pid]) do
with {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
Process.link(conn_pid) do Process.link(conn_pid) do
time = :os.system_time(:second)

{_, _} = {_, _} =
Registry.update_value(@registry, key, fn {_, used_by, crf, last_reference} ->
{conn_pid, used_by, crf, last_reference}
Registry.update_value(@registry, key, fn _ ->
{conn_pid, [client_pid], 1, time}
end) end)


send(client_pid, {:conn_pid, conn_pid}) send(client_pid, {:conn_pid, conn_pid})


+ 1
- 2
lib/pleroma/gun/connection_pool/worker_supervisor.ex View File

@@ -1,5 +1,5 @@
defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
@doc "Supervisor for pool workers. Does not do anything except enforce max connection limit"
@moduledoc "Supervisor for pool workers. Does not do anything except enforce max connection limit"


use DynamicSupervisor use DynamicSupervisor


@@ -35,7 +35,6 @@ defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
pid = pid =
spawn(fn -> spawn(fn ->
{:ok, _pid} = Registry.register(@registry, @enforcer_key, nil) {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil)

max_connections = Pleroma.Config.get([:connections_pool, :max_connections]) max_connections = Pleroma.Config.get([:connections_pool, :max_connections])


reclaim_max = reclaim_max =


Loading…
Cancel
Save