migrate to oban 2.0-rc1

This commit is contained in:
Maksim Pechnikov 2020-06-23 15:09:01 +03:00
commit a8d967762e
28 changed files with 72 additions and 69 deletions

View file

@ -80,7 +80,7 @@ defmodule Pleroma.Application do
[
Pleroma.Stats,
Pleroma.JobQueueMonitor,
{Oban, Config.get(Oban)}
{Oban, oban_config()}
] ++
task_children(@env) ++
streamer_child(@env) ++
@ -138,6 +138,18 @@ defmodule Pleroma.Application do
Pleroma.Web.Endpoint.Instrumenter.setup()
end
defp oban_config do
config = Config.get(Oban)
if Code.ensure_loaded?(IEx) and IEx.started?() do
config
|> Keyword.put(:crontab, false)
|> Keyword.put(:queues, false)
else
config
end
end
defp cachex_children do
[
build_cachex("used_captcha", ttl_interval: seconds_valid_interval()),

View file

@ -11,13 +11,12 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"
@impl Oban.Worker
def perform(
%{
def perform(%Job{
args: %{
"op" => "cleanup_attachments",
"object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}
},
_job
) do
}
}) do
attachments
|> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end)
|> fetch_objects
@ -28,7 +27,7 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
{:ok, :success}
end
def perform(%{"op" => "cleanup_attachments", "object" => _object}, _job), do: {:ok, :skip}
def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip}
defp do_clean({object_ids, attachment_urls}) do
uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])

View file

@ -11,59 +11,59 @@ defmodule Pleroma.Workers.BackgroundWorker do
@impl Oban.Worker
def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
def perform(%Job{args: %{"op" => "deactivate_user", "user_id" => user_id, "status" => status}}) do
user = User.get_cached_by_id(user_id)
User.perform(:deactivate_async, user, status)
end
def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do
user = User.get_cached_by_id(user_id)
User.perform(:delete, user)
end
def perform(%{"op" => "force_password_reset", "user_id" => user_id}, _job) do
def perform(%Job{args: %{"op" => "force_password_reset", "user_id" => user_id}}) do
user = User.get_cached_by_id(user_id)
User.perform(:force_password_reset, user)
end
def perform(
%{
def perform(%Job{
args: %{
"op" => "blocks_import",
"blocker_id" => blocker_id,
"blocked_identifiers" => blocked_identifiers
},
_job
) do
}
}) do
blocker = User.get_cached_by_id(blocker_id)
{:ok, User.perform(:blocks_import, blocker, blocked_identifiers)}
end
def perform(
%{
def perform(%Job{
args: %{
"op" => "follow_import",
"follower_id" => follower_id,
"followed_identifiers" => followed_identifiers
},
_job
) do
}
}) do
follower = User.get_cached_by_id(follower_id)
{:ok, User.perform(:follow_import, follower, followed_identifiers)}
end
def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
def perform(%Job{args: %{"op" => "media_proxy_preload", "message" => message}}) do
MediaProxyWarmingPolicy.perform(:preload, message)
end
def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do
def perform(%Job{args: %{"op" => "media_proxy_prefetch", "url" => url}}) do
MediaProxyWarmingPolicy.perform(:prefetch, url)
end
def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do
def perform(%Job{args: %{"op" => "fetch_data_for_activity", "activity_id" => activity_id}}) do
activity = Activity.get_by_id(activity_id)
Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
end
def perform(%{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id}, _) do
def perform(%Job{
args: %{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id}
}) do
origin = User.get_cached_by_id(origin_id)
target = User.get_cached_by_id(target_id)

View file

@ -13,7 +13,7 @@ defmodule Pleroma.Workers.Cron.ClearOauthTokenWorker do
alias Pleroma.Web.OAuth.Token
@impl Oban.Worker
def perform(_opts, _job) do
def perform(_job) do
if Config.get([:oauth2, :clean_expired_tokens], false) do
Token.delete_expired_tokens()
else

View file

@ -19,7 +19,7 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorker do
require Logger
@impl Oban.Worker
def perform(_opts, _job) do
def perform(_job) do
config = Config.get([:email_notifications, :digest])
if config[:active] do

View file

@ -12,7 +12,7 @@ defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do
use Pleroma.Workers.WorkerHelper, queue: "new_users_digest"
@impl Oban.Worker
def perform(_args, _job) do
def perform(_job) do
if Pleroma.Config.get([Pleroma.Emails.NewUsersDigestEmail, :enabled]) do
today = NaiveDateTime.utc_now() |> Timex.beginning_of_day()

View file

@ -20,7 +20,7 @@ defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do
@interval :timer.minutes(1)
@impl Oban.Worker
def perform(_opts, _job) do
def perform(_job) do
if Config.get([ActivityExpiration, :enabled]) do
Enum.each(ActivityExpiration.due_expirations(@interval), &delete_activity/1)
else

View file

@ -10,7 +10,7 @@ defmodule Pleroma.Workers.Cron.StatsWorker do
use Oban.Worker, queue: "background"
@impl Oban.Worker
def perform(_opts, _job) do
def perform(_job) do
Pleroma.Stats.do_collect()
end
end

View file

@ -6,7 +6,7 @@ defmodule Pleroma.Workers.MailerWorker do
use Pleroma.Workers.WorkerHelper, queue: "mailer"
@impl Oban.Worker
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "config" => config}}) do
encoded_email
|> Base.decode64!()
|> :erlang.binary_to_term()

View file

@ -8,17 +8,17 @@ defmodule Pleroma.Workers.PublisherWorker do
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
def backoff(attempt) when is_integer(attempt) do
def backoff(%Job{attempt: attempt}) when is_integer(attempt) do
Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
end
@impl Oban.Worker
def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do
def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id}}) do
activity = Activity.get_by_id(activity_id)
Federator.perform(:publish, activity)
end
def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do
def perform(%Job{args: %{"op" => "publish_one", "module" => module_name, "params" => params}}) do
params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)
Federator.perform(:publish_one, String.to_atom(module_name), params)
end

View file

@ -8,7 +8,7 @@ defmodule Pleroma.Workers.ReceiverWorker do
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
@impl Oban.Worker
def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do
def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
Federator.perform(:incoming_ap_doc, params)
end
end

View file

@ -8,13 +8,7 @@ defmodule Pleroma.Workers.RemoteFetcherWorker do
use Pleroma.Workers.WorkerHelper, queue: "remote_fetcher"
@impl Oban.Worker
def perform(
%{
"op" => "fetch_remote",
"id" => id
} = args,
_job
) do
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
{:ok, _object} = Fetcher.fetch_object_from_id(id, depth: args["depth"])
end
end

View file

@ -17,7 +17,7 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
require Logger
@impl Oban.Worker
def perform(%{"activity_id" => activity_id}, _job) do
def perform(%Job{args: %{"activity_id" => activity_id}}) do
if Config.get([ScheduledActivity, :enabled]) do
case Pleroma.Repo.get(ScheduledActivity, activity_id) do
%ScheduledActivity{} = scheduled_activity ->

View file

@ -8,7 +8,7 @@ defmodule Pleroma.Workers.TransmogrifierWorker do
use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
@impl Oban.Worker
def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
def perform(%Job{args: %{"op" => "user_upgrade", "user_id" => user_id}}) do
user = User.get_cached_by_id(user_id)
Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
end

View file

@ -9,7 +9,7 @@ defmodule Pleroma.Workers.WebPusherWorker do
use Pleroma.Workers.WorkerHelper, queue: "web_push"
@impl Oban.Worker
def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
def perform(%Job{args: %{"op" => "web_push", "notification_id" => notification_id}}) do
notification =
Notification
|> Repo.get(notification_id)

View file

@ -32,6 +32,8 @@ defmodule Pleroma.Workers.WorkerHelper do
queue: unquote(queue),
max_attempts: 1
alias Oban.Job
def enqueue(op, params, worker_args \\ []) do
params = Map.merge(%{"op" => op}, params)
queue_atom = String.to_atom(unquote(queue))
@ -39,7 +41,7 @@ defmodule Pleroma.Workers.WorkerHelper do
unquote(caller_module)
|> apply(:new, [params, worker_args])
|> Pleroma.Repo.insert()
|> Oban.insert()
end
end
end