From 77dca7c3e59053505abb4fa757b2d97e227fa4f4 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Fri, 27 Jun 2025 16:35:10 -0700 Subject: [PATCH] Refactor ReachabilityWorker to use a 5-phase reachability testing approach It will check reachability for an instance deemed unreachable at the following intervals: 4 attempts, once a minute 4 attempts, once every 15 minutes 4 attempts, once every 60 minutes 4 attempts, once every 8 hours 4 attempts, once every 24 hours This should be effective and respectful of the resources of instances on the fediverse. We have the Oban Pruner plugin enabled to keep the Oban Jobs table from growing indefinitely. It prunes every 15 minutes, but this will interfere with our ability to enforce uniqueness on the ReachabilityWorker jobs for a time period longer than 15 minutes. The solution is to exclude the ReachabilityWorker from the pruning operation and instead schedule a custom job that will prune the table for us once a day. The ReachabilityPruner cron task will clean up the history of the ReachabilityWorker jobs older than 6 days. --- config/config.exs | 5 +- .../workers/cron/reachability_pruner.ex | 26 +++ lib/pleroma/workers/reachability_worker.ex | 71 +++++- .../schedule_reachability_worker_test.exs | 52 ----- .../workers/reachability_worker_test.exs | 202 ++++++++++++++++++ 5 files changed, 296 insertions(+), 60 deletions(-) create mode 100644 lib/pleroma/workers/cron/reachability_pruner.ex delete mode 100644 test/pleroma/workers/cron/schedule_reachability_worker_test.exs create mode 100644 test/pleroma/workers/reachability_worker_test.exs diff --git a/config/config.exs b/config/config.exs index 372852a7b..f58dfb1af 100644 --- a/config/config.exs +++ b/config/config.exs @@ -599,11 +599,12 @@ config :pleroma, Oban, search_indexing: [limit: 10, paused: true], slow: 5 ], - plugins: [{Oban.Plugins.Pruner, max_age: 900}], + plugins: [{Oban.Plugins.Pruner, max_age: 900, exclude: [Pleroma.Workers.ReachabilityWorker]}], crontab: [ {"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker}, {"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}, - {"*/10 * * * *", Pleroma.Workers.Cron.AppCleanupWorker} + {"*/10 * * * *", Pleroma.Workers.Cron.AppCleanupWorker}, + {"0 2 * * *", Pleroma.Workers.Cron.ReachabilityPruner} ] config :pleroma, Pleroma.Formatter, diff --git a/lib/pleroma/workers/cron/reachability_pruner.ex b/lib/pleroma/workers/cron/reachability_pruner.ex new file mode 100644 index 000000000..6eb671e0e --- /dev/null +++ b/lib/pleroma/workers/cron/reachability_pruner.ex @@ -0,0 +1,26 @@ +defmodule Pleroma.Workers.Cron.ReachabilityPruner do + use Oban.Worker, queue: :background, max_attempts: 1 + + import Ecto.Query + require Logger + + @reachability_worker "Elixir.Pleroma.Workers.ReachabilityWorker" + @prune_days 6 + + @impl true + def perform(_job) do + cutoff = DateTime.utc_now() |> DateTime.add(-@prune_days * 24 * 60 * 60, :second) + + {count, _} = + from(j in Oban.Job, + where: j.worker == @reachability_worker and j.inserted_at < ^cutoff + ) + |> Pleroma.Repo.delete_all() + + if count > 0 do + Logger.debug(fn -> "Pruned #{count} old ReachabilityWorker jobs." end) + end + + :ok + end +end diff --git a/lib/pleroma/workers/reachability_worker.ex b/lib/pleroma/workers/reachability_worker.ex index d9f764322..ba6928dee 100644 --- a/lib/pleroma/workers/reachability_worker.ex +++ b/lib/pleroma/workers/reachability_worker.ex @@ -5,17 +5,31 @@ defmodule Pleroma.Workers.ReachabilityWorker do use Oban.Worker, queue: :background, - max_attempts: 3, - unique: [period: :infinity, states: [:available, :scheduled]] + max_attempts: 1, + unique: [period: :infinity, states: [:available, :scheduled], keys: [:domain]] alias Pleroma.HTTP alias Pleroma.Instances @impl true - def perform(%Oban.Job{args: %{"domain" => domain}}) do + def perform(%Oban.Job{args: %{"domain" => domain, "phase" => phase, "attempt" => attempt}}) do + case check_reachability(domain) do + :ok -> + Instances.set_reachable("https://#{domain}") + :ok + + {:error, _} = error -> + handle_failed_attempt(domain, phase, attempt) + error + end + end + + @impl true + def timeout(_job), do: :timer.seconds(5) + + defp check_reachability(domain) do case HTTP.get("https://#{domain}/") do {:ok, %{status: status}} when status in 200..299 -> - Instances.set_reachable("https://#{domain}") :ok {:ok, %{status: _status}} -> @@ -26,6 +40,51 @@ defmodule Pleroma.Workers.ReachabilityWorker do end end - @impl true - def timeout(_job), do: :timer.seconds(5) + defp handle_failed_attempt(_domain, "final", _attempt), do: :ok + + defp handle_failed_attempt(domain, phase, attempt) do + {interval_minutes, max_attempts, next_phase} = get_phase_config(phase) + + if attempt >= max_attempts do + # Move to next phase + schedule_next_phase(domain, next_phase) + else + # Retry same phase with incremented attempt + schedule_retry(domain, phase, attempt + 1, interval_minutes) + end + end + + defp get_phase_config("phase_1min"), do: {1, 4, "phase_15min"} + defp get_phase_config("phase_15min"), do: {15, 4, "phase_1hour"} + defp get_phase_config("phase_1hour"), do: {60, 4, "phase_8hour"} + defp get_phase_config("phase_8hour"), do: {480, 4, "phase_24hour"} + defp get_phase_config("phase_24hour"), do: {1440, 4, "final"} + defp get_phase_config("final"), do: {nil, 0, nil} + + defp schedule_next_phase(_domain, "final"), do: :ok + + defp schedule_next_phase(domain, next_phase) do + {interval_minutes, _max_attempts, _next_phase} = get_phase_config(next_phase) + scheduled_at = DateTime.add(DateTime.utc_now(), interval_minutes * 60, :second) + + %{ + "domain" => domain, + "phase" => next_phase, + "attempt" => 1 + } + |> new(scheduled_at: scheduled_at, replace: true) + |> Oban.insert() + end + + def schedule_retry(domain, phase, attempt, interval_minutes) do + scheduled_at = DateTime.add(DateTime.utc_now(), interval_minutes * 60, :second) + + %{ + "domain" => domain, + "phase" => phase, + "attempt" => attempt + } + |> new(scheduled_at: scheduled_at, replace: true) + |> Oban.insert() + end end diff --git a/test/pleroma/workers/cron/schedule_reachability_worker_test.exs b/test/pleroma/workers/cron/schedule_reachability_worker_test.exs deleted file mode 100644 index 310c2e61a..000000000 --- a/test/pleroma/workers/cron/schedule_reachability_worker_test.exs +++ /dev/null @@ -1,52 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2022 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.Cron.ScheduleReachabilityWorkerTest do - use Pleroma.DataCase, async: true - use Oban.Testing, repo: Pleroma.Repo - - alias Pleroma.Instances - alias Pleroma.Workers.Cron.ScheduleReachabilityWorker - - describe "perform/1" do - test "schedules reachability checks for unreachable servers" do - # Mark some servers as unreachable - Instances.set_unreachable("https://example.com") - Instances.set_unreachable("https://test.com") - Instances.set_unreachable("https://another.com") - - # Verify they are marked as unreachable - refute Instances.reachable?("https://example.com") - refute Instances.reachable?("https://test.com") - refute Instances.reachable?("https://another.com") - - # Run the worker - assert :ok = ScheduleReachabilityWorker.perform(%Oban.Job{}) - - # Verify ReachabilityWorker jobs were scheduled for each server - # Note: domains in get_unreachable/0 are without the https:// prefix - assert_enqueued( - worker: Pleroma.Workers.ReachabilityWorker, - args: %{"domain" => "example.com"} - ) - - assert_enqueued( - worker: Pleroma.Workers.ReachabilityWorker, - args: %{"domain" => "test.com"} - ) - - assert_enqueued( - worker: Pleroma.Workers.ReachabilityWorker, - args: %{"domain" => "another.com"} - ) - end - - test "handles empty list of unreachable servers" do - # Ensure no servers are marked as unreachable - assert [] = Instances.get_unreachable() - assert :ok = ScheduleReachabilityWorker.perform(%Oban.Job{}) - refute_enqueued(worker: Pleroma.Workers.ReachabilityWorker) - end - end -end diff --git a/test/pleroma/workers/reachability_worker_test.exs b/test/pleroma/workers/reachability_worker_test.exs new file mode 100644 index 000000000..32c39e869 --- /dev/null +++ b/test/pleroma/workers/reachability_worker_test.exs @@ -0,0 +1,202 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ReachabilityWorkerTest do + use Pleroma.DataCase, async: true + use Oban.Testing, repo: Pleroma.Repo + + import Mock + + alias Pleroma.Tests.ObanHelpers + alias Pleroma.Workers.ReachabilityWorker + + setup do + ObanHelpers.wipe_all() + :ok + end + + describe "progressive backoff phases" do + test "starts with phase_1min and progresses through phases on failure" do + domain = "example.com" + + with_mocks([ + {Pleroma.HTTP, [], [get: fn _ -> {:error, :timeout} end]}, + {Pleroma.Instances, [], [set_reachable: fn _ -> :ok end]} + ]) do + # Start with phase_1min + job = %Oban.Job{ + args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 1} + } + + # First attempt fails + assert {:error, :timeout} = ReachabilityWorker.perform(job) + + # Should schedule retry for phase_1min (attempt 2) + retry_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(retry_jobs) == 1 + [retry_job] = retry_jobs + assert retry_job.args["phase"] == "phase_1min" + assert retry_job.args["attempt"] == 2 + + # Clear jobs and simulate second attempt failure + ObanHelpers.wipe_all() + + retry_job = %Oban.Job{ + args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 2} + } + + assert {:error, :timeout} = ReachabilityWorker.perform(retry_job) + + # Should schedule retry for phase_1min (attempt 3) + retry_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(retry_jobs) == 1 + [retry_job] = retry_jobs + assert retry_job.args["phase"] == "phase_1min" + assert retry_job.args["attempt"] == 3 + + # Clear jobs and simulate third attempt failure (final attempt for phase_1min) + ObanHelpers.wipe_all() + + retry_job = %Oban.Job{ + args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 3} + } + + assert {:error, :timeout} = ReachabilityWorker.perform(retry_job) + + # Should schedule retry for phase_1min (attempt 4) + retry_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(retry_jobs) == 1 + [retry_job] = retry_jobs + assert retry_job.args["phase"] == "phase_1min" + assert retry_job.args["attempt"] == 4 + + # Clear jobs and simulate fourth attempt failure (final attempt for phase_1min) + ObanHelpers.wipe_all() + + retry_job = %Oban.Job{ + args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 4} + } + + assert {:error, :timeout} = ReachabilityWorker.perform(retry_job) + + # Should schedule next phase (phase_15min) + next_phase_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(next_phase_jobs) == 1 + [next_phase_job] = next_phase_jobs + assert next_phase_job.args["phase"] == "phase_15min" + assert next_phase_job.args["attempt"] == 1 + end + end + + test "progresses through all phases correctly" do + domain = "example.com" + + with_mocks([ + {Pleroma.HTTP, [], [get: fn _ -> {:error, :timeout} end]}, + {Pleroma.Instances, [], [set_reachable: fn _ -> :ok end]} + ]) do + # Simulate all phases failing + phases = ["phase_1min", "phase_15min", "phase_1hour", "phase_8hour", "phase_24hour"] + + Enum.each(phases, fn phase -> + {_interval, max_attempts, next_phase} = get_phase_config(phase) + + # Simulate all attempts failing for this phase + Enum.each(1..max_attempts, fn attempt -> + job = %Oban.Job{args: %{"domain" => domain, "phase" => phase, "attempt" => attempt}} + assert {:error, :timeout} = ReachabilityWorker.perform(job) + + if attempt < max_attempts do + # Should schedule retry for same phase + retry_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(retry_jobs) == 1 + [retry_job] = retry_jobs + assert retry_job.args["phase"] == phase + assert retry_job.args["attempt"] == attempt + 1 + ObanHelpers.wipe_all() + else + # Should schedule next phase (except for final phase) + if next_phase != "final" do + next_phase_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(next_phase_jobs) == 1 + [next_phase_job] = next_phase_jobs + assert next_phase_job.args["phase"] == next_phase + assert next_phase_job.args["attempt"] == 1 + ObanHelpers.wipe_all() + else + # Final phase - no more jobs should be scheduled + next_phase_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(next_phase_jobs) == 0 + end + end + end) + end) + end + end + + test "succeeds and stops progression when instance becomes reachable" do + domain = "example.com" + + with_mocks([ + {Pleroma.HTTP, [], [get: fn _ -> {:ok, %{status: 200}} end]}, + {Pleroma.Instances, [], [set_reachable: fn _ -> :ok end]} + ]) do + job = %Oban.Job{args: %{"domain" => domain, "phase" => "phase_1hour", "attempt" => 2}} + + # Should succeed and not schedule any more jobs + assert :ok = ReachabilityWorker.perform(job) + + # Verify set_reachable was called + assert_called(Pleroma.Instances.set_reachable("https://#{domain}")) + + # No more jobs should be scheduled + next_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(next_jobs) == 0 + end + end + + test "enforces uniqueness per domain using Oban's conflict detection" do + domain = "example.com" + + # Insert first job for the domain + job1 = + %{ + "domain" => domain, + "phase" => "phase_1min", + "attempt" => 1 + } + |> ReachabilityWorker.new() + |> Oban.insert() + + assert {:ok, _} = job1 + + # Try to insert a second job for the same domain with different phase/attempt + job2 = + %{ + "domain" => domain, + "phase" => "phase_15min", + "attempt" => 1 + } + |> ReachabilityWorker.new() + |> Oban.insert() + + # Should fail due to uniqueness constraint (conflict) + assert {:ok, %Oban.Job{conflict?: true}} = job2 + + # Verify only one job exists for this domain + jobs = all_enqueued(worker: ReachabilityWorker) + assert length(jobs) == 1 + [existing_job] = jobs + assert existing_job.args["domain"] == domain + assert existing_job.args["phase"] == "phase_1min" + end + end + + defp get_phase_config("phase_1min"), do: {1, 4, "phase_15min"} + defp get_phase_config("phase_15min"), do: {15, 4, "phase_1hour"} + defp get_phase_config("phase_1hour"), do: {60, 4, "phase_8hour"} + defp get_phase_config("phase_8hour"), do: {480, 4, "phase_24hour"} + defp get_phase_config("phase_24hour"), do: {1440, 4, "final"} + defp get_phase_config("final"), do: {nil, 0, nil} +end