Merge branch 'improved-reachability' into 'develop'

Reachability refactor

See merge request pleroma/pleroma!4366
This commit is contained in:
feld 2025-07-29 21:13:48 +00:00
commit ece089abab
28 changed files with 733 additions and 319 deletions

View file

@ -15,25 +15,7 @@ defmodule Pleroma.Instances do
defdelegate set_unreachable(url_or_host, unreachable_since \\ nil), to: Instance
defdelegate get_consistently_unreachable, to: Instance
def set_consistently_unreachable(url_or_host),
do: set_unreachable(url_or_host, reachability_datetime_threshold())
def reachability_datetime_threshold do
federation_reachability_timeout_days =
Pleroma.Config.get([:instance, :federation_reachability_timeout_days], 0)
if federation_reachability_timeout_days > 0 do
NaiveDateTime.add(
NaiveDateTime.utc_now(),
-federation_reachability_timeout_days * 24 * 3600,
:second
)
else
~N[0000-01-01 00:00:00]
end
end
defdelegate get_unreachable, to: Instance
def host(url_or_host) when is_binary(url_or_host) do
if url_or_host =~ ~r/^http/i do
@ -42,4 +24,21 @@ defmodule Pleroma.Instances do
url_or_host
end
end
@doc "Schedules reachability checks for all unreachable instances"
def check_all_unreachable do
get_unreachable()
|> Enum.each(fn {domain, _} ->
Pleroma.Workers.ReachabilityWorker.new(%{"domain" => domain})
|> Oban.insert()
end)
end
@doc "Deletes all users and activities for unreachable instances"
def delete_all_unreachable do
get_unreachable()
|> Enum.each(fn {domain, _} ->
Instance.delete(domain)
end)
end
end

View file

@ -50,7 +50,7 @@ defmodule Pleroma.Instances.Instance do
|> cast(params, [:software_name, :software_version, :software_repository])
end
def filter_reachable([]), do: %{}
def filter_reachable([]), do: []
def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do
hosts =
@ -67,19 +67,15 @@ defmodule Pleroma.Instances.Instance do
)
|> Map.new(& &1)
reachability_datetime_threshold = Instances.reachability_datetime_threshold()
for entry <- Enum.filter(urls_or_hosts, &is_binary/1) do
host = host(entry)
unreachable_since = unreachable_since_by_host[host]
if !unreachable_since ||
NaiveDateTime.compare(unreachable_since, reachability_datetime_threshold) == :gt do
{entry, unreachable_since}
if is_nil(unreachable_since) do
entry
end
end
|> Enum.filter(& &1)
|> Map.new(& &1)
end
def reachable?(url_or_host) when is_binary(url_or_host) do
@ -87,7 +83,7 @@ defmodule Pleroma.Instances.Instance do
from(i in Instance,
where:
i.host == ^host(url_or_host) and
i.unreachable_since <= ^Instances.reachability_datetime_threshold(),
not is_nil(i.unreachable_since),
select: true
)
)
@ -96,9 +92,16 @@ defmodule Pleroma.Instances.Instance do
def reachable?(url_or_host) when is_binary(url_or_host), do: true
def set_reachable(url_or_host) when is_binary(url_or_host) do
%Instance{host: host(url_or_host)}
|> changeset(%{unreachable_since: nil})
|> Repo.insert(on_conflict: {:replace, [:unreachable_since]}, conflict_target: :host)
host = host(url_or_host)
result =
%Instance{host: host}
|> changeset(%{unreachable_since: nil})
|> Repo.insert(on_conflict: {:replace, [:unreachable_since]}, conflict_target: :host)
Pleroma.Workers.ReachabilityWorker.delete_jobs_for_host(host)
result
end
def set_reachable(_), do: {:error, nil}
@ -131,11 +134,9 @@ defmodule Pleroma.Instances.Instance do
def set_unreachable(_, _), do: {:error, nil}
def get_consistently_unreachable do
reachability_datetime_threshold = Instances.reachability_datetime_threshold()
def get_unreachable do
from(i in Instance,
where: ^reachability_datetime_threshold > i.unreachable_since,
where: not is_nil(i.unreachable_since),
order_by: i.unreachable_since,
select: {i.host, i.unreachable_since}
)
@ -295,8 +296,14 @@ defmodule Pleroma.Instances.Instance do
Deletes all users from an instance in a background task, thus also deleting
all of those users' activities and notifications.
"""
def delete_users_and_activities(host) when is_binary(host) do
def delete(host) when is_binary(host) do
DeleteWorker.new(%{"op" => "delete_instance", "host" => host})
|> Oban.insert()
end
@doc "Schedules reachability check for instance"
def check_unreachable(domain) when is_binary(domain) do
Pleroma.Workers.ReachabilityWorker.new(%{"domain" => domain})
|> Oban.insert()
end
end

View file

@ -4,7 +4,6 @@
defmodule Pleroma.Object.Fetcher do
alias Pleroma.HTTP
alias Pleroma.Instances
alias Pleroma.Maps
alias Pleroma.Object
alias Pleroma.Object.Containment
@ -150,10 +149,6 @@ defmodule Pleroma.Object.Fetcher do
{:ok, body} <- get_object(id),
{:ok, data} <- safe_json_decode(body),
:ok <- Containment.contain_origin_from_id(id, data) do
if not Instances.reachable?(id) do
Instances.set_reachable(id)
end
{:ok, data}
else
{:scheme, _} ->

View file

@ -53,7 +53,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
)
plug(:log_inbox_metadata when action in [:inbox])
plug(:set_requester_reachable when action in [:inbox])
plug(:relay_active? when action in [:relay])
defp relay_active?(conn, _) do
@ -520,15 +519,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
|> json(dgettext("errors", "error"))
end
defp set_requester_reachable(%Plug.Conn{} = conn, _) do
with actor <- conn.params["actor"],
true <- is_binary(actor) do
Pleroma.Instances.set_reachable(actor)
end
conn
end
defp log_inbox_metadata(%{params: %{"actor" => actor, "type" => type}} = conn, _) do
Logger.metadata(actor: actor, type: type)
conn

View file

@ -161,17 +161,9 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
{"digest", p.digest}
]
) do
if not is_nil(p.unreachable_since) do
Instances.set_reachable(p.inbox)
end
result
else
{_post_result, %{status: code} = response} = e ->
if is_nil(p.unreachable_since) do
Instances.set_unreachable(p.inbox)
end
Logger.metadata(activity: p.activity_id, inbox: p.inbox, status: code)
Logger.error("Publisher failed to inbox #{p.inbox} with status #{code}")
@ -192,10 +184,6 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
connection_pool_snooze()
e ->
if is_nil(p.unreachable_since) do
Instances.set_unreachable(p.inbox)
end
Logger.metadata(activity: p.activity_id, inbox: p.inbox)
Logger.error("Publisher failed to inbox #{p.inbox} #{inspect(e)}")
{:error, e}
@ -307,7 +295,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
[priority_recipients, recipients] = recipients(actor, activity)
inboxes =
[priority_inboxes, other_inboxes] =
[priority_recipients, recipients]
|> Enum.map(fn recipients ->
recipients
@ -320,8 +308,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
end)
Repo.checkout(fn ->
Enum.each(inboxes, fn inboxes ->
Enum.each(inboxes, fn {inbox, unreachable_since} ->
Enum.each([priority_inboxes, other_inboxes], fn inboxes ->
Enum.each(inboxes, fn inbox ->
%User{ap_id: ap_id} = Enum.find(recipients, fn actor -> actor.inbox == inbox end)
# Get all the recipients on the same host and add them to cc. Otherwise, a remote
@ -331,8 +319,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
__MODULE__.enqueue_one(%{
inbox: inbox,
cc: cc,
activity_id: activity.id,
unreachable_since: unreachable_since
activity_id: activity.id
})
end)
end)
@ -365,12 +352,11 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|> Enum.each(fn {inboxes, priority} ->
inboxes
|> Instances.filter_reachable()
|> Enum.each(fn {inbox, unreachable_since} ->
|> Enum.each(fn inbox ->
__MODULE__.enqueue_one(
%{
inbox: inbox,
activity_id: activity.id,
unreachable_since: unreachable_since
activity_id: activity.id
},
priority: priority
)

View file

@ -49,7 +49,7 @@ defmodule Pleroma.Web.AdminAPI.InstanceController do
end
def delete(conn, %{"instance" => instance}) do
with {:ok, _job} <- Instance.delete_users_and_activities(instance) do
with {:ok, _job} <- Instance.delete(instance) do
json(conn, instance)
end
end

View file

@ -13,7 +13,7 @@ defmodule Pleroma.Web.PleromaAPI.InstancesController do
def show(conn, _params) do
unreachable =
Instances.get_consistently_unreachable()
Instances.get_unreachable()
|> Map.new(fn {host, date} -> {host, to_string(date)} end)
json(conn, %{"unreachable" => unreachable})

View file

@ -14,6 +14,7 @@ defmodule Pleroma.Workers.DeleteWorker do
end
def perform(%Job{args: %{"op" => "delete_instance", "host" => host}}) do
# Schedule the per-user deletion jobs
Pleroma.Repo.transaction(fn ->
User.Query.build(%{nickname: "@#{host}"})
|> Pleroma.Repo.all()
@ -22,6 +23,17 @@ defmodule Pleroma.Workers.DeleteWorker do
|> __MODULE__.new()
|> Oban.insert()
end)
# Delete the instance from the Instances table
case Pleroma.Repo.get_by(Pleroma.Instances.Instance, host: host) do
nil -> :ok
instance -> Pleroma.Repo.delete(instance)
end
# Delete any pending ReachabilityWorker jobs for this domain
Pleroma.Workers.ReachabilityWorker.delete_jobs_for_host(host)
:ok
end)
end

View file

@ -4,9 +4,10 @@
defmodule Pleroma.Workers.PublisherWorker do
alias Pleroma.Activity
alias Pleroma.Instances
alias Pleroma.Web.Federator
use Oban.Worker, queue: :federator_outgoing, max_attempts: 5
use Oban.Worker, queue: :federator_outgoing, max_attempts: 13
@impl true
def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id}}) do
@ -14,9 +15,30 @@ defmodule Pleroma.Workers.PublisherWorker do
Federator.perform(:publish, activity)
end
def perform(%Job{args: %{"op" => "publish_one", "params" => params}}) do
def perform(%Job{args: %{"op" => "publish_one", "params" => params}} = job) do
params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)
Federator.perform(:publish_one, params)
# Cancel / skip the job if this server believed to be unreachable now
if not Instances.reachable?(params.inbox) do
{:cancel, :unreachable}
else
case Federator.perform(:publish_one, params) do
{:ok, _} ->
:ok
{:error, _} = error ->
# Only mark as unreachable on final failure
if job.attempt == job.max_attempts do
Instances.set_unreachable(params.inbox)
end
error
error ->
# Unexpected error, may have been client side
error
end
end
end
@impl true

View file

@ -0,0 +1,116 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ReachabilityWorker do
use Oban.Worker,
queue: :background,
max_attempts: 1,
unique: [period: :infinity, states: [:available, :scheduled], keys: [:domain]]
alias Pleroma.HTTP
alias Pleroma.Instances
import Ecto.Query
@impl true
def perform(%Oban.Job{args: %{"domain" => domain, "phase" => phase, "attempt" => attempt}}) do
case check_reachability(domain) do
:ok ->
Instances.set_reachable("https://#{domain}")
:ok
{:error, _} = error ->
handle_failed_attempt(domain, phase, attempt)
error
end
end
# New jobs enter here and are immediately re-scheduled for the first phase
@impl true
def perform(%Oban.Job{args: %{"domain" => domain}}) do
scheduled_at = DateTime.add(DateTime.utc_now(), 60, :second)
%{
"domain" => domain,
"phase" => "phase_1min",
"attempt" => 1
}
|> new(scheduled_at: scheduled_at, replace: true)
|> Oban.insert()
:ok
end
@impl true
def timeout(_job), do: :timer.seconds(5)
@doc "Deletes scheduled jobs to check reachability for specified instance"
def delete_jobs_for_host(host) do
Oban.Job
|> where(worker: "Pleroma.Workers.ReachabilityWorker")
|> where([j], j.args["domain"] == ^host)
|> Oban.delete_all_jobs()
end
defp check_reachability(domain) do
case HTTP.get("https://#{domain}/") do
{:ok, %{status: status}} when status in 200..299 ->
:ok
{:ok, %{status: _status}} ->
{:error, :unreachable}
{:error, _} = error ->
error
end
end
defp handle_failed_attempt(_domain, "final", _attempt), do: :ok
defp handle_failed_attempt(domain, phase, attempt) do
{interval_minutes, max_attempts, next_phase} = get_phase_config(phase)
if attempt >= max_attempts do
# Move to next phase
schedule_next_phase(domain, next_phase)
else
# Retry same phase with incremented attempt
schedule_retry(domain, phase, attempt + 1, interval_minutes)
end
end
defp get_phase_config("phase_1min"), do: {1, 4, "phase_15min"}
defp get_phase_config("phase_15min"), do: {15, 4, "phase_1hour"}
defp get_phase_config("phase_1hour"), do: {60, 4, "phase_8hour"}
defp get_phase_config("phase_8hour"), do: {480, 4, "phase_24hour"}
defp get_phase_config("phase_24hour"), do: {1440, 4, "final"}
defp get_phase_config("final"), do: {nil, 0, nil}
defp schedule_next_phase(_domain, "final"), do: :ok
defp schedule_next_phase(domain, next_phase) do
{interval_minutes, _max_attempts, _next_phase} = get_phase_config(next_phase)
scheduled_at = DateTime.add(DateTime.utc_now(), interval_minutes * 60, :second)
%{
"domain" => domain,
"phase" => next_phase,
"attempt" => 1
}
|> new(scheduled_at: scheduled_at, replace: true)
|> Oban.insert()
end
def schedule_retry(domain, phase, attempt, interval_minutes) do
scheduled_at = DateTime.add(DateTime.utc_now(), interval_minutes * 60, :second)
%{
"domain" => domain,
"phase" => phase,
"attempt" => attempt
}
|> new(scheduled_at: scheduled_at, replace: true)
|> Oban.insert()
end
end

View file

@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ReceiverWorker do
alias Pleroma.Instances
alias Pleroma.Signature
alias Pleroma.User
alias Pleroma.Web.Federator
@ -37,6 +38,11 @@ defmodule Pleroma.Workers.ReceiverWorker do
{:ok, _public_key} <- Signature.refetch_public_key(conn_data),
{:signature, true} <- {:signature, Signature.validate_signature(conn_data)},
{:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
unless Instances.reachable?(params["actor"]) do
domain = URI.parse(params["actor"]).host
Oban.insert(Pleroma.Workers.ReachabilityWorker.new(%{"domain" => domain}))
end
{:ok, res}
else
e -> process_errors(e)
@ -45,6 +51,11 @@ defmodule Pleroma.Workers.ReceiverWorker do
def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
unless Instances.reachable?(params["actor"]) do
domain = URI.parse(params["actor"]).host
Oban.insert(Pleroma.Workers.ReachabilityWorker.new(%{"domain" => domain}))
end
{:ok, res}
else
e -> process_errors(e)

View file

@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.RemoteFetcherWorker do
alias Pleroma.Instances
alias Pleroma.Object.Fetcher
use Oban.Worker, queue: :background, unique: [period: :infinity]
@ -11,6 +12,11 @@ defmodule Pleroma.Workers.RemoteFetcherWorker do
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do
{:ok, _object} ->
unless Instances.reachable?(id) do
# Mark the server as reachable since we successfully fetched an object
Instances.set_reachable(id)
end
:ok
{:allowed_depth, false} ->