Remove WorkerHelper

This commit is contained in:
Mark Felder 2024-06-28 11:47:31 -04:00
commit 52e9bec156
41 changed files with 200 additions and 211 deletions

View file

@ -8,9 +8,9 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
alias Pleroma.Object
alias Pleroma.Repo
use Pleroma.Workers.WorkerHelper, queue: "slow"
use Oban.Worker, queue: :slow
@impl Oban.Worker
@impl true
def perform(%Job{
args: %{
"op" => "cleanup_attachments",
@ -31,7 +31,7 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip}
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(900)
defp do_clean({object_ids, attachment_urls}) do

View file

@ -5,9 +5,9 @@
defmodule Pleroma.Workers.BackgroundWorker do
alias Pleroma.User
use Pleroma.Workers.WorkerHelper, queue: "background"
use Oban.Worker, queue: :background
@impl Oban.Worker
@impl true
def perform(%Job{args: %{"op" => "user_activation", "user_id" => user_id, "status" => status}}) do
user = User.get_cached_by_id(user_id)
@ -39,6 +39,6 @@ defmodule Pleroma.Workers.BackgroundWorker do
User.perform(:verify_fields_links, user)
end
@impl Oban.Worker
def timeout(_job), do: :timer.seconds(15)
@impl true
def timeout(_job), do: :timer.seconds(900)
end

View file

@ -9,7 +9,7 @@ defmodule Pleroma.Workers.BackupWorker do
alias Pleroma.Config.Getting, as: Config
alias Pleroma.User.Backup
@impl Oban.Worker
@impl true
def perform(%Job{
args: %{"op" => "process", "backup_id" => backup_id}
}) do
@ -32,7 +32,7 @@ defmodule Pleroma.Workers.BackupWorker do
end
end
@impl Oban.Worker
@impl true
def timeout(_job), do: Config.get([Backup, :timeout], :timer.minutes(30))
defp has_email?(user) do

View file

@ -18,7 +18,7 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorker do
require Logger
@impl Oban.Worker
@impl true
def perform(_job) do
config = Config.get([:email_notifications, :digest])
@ -59,6 +59,6 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorker do
User.touch_last_digest_emailed_at(user)
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(5)
end

View file

@ -9,9 +9,9 @@ defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do
import Ecto.Query
use Pleroma.Workers.WorkerHelper, queue: "background"
use Oban.Worker, queue: :background
@impl Oban.Worker
@impl true
def perform(_job) do
if Pleroma.Config.get([Pleroma.Emails.NewUsersDigestEmail, :enabled]) do
today = NaiveDateTime.utc_now() |> Timex.beginning_of_day()
@ -61,6 +61,6 @@ defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do
:ok
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(5)
end

View file

@ -6,10 +6,9 @@ defmodule Pleroma.Workers.DeleteWorker do
alias Pleroma.Instances.Instance
alias Pleroma.User
use Pleroma.Workers.WorkerHelper, queue: "slow"
@impl Oban.Worker
use Oban.Worker, queue: :slow
@impl true
def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do
user = User.get_cached_by_id(user_id)
User.perform(:delete, user)
@ -19,6 +18,6 @@ defmodule Pleroma.Workers.DeleteWorker do
Instance.perform(:delete_instance, host)
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(900)
end

View file

@ -3,9 +3,9 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.MailerWorker do
use Pleroma.Workers.WorkerHelper, queue: "background"
use Oban.Worker, queue: :background
@impl Oban.Worker
@impl true
def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "config" => config}}) do
encoded_email
|> Base.decode64!()
@ -13,6 +13,6 @@ defmodule Pleroma.Workers.MailerWorker do
|> Pleroma.Emails.Mailer.deliver(config)
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(5)
end

View file

@ -3,9 +3,9 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.MuteExpireWorker do
use Pleroma.Workers.WorkerHelper, queue: "background"
use Oban.Worker, queue: :background
@impl Oban.Worker
@impl true
def perform(%Job{args: %{"op" => "unmute_user", "muter_id" => muter_id, "mutee_id" => mutee_id}}) do
Pleroma.User.unmute(muter_id, mutee_id)
:ok
@ -18,6 +18,6 @@ defmodule Pleroma.Workers.MuteExpireWorker do
:ok
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(5)
end

View file

@ -6,13 +6,13 @@ defmodule Pleroma.Workers.PollWorker do
@moduledoc """
Generates notifications when a poll ends.
"""
use Pleroma.Workers.WorkerHelper, queue: "background"
use Oban.Worker, queue: :background
alias Pleroma.Activity
alias Pleroma.Notification
alias Pleroma.Object
@impl Oban.Worker
@impl true
def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do
with %Activity{} = activity <- find_poll_activity(activity_id),
{:ok, notifications} <- Notification.create_poll_notifications(activity) do
@ -23,7 +23,7 @@ defmodule Pleroma.Workers.PollWorker do
end
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(5)
defp find_poll_activity(activity_id) do

View file

@ -6,13 +6,9 @@ defmodule Pleroma.Workers.PublisherWorker do
alias Pleroma.Activity
alias Pleroma.Web.Federator
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
use Oban.Worker, queue: :federator_outgoing, max_attempts: 5
def backoff(%Job{attempt: attempt}) when is_integer(attempt) do
Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
end
@impl Oban.Worker
@impl true
def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id}}) do
activity = Activity.get_by_id(activity_id)
Federator.perform(:publish, activity)
@ -23,6 +19,18 @@ defmodule Pleroma.Workers.PublisherWorker do
Federator.perform(:publish_one, params)
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(10)
@base_backoff 15
@pow 5
@impl true
def backoff(%Job{attempt: attempt}) when is_integer(attempt) do
backoff =
:math.pow(attempt, @pow) +
@base_backoff +
:rand.uniform(2 * @base_backoff) * attempt
trunc(backoff)
end
end

View file

@ -13,16 +13,13 @@ defmodule Pleroma.Workers.PurgeExpiredActivity do
alias Pleroma.Activity
@spec enqueue(map()) ::
@spec enqueue(map(), list()) ::
{:ok, Oban.Job.t()}
| {:error, :expired_activities_disabled}
| {:error, :expiration_too_close}
def enqueue(args) do
def enqueue(params, worker_args) do
with true <- enabled?() do
{scheduled_at, args} = Map.pop(args, :expires_at)
args
|> new(scheduled_at: scheduled_at)
new(params, worker_args)
|> Oban.insert()
end
end
@ -35,7 +32,7 @@ defmodule Pleroma.Workers.PurgeExpiredActivity do
end
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(5)
defp enabled? do

View file

@ -31,7 +31,7 @@ defmodule Pleroma.Workers.PurgeExpiredFilter do
|> Repo.delete()
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(5)
@spec get_expiration(pos_integer()) :: Job.t() | nil

View file

@ -9,16 +9,6 @@ defmodule Pleroma.Workers.PurgeExpiredToken do
use Oban.Worker, queue: :background, max_attempts: 1
@spec enqueue(%{token_id: integer(), valid_until: DateTime.t(), mod: module()}) ::
{:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t()}
def enqueue(args) do
{scheduled_at, args} = Map.pop(args, :valid_until)
args
|> __MODULE__.new(scheduled_at: scheduled_at)
|> Oban.insert()
end
@impl true
def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do
module
@ -27,6 +17,6 @@ defmodule Pleroma.Workers.PurgeExpiredToken do
|> Pleroma.Repo.delete()
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(5)
end

View file

@ -7,9 +7,9 @@ defmodule Pleroma.Workers.ReceiverWorker do
alias Pleroma.User
alias Pleroma.Web.Federator
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
use Oban.Worker, queue: :federator_incoming, max_attempts: 5
@impl Oban.Worker
@impl true
def perform(%Job{
args: %{
@ -51,7 +51,7 @@ defmodule Pleroma.Workers.ReceiverWorker do
end
end
@impl Oban.Worker
@impl true
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
def timeout(_job), do: :timer.seconds(5)

View file

@ -5,9 +5,9 @@
defmodule Pleroma.Workers.RemoteFetcherWorker do
alias Pleroma.Object.Fetcher
use Pleroma.Workers.WorkerHelper, queue: "background"
use Oban.Worker, queue: :background
@impl Oban.Worker
@impl true
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do
{:ok, _object} ->
@ -30,6 +30,6 @@ defmodule Pleroma.Workers.RemoteFetcherWorker do
end
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(15)
end

View file

@ -9,7 +9,7 @@ defmodule Pleroma.Workers.RichMediaWorker do
use Oban.Worker, queue: :background, max_attempts: 3, unique: [period: 300]
@impl Oban.Worker
@impl true
def perform(%Job{args: %{"op" => "expire", "url" => url} = _args}) do
Card.delete(url)
end
@ -33,7 +33,7 @@ defmodule Pleroma.Workers.RichMediaWorker do
# a slow/infinite data stream and insert a negative cache entry for the URL
# We pad it by 2 seconds to be certain a slow connection is detected and we
# can inject a negative cache entry for the URL
@impl Oban.Worker
@impl true
def timeout(_job) do
Config.get!([:rich_media, :timeout]) + :timer.seconds(2)
end

View file

@ -7,7 +7,7 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
The worker to post scheduled activity.
"""
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
use Oban.Worker, queue: :federator_outgoing, max_attempts: 5
alias Pleroma.Repo
alias Pleroma.ScheduledActivity
@ -15,7 +15,7 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
require Logger
@impl Oban.Worker
@impl true
def perform(%Job{args: %{"activity_id" => activity_id}}) do
with %ScheduledActivity{} = scheduled_activity <- find_scheduled_activity(activity_id),
%User{} = user <- find_user(scheduled_activity.user_id) do
@ -37,7 +37,7 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
end
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(5)
defp find_scheduled_activity(id) do

View file

@ -1,7 +1,7 @@
defmodule Pleroma.Workers.SearchIndexingWorker do
use Pleroma.Workers.WorkerHelper, queue: "search_indexing"
use Oban.Worker, queue: :search_indexing, max_attempts: 2
@impl Oban.Worker
@impl true
alias Pleroma.Config.Getting, as: Config
@ -21,6 +21,6 @@ defmodule Pleroma.Workers.SearchIndexingWorker do
search_module.remove_from_index(object)
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(5)
end

View file

@ -12,6 +12,6 @@ defmodule Pleroma.Workers.UserRefreshWorker do
User.fetch_by_ap_id(ap_id)
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(15)
end

View file

@ -7,9 +7,9 @@ defmodule Pleroma.Workers.WebPusherWorker do
alias Pleroma.Repo
alias Pleroma.Web.Push.Impl
use Pleroma.Workers.WorkerHelper, queue: "web_push"
use Oban.Worker, queue: :web_push
@impl Oban.Worker
@impl true
def perform(%Job{args: %{"op" => "web_push", "notification_id" => notification_id}}) do
notification =
Notification
@ -20,6 +20,6 @@ defmodule Pleroma.Workers.WebPusherWorker do
|> Enum.each(&Impl.deliver(&1))
end
@impl Oban.Worker
@impl true
def timeout(_job), do: :timer.seconds(5)
end

View file

@ -1,48 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.WorkerHelper do
alias Pleroma.Config
alias Pleroma.Workers.WorkerHelper
def worker_args(queue) do
case Config.get([:workers, :retries, queue]) do
nil -> []
max_attempts -> [max_attempts: max_attempts]
end
end
def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
backoff =
:math.pow(attempt, pow) +
base_backoff +
:rand.uniform(2 * base_backoff) * attempt
trunc(backoff)
end
defmacro __using__(opts) do
caller_module = __CALLER__.module
queue = Keyword.fetch!(opts, :queue)
quote do
# Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: unquote(queue),
max_attempts: 1
alias Oban.Job
def enqueue(op, params, worker_args \\ []) do
params = Map.merge(%{"op" => op}, params)
queue_atom = String.to_atom(unquote(queue))
worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
unquote(caller_module)
|> apply(:new, [params, worker_args])
|> Oban.insert()
end
end
end
end