Merge branch 'develop' into feature/database-compaction

This commit is contained in:
rinpatch 2019-04-17 12:22:32 +03:00
commit 627e5a0a49
1271 changed files with 42114 additions and 70683 deletions

View file

@ -1,55 +1,85 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator do
use GenServer
alias Pleroma.User
alias Pleroma.Activity
alias Pleroma.Object.Containment
alias Pleroma.Web.{WebFinger, Websub}
alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Relay
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.OStatus
alias Pleroma.Object.Containment
alias Pleroma.Web.Salmon
alias Pleroma.Web.WebFinger
alias Pleroma.Web.Websub
require Logger
@websub Application.get_env(:pleroma, :websub)
@ostatus Application.get_env(:pleroma, :ostatus)
@httpoison Application.get_env(:pleroma, :httpoison)
@max_jobs 20
def init(args) do
{:ok, args}
def init do
# 1 minute
Process.sleep(1000 * 60)
refresh_subscriptions()
end
def start_link do
spawn(fn ->
# 1 minute
Process.sleep(1000 * 60 * 1)
enqueue(:refresh_subscriptions, nil)
end)
# Client API
GenServer.start_link(
__MODULE__,
%{
in: {:sets.new(), []},
out: {:sets.new(), []}
},
name: __MODULE__
)
def incoming_doc(doc) do
PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
end
def handle(:refresh_subscriptions, _) do
def incoming_ap_doc(params) do
PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
end
def publish(activity, priority \\ 1) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
end
def publish_single_ap(params) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params])
end
def publish_single_websub(websub) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
end
def verify_websub(websub) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
end
def request_subscription(sub) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
end
def refresh_subscriptions do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
end
def publish_single_salmon(params) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params])
end
# Job Worker Callbacks
def perform(:refresh_subscriptions) do
Logger.debug("Federator running refresh subscriptions")
Websub.refresh_subscriptions()
spawn(fn ->
# 6 hours
Process.sleep(1000 * 60 * 60 * 6)
enqueue(:refresh_subscriptions, nil)
refresh_subscriptions()
end)
end
def handle(:request_subscription, websub) do
def perform(:request_subscription, websub) do
Logger.debug("Refreshing #{websub.topic}")
with {:ok, websub} <- Websub.request_subscription(websub) do
@ -59,13 +89,13 @@ defmodule Pleroma.Web.Federator do
end
end
def handle(:publish, activity) do
def perform(:publish, activity) do
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
{:ok, actor} = WebFinger.ensure_keys_present(actor)
if ActivityPub.is_public?(activity) do
if Visibility.is_public?(activity) do
if OStatus.is_representable?(activity) do
Logger.info(fn -> "Sending #{activity.data["id"]} out via WebSub" end)
Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
@ -85,7 +115,7 @@ defmodule Pleroma.Web.Federator do
end
end
def handle(:verify_websub, websub) do
def perform(:verify_websub, websub) do
Logger.debug(fn ->
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
end)
@ -93,12 +123,12 @@ defmodule Pleroma.Web.Federator do
@websub.verify(websub)
end
def handle(:incoming_doc, doc) do
def perform(:incoming_doc, doc) do
Logger.info("Got document, trying to parse")
@ostatus.handle_incoming(doc)
end
def handle(:incoming_ap_doc, params) do
def perform(:incoming_ap_doc, params) do
Logger.info("Handling incoming AP activity")
params = Utils.normalize_params(params)
@ -123,7 +153,11 @@ defmodule Pleroma.Web.Federator do
end
end
def handle(:publish_single_ap, params) do
def perform(:publish_single_salmon, params) do
Salmon.send_to_user(params)
end
def perform(:publish_single_ap, params) do
case ActivityPub.publish_one(params) do
{:ok, _} ->
:ok
@ -133,9 +167,9 @@ defmodule Pleroma.Web.Federator do
end
end
def handle(
def perform(
:publish_single_websub,
%{xml: xml, topic: topic, callback: callback, secret: secret} = params
%{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
) do
case Websub.publish_one(params) do
{:ok, _} ->
@ -146,75 +180,11 @@ defmodule Pleroma.Web.Federator do
end
end
def handle(type, _) do
def perform(type, _) do
Logger.debug(fn -> "Unknown task: #{type}" end)
{:error, "Don't know what to do with this"}
end
if Mix.env() == :test do
def enqueue(type, payload, priority \\ 1) do
if Pleroma.Config.get([:instance, :federating]) do
handle(type, payload)
end
end
else
def enqueue(type, payload, priority \\ 1) do
if Pleroma.Config.get([:instance, :federating]) do
GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
end
end
end
def maybe_start_job(running_jobs, queue) do
if :sets.size(running_jobs) < @max_jobs && queue != [] do
{{type, payload}, queue} = queue_pop(queue)
{:ok, pid} = Task.start(fn -> handle(type, payload) end)
mref = Process.monitor(pid)
{:sets.add_element(mref, running_jobs), queue}
else
{running_jobs, queue}
end
end
def handle_cast({:enqueue, type, payload, _priority}, state)
when type in [:incoming_doc, :incoming_ap_doc] do
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
i_queue = enqueue_sorted(i_queue, {type, payload}, 1)
{i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end
def handle_cast({:enqueue, type, payload, _priority}, state) do
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
o_queue = enqueue_sorted(o_queue, {type, payload}, 1)
{o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end
def handle_cast(m, state) do
IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
{:noreply, state}
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
i_running_jobs = :sets.del_element(ref, i_running_jobs)
o_running_jobs = :sets.del_element(ref, o_running_jobs)
{i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
{o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end
def enqueue_sorted(queue, element, priority) do
[%{item: element, priority: priority} | queue]
|> Enum.sort_by(fn %{priority: priority} -> priority end)
end
def queue_pop([%{item: element} | queue]) do
{element, queue}
end
def ap_enabled_actor(id) do
user = User.get_by_ap_id(id)

View file

@ -1,47 +1,171 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator.RetryQueue do
use GenServer
alias Pleroma.Web.{WebFinger, Websub}
alias Pleroma.Web.ActivityPub.ActivityPub
require Logger
@websub Application.get_env(:pleroma, :websub)
@ostatus Application.get_env(:pleroma, :websub)
@httpoison Application.get_env(:pleroma, :websub)
@instance Application.get_env(:pleroma, :websub)
# initial timeout, 5 min
@initial_timeout 30_000
@max_retries 5
def init(args) do
{:ok, args}
queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
{:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
end
def start_link() do
GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
def start_link do
enabled =
if Mix.env() == :test, do: true, else: Pleroma.Config.get([__MODULE__, :enabled], false)
if enabled do
Logger.info("Starting retry queue")
linkres =
GenServer.start_link(
__MODULE__,
%{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
name: __MODULE__
)
maybe_kickoff_timer()
linkres
else
Logger.info("Retry queue disabled")
:ignore
end
end
def enqueue(data, transport, retries \\ 0) do
GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
end
def get_stats do
GenServer.call(__MODULE__, :get_stats)
end
def reset_stats do
GenServer.call(__MODULE__, :reset_stats)
end
def get_retry_params(retries) do
if retries > @max_retries do
if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
{:drop, "Max retries reached"}
else
{:retry, growth_function(retries)}
end
end
def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
def get_retry_timer_interval do
Pleroma.Config.get([:retry_queue, :interval], 1000)
end
defp ets_count_expires(table, current_time) do
:ets.select_count(
table,
[
{
{:"$1", :"$2"},
[{:"=<", :"$1", {:const, current_time}}],
[true]
}
]
)
end
defp ets_pop_n_expired(table, current_time, desired) do
{popped, _continuation} =
:ets.select(
table,
[
{
{:"$1", :"$2"},
[{:"=<", :"$1", {:const, current_time}}],
[:"$_"]
}
],
desired
)
popped
|> Enum.each(fn e ->
:ets.delete_object(table, e)
end)
popped
end
def maybe_start_job(running_jobs, queue_table) do
# we don't want to hit the ets or the DateTime more times than we have to
# could optimize slightly further by not using the count, and instead grabbing
# up to N objects early...
current_time = DateTime.to_unix(DateTime.utc_now())
n_running_jobs = :sets.size(running_jobs)
if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
n_ready_jobs = ets_count_expires(queue_table, current_time)
if n_ready_jobs > 0 do
# figure out how many we could start
available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
else
running_jobs
end
else
running_jobs
end
end
defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
running_jobs
end
defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
when available_job_slots > 0 do
candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
candidates
|> List.foldl(running_jobs, fn {_, e}, rj ->
{:ok, pid} = Task.start(fn -> worker(e) end)
mref = Process.monitor(pid)
:sets.add_element(mref, rj)
end)
end
def worker({:send, data, transport, retries}) do
case transport.publish_one(data) do
{:ok, _} ->
GenServer.cast(__MODULE__, :inc_delivered)
:delivered
{:error, _reason} ->
enqueue(data, transport, retries)
:retry
end
end
def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
{:reply, %{delivered: delivery_count, dropped: drop_count}, state}
end
def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
{:reply, %{delivered: delivery_count, dropped: drop_count},
%{state | delivered: 0, dropped: 0}}
end
def handle_cast(:reset_stats, state) do
{:noreply, %{state | delivered: 0, dropped: 0}}
end
def handle_cast(
{:maybe_enqueue, data, transport, retries},
%{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
) do
case get_retry_params(retries) do
{:retry, timeout} ->
Process.send_after(
__MODULE__,
{:send, data, transport, retries},
growth_function(retries)
)
{:noreply, state}
:ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
running_jobs = maybe_start_job(running_jobs, queue_table)
{:noreply, %{state | running_jobs: running_jobs}}
{:drop, message} ->
Logger.debug(message)
@ -49,23 +173,65 @@ defmodule Pleroma.Web.Federator.RetryQueue do
end
end
def handle_cast(:kickoff_timer, state) do
retry_interval = get_retry_timer_interval()
Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
{:noreply, state}
end
def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
{:noreply, %{state | delivered: delivery_count + 1}}
end
def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
{:noreply, %{state | dropped: drop_count + 1}}
end
def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
case transport.publish_one(data) do
{:ok, _} ->
{:noreply, %{state | delivered: delivery_count + 1}}
{:error, reason} ->
{:error, _reason} ->
enqueue(data, transport, retries)
{:noreply, state}
end
end
def handle_info(
:retry_timer_run,
%{queue_table: queue_table, running_jobs: running_jobs} = state
) do
maybe_kickoff_timer()
running_jobs = maybe_start_job(running_jobs, queue_table)
{:noreply, %{state | running_jobs: running_jobs}}
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
%{running_jobs: running_jobs, queue_table: queue_table} = state
running_jobs = :sets.del_element(ref, running_jobs)
running_jobs = maybe_start_job(running_jobs, queue_table)
{:noreply, %{state | running_jobs: running_jobs}}
end
def handle_info(unknown, state) do
Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
{:noreply, state}
end
defp growth_function(retries) do
round(@initial_timeout * :math.pow(retries, 3))
if Mix.env() == :test do
defp growth_function(_retries) do
_shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
DateTime.to_unix(DateTime.utc_now()) - 1
end
else
defp growth_function(retries) do
round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
DateTime.to_unix(DateTime.utc_now())
end
end
defp maybe_kickoff_timer do
GenServer.cast(__MODULE__, :kickoff_timer)
end
end