added tesla client for reverse proxy
This commit is contained in:
parent
b383d85b9b
commit
4e9d9209c3
@ -10,11 +10,7 @@ defmodule Pleroma.HTTP.Connection do
|
||||
@options [
|
||||
connect_timeout: 10_000,
|
||||
protocols: [:http],
|
||||
timeout: 20_000,
|
||||
recv_timeout: 20_000,
|
||||
follow_redirect: true,
|
||||
force_redirect: true,
|
||||
pool: :federation
|
||||
timeout: 20_000
|
||||
]
|
||||
@adapter Application.get_env(:tesla, :adapter)
|
||||
|
||||
|
@ -3,9 +3,14 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.ReverseProxy.Client do
|
||||
@callback request(atom(), String.t(), [tuple()], String.t(), list()) ::
|
||||
{:ok, pos_integer(), [tuple()], reference() | map()}
|
||||
| {:ok, pos_integer(), [tuple()]}
|
||||
@type status :: pos_integer()
|
||||
@type header_name :: String.t()
|
||||
@type header_value :: String.t()
|
||||
@type headers :: [{header_name(), header_value()}]
|
||||
|
||||
@callback request(atom(), String.t(), headers(), String.t(), list()) ::
|
||||
{:ok, status(), headers(), reference() | map()}
|
||||
| {:ok, status(), headers()}
|
||||
| {:ok, reference()}
|
||||
| {:error, term()}
|
||||
|
||||
@ -14,8 +19,8 @@ defmodule Pleroma.ReverseProxy.Client do
|
||||
|
||||
@callback close(reference() | pid() | map()) :: :ok
|
||||
|
||||
def request(method, url, headers, "", opts \\ []) do
|
||||
client().request(method, url, headers, "", opts)
|
||||
def request(method, url, headers, body \\ "", opts \\ []) do
|
||||
client().request(method, url, headers, body, opts)
|
||||
end
|
||||
|
||||
def stream_body(ref), do: client().stream_body(ref)
|
||||
@ -23,6 +28,6 @@ defmodule Pleroma.ReverseProxy.Client do
|
||||
def close(ref), do: client().close(ref)
|
||||
|
||||
defp client do
|
||||
Pleroma.Config.get([Pleroma.ReverseProxy.Client], :hackney)
|
||||
Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Hackney)
|
||||
end
|
||||
end
|
||||
|
17
lib/pleroma/reverse_proxy/client/hackney.ex
Normal file
17
lib/pleroma/reverse_proxy/client/hackney.ex
Normal file
@ -0,0 +1,17 @@
|
||||
defmodule Pleroma.ReverseProxy.Client.Hackney do
|
||||
@behaviour Pleroma.ReverseProxy.Client
|
||||
|
||||
def request(method, url, headers, body, opts \\ []) do
|
||||
:hackney.request(method, url, headers, body, opts)
|
||||
end
|
||||
|
||||
def stream_body(ref) do
|
||||
case :hackney.stream_body(ref) do
|
||||
:done -> :done
|
||||
{:ok, data} -> {:ok, data, ref}
|
||||
{:error, error} -> {:error, error}
|
||||
end
|
||||
end
|
||||
|
||||
def close(ref), do: :hackney.close(ref)
|
||||
end
|
48
lib/pleroma/reverse_proxy/client/tesla.ex
Normal file
48
lib/pleroma/reverse_proxy/client/tesla.ex
Normal file
@ -0,0 +1,48 @@
|
||||
defmodule Pleroma.ReverseProxy.Client.Tesla do
|
||||
@behaviour Pleroma.ReverseProxy.Client
|
||||
|
||||
@adapters [Tesla.Adapter.Gun]
|
||||
alias Pleroma.HTTP
|
||||
|
||||
def request(method, url, headers, body, opts \\ []) do
|
||||
adapter_opts =
|
||||
Keyword.get(opts, :adapter, [])
|
||||
|> Keyword.put(:chunks_response, true)
|
||||
|
||||
with {:ok, response} <-
|
||||
HTTP.request(method, url, body, headers, Keyword.put(opts, :adapter, adapter_opts)) do
|
||||
{:ok, response.status, response.headers, response.body}
|
||||
else
|
||||
{:error, error} -> {:error, error}
|
||||
end
|
||||
end
|
||||
|
||||
def stream_body(%{fin: true}), do: :done
|
||||
|
||||
def stream_body(client) do
|
||||
case read_chunk!(client) do
|
||||
{:fin, body} -> {:ok, body, Map.put(client, :fin, true)}
|
||||
{:nofin, part} -> {:ok, part, client}
|
||||
end
|
||||
end
|
||||
|
||||
defp read_chunk!(client) do
|
||||
adapter = Application.get_env(:tesla, :adapter)
|
||||
|
||||
unless adapter in @adapters do
|
||||
raise "#{adapter} doesn't support reading body in chunks"
|
||||
end
|
||||
|
||||
adapter.read_chunk(client)
|
||||
end
|
||||
|
||||
def close(client) do
|
||||
adapter = Application.get_env(:tesla, :adapter)
|
||||
|
||||
unless adapter in @adapters do
|
||||
raise "#{adapter} doesn't support closing connection"
|
||||
end
|
||||
|
||||
adapter.close(client)
|
||||
end
|
||||
end
|
@ -61,7 +61,7 @@ defmodule Pleroma.ReverseProxy do
|
||||
* `http`: options for [hackney](https://github.com/benoitc/hackney).
|
||||
|
||||
"""
|
||||
@default_hackney_options [pool: :media]
|
||||
@default_options [pool: :media]
|
||||
|
||||
@inline_content_types [
|
||||
"image/gif",
|
||||
@ -93,9 +93,9 @@ defmodule Pleroma.ReverseProxy do
|
||||
def call(_conn, _url, _opts \\ [])
|
||||
|
||||
def call(conn = %{method: method}, url, opts) when method in @methods do
|
||||
hackney_opts =
|
||||
client_opts =
|
||||
Pleroma.HTTP.Connection.options([])
|
||||
|> Keyword.merge(@default_hackney_options)
|
||||
|> Keyword.merge(@default_options)
|
||||
|> Keyword.merge(Keyword.get(opts, :http, []))
|
||||
|> HTTP.process_request_options()
|
||||
|
||||
@ -108,7 +108,7 @@ defmodule Pleroma.ReverseProxy do
|
||||
opts
|
||||
end
|
||||
|
||||
with {:ok, code, headers, client} <- request(method, url, req_headers, hackney_opts),
|
||||
with {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
|
||||
:ok <-
|
||||
header_length_constraint(
|
||||
headers,
|
||||
@ -201,7 +201,7 @@ defmodule Pleroma.ReverseProxy do
|
||||
duration,
|
||||
Keyword.get(opts, :max_read_duration, @max_read_duration)
|
||||
),
|
||||
{:ok, data} <- client().stream_body(client),
|
||||
{:ok, data, client} <- client().stream_body(client),
|
||||
{:ok, duration} <- increase_read_duration(duration),
|
||||
sent_so_far = sent_so_far + byte_size(data),
|
||||
:ok <-
|
||||
|
2
mix.exs
2
mix.exs
@ -113,7 +113,7 @@ defmodule Pleroma.Mixfile do
|
||||
{:poison, "~> 3.0", override: true},
|
||||
{:tesla,
|
||||
github: "alex-strizhakov/tesla",
|
||||
ref: "9ad792fb630bdfc2266ed13b830c28b6552fb3f9",
|
||||
ref: "beb8927358dfaa66ecd458df607befde12dd56e0",
|
||||
override: true},
|
||||
{:cowlib, "~> 2.6.0", override: true},
|
||||
{:gun, "~> 1.3"},
|
||||
|
2
mix.lock
2
mix.lock
@ -85,7 +85,7 @@
|
||||
"swoosh": {:hex, :swoosh, "0.23.2", "7dda95ff0bf54a2298328d6899c74dae1223777b43563ccebebb4b5d2b61df38", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm"},
|
||||
"syslog": {:git, "https://github.com/Vagabond/erlang-syslog.git", "4a6c6f2c996483e86c1320e9553f91d337bcb6aa", [tag: "1.0.5"]},
|
||||
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"},
|
||||
"tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "9ad792fb630bdfc2266ed13b830c28b6552fb3f9", [ref: "9ad792fb630bdfc2266ed13b830c28b6552fb3f9"]},
|
||||
"tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "beb8927358dfaa66ecd458df607befde12dd56e0", [ref: "beb8927358dfaa66ecd458df607befde12dd56e0"]},
|
||||
"timex": {:hex, :timex, "3.6.1", "efdf56d0e67a6b956cc57774353b0329c8ab7726766a11547e529357ffdc1d56", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"trailing_format_plug": {:hex, :trailing_format_plug, "0.0.7", "64b877f912cf7273bed03379936df39894149e35137ac9509117e59866e10e45", [:mix], [{:plug, "> 0.12.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"tzdata": {:hex, :tzdata, "0.5.21", "8cbf3607fcce69636c672d5be2bbb08687fe26639a62bdcc283d267277db7cf0", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
|
@ -29,11 +29,11 @@ defmodule Pleroma.ReverseProxyTest do
|
||||
{"content-length", byte_size(json) |> to_string()}
|
||||
], %{url: url}}
|
||||
end)
|
||||
|> expect(:stream_body, invokes, fn %{url: url} ->
|
||||
|> expect(:stream_body, invokes, fn %{url: url} = client ->
|
||||
case Registry.lookup(Pleroma.ReverseProxy.ClientMock, url) do
|
||||
[{_, 0}] ->
|
||||
Registry.update_value(Pleroma.ReverseProxy.ClientMock, url, &(&1 + 1))
|
||||
{:ok, json}
|
||||
{:ok, json, client}
|
||||
|
||||
[{_, 1}] ->
|
||||
Registry.unregister(Pleroma.ReverseProxy.ClientMock, url)
|
||||
@ -66,6 +66,38 @@ defmodule Pleroma.ReverseProxyTest do
|
||||
assert conn.halted
|
||||
end
|
||||
|
||||
defp stream_mock(invokes, with_close? \\ false) do
|
||||
ClientMock
|
||||
|> expect(:request, fn :get, "/stream-bytes/" <> length, _, _, _ ->
|
||||
Registry.register(Pleroma.ReverseProxy.ClientMock, "/stream-bytes/" <> length, 0)
|
||||
|
||||
{:ok, 200, [{"content-type", "application/octet-stream"}],
|
||||
%{url: "/stream-bytes/" <> length}}
|
||||
end)
|
||||
|> expect(:stream_body, invokes, fn %{url: "/stream-bytes/" <> length} = client ->
|
||||
max = String.to_integer(length)
|
||||
|
||||
case Registry.lookup(Pleroma.ReverseProxy.ClientMock, "/stream-bytes/" <> length) do
|
||||
[{_, current}] when current < max ->
|
||||
Registry.update_value(
|
||||
Pleroma.ReverseProxy.ClientMock,
|
||||
"/stream-bytes/" <> length,
|
||||
&(&1 + 10)
|
||||
)
|
||||
|
||||
{:ok, "0123456789", client}
|
||||
|
||||
[{_, ^max}] ->
|
||||
Registry.unregister(Pleroma.ReverseProxy.ClientMock, "/stream-bytes/" <> length)
|
||||
:done
|
||||
end
|
||||
end)
|
||||
|
||||
if with_close? do
|
||||
expect(ClientMock, :close, fn _ -> :ok end)
|
||||
end
|
||||
end
|
||||
|
||||
describe "max_body " do
|
||||
test "length returns error if content-length more than option", %{conn: conn} do
|
||||
user_agent_mock("hackney/1.15.1", 0)
|
||||
@ -179,12 +211,12 @@ defmodule Pleroma.ReverseProxyTest do
|
||||
Registry.register(Pleroma.ReverseProxy.ClientMock, "/headers", 0)
|
||||
{:ok, 200, [{"content-type", "application/json"}], %{url: "/headers", headers: headers}}
|
||||
end)
|
||||
|> expect(:stream_body, 2, fn %{url: url, headers: headers} ->
|
||||
|> expect(:stream_body, 2, fn %{url: url, headers: headers} = client ->
|
||||
case Registry.lookup(Pleroma.ReverseProxy.ClientMock, url) do
|
||||
[{_, 0}] ->
|
||||
Registry.update_value(Pleroma.ReverseProxy.ClientMock, url, &(&1 + 1))
|
||||
headers = for {k, v} <- headers, into: %{}, do: {String.capitalize(k), v}
|
||||
{:ok, Jason.encode!(%{headers: headers})}
|
||||
{:ok, Jason.encode!(%{headers: headers}), client}
|
||||
|
||||
[{_, 1}] ->
|
||||
Registry.unregister(Pleroma.ReverseProxy.ClientMock, url)
|
||||
@ -261,11 +293,11 @@ defmodule Pleroma.ReverseProxyTest do
|
||||
|
||||
{:ok, 200, headers, %{url: "/disposition"}}
|
||||
end)
|
||||
|> expect(:stream_body, 2, fn %{url: "/disposition"} ->
|
||||
|> expect(:stream_body, 2, fn %{url: "/disposition"} = client ->
|
||||
case Registry.lookup(Pleroma.ReverseProxy.ClientMock, "/disposition") do
|
||||
[{_, 0}] ->
|
||||
Registry.update_value(Pleroma.ReverseProxy.ClientMock, "/disposition", &(&1 + 1))
|
||||
{:ok, ""}
|
||||
{:ok, "", client}
|
||||
|
||||
[{_, 1}] ->
|
||||
Registry.unregister(Pleroma.ReverseProxy.ClientMock, "/disposition")
|
||||
|
Loading…
Reference in New Issue
Block a user