open conn in separate task
This commit is contained in:
parent
d44f9e3b6c
commit
8efae966b1
12 changed files with 384 additions and 304 deletions
|
|
@ -20,7 +20,6 @@ defmodule Pleroma.Pool.Connections do
|
|||
defstruct conns: %{}, opts: []
|
||||
|
||||
alias Pleroma.Gun.API
|
||||
alias Pleroma.Gun.Conn
|
||||
|
||||
@spec start_link({atom(), keyword()}) :: {:ok, pid()}
|
||||
def start_link({name, opts}) do
|
||||
|
|
@ -44,23 +43,6 @@ defmodule Pleroma.Pool.Connections do
|
|||
)
|
||||
end
|
||||
|
||||
@spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok
|
||||
def open_conn(url, name, opts \\ [])
|
||||
def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts)
|
||||
|
||||
def open_conn(%URI{} = uri, name, opts) do
|
||||
pool_opts = Config.get([:connections_pool], [])
|
||||
|
||||
opts =
|
||||
opts
|
||||
|> Enum.into(%{})
|
||||
|> Map.put_new(:retry, pool_opts[:retry] || 0)
|
||||
|> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
|
||||
|> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
|
||||
|
||||
GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}})
|
||||
end
|
||||
|
||||
@spec alive?(atom()) :: boolean()
|
||||
def alive?(name) do
|
||||
pid = Process.whereis(name)
|
||||
|
|
@ -72,23 +54,37 @@ defmodule Pleroma.Pool.Connections do
|
|||
GenServer.call(name, :state)
|
||||
end
|
||||
|
||||
@spec count(atom()) :: pos_integer()
|
||||
def count(name) do
|
||||
GenServer.call(name, :count)
|
||||
end
|
||||
|
||||
@spec get_unused_conns(atom()) :: [{domain(), conn()}]
|
||||
def get_unused_conns(name) do
|
||||
GenServer.call(name, :unused_conns)
|
||||
end
|
||||
|
||||
@spec checkout(pid(), pid(), atom()) :: :ok
|
||||
def checkout(conn, pid, name) do
|
||||
GenServer.cast(name, {:checkout, conn, pid})
|
||||
end
|
||||
|
||||
@spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
|
||||
def add_conn(name, key, conn) do
|
||||
GenServer.cast(name, {:add_conn, key, conn})
|
||||
end
|
||||
|
||||
@spec remove_conn(atom(), String.t()) :: :ok
|
||||
def remove_conn(name, key) do
|
||||
GenServer.cast(name, {:remove_conn, key})
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do
|
||||
Logger.debug("opening new #{compose_uri(uri)}")
|
||||
max_connections = state.opts[:max_connections]
|
||||
def handle_cast({:add_conn, key, conn}, state) do
|
||||
state = put_in(state.conns[key], conn)
|
||||
|
||||
key = compose_key(uri)
|
||||
|
||||
if Enum.count(state.conns) < max_connections do
|
||||
open_conn(key, uri, state, opts)
|
||||
else
|
||||
try_to_open_conn(key, uri, state, opts)
|
||||
end
|
||||
Process.monitor(conn.conn)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
|
|
@ -120,14 +116,20 @@ defmodule Pleroma.Pool.Connections do
|
|||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:remove_conn, key}, state) do
|
||||
state = put_in(state.conns, Map.delete(state.conns, key))
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:checkin, uri}, from, state) do
|
||||
Logger.debug("checkin #{compose_uri(uri)}")
|
||||
key = compose_key(uri)
|
||||
key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
|
||||
Logger.debug("checkin #{key}")
|
||||
|
||||
case state.conns[key] do
|
||||
%{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
|
||||
Logger.debug("reusing conn #{compose_uri(uri)}")
|
||||
Logger.debug("reusing conn #{key}")
|
||||
|
||||
with time <- :os.system_time(:second),
|
||||
last_reference <- time - current_conn.last_reference,
|
||||
|
|
@ -154,12 +156,31 @@ defmodule Pleroma.Pool.Connections do
|
|||
@impl true
|
||||
def handle_call(:state, _from, state), do: {:reply, state, state}
|
||||
|
||||
@impl true
|
||||
def handle_call(:count, _from, state) do
|
||||
{:reply, Enum.count(state.conns), state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:unused_conns, _from, state) do
|
||||
unused_conns =
|
||||
state.conns
|
||||
|> Enum.filter(fn {_k, v} ->
|
||||
v.conn_state == :idle and v.used_by == []
|
||||
end)
|
||||
|> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
|
||||
x.crf <= y.crf and x.last_reference <= y.last_reference
|
||||
end)
|
||||
|
||||
{:reply, unused_conns, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info({:gun_up, conn_pid, _protocol}, state) do
|
||||
state =
|
||||
with true <- Process.alive?(conn_pid),
|
||||
conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
|
||||
with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
|
||||
{key, conn} <- find_conn(state.conns, conn_pid, conn_key),
|
||||
{true, key} <- {Process.alive?(conn_pid), key},
|
||||
time <- :os.system_time(:second),
|
||||
last_reference <- time - conn.last_reference,
|
||||
current_crf <- crf(last_reference, 100, conn.crf) do
|
||||
|
|
@ -176,14 +197,16 @@ defmodule Pleroma.Pool.Connections do
|
|||
Logger.debug(":gun.info caused error")
|
||||
state
|
||||
|
||||
false ->
|
||||
{false, key} ->
|
||||
Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}")
|
||||
state
|
||||
|
||||
put_in(
|
||||
state.conns,
|
||||
Map.delete(state.conns, key)
|
||||
)
|
||||
|
||||
nil ->
|
||||
Logger.debug(
|
||||
":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state"
|
||||
)
|
||||
Logger.debug(":gun_up message for conn which is not found in state")
|
||||
|
||||
:ok = API.close(conn_pid)
|
||||
|
||||
|
|
@ -198,8 +221,8 @@ defmodule Pleroma.Pool.Connections do
|
|||
retries = Config.get([:connections_pool, :retry], 0)
|
||||
# we can't get info on this pid, because pid is dead
|
||||
state =
|
||||
with true <- Process.alive?(conn_pid),
|
||||
{key, conn} <- find_conn(state.conns, conn_pid) do
|
||||
with {key, conn} <- find_conn(state.conns, conn_pid),
|
||||
{true, key} <- {Process.alive?(conn_pid), key} do
|
||||
if conn.retries == retries do
|
||||
Logger.debug("closing conn if retries is eq #{inspect(conn_pid)}")
|
||||
:ok = API.close(conn.conn)
|
||||
|
|
@ -216,15 +239,17 @@ defmodule Pleroma.Pool.Connections do
|
|||
})
|
||||
end
|
||||
else
|
||||
false ->
|
||||
{false, key} ->
|
||||
# gun can send gun_down for closed conn, maybe connection is not closed yet
|
||||
Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}")
|
||||
state
|
||||
|
||||
put_in(
|
||||
state.conns,
|
||||
Map.delete(state.conns, key)
|
||||
)
|
||||
|
||||
nil ->
|
||||
Logger.debug(
|
||||
":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state"
|
||||
)
|
||||
Logger.debug(":gun_down message for conn which is not found in state")
|
||||
|
||||
:ok = API.close(conn_pid)
|
||||
|
||||
|
|
@ -234,7 +259,29 @@ defmodule Pleroma.Pool.Connections do
|
|||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}"
|
||||
@impl true
|
||||
def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
|
||||
Logger.debug("received DOWM message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
|
||||
|
||||
state =
|
||||
with {key, conn} <- find_conn(state.conns, conn_pid) do
|
||||
Enum.each(conn.used_by, fn {pid, _ref} ->
|
||||
Process.exit(pid, reason)
|
||||
end)
|
||||
|
||||
put_in(
|
||||
state.conns,
|
||||
Map.delete(state.conns, key)
|
||||
)
|
||||
else
|
||||
nil ->
|
||||
Logger.debug(":DOWN message for conn which is not found in state")
|
||||
|
||||
state
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp compose_key_gun_info(pid) do
|
||||
try do
|
||||
|
|
@ -265,153 +312,11 @@ defmodule Pleroma.Pool.Connections do
|
|||
end)
|
||||
end
|
||||
|
||||
defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
|
||||
connect_opts =
|
||||
uri
|
||||
|> destination_opts()
|
||||
|> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
|
||||
|
||||
with open_opts <- Map.delete(opts, :tls_opts),
|
||||
{:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
|
||||
{:ok, _} <- API.await_up(conn),
|
||||
stream <- API.connect(conn, connect_opts),
|
||||
{:response, :fin, 200, _} <- API.await(conn, stream),
|
||||
state <-
|
||||
put_in(state.conns[key], %Conn{
|
||||
conn: conn,
|
||||
gun_state: :up,
|
||||
conn_state: :active,
|
||||
last_reference: :os.system_time(:second)
|
||||
}) do
|
||||
{:noreply, state}
|
||||
else
|
||||
error ->
|
||||
Logger.warn(
|
||||
"Received error on opening connection with http proxy #{uri.scheme}://#{
|
||||
compose_uri(uri)
|
||||
}: #{inspect(error)}"
|
||||
)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
|
||||
version =
|
||||
proxy_type
|
||||
|> to_string()
|
||||
|> String.last()
|
||||
|> case do
|
||||
"4" -> 4
|
||||
_ -> 5
|
||||
end
|
||||
|
||||
socks_opts =
|
||||
uri
|
||||
|> destination_opts()
|
||||
|> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
|
||||
|> Map.put(:version, version)
|
||||
|
||||
opts =
|
||||
opts
|
||||
|> Map.put(:protocols, [:socks])
|
||||
|> Map.put(:socks_opts, socks_opts)
|
||||
|
||||
with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
|
||||
{:ok, _} <- API.await_up(conn),
|
||||
state <-
|
||||
put_in(state.conns[key], %Conn{
|
||||
conn: conn,
|
||||
gun_state: :up,
|
||||
conn_state: :active,
|
||||
last_reference: :os.system_time(:second)
|
||||
}) do
|
||||
{:noreply, state}
|
||||
else
|
||||
error ->
|
||||
Logger.warn(
|
||||
"Received error on opening connection with socks proxy #{uri.scheme}://#{
|
||||
compose_uri(uri)
|
||||
}: #{inspect(error)}"
|
||||
)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
|
||||
Logger.debug("opening conn #{compose_uri(uri)}")
|
||||
{_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
|
||||
|
||||
with {:ok, conn} <- API.open(host, port, opts),
|
||||
{:ok, _} <- API.await_up(conn),
|
||||
state <-
|
||||
put_in(state.conns[key], %Conn{
|
||||
conn: conn,
|
||||
gun_state: :up,
|
||||
conn_state: :active,
|
||||
last_reference: :os.system_time(:second)
|
||||
}) do
|
||||
Logger.debug("new conn opened #{compose_uri(uri)}")
|
||||
Logger.debug("replying to the call #{compose_uri(uri)}")
|
||||
{:noreply, state}
|
||||
else
|
||||
error ->
|
||||
Logger.warn(
|
||||
"Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
|
||||
inspect(error)
|
||||
}"
|
||||
)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
defp destination_opts(%URI{host: host, port: port}) do
|
||||
{_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
|
||||
%{host: host, port: port}
|
||||
end
|
||||
|
||||
defp add_http2_opts(opts, "https", tls_opts) do
|
||||
Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
|
||||
end
|
||||
|
||||
defp add_http2_opts(opts, _, _), do: opts
|
||||
|
||||
@spec get_unused_conns(map()) :: [{domain(), conn()}]
|
||||
def get_unused_conns(conns) do
|
||||
conns
|
||||
|> Enum.filter(fn {_k, v} ->
|
||||
v.conn_state == :idle and v.used_by == []
|
||||
end)
|
||||
|> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
|
||||
x.crf <= y.crf and x.last_reference <= y.last_reference
|
||||
end)
|
||||
end
|
||||
|
||||
defp try_to_open_conn(key, uri, state, opts) do
|
||||
Logger.debug("try to open conn #{compose_uri(uri)}")
|
||||
|
||||
with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
|
||||
:ok <- API.close(least_used.conn),
|
||||
state <-
|
||||
put_in(
|
||||
state.conns,
|
||||
Map.delete(state.conns, close_key)
|
||||
) do
|
||||
Logger.debug(
|
||||
"least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
|
||||
)
|
||||
|
||||
open_conn(key, uri, state, opts)
|
||||
else
|
||||
[] -> {:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
def crf(current, steps, crf) do
|
||||
1 + :math.pow(0.5, current / steps) * crf
|
||||
end
|
||||
|
||||
def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}"
|
||||
def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
|
||||
"#{scheme}://#{host}#{path}"
|
||||
end
|
||||
end
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue