Merge remote-tracking branch 'origin/develop' into link-verification
This commit is contained in:
commit
b7c625db0f
609 changed files with 6056 additions and 2607 deletions
|
|
@ -28,7 +28,7 @@ defmodule Pleroma.Workers.BackgroundWorker do
|
|||
def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => identifiers}})
|
||||
when op in ["blocks_import", "follow_import", "mutes_import"] do
|
||||
user = User.get_cached_by_id(user_id)
|
||||
{:ok, User.Import.perform(String.to_atom(op), user, identifiers)}
|
||||
{:ok, User.Import.perform(String.to_existing_atom(op), user, identifiers)}
|
||||
end
|
||||
|
||||
def perform(%Job{
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorker do
|
|||
The worker to send digest emails.
|
||||
"""
|
||||
|
||||
use Oban.Worker, queue: "digest_emails"
|
||||
use Oban.Worker, queue: "mailer"
|
||||
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.Emails
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do
|
|||
|
||||
import Ecto.Query
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "new_users_digest"
|
||||
use Pleroma.Workers.WorkerHelper, queue: "mailer"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(_job) do
|
||||
|
|
|
|||
|
|
@ -18,9 +18,9 @@ defmodule Pleroma.Workers.PublisherWorker do
|
|||
Federator.perform(:publish, activity)
|
||||
end
|
||||
|
||||
def perform(%Job{args: %{"op" => "publish_one", "module" => module_name, "params" => params}}) do
|
||||
def perform(%Job{args: %{"op" => "publish_one", "params" => params}}) do
|
||||
params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)
|
||||
Federator.perform(:publish_one, String.to_atom(module_name), params)
|
||||
Federator.perform(:publish_one, params)
|
||||
end
|
||||
|
||||
@impl Oban.Worker
|
||||
|
|
|
|||
|
|
@ -3,24 +3,56 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.ReceiverWorker do
|
||||
alias Pleroma.Signature
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.Federator
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
|
||||
|
||||
@impl Oban.Worker
|
||||
|
||||
def perform(%Job{
|
||||
args: %{"op" => "incoming_ap_doc", "req_headers" => req_headers, "params" => params}
|
||||
}) do
|
||||
# Oban's serialization converts our tuple headers to lists.
|
||||
# Revert it for the signature validation.
|
||||
req_headers = Enum.into(req_headers, [], &List.to_tuple(&1))
|
||||
|
||||
conn_data = %{params: params, req_headers: req_headers}
|
||||
|
||||
with {:ok, %User{} = _actor} <- User.get_or_fetch_by_ap_id(conn_data.params["actor"]),
|
||||
{:ok, _public_key} <- Signature.refetch_public_key(conn_data),
|
||||
{:signature, true} <- {:signature, HTTPSignatures.validate_conn(conn_data)},
|
||||
{:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
|
||||
{:ok, res}
|
||||
else
|
||||
e -> process_errors(e)
|
||||
end
|
||||
end
|
||||
|
||||
def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
|
||||
with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
|
||||
{:ok, res}
|
||||
else
|
||||
e -> process_errors(e)
|
||||
end
|
||||
end
|
||||
|
||||
@impl Oban.Worker
|
||||
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
|
||||
|
||||
def timeout(_job), do: :timer.seconds(5)
|
||||
|
||||
defp process_errors(errors) do
|
||||
case errors do
|
||||
{:error, :origin_containment_failed} -> {:cancel, :origin_containment_failed}
|
||||
{:error, :already_present} -> {:cancel, :already_present}
|
||||
{:error, {:validate_object, reason}} -> {:cancel, reason}
|
||||
{:error, {:error, {:validate, reason}}} -> {:cancel, reason}
|
||||
{:error, {:reject, reason}} -> {:cancel, reason}
|
||||
{:signature, false} -> {:cancel, :invalid_signature}
|
||||
{:error, {:error, reason = "Object has been deleted"}} -> {:cancel, reason}
|
||||
e -> e
|
||||
end
|
||||
end
|
||||
|
||||
@impl Oban.Worker
|
||||
def timeout(_job), do: :timer.seconds(5)
|
||||
end
|
||||
|
|
|
|||
|
|
@ -9,7 +9,25 @@ defmodule Pleroma.Workers.RemoteFetcherWorker do
|
|||
|
||||
@impl Oban.Worker
|
||||
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
|
||||
{:ok, _object} = Fetcher.fetch_object_from_id(id, depth: args["depth"])
|
||||
case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do
|
||||
{:ok, _object} ->
|
||||
:ok
|
||||
|
||||
{:error, :forbidden} ->
|
||||
{:discard, :forbidden}
|
||||
|
||||
{:error, :not_found} ->
|
||||
{:discard, :not_found}
|
||||
|
||||
{:error, :allowed_depth} ->
|
||||
{:discard, :allowed_depth}
|
||||
|
||||
{:error, _} = e ->
|
||||
e
|
||||
|
||||
e ->
|
||||
{:error, e}
|
||||
end
|
||||
end
|
||||
|
||||
@impl Oban.Worker
|
||||
|
|
|
|||
23
lib/pleroma/workers/search_indexing_worker.ex
Normal file
23
lib/pleroma/workers/search_indexing_worker.ex
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
defmodule Pleroma.Workers.SearchIndexingWorker do
|
||||
use Pleroma.Workers.WorkerHelper, queue: "search_indexing"
|
||||
|
||||
@impl Oban.Worker
|
||||
|
||||
alias Pleroma.Config.Getting, as: Config
|
||||
|
||||
def perform(%Job{args: %{"op" => "add_to_index", "activity" => activity_id}}) do
|
||||
activity = Pleroma.Activity.get_by_id_with_object(activity_id)
|
||||
|
||||
search_module = Config.get([Pleroma.Search, :module])
|
||||
|
||||
search_module.add_to_index(activity)
|
||||
end
|
||||
|
||||
def perform(%Job{args: %{"op" => "remove_from_index", "object" => object_id}}) do
|
||||
object = Pleroma.Object.get_by_id(object_id)
|
||||
|
||||
search_module = Config.get([Pleroma.Search, :module])
|
||||
|
||||
search_module.remove_from_index(object)
|
||||
end
|
||||
end
|
||||
Loading…
Add table
Add a link
Reference in a new issue