Merge branch 'issue/1383' into 'develop'
[#1383] Switch periodic jobs from quantum to oban See merge request pleroma/pleroma!2015
This commit is contained in:
commit
94e5ca1105
34 changed files with 558 additions and 386 deletions
|
|
@ -42,12 +42,9 @@ defmodule Pleroma.Application do
|
|||
children =
|
||||
[
|
||||
Pleroma.Repo,
|
||||
Pleroma.Scheduler,
|
||||
Pleroma.Config.TransferTask,
|
||||
Pleroma.Emoji,
|
||||
Pleroma.Captcha,
|
||||
Pleroma.Daemons.ScheduledActivityDaemon,
|
||||
Pleroma.Daemons.ActivityExpirationDaemon,
|
||||
Pleroma.Plugs.RateLimiter.Supervisor
|
||||
] ++
|
||||
cachex_children() ++
|
||||
|
|
@ -58,7 +55,6 @@ defmodule Pleroma.Application do
|
|||
{Oban, Pleroma.Config.get(Oban)}
|
||||
] ++
|
||||
task_children(@env) ++
|
||||
oauth_cleanup_child(oauth_cleanup_enabled?()) ++
|
||||
streamer_child(@env) ++
|
||||
chat_child(@env, chat_enabled?()) ++
|
||||
[
|
||||
|
|
@ -160,20 +156,12 @@ defmodule Pleroma.Application do
|
|||
|
||||
defp chat_enabled?, do: Pleroma.Config.get([:chat, :enabled])
|
||||
|
||||
defp oauth_cleanup_enabled?,
|
||||
do: Pleroma.Config.get([:oauth2, :clean_expired_tokens], false)
|
||||
|
||||
defp streamer_child(:test), do: []
|
||||
|
||||
defp streamer_child(_) do
|
||||
[Pleroma.Web.Streamer.supervisor()]
|
||||
end
|
||||
|
||||
defp oauth_cleanup_child(true),
|
||||
do: [Pleroma.Web.OAuth.Token.CleanWorker]
|
||||
|
||||
defp oauth_cleanup_child(_), do: []
|
||||
|
||||
defp chat_child(_env, true) do
|
||||
[Pleroma.Web.ChatChannel.ChatChannelState]
|
||||
end
|
||||
|
|
|
|||
|
|
@ -1,66 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Daemons.ActivityExpirationDaemon do
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.ActivityExpiration
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.CommonAPI
|
||||
|
||||
require Logger
|
||||
use GenServer
|
||||
import Ecto.Query
|
||||
|
||||
@schedule_interval :timer.minutes(1)
|
||||
|
||||
def start_link(_) do
|
||||
GenServer.start_link(__MODULE__, nil)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_) do
|
||||
if Config.get([ActivityExpiration, :enabled]) do
|
||||
schedule_next()
|
||||
{:ok, nil}
|
||||
else
|
||||
:ignore
|
||||
end
|
||||
end
|
||||
|
||||
def perform(:execute, expiration_id) do
|
||||
try do
|
||||
expiration =
|
||||
ActivityExpiration
|
||||
|> where([e], e.id == ^expiration_id)
|
||||
|> Repo.one!()
|
||||
|
||||
activity = Activity.get_by_id_with_object(expiration.activity_id)
|
||||
user = User.get_by_ap_id(activity.object.data["actor"])
|
||||
CommonAPI.delete(activity.id, user)
|
||||
rescue
|
||||
error ->
|
||||
Logger.error("#{__MODULE__} Couldn't delete expired activity: #{inspect(error)}")
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:perform, state) do
|
||||
ActivityExpiration.due_expirations(@schedule_interval)
|
||||
|> Enum.each(fn expiration ->
|
||||
Pleroma.Workers.ActivityExpirationWorker.enqueue(
|
||||
"activity_expiration",
|
||||
%{"activity_expiration_id" => expiration.id}
|
||||
)
|
||||
end)
|
||||
|
||||
schedule_next()
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp schedule_next do
|
||||
Process.send_after(self(), :perform, @schedule_interval)
|
||||
end
|
||||
end
|
||||
|
|
@ -1,42 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Daemons.DigestEmailDaemon do
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.Workers.DigestEmailsWorker
|
||||
|
||||
import Ecto.Query
|
||||
|
||||
def perform do
|
||||
config = Pleroma.Config.get([:email_notifications, :digest])
|
||||
negative_interval = -Map.fetch!(config, :interval)
|
||||
inactivity_threshold = Map.fetch!(config, :inactivity_threshold)
|
||||
inactive_users_query = Pleroma.User.list_inactive_users_query(inactivity_threshold)
|
||||
|
||||
now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
|
||||
|
||||
from(u in inactive_users_query,
|
||||
where: fragment(~s(? ->'digest' @> 'true'), u.email_notifications),
|
||||
where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
|
||||
select: u
|
||||
)
|
||||
|> Repo.all()
|
||||
|> Enum.each(fn user ->
|
||||
DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id})
|
||||
end)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Send digest email to the given user.
|
||||
Updates `last_digest_emailed_at` field for the user and returns the updated user.
|
||||
"""
|
||||
@spec perform(Pleroma.User.t()) :: Pleroma.User.t()
|
||||
def perform(user) do
|
||||
with %Swoosh.Email{} = email <- Pleroma.Emails.UserEmail.digest_email(user) do
|
||||
Pleroma.Emails.Mailer.deliver_async(email)
|
||||
end
|
||||
|
||||
Pleroma.User.touch_last_digest_emailed_at(user)
|
||||
end
|
||||
end
|
||||
|
|
@ -1,62 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Daemons.ScheduledActivityDaemon do
|
||||
@moduledoc """
|
||||
Sends scheduled activities to the job queue.
|
||||
"""
|
||||
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.ScheduledActivity
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.CommonAPI
|
||||
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
@schedule_interval :timer.minutes(1)
|
||||
|
||||
def start_link(_) do
|
||||
GenServer.start_link(__MODULE__, nil)
|
||||
end
|
||||
|
||||
def init(_) do
|
||||
if Config.get([ScheduledActivity, :enabled]) do
|
||||
schedule_next()
|
||||
{:ok, nil}
|
||||
else
|
||||
:ignore
|
||||
end
|
||||
end
|
||||
|
||||
def perform(:execute, scheduled_activity_id) do
|
||||
try do
|
||||
{:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity_id)
|
||||
%User{} = user = User.get_cached_by_id(scheduled_activity.user_id)
|
||||
{:ok, _result} = CommonAPI.post(user, scheduled_activity.params)
|
||||
rescue
|
||||
error ->
|
||||
Logger.error(
|
||||
"#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}"
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def handle_info(:perform, state) do
|
||||
ScheduledActivity.due_activities(@schedule_interval)
|
||||
|> Enum.each(fn scheduled_activity ->
|
||||
Pleroma.Workers.ScheduledActivityWorker.enqueue(
|
||||
"execute",
|
||||
%{"activity_id" => scheduled_activity.id}
|
||||
)
|
||||
end)
|
||||
|
||||
schedule_next()
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp schedule_next do
|
||||
Process.send_after(self(), :perform, @schedule_interval)
|
||||
end
|
||||
end
|
||||
|
|
@ -5,15 +5,19 @@
|
|||
defmodule Pleroma.ScheduledActivity do
|
||||
use Ecto.Schema
|
||||
|
||||
alias Ecto.Multi
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.ScheduledActivity
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.CommonAPI.Utils
|
||||
alias Pleroma.Workers.ScheduledActivityWorker
|
||||
|
||||
import Ecto.Query
|
||||
import Ecto.Changeset
|
||||
|
||||
@type t :: %__MODULE__{}
|
||||
|
||||
@min_offset :timer.minutes(5)
|
||||
|
||||
schema "scheduled_activities" do
|
||||
|
|
@ -105,16 +109,32 @@ defmodule Pleroma.ScheduledActivity do
|
|||
end
|
||||
|
||||
def new(%User{} = user, attrs) do
|
||||
%ScheduledActivity{user_id: user.id}
|
||||
|> changeset(attrs)
|
||||
changeset(%ScheduledActivity{user_id: user.id}, attrs)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Creates ScheduledActivity and add to queue to perform at scheduled_at date
|
||||
"""
|
||||
@spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
|
||||
def create(%User{} = user, attrs) do
|
||||
user
|
||||
|> new(attrs)
|
||||
|> Repo.insert()
|
||||
Multi.new()
|
||||
|> Multi.insert(:scheduled_activity, new(user, attrs))
|
||||
|> maybe_add_jobs(Config.get([ScheduledActivity, :enabled]))
|
||||
|> Repo.transaction()
|
||||
|> transaction_response
|
||||
end
|
||||
|
||||
defp maybe_add_jobs(multi, true) do
|
||||
multi
|
||||
|> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} ->
|
||||
%{activity_id: activity.id}
|
||||
|> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at)
|
||||
|> Oban.insert()
|
||||
end)
|
||||
end
|
||||
|
||||
defp maybe_add_jobs(multi, _), do: multi
|
||||
|
||||
def get(%User{} = user, scheduled_activity_id) do
|
||||
ScheduledActivity
|
||||
|> where(user_id: ^user.id)
|
||||
|
|
@ -122,25 +142,43 @@ defmodule Pleroma.ScheduledActivity do
|
|||
|> Repo.one()
|
||||
end
|
||||
|
||||
def update(%ScheduledActivity{} = scheduled_activity, attrs) do
|
||||
scheduled_activity
|
||||
|> update_changeset(attrs)
|
||||
|> Repo.update()
|
||||
@spec update(ScheduledActivity.t(), map()) ::
|
||||
{:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
|
||||
def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do
|
||||
with {:error, %Ecto.Changeset{valid?: true} = changeset} <-
|
||||
{:error, update_changeset(scheduled_activity, attrs)} do
|
||||
Multi.new()
|
||||
|> Multi.update(:scheduled_activity, changeset)
|
||||
|> Multi.update_all(:scheduled_job, job_query(id),
|
||||
set: [scheduled_at: get_field(changeset, :scheduled_at)]
|
||||
)
|
||||
|> Repo.transaction()
|
||||
|> transaction_response
|
||||
end
|
||||
end
|
||||
|
||||
def delete(%ScheduledActivity{} = scheduled_activity) do
|
||||
scheduled_activity
|
||||
|> Repo.delete()
|
||||
@doc "Deletes a ScheduledActivity and linked jobs."
|
||||
@spec delete(ScheduledActivity.t() | binary() | integer) ::
|
||||
{:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
|
||||
def delete(%ScheduledActivity{id: id} = scheduled_activity) do
|
||||
Multi.new()
|
||||
|> Multi.delete(:scheduled_activity, scheduled_activity, stale_error_field: :id)
|
||||
|> Multi.delete_all(:jobs, job_query(id))
|
||||
|> Repo.transaction()
|
||||
|> transaction_response
|
||||
end
|
||||
|
||||
def delete(id) when is_binary(id) or is_integer(id) do
|
||||
ScheduledActivity
|
||||
|> where(id: ^id)
|
||||
|> select([sa], sa)
|
||||
|> Repo.delete_all()
|
||||
|> case do
|
||||
{1, [scheduled_activity]} -> {:ok, scheduled_activity}
|
||||
_ -> :error
|
||||
delete(%__MODULE__{id: id})
|
||||
end
|
||||
|
||||
defp transaction_response(result) do
|
||||
case result do
|
||||
{:ok, %{scheduled_activity: scheduled_activity}} ->
|
||||
{:ok, scheduled_activity}
|
||||
|
||||
{:error, _, changeset, _} ->
|
||||
{:error, changeset}
|
||||
end
|
||||
end
|
||||
|
||||
|
|
@ -158,4 +196,11 @@ defmodule Pleroma.ScheduledActivity do
|
|||
|> where([sa], sa.scheduled_at < ^naive_datetime)
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
def job_query(scheduled_activity_id) do
|
||||
from(j in Oban.Job,
|
||||
where: j.queue == "scheduled_activities",
|
||||
where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id))
|
||||
)
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -1,7 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Scheduler do
|
||||
use Quantum.Scheduler, otp_app: :pleroma
|
||||
end
|
||||
|
|
@ -9,22 +9,43 @@ defmodule Pleroma.Stats do
|
|||
|
||||
use GenServer
|
||||
|
||||
@interval 1000 * 60 * 60
|
||||
@init_state %{
|
||||
peers: [],
|
||||
stats: %{
|
||||
domain_count: 0,
|
||||
status_count: 0,
|
||||
user_count: 0
|
||||
}
|
||||
}
|
||||
|
||||
def start_link(_) do
|
||||
GenServer.start_link(__MODULE__, initial_data(), name: __MODULE__)
|
||||
GenServer.start_link(
|
||||
__MODULE__,
|
||||
@init_state,
|
||||
name: __MODULE__
|
||||
)
|
||||
end
|
||||
|
||||
@doc "Performs update stats"
|
||||
def force_update do
|
||||
GenServer.call(__MODULE__, :force_update)
|
||||
end
|
||||
|
||||
@doc "Performs collect stats"
|
||||
def do_collect do
|
||||
GenServer.cast(__MODULE__, :run_update)
|
||||
end
|
||||
|
||||
@doc "Returns stats data"
|
||||
@spec get_stats() :: %{domain_count: integer(), status_count: integer(), user_count: integer()}
|
||||
def get_stats do
|
||||
%{stats: stats} = GenServer.call(__MODULE__, :get_state)
|
||||
|
||||
stats
|
||||
end
|
||||
|
||||
@doc "Returns list peers"
|
||||
@spec get_peers() :: list(String.t())
|
||||
def get_peers do
|
||||
%{peers: peers} = GenServer.call(__MODULE__, :get_state)
|
||||
|
||||
|
|
@ -32,7 +53,6 @@ defmodule Pleroma.Stats do
|
|||
end
|
||||
|
||||
def init(args) do
|
||||
Process.send(self(), :run_update, [])
|
||||
{:ok, args}
|
||||
end
|
||||
|
||||
|
|
@ -45,17 +65,12 @@ defmodule Pleroma.Stats do
|
|||
{:reply, state, state}
|
||||
end
|
||||
|
||||
def handle_info(:run_update, _state) do
|
||||
def handle_cast(:run_update, _state) do
|
||||
new_stats = get_stat_data()
|
||||
|
||||
Process.send_after(self(), :run_update, @interval)
|
||||
{:noreply, new_stats}
|
||||
end
|
||||
|
||||
defp initial_data do
|
||||
%{peers: [], stats: %{}}
|
||||
end
|
||||
|
||||
defp get_stat_data do
|
||||
peers =
|
||||
from(
|
||||
|
|
@ -74,7 +89,11 @@ defmodule Pleroma.Stats do
|
|||
|
||||
%{
|
||||
peers: peers,
|
||||
stats: %{domain_count: domain_count, status_count: status_count, user_count: user_count}
|
||||
stats: %{
|
||||
domain_count: domain_count,
|
||||
status_count: status_count,
|
||||
user_count: user_count
|
||||
}
|
||||
}
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -124,15 +124,18 @@ defmodule Pleroma.Web.MastodonAPI.StatusController do
|
|||
) do
|
||||
params = Map.put(params, "in_reply_to_status_id", params["in_reply_to_id"])
|
||||
|
||||
if ScheduledActivity.far_enough?(scheduled_at) do
|
||||
with {:ok, scheduled_activity} <-
|
||||
ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at}) do
|
||||
conn
|
||||
|> put_view(ScheduledActivityView)
|
||||
|> render("show.json", scheduled_activity: scheduled_activity)
|
||||
end
|
||||
with {:far_enough, true} <- {:far_enough, ScheduledActivity.far_enough?(scheduled_at)},
|
||||
attrs <- %{"params" => params, "scheduled_at" => scheduled_at},
|
||||
{:ok, scheduled_activity} <- ScheduledActivity.create(user, attrs) do
|
||||
conn
|
||||
|> put_view(ScheduledActivityView)
|
||||
|> render("show.json", scheduled_activity: scheduled_activity)
|
||||
else
|
||||
create(conn, Map.drop(params, ["scheduled_at"]))
|
||||
{:far_enough, _} ->
|
||||
create(conn, Map.drop(params, ["scheduled_at"]))
|
||||
|
||||
error ->
|
||||
error
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
|||
|
|
@ -1,34 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.OAuth.Token.CleanWorker do
|
||||
@moduledoc """
|
||||
The module represents functions to clean an expired oauth tokens.
|
||||
"""
|
||||
use GenServer
|
||||
|
||||
@ten_seconds 10_000
|
||||
@one_day 86_400_000
|
||||
|
||||
alias Pleroma.Web.OAuth.Token
|
||||
alias Pleroma.Workers.BackgroundWorker
|
||||
|
||||
def start_link(_), do: GenServer.start_link(__MODULE__, %{})
|
||||
|
||||
def init(_) do
|
||||
Process.send_after(self(), :perform, @ten_seconds)
|
||||
{:ok, nil}
|
||||
end
|
||||
|
||||
@doc false
|
||||
def handle_info(:perform, state) do
|
||||
BackgroundWorker.enqueue("clean_expired_tokens", %{})
|
||||
interval = Pleroma.Config.get([:oauth2, :clean_expired_tokens_interval], @one_day)
|
||||
|
||||
Process.send_after(self(), :perform, interval)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def perform(:clean), do: Token.delete_expired_tokens()
|
||||
end
|
||||
|
|
@ -1,18 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.ActivityExpirationWorker do
|
||||
use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(
|
||||
%{
|
||||
"op" => "activity_expiration",
|
||||
"activity_expiration_id" => activity_expiration_id
|
||||
},
|
||||
_job
|
||||
) do
|
||||
Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id)
|
||||
end
|
||||
end
|
||||
|
|
@ -6,7 +6,6 @@ defmodule Pleroma.Workers.BackgroundWorker do
|
|||
alias Pleroma.Activity
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
|
||||
alias Pleroma.Web.OAuth.Token.CleanWorker
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "background"
|
||||
|
||||
|
|
@ -55,10 +54,6 @@ defmodule Pleroma.Workers.BackgroundWorker do
|
|||
User.perform(:follow_import, follower, followed_identifiers)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "clean_expired_tokens"}, _job) do
|
||||
CleanWorker.perform(:clean)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
|
||||
MediaProxyWarmingPolicy.perform(:preload, message)
|
||||
end
|
||||
|
|
|
|||
21
lib/pleroma/workers/cron/clear_oauth_token_worker.ex
Normal file
21
lib/pleroma/workers/cron/clear_oauth_token_worker.ex
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.Cron.ClearOauthTokenWorker do
|
||||
@moduledoc """
|
||||
The worker to cleanup expired oAuth tokens.
|
||||
"""
|
||||
|
||||
use Oban.Worker, queue: "background"
|
||||
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.Web.OAuth.Token
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(_opts, _job) do
|
||||
if Config.get([:oauth2, :clean_expired_tokens], false) do
|
||||
Token.delete_expired_tokens()
|
||||
end
|
||||
end
|
||||
end
|
||||
58
lib/pleroma/workers/cron/digest_emails_worker.ex
Normal file
58
lib/pleroma/workers/cron/digest_emails_worker.ex
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.Cron.DigestEmailsWorker do
|
||||
@moduledoc """
|
||||
The worker to send digest emails.
|
||||
"""
|
||||
|
||||
use Oban.Worker, queue: "digest_emails"
|
||||
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.Emails
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.User
|
||||
|
||||
import Ecto.Query
|
||||
|
||||
require Logger
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(_opts, _job) do
|
||||
config = Config.get([:email_notifications, :digest])
|
||||
|
||||
if config[:active] do
|
||||
negative_interval = -Map.fetch!(config, :interval)
|
||||
inactivity_threshold = Map.fetch!(config, :inactivity_threshold)
|
||||
inactive_users_query = User.list_inactive_users_query(inactivity_threshold)
|
||||
|
||||
now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
|
||||
|
||||
from(u in inactive_users_query,
|
||||
where: fragment(~s(? ->'digest' @> 'true'), u.email_notifications),
|
||||
where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
|
||||
select: u
|
||||
)
|
||||
|> Repo.all()
|
||||
|> send_emails
|
||||
end
|
||||
end
|
||||
|
||||
def send_emails(users) do
|
||||
Enum.each(users, &send_email/1)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Send digest email to the given user.
|
||||
Updates `last_digest_emailed_at` field for the user and returns the updated user.
|
||||
"""
|
||||
@spec send_email(User.t()) :: User.t()
|
||||
def send_email(user) do
|
||||
with %Swoosh.Email{} = email <- Emails.UserEmail.digest_email(user) do
|
||||
Emails.Mailer.deliver_async(email)
|
||||
end
|
||||
|
||||
User.touch_last_digest_emailed_at(user)
|
||||
end
|
||||
end
|
||||
46
lib/pleroma/workers/cron/purge_expired_activities_worker.ex
Normal file
46
lib/pleroma/workers/cron/purge_expired_activities_worker.ex
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do
|
||||
@moduledoc """
|
||||
The worker to purge expired activities.
|
||||
"""
|
||||
|
||||
use Oban.Worker, queue: "activity_expiration"
|
||||
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.ActivityExpiration
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.CommonAPI
|
||||
|
||||
require Logger
|
||||
|
||||
@interval :timer.minutes(1)
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(_opts, _job) do
|
||||
if Config.get([ActivityExpiration, :enabled]) do
|
||||
Enum.each(ActivityExpiration.due_expirations(@interval), &delete_activity/1)
|
||||
end
|
||||
end
|
||||
|
||||
def delete_activity(%ActivityExpiration{activity_id: activity_id}) do
|
||||
with {:activity, %Activity{} = activity} <-
|
||||
{:activity, Activity.get_by_id_with_object(activity_id)},
|
||||
{:user, %User{} = user} <- {:user, User.get_by_ap_id(activity.object.data["actor"])} do
|
||||
CommonAPI.delete(activity.id, user)
|
||||
else
|
||||
{:activity, _} ->
|
||||
Logger.error(
|
||||
"#{__MODULE__} Couldn't delete expired activity: not found activity ##{activity_id}"
|
||||
)
|
||||
|
||||
{:user, _} ->
|
||||
Logger.error(
|
||||
"#{__MODULE__} Couldn't delete expired activity: not found actorof ##{activity_id}"
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
16
lib/pleroma/workers/cron/stats_worker.ex
Normal file
16
lib/pleroma/workers/cron/stats_worker.ex
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.Cron.StatsWorker do
|
||||
@moduledoc """
|
||||
The worker to update peers statistics.
|
||||
"""
|
||||
|
||||
use Oban.Worker, queue: "background"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(_opts, _job) do
|
||||
Pleroma.Stats.do_collect()
|
||||
end
|
||||
end
|
||||
|
|
@ -1,16 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.DigestEmailsWorker do
|
||||
alias Pleroma.User
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
|
||||
user_id
|
||||
|> User.get_cached_by_id()
|
||||
|> Pleroma.Daemons.DigestEmailDaemon.perform()
|
||||
end
|
||||
end
|
||||
|
|
@ -3,10 +3,42 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.ScheduledActivityWorker do
|
||||
@moduledoc """
|
||||
The worker to post scheduled activity.
|
||||
"""
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
|
||||
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.ScheduledActivity
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.CommonAPI
|
||||
|
||||
require Logger
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
|
||||
Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id)
|
||||
def perform(%{"activity_id" => activity_id}, _job) do
|
||||
if Config.get([ScheduledActivity, :enabled]) do
|
||||
case Pleroma.Repo.get(ScheduledActivity, activity_id) do
|
||||
%ScheduledActivity{} = scheduled_activity ->
|
||||
post_activity(scheduled_activity)
|
||||
|
||||
_ ->
|
||||
Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp post_activity(%ScheduledActivity{user_id: user_id, params: params} = scheduled_activity) do
|
||||
with {:delete, {:ok, _}} <- {:delete, ScheduledActivity.delete(scheduled_activity)},
|
||||
{:user, %User{} = user} <- {:user, User.get_cached_by_id(user_id)},
|
||||
{:post, {:ok, _}} <- {:post, CommonAPI.post(user, params)} do
|
||||
:ok
|
||||
else
|
||||
error ->
|
||||
Logger.error(
|
||||
"#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}"
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue