Reachability refactor

The result of Oban jobs determine the reachability status.

Publisher jobs will cancel themselves at execution time if the target server is now unreachable.

Receiving activities does not immediately mark a server as reachable, but creates a ReachabilityWorker job to validate.

A Cron will execute daily to test all unreachable servers.
This commit is contained in:
Mark Felder 2025-06-05 16:38:40 -07:00
commit 3d422ef325
24 changed files with 341 additions and 312 deletions

View file

@ -0,0 +1,33 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Cron.ScheduleReachabilityWorker do
use Oban.Worker,
queue: :background,
max_attempts: 2
alias Pleroma.Instances
alias Pleroma.Repo
@impl true
def perform(_job) do
unreachable_servers = Instances.get_unreachable()
jobs =
unreachable_servers
|> Enum.map(fn {domain, _} ->
Pleroma.Workers.ReachabilityWorker.new(%{"domain" => domain})
end)
case Repo.transaction(fn ->
Enum.each(jobs, &Oban.insert/1)
end) do
{:ok, _} ->
:ok
{:error, reason} ->
{:error, reason}
end
end
end

View file

@ -4,6 +4,7 @@
defmodule Pleroma.Workers.PublisherWorker do
alias Pleroma.Activity
alias Pleroma.Instances
alias Pleroma.Web.Federator
use Oban.Worker, queue: :federator_outgoing, max_attempts: 5
@ -14,9 +15,30 @@ defmodule Pleroma.Workers.PublisherWorker do
Federator.perform(:publish, activity)
end
def perform(%Job{args: %{"op" => "publish_one", "params" => params}}) do
def perform(%Job{args: %{"op" => "publish_one", "params" => params}} = job) do
params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)
Federator.perform(:publish_one, params)
# Cancel / skip the job if this server believed to be unreachable now
if not Instances.reachable?(params.inbox) do
{:cancel, :unreachable}
else
case Federator.perform(:publish_one, params) do
{:ok, _} ->
:ok
{:error, _} = error ->
# Only mark as unreachable on final failure
if job.attempt == job.max_attempts do
Instances.set_unreachable(params.inbox)
end
error
error ->
# Unexpected error, may have been client side
error
end
end
end
@impl true

View file

@ -0,0 +1,31 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ReachabilityWorker do
use Oban.Worker,
queue: :background,
max_attempts: 3,
unique: [period: :infinity, states: [:available, :scheduled]]
alias Pleroma.HTTP
alias Pleroma.Instances
@impl true
def perform(%Oban.Job{args: %{"domain" => domain}}) do
case HTTP.get("https://#{domain}/.well-known/nodeinfo") do
{:ok, %{status: status}} when status in 200..299 ->
Instances.set_reachable("https://#{domain}")
:ok
{:ok, %{status: _status}} ->
{:error, :unreachable}
{:error, _} = error ->
error
end
end
@impl true
def timeout(_job), do: :timer.seconds(5)
end

View file

@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ReceiverWorker do
alias Pleroma.Instances
alias Pleroma.Signature
alias Pleroma.User
alias Pleroma.Web.Federator
@ -37,6 +38,11 @@ defmodule Pleroma.Workers.ReceiverWorker do
{:ok, _public_key} <- Signature.refetch_public_key(conn_data),
{:signature, true} <- {:signature, Signature.validate_signature(conn_data)},
{:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
unless Instances.reachable?(params["actor"]) do
domain = URI.parse(params["actor"]).host
Oban.insert(Pleroma.Workers.ReachabilityWorker.new(%{"domain" => domain}))
end
{:ok, res}
else
e -> process_errors(e)
@ -45,6 +51,11 @@ defmodule Pleroma.Workers.ReceiverWorker do
def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
unless Instances.reachable?(params["actor"]) do
domain = URI.parse(params["actor"]).host
Oban.insert(Pleroma.Workers.ReachabilityWorker.new(%{"domain" => domain}))
end
{:ok, res}
else
e -> process_errors(e)

View file

@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.RemoteFetcherWorker do
alias Pleroma.Instances
alias Pleroma.Object.Fetcher
use Oban.Worker, queue: :background, unique: [period: :infinity]
@ -11,6 +12,15 @@ defmodule Pleroma.Workers.RemoteFetcherWorker do
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do
{:ok, _object} ->
# Mark the server as reachable since we successfully fetched an object
case URI.parse(id) do
%URI{host: host} when not is_nil(host) ->
Instances.set_reachable("https://#{host}")
_ ->
:ok
end
:ok
{:allowed_depth, false} ->