Refactor ReachabilityWorker to use a 5-phase reachability testing approach

It will check reachability for an instance deemed unreachable at the following intervals:

4 attempts, once a minute
4 attempts, once every 15 minutes
4 attempts, once every 60 minutes
4 attempts, once every 8 hours
4 attempts, once every 24 hours

This should be effective and respectful of the resources of instances on the fediverse.

We have the Oban Pruner plugin enabled to keep the Oban Jobs table from growing indefinitely. It prunes every 15 minutes, but this will interfere with our ability to enforce uniqueness on the ReachabilityWorker jobs for a time period longer than 15 minutes. The solution is to exclude the ReachabilityWorker from the pruning operation and instead schedule a custom job that will prune the table for us once a day. The ReachabilityPruner cron task will clean up the history of the ReachabilityWorker jobs older than 6 days.
This commit is contained in:
Mark Felder 2025-06-27 16:35:10 -07:00
commit 77dca7c3e5
5 changed files with 296 additions and 60 deletions

View file

@ -599,11 +599,12 @@ config :pleroma, Oban,
search_indexing: [limit: 10, paused: true],
slow: 5
],
plugins: [{Oban.Plugins.Pruner, max_age: 900}],
plugins: [{Oban.Plugins.Pruner, max_age: 900, exclude: [Pleroma.Workers.ReachabilityWorker]}],
crontab: [
{"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},
{"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker},
{"*/10 * * * *", Pleroma.Workers.Cron.AppCleanupWorker}
{"*/10 * * * *", Pleroma.Workers.Cron.AppCleanupWorker},
{"0 2 * * *", Pleroma.Workers.Cron.ReachabilityPruner}
]
config :pleroma, Pleroma.Formatter,

View file

@ -0,0 +1,26 @@
defmodule Pleroma.Workers.Cron.ReachabilityPruner do
use Oban.Worker, queue: :background, max_attempts: 1
import Ecto.Query
require Logger
@reachability_worker "Elixir.Pleroma.Workers.ReachabilityWorker"
@prune_days 6
@impl true
def perform(_job) do
cutoff = DateTime.utc_now() |> DateTime.add(-@prune_days * 24 * 60 * 60, :second)
{count, _} =
from(j in Oban.Job,
where: j.worker == @reachability_worker and j.inserted_at < ^cutoff
)
|> Pleroma.Repo.delete_all()
if count > 0 do
Logger.debug(fn -> "Pruned #{count} old ReachabilityWorker jobs." end)
end
:ok
end
end

View file

@ -5,17 +5,31 @@
defmodule Pleroma.Workers.ReachabilityWorker do
use Oban.Worker,
queue: :background,
max_attempts: 3,
unique: [period: :infinity, states: [:available, :scheduled]]
max_attempts: 1,
unique: [period: :infinity, states: [:available, :scheduled], keys: [:domain]]
alias Pleroma.HTTP
alias Pleroma.Instances
@impl true
def perform(%Oban.Job{args: %{"domain" => domain}}) do
def perform(%Oban.Job{args: %{"domain" => domain, "phase" => phase, "attempt" => attempt}}) do
case check_reachability(domain) do
:ok ->
Instances.set_reachable("https://#{domain}")
:ok
{:error, _} = error ->
handle_failed_attempt(domain, phase, attempt)
error
end
end
@impl true
def timeout(_job), do: :timer.seconds(5)
defp check_reachability(domain) do
case HTTP.get("https://#{domain}/") do
{:ok, %{status: status}} when status in 200..299 ->
Instances.set_reachable("https://#{domain}")
:ok
{:ok, %{status: _status}} ->
@ -26,6 +40,51 @@ defmodule Pleroma.Workers.ReachabilityWorker do
end
end
@impl true
def timeout(_job), do: :timer.seconds(5)
defp handle_failed_attempt(_domain, "final", _attempt), do: :ok
defp handle_failed_attempt(domain, phase, attempt) do
{interval_minutes, max_attempts, next_phase} = get_phase_config(phase)
if attempt >= max_attempts do
# Move to next phase
schedule_next_phase(domain, next_phase)
else
# Retry same phase with incremented attempt
schedule_retry(domain, phase, attempt + 1, interval_minutes)
end
end
defp get_phase_config("phase_1min"), do: {1, 4, "phase_15min"}
defp get_phase_config("phase_15min"), do: {15, 4, "phase_1hour"}
defp get_phase_config("phase_1hour"), do: {60, 4, "phase_8hour"}
defp get_phase_config("phase_8hour"), do: {480, 4, "phase_24hour"}
defp get_phase_config("phase_24hour"), do: {1440, 4, "final"}
defp get_phase_config("final"), do: {nil, 0, nil}
defp schedule_next_phase(_domain, "final"), do: :ok
defp schedule_next_phase(domain, next_phase) do
{interval_minutes, _max_attempts, _next_phase} = get_phase_config(next_phase)
scheduled_at = DateTime.add(DateTime.utc_now(), interval_minutes * 60, :second)
%{
"domain" => domain,
"phase" => next_phase,
"attempt" => 1
}
|> new(scheduled_at: scheduled_at, replace: true)
|> Oban.insert()
end
def schedule_retry(domain, phase, attempt, interval_minutes) do
scheduled_at = DateTime.add(DateTime.utc_now(), interval_minutes * 60, :second)
%{
"domain" => domain,
"phase" => phase,
"attempt" => attempt
}
|> new(scheduled_at: scheduled_at, replace: true)
|> Oban.insert()
end
end

View file

@ -1,52 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Cron.ScheduleReachabilityWorkerTest do
use Pleroma.DataCase, async: true
use Oban.Testing, repo: Pleroma.Repo
alias Pleroma.Instances
alias Pleroma.Workers.Cron.ScheduleReachabilityWorker
describe "perform/1" do
test "schedules reachability checks for unreachable servers" do
# Mark some servers as unreachable
Instances.set_unreachable("https://example.com")
Instances.set_unreachable("https://test.com")
Instances.set_unreachable("https://another.com")
# Verify they are marked as unreachable
refute Instances.reachable?("https://example.com")
refute Instances.reachable?("https://test.com")
refute Instances.reachable?("https://another.com")
# Run the worker
assert :ok = ScheduleReachabilityWorker.perform(%Oban.Job{})
# Verify ReachabilityWorker jobs were scheduled for each server
# Note: domains in get_unreachable/0 are without the https:// prefix
assert_enqueued(
worker: Pleroma.Workers.ReachabilityWorker,
args: %{"domain" => "example.com"}
)
assert_enqueued(
worker: Pleroma.Workers.ReachabilityWorker,
args: %{"domain" => "test.com"}
)
assert_enqueued(
worker: Pleroma.Workers.ReachabilityWorker,
args: %{"domain" => "another.com"}
)
end
test "handles empty list of unreachable servers" do
# Ensure no servers are marked as unreachable
assert [] = Instances.get_unreachable()
assert :ok = ScheduleReachabilityWorker.perform(%Oban.Job{})
refute_enqueued(worker: Pleroma.Workers.ReachabilityWorker)
end
end
end

View file

@ -0,0 +1,202 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ReachabilityWorkerTest do
use Pleroma.DataCase, async: true
use Oban.Testing, repo: Pleroma.Repo
import Mock
alias Pleroma.Tests.ObanHelpers
alias Pleroma.Workers.ReachabilityWorker
setup do
ObanHelpers.wipe_all()
:ok
end
describe "progressive backoff phases" do
test "starts with phase_1min and progresses through phases on failure" do
domain = "example.com"
with_mocks([
{Pleroma.HTTP, [], [get: fn _ -> {:error, :timeout} end]},
{Pleroma.Instances, [], [set_reachable: fn _ -> :ok end]}
]) do
# Start with phase_1min
job = %Oban.Job{
args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 1}
}
# First attempt fails
assert {:error, :timeout} = ReachabilityWorker.perform(job)
# Should schedule retry for phase_1min (attempt 2)
retry_jobs = all_enqueued(worker: ReachabilityWorker)
assert length(retry_jobs) == 1
[retry_job] = retry_jobs
assert retry_job.args["phase"] == "phase_1min"
assert retry_job.args["attempt"] == 2
# Clear jobs and simulate second attempt failure
ObanHelpers.wipe_all()
retry_job = %Oban.Job{
args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 2}
}
assert {:error, :timeout} = ReachabilityWorker.perform(retry_job)
# Should schedule retry for phase_1min (attempt 3)
retry_jobs = all_enqueued(worker: ReachabilityWorker)
assert length(retry_jobs) == 1
[retry_job] = retry_jobs
assert retry_job.args["phase"] == "phase_1min"
assert retry_job.args["attempt"] == 3
# Clear jobs and simulate third attempt failure (final attempt for phase_1min)
ObanHelpers.wipe_all()
retry_job = %Oban.Job{
args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 3}
}
assert {:error, :timeout} = ReachabilityWorker.perform(retry_job)
# Should schedule retry for phase_1min (attempt 4)
retry_jobs = all_enqueued(worker: ReachabilityWorker)
assert length(retry_jobs) == 1
[retry_job] = retry_jobs
assert retry_job.args["phase"] == "phase_1min"
assert retry_job.args["attempt"] == 4
# Clear jobs and simulate fourth attempt failure (final attempt for phase_1min)
ObanHelpers.wipe_all()
retry_job = %Oban.Job{
args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 4}
}
assert {:error, :timeout} = ReachabilityWorker.perform(retry_job)
# Should schedule next phase (phase_15min)
next_phase_jobs = all_enqueued(worker: ReachabilityWorker)
assert length(next_phase_jobs) == 1
[next_phase_job] = next_phase_jobs
assert next_phase_job.args["phase"] == "phase_15min"
assert next_phase_job.args["attempt"] == 1
end
end
test "progresses through all phases correctly" do
domain = "example.com"
with_mocks([
{Pleroma.HTTP, [], [get: fn _ -> {:error, :timeout} end]},
{Pleroma.Instances, [], [set_reachable: fn _ -> :ok end]}
]) do
# Simulate all phases failing
phases = ["phase_1min", "phase_15min", "phase_1hour", "phase_8hour", "phase_24hour"]
Enum.each(phases, fn phase ->
{_interval, max_attempts, next_phase} = get_phase_config(phase)
# Simulate all attempts failing for this phase
Enum.each(1..max_attempts, fn attempt ->
job = %Oban.Job{args: %{"domain" => domain, "phase" => phase, "attempt" => attempt}}
assert {:error, :timeout} = ReachabilityWorker.perform(job)
if attempt < max_attempts do
# Should schedule retry for same phase
retry_jobs = all_enqueued(worker: ReachabilityWorker)
assert length(retry_jobs) == 1
[retry_job] = retry_jobs
assert retry_job.args["phase"] == phase
assert retry_job.args["attempt"] == attempt + 1
ObanHelpers.wipe_all()
else
# Should schedule next phase (except for final phase)
if next_phase != "final" do
next_phase_jobs = all_enqueued(worker: ReachabilityWorker)
assert length(next_phase_jobs) == 1
[next_phase_job] = next_phase_jobs
assert next_phase_job.args["phase"] == next_phase
assert next_phase_job.args["attempt"] == 1
ObanHelpers.wipe_all()
else
# Final phase - no more jobs should be scheduled
next_phase_jobs = all_enqueued(worker: ReachabilityWorker)
assert length(next_phase_jobs) == 0
end
end
end)
end)
end
end
test "succeeds and stops progression when instance becomes reachable" do
domain = "example.com"
with_mocks([
{Pleroma.HTTP, [], [get: fn _ -> {:ok, %{status: 200}} end]},
{Pleroma.Instances, [], [set_reachable: fn _ -> :ok end]}
]) do
job = %Oban.Job{args: %{"domain" => domain, "phase" => "phase_1hour", "attempt" => 2}}
# Should succeed and not schedule any more jobs
assert :ok = ReachabilityWorker.perform(job)
# Verify set_reachable was called
assert_called(Pleroma.Instances.set_reachable("https://#{domain}"))
# No more jobs should be scheduled
next_jobs = all_enqueued(worker: ReachabilityWorker)
assert length(next_jobs) == 0
end
end
test "enforces uniqueness per domain using Oban's conflict detection" do
domain = "example.com"
# Insert first job for the domain
job1 =
%{
"domain" => domain,
"phase" => "phase_1min",
"attempt" => 1
}
|> ReachabilityWorker.new()
|> Oban.insert()
assert {:ok, _} = job1
# Try to insert a second job for the same domain with different phase/attempt
job2 =
%{
"domain" => domain,
"phase" => "phase_15min",
"attempt" => 1
}
|> ReachabilityWorker.new()
|> Oban.insert()
# Should fail due to uniqueness constraint (conflict)
assert {:ok, %Oban.Job{conflict?: true}} = job2
# Verify only one job exists for this domain
jobs = all_enqueued(worker: ReachabilityWorker)
assert length(jobs) == 1
[existing_job] = jobs
assert existing_job.args["domain"] == domain
assert existing_job.args["phase"] == "phase_1min"
end
end
defp get_phase_config("phase_1min"), do: {1, 4, "phase_15min"}
defp get_phase_config("phase_15min"), do: {15, 4, "phase_1hour"}
defp get_phase_config("phase_1hour"), do: {60, 4, "phase_8hour"}
defp get_phase_config("phase_8hour"), do: {480, 4, "phase_24hour"}
defp get_phase_config("phase_24hour"), do: {1440, 4, "final"}
defp get_phase_config("final"), do: {nil, 0, nil}
end