diff --git a/config/config.exs b/config/config.exs index 372852a7b..f58dfb1af 100644 --- a/config/config.exs +++ b/config/config.exs @@ -599,11 +599,12 @@ config :pleroma, Oban, search_indexing: [limit: 10, paused: true], slow: 5 ], - plugins: [{Oban.Plugins.Pruner, max_age: 900}], + plugins: [{Oban.Plugins.Pruner, max_age: 900, exclude: [Pleroma.Workers.ReachabilityWorker]}], crontab: [ {"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker}, {"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}, - {"*/10 * * * *", Pleroma.Workers.Cron.AppCleanupWorker} + {"*/10 * * * *", Pleroma.Workers.Cron.AppCleanupWorker}, + {"0 2 * * *", Pleroma.Workers.Cron.ReachabilityPruner} ] config :pleroma, Pleroma.Formatter, diff --git a/lib/pleroma/workers/cron/reachability_pruner.ex b/lib/pleroma/workers/cron/reachability_pruner.ex new file mode 100644 index 000000000..6eb671e0e --- /dev/null +++ b/lib/pleroma/workers/cron/reachability_pruner.ex @@ -0,0 +1,26 @@ +defmodule Pleroma.Workers.Cron.ReachabilityPruner do + use Oban.Worker, queue: :background, max_attempts: 1 + + import Ecto.Query + require Logger + + @reachability_worker "Elixir.Pleroma.Workers.ReachabilityWorker" + @prune_days 6 + + @impl true + def perform(_job) do + cutoff = DateTime.utc_now() |> DateTime.add(-@prune_days * 24 * 60 * 60, :second) + + {count, _} = + from(j in Oban.Job, + where: j.worker == @reachability_worker and j.inserted_at < ^cutoff + ) + |> Pleroma.Repo.delete_all() + + if count > 0 do + Logger.debug(fn -> "Pruned #{count} old ReachabilityWorker jobs." end) + end + + :ok + end +end diff --git a/lib/pleroma/workers/reachability_worker.ex b/lib/pleroma/workers/reachability_worker.ex index d9f764322..ba6928dee 100644 --- a/lib/pleroma/workers/reachability_worker.ex +++ b/lib/pleroma/workers/reachability_worker.ex @@ -5,17 +5,31 @@ defmodule Pleroma.Workers.ReachabilityWorker do use Oban.Worker, queue: :background, - max_attempts: 3, - unique: [period: :infinity, states: [:available, :scheduled]] + max_attempts: 1, + unique: [period: :infinity, states: [:available, :scheduled], keys: [:domain]] alias Pleroma.HTTP alias Pleroma.Instances @impl true - def perform(%Oban.Job{args: %{"domain" => domain}}) do + def perform(%Oban.Job{args: %{"domain" => domain, "phase" => phase, "attempt" => attempt}}) do + case check_reachability(domain) do + :ok -> + Instances.set_reachable("https://#{domain}") + :ok + + {:error, _} = error -> + handle_failed_attempt(domain, phase, attempt) + error + end + end + + @impl true + def timeout(_job), do: :timer.seconds(5) + + defp check_reachability(domain) do case HTTP.get("https://#{domain}/") do {:ok, %{status: status}} when status in 200..299 -> - Instances.set_reachable("https://#{domain}") :ok {:ok, %{status: _status}} -> @@ -26,6 +40,51 @@ defmodule Pleroma.Workers.ReachabilityWorker do end end - @impl true - def timeout(_job), do: :timer.seconds(5) + defp handle_failed_attempt(_domain, "final", _attempt), do: :ok + + defp handle_failed_attempt(domain, phase, attempt) do + {interval_minutes, max_attempts, next_phase} = get_phase_config(phase) + + if attempt >= max_attempts do + # Move to next phase + schedule_next_phase(domain, next_phase) + else + # Retry same phase with incremented attempt + schedule_retry(domain, phase, attempt + 1, interval_minutes) + end + end + + defp get_phase_config("phase_1min"), do: {1, 4, "phase_15min"} + defp get_phase_config("phase_15min"), do: {15, 4, "phase_1hour"} + defp get_phase_config("phase_1hour"), do: {60, 4, "phase_8hour"} + defp get_phase_config("phase_8hour"), do: {480, 4, "phase_24hour"} + defp get_phase_config("phase_24hour"), do: {1440, 4, "final"} + defp get_phase_config("final"), do: {nil, 0, nil} + + defp schedule_next_phase(_domain, "final"), do: :ok + + defp schedule_next_phase(domain, next_phase) do + {interval_minutes, _max_attempts, _next_phase} = get_phase_config(next_phase) + scheduled_at = DateTime.add(DateTime.utc_now(), interval_minutes * 60, :second) + + %{ + "domain" => domain, + "phase" => next_phase, + "attempt" => 1 + } + |> new(scheduled_at: scheduled_at, replace: true) + |> Oban.insert() + end + + def schedule_retry(domain, phase, attempt, interval_minutes) do + scheduled_at = DateTime.add(DateTime.utc_now(), interval_minutes * 60, :second) + + %{ + "domain" => domain, + "phase" => phase, + "attempt" => attempt + } + |> new(scheduled_at: scheduled_at, replace: true) + |> Oban.insert() + end end diff --git a/test/pleroma/workers/cron/schedule_reachability_worker_test.exs b/test/pleroma/workers/cron/schedule_reachability_worker_test.exs deleted file mode 100644 index 310c2e61a..000000000 --- a/test/pleroma/workers/cron/schedule_reachability_worker_test.exs +++ /dev/null @@ -1,52 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2022 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.Cron.ScheduleReachabilityWorkerTest do - use Pleroma.DataCase, async: true - use Oban.Testing, repo: Pleroma.Repo - - alias Pleroma.Instances - alias Pleroma.Workers.Cron.ScheduleReachabilityWorker - - describe "perform/1" do - test "schedules reachability checks for unreachable servers" do - # Mark some servers as unreachable - Instances.set_unreachable("https://example.com") - Instances.set_unreachable("https://test.com") - Instances.set_unreachable("https://another.com") - - # Verify they are marked as unreachable - refute Instances.reachable?("https://example.com") - refute Instances.reachable?("https://test.com") - refute Instances.reachable?("https://another.com") - - # Run the worker - assert :ok = ScheduleReachabilityWorker.perform(%Oban.Job{}) - - # Verify ReachabilityWorker jobs were scheduled for each server - # Note: domains in get_unreachable/0 are without the https:// prefix - assert_enqueued( - worker: Pleroma.Workers.ReachabilityWorker, - args: %{"domain" => "example.com"} - ) - - assert_enqueued( - worker: Pleroma.Workers.ReachabilityWorker, - args: %{"domain" => "test.com"} - ) - - assert_enqueued( - worker: Pleroma.Workers.ReachabilityWorker, - args: %{"domain" => "another.com"} - ) - end - - test "handles empty list of unreachable servers" do - # Ensure no servers are marked as unreachable - assert [] = Instances.get_unreachable() - assert :ok = ScheduleReachabilityWorker.perform(%Oban.Job{}) - refute_enqueued(worker: Pleroma.Workers.ReachabilityWorker) - end - end -end diff --git a/test/pleroma/workers/reachability_worker_test.exs b/test/pleroma/workers/reachability_worker_test.exs new file mode 100644 index 000000000..32c39e869 --- /dev/null +++ b/test/pleroma/workers/reachability_worker_test.exs @@ -0,0 +1,202 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ReachabilityWorkerTest do + use Pleroma.DataCase, async: true + use Oban.Testing, repo: Pleroma.Repo + + import Mock + + alias Pleroma.Tests.ObanHelpers + alias Pleroma.Workers.ReachabilityWorker + + setup do + ObanHelpers.wipe_all() + :ok + end + + describe "progressive backoff phases" do + test "starts with phase_1min and progresses through phases on failure" do + domain = "example.com" + + with_mocks([ + {Pleroma.HTTP, [], [get: fn _ -> {:error, :timeout} end]}, + {Pleroma.Instances, [], [set_reachable: fn _ -> :ok end]} + ]) do + # Start with phase_1min + job = %Oban.Job{ + args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 1} + } + + # First attempt fails + assert {:error, :timeout} = ReachabilityWorker.perform(job) + + # Should schedule retry for phase_1min (attempt 2) + retry_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(retry_jobs) == 1 + [retry_job] = retry_jobs + assert retry_job.args["phase"] == "phase_1min" + assert retry_job.args["attempt"] == 2 + + # Clear jobs and simulate second attempt failure + ObanHelpers.wipe_all() + + retry_job = %Oban.Job{ + args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 2} + } + + assert {:error, :timeout} = ReachabilityWorker.perform(retry_job) + + # Should schedule retry for phase_1min (attempt 3) + retry_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(retry_jobs) == 1 + [retry_job] = retry_jobs + assert retry_job.args["phase"] == "phase_1min" + assert retry_job.args["attempt"] == 3 + + # Clear jobs and simulate third attempt failure (final attempt for phase_1min) + ObanHelpers.wipe_all() + + retry_job = %Oban.Job{ + args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 3} + } + + assert {:error, :timeout} = ReachabilityWorker.perform(retry_job) + + # Should schedule retry for phase_1min (attempt 4) + retry_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(retry_jobs) == 1 + [retry_job] = retry_jobs + assert retry_job.args["phase"] == "phase_1min" + assert retry_job.args["attempt"] == 4 + + # Clear jobs and simulate fourth attempt failure (final attempt for phase_1min) + ObanHelpers.wipe_all() + + retry_job = %Oban.Job{ + args: %{"domain" => domain, "phase" => "phase_1min", "attempt" => 4} + } + + assert {:error, :timeout} = ReachabilityWorker.perform(retry_job) + + # Should schedule next phase (phase_15min) + next_phase_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(next_phase_jobs) == 1 + [next_phase_job] = next_phase_jobs + assert next_phase_job.args["phase"] == "phase_15min" + assert next_phase_job.args["attempt"] == 1 + end + end + + test "progresses through all phases correctly" do + domain = "example.com" + + with_mocks([ + {Pleroma.HTTP, [], [get: fn _ -> {:error, :timeout} end]}, + {Pleroma.Instances, [], [set_reachable: fn _ -> :ok end]} + ]) do + # Simulate all phases failing + phases = ["phase_1min", "phase_15min", "phase_1hour", "phase_8hour", "phase_24hour"] + + Enum.each(phases, fn phase -> + {_interval, max_attempts, next_phase} = get_phase_config(phase) + + # Simulate all attempts failing for this phase + Enum.each(1..max_attempts, fn attempt -> + job = %Oban.Job{args: %{"domain" => domain, "phase" => phase, "attempt" => attempt}} + assert {:error, :timeout} = ReachabilityWorker.perform(job) + + if attempt < max_attempts do + # Should schedule retry for same phase + retry_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(retry_jobs) == 1 + [retry_job] = retry_jobs + assert retry_job.args["phase"] == phase + assert retry_job.args["attempt"] == attempt + 1 + ObanHelpers.wipe_all() + else + # Should schedule next phase (except for final phase) + if next_phase != "final" do + next_phase_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(next_phase_jobs) == 1 + [next_phase_job] = next_phase_jobs + assert next_phase_job.args["phase"] == next_phase + assert next_phase_job.args["attempt"] == 1 + ObanHelpers.wipe_all() + else + # Final phase - no more jobs should be scheduled + next_phase_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(next_phase_jobs) == 0 + end + end + end) + end) + end + end + + test "succeeds and stops progression when instance becomes reachable" do + domain = "example.com" + + with_mocks([ + {Pleroma.HTTP, [], [get: fn _ -> {:ok, %{status: 200}} end]}, + {Pleroma.Instances, [], [set_reachable: fn _ -> :ok end]} + ]) do + job = %Oban.Job{args: %{"domain" => domain, "phase" => "phase_1hour", "attempt" => 2}} + + # Should succeed and not schedule any more jobs + assert :ok = ReachabilityWorker.perform(job) + + # Verify set_reachable was called + assert_called(Pleroma.Instances.set_reachable("https://#{domain}")) + + # No more jobs should be scheduled + next_jobs = all_enqueued(worker: ReachabilityWorker) + assert length(next_jobs) == 0 + end + end + + test "enforces uniqueness per domain using Oban's conflict detection" do + domain = "example.com" + + # Insert first job for the domain + job1 = + %{ + "domain" => domain, + "phase" => "phase_1min", + "attempt" => 1 + } + |> ReachabilityWorker.new() + |> Oban.insert() + + assert {:ok, _} = job1 + + # Try to insert a second job for the same domain with different phase/attempt + job2 = + %{ + "domain" => domain, + "phase" => "phase_15min", + "attempt" => 1 + } + |> ReachabilityWorker.new() + |> Oban.insert() + + # Should fail due to uniqueness constraint (conflict) + assert {:ok, %Oban.Job{conflict?: true}} = job2 + + # Verify only one job exists for this domain + jobs = all_enqueued(worker: ReachabilityWorker) + assert length(jobs) == 1 + [existing_job] = jobs + assert existing_job.args["domain"] == domain + assert existing_job.args["phase"] == "phase_1min" + end + end + + defp get_phase_config("phase_1min"), do: {1, 4, "phase_15min"} + defp get_phase_config("phase_15min"), do: {15, 4, "phase_1hour"} + defp get_phase_config("phase_1hour"), do: {60, 4, "phase_8hour"} + defp get_phase_config("phase_8hour"), do: {480, 4, "phase_24hour"} + defp get_phase_config("phase_24hour"), do: {1440, 4, "final"} + defp get_phase_config("final"), do: {nil, 0, nil} +end