Merge branch 'develop' into issue/733
This commit is contained in:
commit
d75bc728e7
81 changed files with 4534 additions and 1014 deletions
42
lib/mix/tasks/pleroma/docs.ex
Normal file
42
lib/mix/tasks/pleroma/docs.ex
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
defmodule Mix.Tasks.Pleroma.Docs do
|
||||
use Mix.Task
|
||||
import Mix.Pleroma
|
||||
|
||||
@shortdoc "Generates docs from descriptions.exs"
|
||||
@moduledoc """
|
||||
Generates docs from `descriptions.exs`.
|
||||
|
||||
Supports two formats: `markdown` and `json`.
|
||||
|
||||
## Generate Markdown docs
|
||||
|
||||
`mix pleroma.docs`
|
||||
|
||||
## Generate JSON docs
|
||||
|
||||
`mix pleroma.docs json`
|
||||
"""
|
||||
|
||||
def run(["json"]) do
|
||||
do_run(Pleroma.Docs.JSON)
|
||||
end
|
||||
|
||||
def run(_) do
|
||||
do_run(Pleroma.Docs.Markdown)
|
||||
end
|
||||
|
||||
defp do_run(implementation) do
|
||||
start_pleroma()
|
||||
|
||||
with {descriptions, _paths} <- Mix.Config.eval!("config/description.exs"),
|
||||
{:ok, file_path} <-
|
||||
Pleroma.Docs.Generator.process(
|
||||
implementation,
|
||||
descriptions[:pleroma][:config_description]
|
||||
) do
|
||||
type = if implementation == Pleroma.Docs.Markdown, do: "Markdown", else: "JSON"
|
||||
|
||||
Mix.shell().info([:green, "#{type} docs successfully generated to #{file_path}."])
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
@ -6,6 +6,7 @@ defmodule Pleroma.Activity do
|
|||
use Ecto.Schema
|
||||
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Activity.Queries
|
||||
alias Pleroma.ActivityExpiration
|
||||
alias Pleroma.Bookmark
|
||||
alias Pleroma.Notification
|
||||
|
|
@ -65,8 +66,8 @@ defmodule Pleroma.Activity do
|
|||
timestamps()
|
||||
end
|
||||
|
||||
def with_joined_object(query) do
|
||||
join(query, :inner, [activity], o in Object,
|
||||
def with_joined_object(query, join_type \\ :inner) do
|
||||
join(query, join_type, [activity], o in Object,
|
||||
on:
|
||||
fragment(
|
||||
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
|
||||
|
|
@ -78,10 +79,10 @@ defmodule Pleroma.Activity do
|
|||
)
|
||||
end
|
||||
|
||||
def with_preloaded_object(query) do
|
||||
def with_preloaded_object(query, join_type \\ :inner) do
|
||||
query
|
||||
|> has_named_binding?(:object)
|
||||
|> if(do: query, else: with_joined_object(query))
|
||||
|> if(do: query, else: with_joined_object(query, join_type))
|
||||
|> preload([activity, object: object], object: object)
|
||||
end
|
||||
|
||||
|
|
@ -107,12 +108,9 @@ defmodule Pleroma.Activity do
|
|||
def with_set_thread_muted_field(query, _), do: query
|
||||
|
||||
def get_by_ap_id(ap_id) do
|
||||
Repo.one(
|
||||
from(
|
||||
activity in Activity,
|
||||
where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id))
|
||||
)
|
||||
)
|
||||
ap_id
|
||||
|> Queries.by_ap_id()
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
def get_bookmark(%Activity{} = activity, %User{} = user) do
|
||||
|
|
@ -133,21 +131,10 @@ defmodule Pleroma.Activity do
|
|||
end
|
||||
|
||||
def get_by_ap_id_with_object(ap_id) do
|
||||
Repo.one(
|
||||
from(
|
||||
activity in Activity,
|
||||
where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id)),
|
||||
left_join: o in Object,
|
||||
on:
|
||||
fragment(
|
||||
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
|
||||
o.data,
|
||||
activity.data,
|
||||
activity.data
|
||||
),
|
||||
preload: [object: o]
|
||||
)
|
||||
)
|
||||
ap_id
|
||||
|> Queries.by_ap_id()
|
||||
|> with_preloaded_object(:left)
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
def get_by_id(id) do
|
||||
|
|
@ -158,18 +145,9 @@ defmodule Pleroma.Activity do
|
|||
end
|
||||
|
||||
def get_by_id_with_object(id) do
|
||||
from(activity in Activity,
|
||||
where: activity.id == ^id,
|
||||
inner_join: o in Object,
|
||||
on:
|
||||
fragment(
|
||||
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
|
||||
o.data,
|
||||
activity.data,
|
||||
activity.data
|
||||
),
|
||||
preload: [object: o]
|
||||
)
|
||||
Activity
|
||||
|> where(id: ^id)
|
||||
|> with_preloaded_object()
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
|
|
@ -180,51 +158,21 @@ defmodule Pleroma.Activity do
|
|||
|> Repo.all()
|
||||
end
|
||||
|
||||
def by_object_ap_id(ap_id) do
|
||||
from(
|
||||
activity in Activity,
|
||||
where:
|
||||
fragment(
|
||||
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
|
||||
activity.data,
|
||||
activity.data,
|
||||
^to_string(ap_id)
|
||||
)
|
||||
)
|
||||
@doc """
|
||||
Accepts `ap_id` or list of `ap_id`.
|
||||
Returns a query.
|
||||
"""
|
||||
@spec create_by_object_ap_id(String.t() | [String.t()]) :: Ecto.Queryable.t()
|
||||
def create_by_object_ap_id(ap_id) do
|
||||
ap_id
|
||||
|> Queries.by_object_id()
|
||||
|> Queries.by_type("Create")
|
||||
end
|
||||
|
||||
def create_by_object_ap_id(ap_ids) when is_list(ap_ids) do
|
||||
from(
|
||||
activity in Activity,
|
||||
where:
|
||||
fragment(
|
||||
"coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
|
||||
activity.data,
|
||||
activity.data,
|
||||
^ap_ids
|
||||
),
|
||||
where: fragment("(?)->>'type' = 'Create'", activity.data)
|
||||
)
|
||||
end
|
||||
|
||||
def create_by_object_ap_id(ap_id) when is_binary(ap_id) do
|
||||
from(
|
||||
activity in Activity,
|
||||
where:
|
||||
fragment(
|
||||
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
|
||||
activity.data,
|
||||
activity.data,
|
||||
^to_string(ap_id)
|
||||
),
|
||||
where: fragment("(?)->>'type' = 'Create'", activity.data)
|
||||
)
|
||||
end
|
||||
|
||||
def create_by_object_ap_id(_), do: nil
|
||||
|
||||
def get_all_create_by_object_ap_id(ap_id) do
|
||||
Repo.all(create_by_object_ap_id(ap_id))
|
||||
ap_id
|
||||
|> create_by_object_ap_id()
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
def get_create_by_object_ap_id(ap_id) when is_binary(ap_id) do
|
||||
|
|
@ -235,54 +183,17 @@ defmodule Pleroma.Activity do
|
|||
|
||||
def get_create_by_object_ap_id(_), do: nil
|
||||
|
||||
def create_by_object_ap_id_with_object(ap_ids) when is_list(ap_ids) do
|
||||
from(
|
||||
activity in Activity,
|
||||
where:
|
||||
fragment(
|
||||
"coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
|
||||
activity.data,
|
||||
activity.data,
|
||||
^ap_ids
|
||||
),
|
||||
where: fragment("(?)->>'type' = 'Create'", activity.data),
|
||||
inner_join: o in Object,
|
||||
on:
|
||||
fragment(
|
||||
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
|
||||
o.data,
|
||||
activity.data,
|
||||
activity.data
|
||||
),
|
||||
preload: [object: o]
|
||||
)
|
||||
@doc """
|
||||
Accepts `ap_id` or list of `ap_id`.
|
||||
Returns a query.
|
||||
"""
|
||||
@spec create_by_object_ap_id_with_object(String.t() | [String.t()]) :: Ecto.Queryable.t()
|
||||
def create_by_object_ap_id_with_object(ap_id) do
|
||||
ap_id
|
||||
|> create_by_object_ap_id()
|
||||
|> with_preloaded_object()
|
||||
end
|
||||
|
||||
def create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
|
||||
from(
|
||||
activity in Activity,
|
||||
where:
|
||||
fragment(
|
||||
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
|
||||
activity.data,
|
||||
activity.data,
|
||||
^to_string(ap_id)
|
||||
),
|
||||
where: fragment("(?)->>'type' = 'Create'", activity.data),
|
||||
inner_join: o in Object,
|
||||
on:
|
||||
fragment(
|
||||
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
|
||||
o.data,
|
||||
activity.data,
|
||||
activity.data
|
||||
),
|
||||
preload: [object: o]
|
||||
)
|
||||
end
|
||||
|
||||
def create_by_object_ap_id_with_object(_), do: nil
|
||||
|
||||
def get_create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
|
||||
ap_id
|
||||
|> create_by_object_ap_id_with_object()
|
||||
|
|
@ -306,7 +217,8 @@ defmodule Pleroma.Activity do
|
|||
def normalize(_), do: nil
|
||||
|
||||
def delete_by_ap_id(id) when is_binary(id) do
|
||||
by_object_ap_id(id)
|
||||
id
|
||||
|> Queries.by_object_id()
|
||||
|> select([u], u)
|
||||
|> Repo.delete_all()
|
||||
|> elem(1)
|
||||
|
|
@ -350,31 +262,10 @@ defmodule Pleroma.Activity do
|
|||
end
|
||||
|
||||
def follow_requests_for_actor(%Pleroma.User{ap_id: ap_id}) do
|
||||
from(
|
||||
a in Activity,
|
||||
where:
|
||||
fragment(
|
||||
"? ->> 'type' = 'Follow'",
|
||||
a.data
|
||||
),
|
||||
where:
|
||||
fragment(
|
||||
"? ->> 'state' = 'pending'",
|
||||
a.data
|
||||
),
|
||||
where:
|
||||
fragment(
|
||||
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
|
||||
a.data,
|
||||
a.data,
|
||||
^ap_id
|
||||
)
|
||||
)
|
||||
end
|
||||
|
||||
@spec query_by_actor(actor()) :: Ecto.Query.t()
|
||||
def query_by_actor(actor) do
|
||||
from(a in Activity, where: a.actor == ^actor)
|
||||
ap_id
|
||||
|> Queries.by_object_id()
|
||||
|> Queries.by_type("Follow")
|
||||
|> where([a], fragment("? ->> 'state' = 'pending'", a.data))
|
||||
end
|
||||
|
||||
def restrict_deactivated_users(query) do
|
||||
|
|
|
|||
|
|
@ -13,6 +13,14 @@ defmodule Pleroma.Activity.Queries do
|
|||
|
||||
alias Pleroma.Activity
|
||||
|
||||
@spec by_ap_id(query, String.t()) :: query
|
||||
def by_ap_id(query \\ Activity, ap_id) do
|
||||
from(
|
||||
activity in query,
|
||||
where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id))
|
||||
)
|
||||
end
|
||||
|
||||
@spec by_actor(query, String.t()) :: query
|
||||
def by_actor(query \\ Activity, actor) do
|
||||
from(
|
||||
|
|
@ -21,8 +29,23 @@ defmodule Pleroma.Activity.Queries do
|
|||
)
|
||||
end
|
||||
|
||||
@spec by_object_id(query, String.t()) :: query
|
||||
def by_object_id(query \\ Activity, object_id) do
|
||||
@spec by_object_id(query, String.t() | [String.t()]) :: query
|
||||
def by_object_id(query \\ Activity, object_id)
|
||||
|
||||
def by_object_id(query, object_ids) when is_list(object_ids) do
|
||||
from(
|
||||
activity in query,
|
||||
where:
|
||||
fragment(
|
||||
"coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
|
||||
activity.data,
|
||||
activity.data,
|
||||
^object_ids
|
||||
)
|
||||
)
|
||||
end
|
||||
|
||||
def by_object_id(query, object_id) when is_binary(object_id) do
|
||||
from(activity in query,
|
||||
where:
|
||||
fragment(
|
||||
|
|
@ -41,9 +64,4 @@ defmodule Pleroma.Activity.Queries do
|
|||
where: fragment("(?)->>'type' = ?", activity.data, ^activity_type)
|
||||
)
|
||||
end
|
||||
|
||||
@spec limit(query, pos_integer()) :: query
|
||||
def limit(query \\ Activity, limit) do
|
||||
from(activity in query, limit: ^limit)
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -31,18 +31,19 @@ defmodule Pleroma.Application do
|
|||
children =
|
||||
[
|
||||
Pleroma.Repo,
|
||||
Pleroma.Scheduler,
|
||||
Pleroma.Config.TransferTask,
|
||||
Pleroma.Emoji,
|
||||
Pleroma.Captcha,
|
||||
Pleroma.FlakeId,
|
||||
Pleroma.ScheduledActivityWorker,
|
||||
Pleroma.ActivityExpirationWorker
|
||||
Pleroma.Daemons.ScheduledActivityDaemon,
|
||||
Pleroma.Daemons.ActivityExpirationDaemon
|
||||
] ++
|
||||
cachex_children() ++
|
||||
hackney_pool_children() ++
|
||||
[
|
||||
Pleroma.Web.Federator.RetryQueue,
|
||||
Pleroma.Stats,
|
||||
{Oban, Pleroma.Config.get(Oban)},
|
||||
%{
|
||||
id: :web_push_init,
|
||||
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
|
||||
|
|
@ -70,9 +71,7 @@ defmodule Pleroma.Application do
|
|||
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
|
||||
# for other strategies and supported options
|
||||
opts = [strategy: :one_for_one, name: Pleroma.Supervisor]
|
||||
result = Supervisor.start_link(children, opts)
|
||||
:ok = after_supervisor_start()
|
||||
result
|
||||
Supervisor.start_link(children, opts)
|
||||
end
|
||||
|
||||
defp setup_instrumenters do
|
||||
|
|
@ -164,17 +163,4 @@ defmodule Pleroma.Application do
|
|||
:hackney_pool.child_spec(pool, options)
|
||||
end
|
||||
end
|
||||
|
||||
defp after_supervisor_start do
|
||||
with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
|
||||
true <- digest_config[:active] do
|
||||
PleromaJobQueue.schedule(
|
||||
digest_config[:schedule],
|
||||
:digest_emails,
|
||||
Pleroma.DigestEmailWorker
|
||||
)
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -2,13 +2,14 @@
|
|||
# Copyright © 2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.ActivityExpirationWorker do
|
||||
defmodule Pleroma.Daemons.ActivityExpirationDaemon do
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.ActivityExpiration
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.CommonAPI
|
||||
|
||||
require Logger
|
||||
use GenServer
|
||||
import Ecto.Query
|
||||
|
|
@ -49,7 +50,10 @@ defmodule Pleroma.ActivityExpirationWorker do
|
|||
def handle_info(:perform, state) do
|
||||
ActivityExpiration.due_expirations(@schedule_interval)
|
||||
|> Enum.each(fn expiration ->
|
||||
PleromaJobQueue.enqueue(:activity_expiration, __MODULE__, [:execute, expiration.id])
|
||||
Pleroma.Workers.ActivityExpirationWorker.enqueue(
|
||||
"activity_expiration",
|
||||
%{"activity_expiration_id" => expiration.id}
|
||||
)
|
||||
end)
|
||||
|
||||
schedule_next()
|
||||
|
|
@ -2,10 +2,11 @@
|
|||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.DigestEmailWorker do
|
||||
import Ecto.Query
|
||||
defmodule Pleroma.Daemons.DigestEmailDaemon do
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.Workers.DigestEmailsWorker
|
||||
|
||||
@queue_name :digest_emails
|
||||
import Ecto.Query
|
||||
|
||||
def perform do
|
||||
config = Pleroma.Config.get([:email_notifications, :digest])
|
||||
|
|
@ -20,8 +21,10 @@ defmodule Pleroma.DigestEmailWorker do
|
|||
where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
|
||||
select: u
|
||||
)
|
||||
|> Pleroma.Repo.all()
|
||||
|> Enum.each(&PleromaJobQueue.enqueue(@queue_name, __MODULE__, [&1]))
|
||||
|> Repo.all()
|
||||
|> Enum.each(fn user ->
|
||||
DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id})
|
||||
end)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
|
@ -2,7 +2,7 @@
|
|||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.ScheduledActivityWorker do
|
||||
defmodule Pleroma.Daemons.ScheduledActivityDaemon do
|
||||
@moduledoc """
|
||||
Sends scheduled activities to the job queue.
|
||||
"""
|
||||
|
|
@ -11,6 +11,7 @@ defmodule Pleroma.ScheduledActivityWorker do
|
|||
alias Pleroma.ScheduledActivity
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.CommonAPI
|
||||
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
|
|
@ -45,7 +46,10 @@ defmodule Pleroma.ScheduledActivityWorker do
|
|||
def handle_info(:perform, state) do
|
||||
ScheduledActivity.due_activities(@schedule_interval)
|
||||
|> Enum.each(fn scheduled_activity ->
|
||||
PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id])
|
||||
Pleroma.Workers.ScheduledActivityWorker.enqueue(
|
||||
"execute",
|
||||
%{"activity_id" => scheduled_activity.id}
|
||||
)
|
||||
end)
|
||||
|
||||
schedule_next()
|
||||
73
lib/pleroma/docs/generator.ex
Normal file
73
lib/pleroma/docs/generator.ex
Normal file
|
|
@ -0,0 +1,73 @@
|
|||
defmodule Pleroma.Docs.Generator do
|
||||
@callback process(keyword()) :: {:ok, String.t()}
|
||||
|
||||
@spec process(module(), keyword()) :: {:ok, String.t()}
|
||||
def process(implementation, descriptions) do
|
||||
implementation.process(descriptions)
|
||||
end
|
||||
|
||||
@spec uploaders_list() :: [module()]
|
||||
def uploaders_list do
|
||||
{:ok, modules} = :application.get_key(:pleroma, :modules)
|
||||
|
||||
Enum.filter(modules, fn module ->
|
||||
name_as_list = Module.split(module)
|
||||
|
||||
List.starts_with?(name_as_list, ["Pleroma", "Uploaders"]) and
|
||||
List.last(name_as_list) != "Uploader"
|
||||
end)
|
||||
end
|
||||
|
||||
@spec filters_list() :: [module()]
|
||||
def filters_list do
|
||||
{:ok, modules} = :application.get_key(:pleroma, :modules)
|
||||
|
||||
Enum.filter(modules, fn module ->
|
||||
name_as_list = Module.split(module)
|
||||
|
||||
List.starts_with?(name_as_list, ["Pleroma", "Upload", "Filter"])
|
||||
end)
|
||||
end
|
||||
|
||||
@spec mrf_list() :: [module()]
|
||||
def mrf_list do
|
||||
{:ok, modules} = :application.get_key(:pleroma, :modules)
|
||||
|
||||
Enum.filter(modules, fn module ->
|
||||
name_as_list = Module.split(module)
|
||||
|
||||
List.starts_with?(name_as_list, ["Pleroma", "Web", "ActivityPub", "MRF"]) and
|
||||
length(name_as_list) > 4
|
||||
end)
|
||||
end
|
||||
|
||||
@spec richmedia_parsers() :: [module()]
|
||||
def richmedia_parsers do
|
||||
{:ok, modules} = :application.get_key(:pleroma, :modules)
|
||||
|
||||
Enum.filter(modules, fn module ->
|
||||
name_as_list = Module.split(module)
|
||||
|
||||
List.starts_with?(name_as_list, ["Pleroma", "Web", "RichMedia", "Parsers"]) and
|
||||
length(name_as_list) == 5
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
||||
defimpl Jason.Encoder, for: Tuple do
|
||||
def encode(tuple, opts) do
|
||||
Jason.Encode.list(Tuple.to_list(tuple), opts)
|
||||
end
|
||||
end
|
||||
|
||||
defimpl Jason.Encoder, for: [Regex, Function] do
|
||||
def encode(term, opts) do
|
||||
Jason.Encode.string(inspect(term), opts)
|
||||
end
|
||||
end
|
||||
|
||||
defimpl String.Chars, for: Regex do
|
||||
def to_string(term) do
|
||||
inspect(term)
|
||||
end
|
||||
end
|
||||
20
lib/pleroma/docs/json.ex
Normal file
20
lib/pleroma/docs/json.ex
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
defmodule Pleroma.Docs.JSON do
|
||||
@behaviour Pleroma.Docs.Generator
|
||||
|
||||
@spec process(keyword()) :: {:ok, String.t()}
|
||||
def process(descriptions) do
|
||||
config_path = "docs/generate_config.json"
|
||||
|
||||
with {:ok, file} <- File.open(config_path, [:write]),
|
||||
json <- generate_json(descriptions),
|
||||
:ok <- IO.write(file, json),
|
||||
:ok <- File.close(file) do
|
||||
{:ok, config_path}
|
||||
end
|
||||
end
|
||||
|
||||
@spec generate_json([keyword()]) :: String.t()
|
||||
def generate_json(descriptions) do
|
||||
Jason.encode!(descriptions)
|
||||
end
|
||||
end
|
||||
78
lib/pleroma/docs/markdown.ex
Normal file
78
lib/pleroma/docs/markdown.ex
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
defmodule Pleroma.Docs.Markdown do
|
||||
@behaviour Pleroma.Docs.Generator
|
||||
|
||||
@spec process(keyword()) :: {:ok, String.t()}
|
||||
def process(descriptions) do
|
||||
config_path = "docs/generated_config.md"
|
||||
{:ok, file} = File.open(config_path, [:utf8, :write])
|
||||
IO.write(file, "# Generated configuration\n")
|
||||
IO.write(file, "Date of generation: #{Date.utc_today()}\n\n")
|
||||
|
||||
IO.write(
|
||||
file,
|
||||
"This file describe the configuration, it is recommended to edit the relevant `*.secret.exs` file instead of the others founds in the ``config`` directory.\n\n" <>
|
||||
"If you run Pleroma with ``MIX_ENV=prod`` the file is ``prod.secret.exs``, otherwise it is ``dev.secret.exs``.\n\n"
|
||||
)
|
||||
|
||||
for group <- descriptions do
|
||||
if is_nil(group[:key]) do
|
||||
IO.write(file, "## #{inspect(group[:group])}\n")
|
||||
else
|
||||
IO.write(file, "## #{inspect(group[:key])}\n")
|
||||
end
|
||||
|
||||
IO.write(file, "#{group[:description]}\n")
|
||||
|
||||
for child <- group[:children] do
|
||||
print_child_header(file, child)
|
||||
|
||||
print_suggestions(file, child[:suggestions])
|
||||
|
||||
if child[:children] do
|
||||
for subchild <- child[:children] do
|
||||
print_child_header(file, subchild)
|
||||
|
||||
print_suggestions(file, subchild[:suggestions])
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
IO.write(file, "\n")
|
||||
end
|
||||
|
||||
:ok = File.close(file)
|
||||
{:ok, config_path}
|
||||
end
|
||||
|
||||
defp print_suggestion(file, suggestion) when is_list(suggestion) do
|
||||
IO.write(file, " `#{inspect(suggestion)}`\n")
|
||||
end
|
||||
|
||||
defp print_suggestion(file, suggestion) when is_function(suggestion) do
|
||||
IO.write(file, " `#{inspect(suggestion.())}`\n")
|
||||
end
|
||||
|
||||
defp print_suggestion(file, suggestion, as_list \\ false) do
|
||||
list_mark = if as_list, do: "- ", else: ""
|
||||
IO.write(file, " #{list_mark}`#{inspect(suggestion)}`\n")
|
||||
end
|
||||
|
||||
defp print_suggestions(_file, nil), do: nil
|
||||
|
||||
defp print_suggestions(file, suggestions) do
|
||||
IO.write(file, "Suggestions:\n")
|
||||
|
||||
if length(suggestions) > 1 do
|
||||
for suggestion <- suggestions do
|
||||
print_suggestion(file, suggestion, true)
|
||||
end
|
||||
else
|
||||
print_suggestion(file, List.first(suggestions))
|
||||
end
|
||||
end
|
||||
|
||||
defp print_child_header(file, child) do
|
||||
IO.write(file, "- `#{inspect(child[:key])}` -`#{inspect(child[:type])}` \n")
|
||||
IO.write(file, "#{child[:description]} \n")
|
||||
end
|
||||
end
|
||||
|
|
@ -9,6 +9,7 @@ defmodule Pleroma.Emails.Mailer do
|
|||
The module contains functions to delivery email using Swoosh.Mailer.
|
||||
"""
|
||||
|
||||
alias Pleroma.Workers.MailerWorker
|
||||
alias Swoosh.DeliveryError
|
||||
|
||||
@otp_app :pleroma
|
||||
|
|
@ -19,7 +20,12 @@ defmodule Pleroma.Emails.Mailer do
|
|||
|
||||
@doc "add email to queue"
|
||||
def deliver_async(email, config \\ []) do
|
||||
PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config])
|
||||
encoded_email =
|
||||
email
|
||||
|> :erlang.term_to_binary()
|
||||
|> Base.encode64()
|
||||
|
||||
MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config})
|
||||
end
|
||||
|
||||
@doc "callback to perform send email from queue"
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ defmodule Pleroma.Instances.Instance do
|
|||
def set_unreachable(url_or_host, unreachable_since \\ nil)
|
||||
|
||||
def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do
|
||||
unreachable_since = unreachable_since || DateTime.utc_now()
|
||||
unreachable_since = parse_datetime(unreachable_since) || NaiveDateTime.utc_now()
|
||||
host = host(url_or_host)
|
||||
existing_record = Repo.get_by(Instance, %{host: host})
|
||||
|
||||
|
|
@ -114,4 +114,10 @@ defmodule Pleroma.Instances.Instance do
|
|||
end
|
||||
|
||||
def set_unreachable(_, _), do: {:error, nil}
|
||||
|
||||
defp parse_datetime(datetime) when is_binary(datetime) do
|
||||
NaiveDateTime.from_iso8601(datetime)
|
||||
end
|
||||
|
||||
defp parse_datetime(datetime), do: datetime
|
||||
end
|
||||
|
|
|
|||
7
lib/pleroma/scheduler.ex
Normal file
7
lib/pleroma/scheduler.ex
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Scheduler do
|
||||
use Quantum.Scheduler, otp_app: :pleroma
|
||||
end
|
||||
|
|
@ -27,6 +27,7 @@ defmodule Pleroma.User do
|
|||
alias Pleroma.Web.OStatus
|
||||
alias Pleroma.Web.RelMe
|
||||
alias Pleroma.Web.Websub
|
||||
alias Pleroma.Workers.BackgroundWorker
|
||||
|
||||
require Logger
|
||||
|
||||
|
|
@ -174,11 +175,25 @@ defmodule Pleroma.User do
|
|||
|> Repo.aggregate(:count, :id)
|
||||
end
|
||||
|
||||
defp truncate_if_exists(params, key, max_length) do
|
||||
if Map.has_key?(params, key) and is_binary(params[key]) do
|
||||
{value, _chopped} = String.split_at(params[key], max_length)
|
||||
Map.put(params, key, value)
|
||||
else
|
||||
params
|
||||
end
|
||||
end
|
||||
|
||||
def remote_user_creation(params) do
|
||||
bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000)
|
||||
name_limit = Pleroma.Config.get([:instance, :user_name_length], 100)
|
||||
|
||||
params = Map.put(params, :info, params[:info] || %{})
|
||||
params =
|
||||
params
|
||||
|> Map.put(:info, params[:info] || %{})
|
||||
|> truncate_if_exists(:name, name_limit)
|
||||
|> truncate_if_exists(:bio, bio_limit)
|
||||
|
||||
info_cng = User.Info.remote_user_creation(%User.Info{}, params[:info])
|
||||
|
||||
changes =
|
||||
|
|
@ -633,8 +648,9 @@ defmodule Pleroma.User do
|
|||
end
|
||||
|
||||
@doc "Fetch some posts when the user has just been federated with"
|
||||
def fetch_initial_posts(user),
|
||||
do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user])
|
||||
def fetch_initial_posts(user) do
|
||||
BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id})
|
||||
end
|
||||
|
||||
@spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
|
||||
def get_followers_query(%User{} = user, nil) do
|
||||
|
|
@ -1064,7 +1080,7 @@ defmodule Pleroma.User do
|
|||
end
|
||||
|
||||
def deactivate_async(user, status \\ true) do
|
||||
PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status])
|
||||
BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status})
|
||||
end
|
||||
|
||||
def deactivate(%User{} = user, status \\ true) do
|
||||
|
|
@ -1092,9 +1108,9 @@ defmodule Pleroma.User do
|
|||
|> update_and_set_cache()
|
||||
end
|
||||
|
||||
@spec delete(User.t()) :: :ok
|
||||
def delete(%User{} = user),
|
||||
do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user])
|
||||
def delete(%User{} = user) do
|
||||
BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id})
|
||||
end
|
||||
|
||||
@spec perform(atom(), User.t()) :: {:ok, User.t()}
|
||||
def perform(:delete, %User{} = user) do
|
||||
|
|
@ -1201,25 +1217,24 @@ defmodule Pleroma.User do
|
|||
Repo.all(query)
|
||||
end
|
||||
|
||||
def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers),
|
||||
do:
|
||||
PleromaJobQueue.enqueue(:background, __MODULE__, [
|
||||
:blocks_import,
|
||||
blocker,
|
||||
blocked_identifiers
|
||||
])
|
||||
def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
|
||||
BackgroundWorker.enqueue("blocks_import", %{
|
||||
"blocker_id" => blocker.id,
|
||||
"blocked_identifiers" => blocked_identifiers
|
||||
})
|
||||
end
|
||||
|
||||
def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers),
|
||||
do:
|
||||
PleromaJobQueue.enqueue(:background, __MODULE__, [
|
||||
:follow_import,
|
||||
follower,
|
||||
followed_identifiers
|
||||
])
|
||||
def follow_import(%User{} = follower, followed_identifiers)
|
||||
when is_list(followed_identifiers) do
|
||||
BackgroundWorker.enqueue("follow_import", %{
|
||||
"follower_id" => follower.id,
|
||||
"followed_identifiers" => followed_identifiers
|
||||
})
|
||||
end
|
||||
|
||||
def delete_user_activities(%User{ap_id: ap_id} = user) do
|
||||
ap_id
|
||||
|> Activity.query_by_actor()
|
||||
|> Activity.Queries.by_actor()
|
||||
|> RepoStreamer.chunk_stream(50)
|
||||
|> Stream.each(fn activities ->
|
||||
Enum.each(activities, &delete_activity(&1))
|
||||
|
|
@ -1624,4 +1639,13 @@ defmodule Pleroma.User do
|
|||
def is_internal_user?(%User{nickname: nil}), do: true
|
||||
def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true
|
||||
def is_internal_user?(_), do: false
|
||||
|
||||
def change_email(user, email) do
|
||||
user
|
||||
|> cast(%{email: email}, [:email])
|
||||
|> validate_required([:email])
|
||||
|> unique_constraint(:email)
|
||||
|> validate_format(:email, @email_regex)
|
||||
|> update_and_set_cache()
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -242,6 +242,13 @@ defmodule Pleroma.User.Info do
|
|||
end
|
||||
|
||||
def remote_user_creation(info, params) do
|
||||
params =
|
||||
if Map.has_key?(params, :fields) do
|
||||
Map.put(params, :fields, Enum.map(params[:fields], &truncate_field/1))
|
||||
else
|
||||
params
|
||||
end
|
||||
|
||||
info
|
||||
|> cast(params, [
|
||||
:ap_enabled,
|
||||
|
|
@ -326,6 +333,16 @@ defmodule Pleroma.User.Info do
|
|||
|
||||
defp valid_field?(_), do: false
|
||||
|
||||
defp truncate_field(%{"name" => name, "value" => value}) do
|
||||
{name, _chopped} =
|
||||
String.split_at(name, Pleroma.Config.get([:instance, :account_field_name_length], 255))
|
||||
|
||||
{value, _chopped} =
|
||||
String.split_at(value, Pleroma.Config.get([:instance, :account_field_value_length], 255))
|
||||
|
||||
%{"name" => name, "value" => value}
|
||||
end
|
||||
|
||||
@spec confirmation_changeset(Info.t(), keyword()) :: Changeset.t()
|
||||
def confirmation_changeset(info, opts) do
|
||||
need_confirmation? = Keyword.get(opts, :need_confirmation)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
|
|||
alias Pleroma.Web.ActivityPub.MRF
|
||||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||
alias Pleroma.Web.WebFinger
|
||||
alias Pleroma.Workers.BackgroundWorker
|
||||
|
||||
import Ecto.Query
|
||||
import Pleroma.Web.ActivityPub.Utils
|
||||
|
|
@ -145,7 +146,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
|
|||
activity
|
||||
end
|
||||
|
||||
PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
|
||||
BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
|
||||
|
||||
Notification.create_notifications(activity)
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
|
|||
|
||||
alias Pleroma.HTTP
|
||||
alias Pleroma.Web.MediaProxy
|
||||
alias Pleroma.Workers.BackgroundWorker
|
||||
|
||||
require Logger
|
||||
|
||||
|
|
@ -30,7 +31,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
|
|||
url
|
||||
|> Enum.each(fn
|
||||
%{"href" => href} ->
|
||||
PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href])
|
||||
BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href})
|
||||
|
||||
x ->
|
||||
Logger.debug("Unhandled attachment URL object #{inspect(x)}")
|
||||
|
|
@ -46,7 +47,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
|
|||
%{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
|
||||
)
|
||||
when is_list(attachments) and length(attachments) > 0 do
|
||||
PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message])
|
||||
BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message})
|
||||
|
||||
{:ok, message}
|
||||
end
|
||||
|
|
|
|||
|
|
@ -84,6 +84,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|
|||
end
|
||||
end
|
||||
|
||||
def publish_one(%{actor_id: actor_id} = params) do
|
||||
actor = User.get_cached_by_id(actor_id)
|
||||
|
||||
params
|
||||
|> Map.delete(:actor_id)
|
||||
|> Map.put(:actor, actor)
|
||||
|> publish_one()
|
||||
end
|
||||
|
||||
defp should_federate?(inbox, public) do
|
||||
if public do
|
||||
true
|
||||
|
|
@ -159,7 +168,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|
|||
Publishes an activity with BCC to all relevant peers.
|
||||
"""
|
||||
|
||||
def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
|
||||
def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
|
||||
when is_list(bcc) and bcc != [] do
|
||||
public = is_public?(activity)
|
||||
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
||||
|
||||
|
|
@ -186,7 +196,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|
|||
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
|
||||
inbox: inbox,
|
||||
json: json,
|
||||
actor: actor,
|
||||
actor_id: actor.id,
|
||||
id: activity.data["id"],
|
||||
unreachable_since: unreachable_since
|
||||
})
|
||||
|
|
@ -221,7 +231,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|
|||
%{
|
||||
inbox: inbox,
|
||||
json: json,
|
||||
actor: actor,
|
||||
actor_id: actor.id,
|
||||
id: activity.data["id"],
|
||||
unreachable_since: unreachable_since
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
|
|||
alias Pleroma.Web.ActivityPub.Utils
|
||||
alias Pleroma.Web.ActivityPub.Visibility
|
||||
alias Pleroma.Web.Federator
|
||||
alias Pleroma.Workers.TransmogrifierWorker
|
||||
|
||||
import Ecto.Query
|
||||
|
||||
|
|
@ -185,12 +186,12 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
|
|||
|> Map.put("context", replied_object.data["context"] || object["conversation"])
|
||||
else
|
||||
e ->
|
||||
Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}")
|
||||
Logger.error("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}")
|
||||
object
|
||||
end
|
||||
|
||||
e ->
|
||||
Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}")
|
||||
Logger.error("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}")
|
||||
object
|
||||
end
|
||||
else
|
||||
|
|
@ -1051,7 +1052,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
|
|||
already_ap <- User.ap_enabled?(user),
|
||||
{:ok, user} <- upgrade_user(user, data) do
|
||||
if not already_ap do
|
||||
PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user])
|
||||
TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
|
||||
end
|
||||
|
||||
{:ok, user}
|
||||
|
|
|
|||
|
|
@ -85,15 +85,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
defp extract_list(_), do: []
|
||||
|
||||
def maybe_splice_recipient(ap_id, params) do
|
||||
need_splice =
|
||||
need_splice? =
|
||||
!recipient_in_collection(ap_id, params["to"]) &&
|
||||
!recipient_in_collection(ap_id, params["cc"])
|
||||
|
||||
cc_list = extract_list(params["cc"])
|
||||
|
||||
if need_splice do
|
||||
params
|
||||
|> Map.put("cc", [ap_id | cc_list])
|
||||
if need_splice? do
|
||||
cc_list = extract_list(params["cc"])
|
||||
Map.put(params, "cc", [ap_id | cc_list])
|
||||
else
|
||||
params
|
||||
end
|
||||
|
|
@ -139,7 +137,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
"object" => object
|
||||
}
|
||||
|
||||
Notification.get_notified_from_activity(%Activity{data: fake_create_activity}, false)
|
||||
get_notified_from_object(fake_create_activity)
|
||||
end
|
||||
|
||||
def get_notified_from_object(object) do
|
||||
|
|
@ -169,14 +167,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
@spec maybe_federate(any()) :: :ok
|
||||
def maybe_federate(%Activity{local: true} = activity) do
|
||||
if Pleroma.Config.get!([:instance, :federating]) do
|
||||
priority =
|
||||
case activity.data["type"] do
|
||||
"Delete" -> 10
|
||||
"Create" -> 1
|
||||
_ -> 5
|
||||
end
|
||||
|
||||
Pleroma.Web.Federator.publish(activity, priority)
|
||||
Pleroma.Web.Federator.publish(activity)
|
||||
end
|
||||
|
||||
:ok
|
||||
|
|
@ -188,9 +179,9 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
Adds an id and a published data if they aren't there,
|
||||
also adds it to an included object
|
||||
"""
|
||||
def lazy_put_activity_defaults(map, fake \\ false) do
|
||||
def lazy_put_activity_defaults(map, fake? \\ false) do
|
||||
map =
|
||||
unless fake do
|
||||
if not fake? do
|
||||
%{data: %{"id" => context}, id: context_id} = create_context(map["context"])
|
||||
|
||||
map
|
||||
|
|
@ -207,7 +198,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
end
|
||||
|
||||
if is_map(map["object"]) do
|
||||
object = lazy_put_object_defaults(map["object"], map, fake)
|
||||
object = lazy_put_object_defaults(map["object"], map, fake?)
|
||||
%{map | "object" => object}
|
||||
else
|
||||
map
|
||||
|
|
@ -217,9 +208,9 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
@doc """
|
||||
Adds an id and published date if they aren't there.
|
||||
"""
|
||||
def lazy_put_object_defaults(map, activity \\ %{}, fake)
|
||||
def lazy_put_object_defaults(map, activity \\ %{}, fake?)
|
||||
|
||||
def lazy_put_object_defaults(map, activity, true = _fake) do
|
||||
def lazy_put_object_defaults(map, activity, true = _fake?) do
|
||||
map
|
||||
|> Map.put_new_lazy("published", &make_date/0)
|
||||
|> Map.put_new("id", "pleroma:fake_object_id")
|
||||
|
|
@ -228,7 +219,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
|> Map.put_new("context_id", activity["context_id"])
|
||||
end
|
||||
|
||||
def lazy_put_object_defaults(map, activity, _fake) do
|
||||
def lazy_put_object_defaults(map, activity, _fake?) do
|
||||
map
|
||||
|> Map.put_new_lazy("id", &generate_object_id/0)
|
||||
|> Map.put_new_lazy("published", &make_date/0)
|
||||
|
|
@ -242,9 +233,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
def insert_full_object(%{"object" => %{"type" => type} = object_data} = map)
|
||||
when is_map(object_data) and type in @supported_object_types do
|
||||
with {:ok, object} <- Object.create(object_data) do
|
||||
map =
|
||||
map
|
||||
|> Map.put("object", object.data["id"])
|
||||
map = Map.put(map, "object", object.data["id"])
|
||||
|
||||
{:ok, map, object}
|
||||
end
|
||||
|
|
@ -263,7 +252,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
|> Activity.Queries.by_actor()
|
||||
|> Activity.Queries.by_object_id(id)
|
||||
|> Activity.Queries.by_type("Like")
|
||||
|> Activity.Queries.limit(1)
|
||||
|> limit(1)
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
|
|
@ -380,12 +369,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
%Activity{data: %{"actor" => actor, "object" => object}} = activity,
|
||||
state
|
||||
) do
|
||||
with new_data <-
|
||||
activity.data
|
||||
|> Map.put("state", state),
|
||||
changeset <- Changeset.change(activity, data: new_data),
|
||||
{:ok, activity} <- Repo.update(changeset),
|
||||
_ <- User.set_follow_state_cache(actor, object, state) do
|
||||
new_data = Map.put(activity.data, "state", state)
|
||||
changeset = Changeset.change(activity, data: new_data)
|
||||
|
||||
with {:ok, activity} <- Repo.update(changeset) do
|
||||
User.set_follow_state_cache(actor, object, state)
|
||||
{:ok, activity}
|
||||
end
|
||||
end
|
||||
|
|
@ -410,28 +398,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
end
|
||||
|
||||
def fetch_latest_follow(%User{ap_id: follower_id}, %User{ap_id: followed_id}) do
|
||||
query =
|
||||
from(
|
||||
activity in Activity,
|
||||
where:
|
||||
fragment(
|
||||
"? ->> 'type' = 'Follow'",
|
||||
activity.data
|
||||
),
|
||||
where: activity.actor == ^follower_id,
|
||||
# this is to use the index
|
||||
where:
|
||||
fragment(
|
||||
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
|
||||
activity.data,
|
||||
activity.data,
|
||||
^followed_id
|
||||
),
|
||||
order_by: [fragment("? desc nulls last", activity.id)],
|
||||
limit: 1
|
||||
)
|
||||
|
||||
Repo.one(query)
|
||||
"Follow"
|
||||
|> Activity.Queries.by_type()
|
||||
|> where(actor: ^follower_id)
|
||||
# this is to use the index
|
||||
|> Activity.Queries.by_object_id(followed_id)
|
||||
|> order_by([activity], fragment("? desc nulls last", activity.id))
|
||||
|> limit(1)
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
#### Announce-related helpers
|
||||
|
|
@ -439,23 +413,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
@doc """
|
||||
Retruns an existing announce activity if the notice has already been announced
|
||||
"""
|
||||
def get_existing_announce(actor, %{data: %{"id" => id}}) do
|
||||
query =
|
||||
from(
|
||||
activity in Activity,
|
||||
where: activity.actor == ^actor,
|
||||
# this is to use the index
|
||||
where:
|
||||
fragment(
|
||||
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
|
||||
activity.data,
|
||||
activity.data,
|
||||
^id
|
||||
),
|
||||
where: fragment("(?)->>'type' = 'Announce'", activity.data)
|
||||
)
|
||||
|
||||
Repo.one(query)
|
||||
def get_existing_announce(actor, %{data: %{"id" => ap_id}}) do
|
||||
"Announce"
|
||||
|> Activity.Queries.by_type()
|
||||
|> where(actor: ^actor)
|
||||
# this is to use the index
|
||||
|> Activity.Queries.by_object_id(ap_id)
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
|
@ -538,11 +502,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
object
|
||||
) do
|
||||
announcements =
|
||||
if is_list(object.data["announcements"]), do: object.data["announcements"], else: []
|
||||
if is_list(object.data["announcements"]) do
|
||||
Enum.uniq([actor | object.data["announcements"]])
|
||||
else
|
||||
[actor]
|
||||
end
|
||||
|
||||
with announcements <- [actor | announcements] |> Enum.uniq() do
|
||||
update_element_in_object("announcement", announcements, object)
|
||||
end
|
||||
update_element_in_object("announcement", announcements, object)
|
||||
end
|
||||
|
||||
def add_announce_to_object(_, object), do: {:ok, object}
|
||||
|
|
@ -570,28 +536,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
|
||||
#### Block-related helpers
|
||||
def fetch_latest_block(%User{ap_id: blocker_id}, %User{ap_id: blocked_id}) do
|
||||
query =
|
||||
from(
|
||||
activity in Activity,
|
||||
where:
|
||||
fragment(
|
||||
"? ->> 'type' = 'Block'",
|
||||
activity.data
|
||||
),
|
||||
where: activity.actor == ^blocker_id,
|
||||
# this is to use the index
|
||||
where:
|
||||
fragment(
|
||||
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
|
||||
activity.data,
|
||||
activity.data,
|
||||
^blocked_id
|
||||
),
|
||||
order_by: [fragment("? desc nulls last", activity.id)],
|
||||
limit: 1
|
||||
)
|
||||
|
||||
Repo.one(query)
|
||||
"Block"
|
||||
|> Activity.Queries.by_type()
|
||||
|> where(actor: ^blocker_id)
|
||||
# this is to use the index
|
||||
|> Activity.Queries.by_object_id(blocked_id)
|
||||
|> order_by([activity], fragment("? desc nulls last", activity.id))
|
||||
|> limit(1)
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
def make_block_data(blocker, blocked, activity_id) do
|
||||
|
|
@ -695,11 +647,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
#### Report-related helpers
|
||||
|
||||
def update_report_state(%Activity{} = activity, state) when state in @supported_report_states do
|
||||
with new_data <- Map.put(activity.data, "state", state),
|
||||
changeset <- Changeset.change(activity, data: new_data),
|
||||
{:ok, activity} <- Repo.update(changeset) do
|
||||
{:ok, activity}
|
||||
end
|
||||
new_data = Map.put(activity.data, "state", state)
|
||||
|
||||
activity
|
||||
|> Changeset.change(data: new_data)
|
||||
|> Repo.update()
|
||||
end
|
||||
|
||||
def update_report_state(_, _), do: {:error, "Unsupported state"}
|
||||
|
|
@ -766,21 +718,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|
|||
end
|
||||
|
||||
def get_existing_votes(actor, %{data: %{"id" => id}}) do
|
||||
query =
|
||||
from(
|
||||
[activity, object: object] in Activity.with_preloaded_object(Activity),
|
||||
where: fragment("(?)->>'type' = 'Create'", activity.data),
|
||||
where: fragment("(?)->>'actor' = ?", activity.data, ^actor),
|
||||
where:
|
||||
fragment(
|
||||
"(?)->>'inReplyTo' = ?",
|
||||
object.data,
|
||||
^to_string(id)
|
||||
),
|
||||
where: fragment("(?)->>'type' = 'Answer'", object.data)
|
||||
)
|
||||
|
||||
Repo.all(query)
|
||||
actor
|
||||
|> Activity.Queries.by_actor()
|
||||
|> Activity.Queries.by_type("Create")
|
||||
|> Activity.with_preloaded_object()
|
||||
|> where([a, object: o], fragment("(?)->>'inReplyTo' = ?", o.data, ^to_string(id)))
|
||||
|> where([a, object: o], fragment("(?)->>'type' = 'Answer'", o.data))
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
defp maybe_put(map, _key, nil), do: map
|
||||
|
|
|
|||
|
|
@ -90,6 +90,8 @@ defmodule Pleroma.Web.AdminAPI.Config do
|
|||
for v <- entity, into: [], do: do_convert(v)
|
||||
end
|
||||
|
||||
defp do_convert(%Regex{} = entity), do: inspect(entity)
|
||||
|
||||
defp do_convert(entity) when is_map(entity) do
|
||||
for {k, v} <- entity, into: %{}, do: {do_convert(k), do_convert(v)}
|
||||
end
|
||||
|
|
@ -122,7 +124,7 @@ defmodule Pleroma.Web.AdminAPI.Config do
|
|||
|
||||
def transform(entity), do: :erlang.term_to_binary(entity)
|
||||
|
||||
defp do_transform(%Regex{} = entity) when is_map(entity), do: entity
|
||||
defp do_transform(%Regex{} = entity), do: entity
|
||||
|
||||
defp do_transform(%{"tuple" => [":dispatch", [entity]]}) do
|
||||
{dispatch_settings, []} = do_eval(entity)
|
||||
|
|
@ -154,8 +156,15 @@ defmodule Pleroma.Web.AdminAPI.Config do
|
|||
defp do_transform(entity), do: entity
|
||||
|
||||
defp do_transform_string("~r/" <> pattern) do
|
||||
pattern = String.trim_trailing(pattern, "/")
|
||||
~r/#{pattern}/
|
||||
modificator = String.split(pattern, "/") |> List.last()
|
||||
pattern = String.trim_trailing(pattern, "/" <> modificator)
|
||||
|
||||
case modificator do
|
||||
"" -> ~r/#{pattern}/
|
||||
"i" -> ~r/#{pattern}/i
|
||||
"u" -> ~r/#{pattern}/u
|
||||
"s" -> ~r/#{pattern}/s
|
||||
end
|
||||
end
|
||||
|
||||
defp do_transform_string(":" <> atom), do: String.to_atom(atom)
|
||||
|
|
|
|||
|
|
@ -34,79 +34,38 @@ defmodule Pleroma.Web.ControllerHelper do
|
|||
|
||||
defp param_to_integer(_, default), do: default
|
||||
|
||||
def add_link_headers(
|
||||
conn,
|
||||
method,
|
||||
activities,
|
||||
param \\ nil,
|
||||
params \\ %{},
|
||||
func3 \\ nil,
|
||||
func4 \\ nil
|
||||
) do
|
||||
params =
|
||||
conn.params
|
||||
|> Map.drop(["since_id", "max_id", "min_id"])
|
||||
|> Map.merge(params)
|
||||
def add_link_headers(conn, activities, extra_params \\ %{}) do
|
||||
case List.last(activities) do
|
||||
%{id: max_id} ->
|
||||
params =
|
||||
conn.params
|
||||
|> Map.drop(Map.keys(conn.path_params))
|
||||
|> Map.drop(["since_id", "max_id", "min_id"])
|
||||
|> Map.merge(extra_params)
|
||||
|
||||
last = List.last(activities)
|
||||
limit =
|
||||
params
|
||||
|> Map.get("limit", "20")
|
||||
|> String.to_integer()
|
||||
|
||||
func3 = func3 || (&mastodon_api_url/3)
|
||||
func4 = func4 || (&mastodon_api_url/4)
|
||||
min_id =
|
||||
if length(activities) <= limit do
|
||||
activities
|
||||
|> List.first()
|
||||
|> Map.get(:id)
|
||||
else
|
||||
activities
|
||||
|> Enum.at(limit * -1)
|
||||
|> Map.get(:id)
|
||||
end
|
||||
|
||||
if last do
|
||||
max_id = last.id
|
||||
next_url = current_url(conn, Map.merge(params, %{max_id: max_id}))
|
||||
prev_url = current_url(conn, Map.merge(params, %{min_id: min_id}))
|
||||
|
||||
limit =
|
||||
params
|
||||
|> Map.get("limit", "20")
|
||||
|> String.to_integer()
|
||||
put_resp_header(conn, "link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"")
|
||||
|
||||
min_id =
|
||||
if length(activities) <= limit do
|
||||
activities
|
||||
|> List.first()
|
||||
|> Map.get(:id)
|
||||
else
|
||||
activities
|
||||
|> Enum.at(limit * -1)
|
||||
|> Map.get(:id)
|
||||
end
|
||||
|
||||
{next_url, prev_url} =
|
||||
if param do
|
||||
{
|
||||
func4.(
|
||||
Pleroma.Web.Endpoint,
|
||||
method,
|
||||
param,
|
||||
Map.merge(params, %{max_id: max_id})
|
||||
),
|
||||
func4.(
|
||||
Pleroma.Web.Endpoint,
|
||||
method,
|
||||
param,
|
||||
Map.merge(params, %{min_id: min_id})
|
||||
)
|
||||
}
|
||||
else
|
||||
{
|
||||
func3.(
|
||||
Pleroma.Web.Endpoint,
|
||||
method,
|
||||
Map.merge(params, %{max_id: max_id})
|
||||
),
|
||||
func3.(
|
||||
Pleroma.Web.Endpoint,
|
||||
method,
|
||||
Map.merge(params, %{min_id: min_id})
|
||||
)
|
||||
}
|
||||
end
|
||||
|
||||
conn
|
||||
|> put_resp_header("link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"")
|
||||
else
|
||||
conn
|
||||
_ ->
|
||||
conn
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -10,16 +10,17 @@ defmodule Pleroma.Web.Federator do
|
|||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||
alias Pleroma.Web.ActivityPub.Utils
|
||||
alias Pleroma.Web.Federator.Publisher
|
||||
alias Pleroma.Web.Federator.RetryQueue
|
||||
alias Pleroma.Web.OStatus
|
||||
alias Pleroma.Web.Websub
|
||||
alias Pleroma.Workers.PublisherWorker
|
||||
alias Pleroma.Workers.ReceiverWorker
|
||||
alias Pleroma.Workers.SubscriberWorker
|
||||
|
||||
require Logger
|
||||
|
||||
def init do
|
||||
# 1 minute
|
||||
Process.sleep(1000 * 60)
|
||||
refresh_subscriptions()
|
||||
# To do: consider removing this call in favor of scheduled execution (`quantum`-based)
|
||||
refresh_subscriptions(schedule_in: 60)
|
||||
end
|
||||
|
||||
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
|
||||
|
|
@ -37,50 +38,38 @@ defmodule Pleroma.Web.Federator do
|
|||
# Client API
|
||||
|
||||
def incoming_doc(doc) do
|
||||
PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
|
||||
ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
|
||||
end
|
||||
|
||||
def incoming_ap_doc(params) do
|
||||
PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
|
||||
ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
|
||||
end
|
||||
|
||||
def publish(activity, priority \\ 1) do
|
||||
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
|
||||
def publish(%{id: "pleroma:fakeid"} = activity) do
|
||||
perform(:publish, activity)
|
||||
end
|
||||
|
||||
def publish(activity) do
|
||||
PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
|
||||
end
|
||||
|
||||
def verify_websub(websub) do
|
||||
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
|
||||
SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
|
||||
end
|
||||
|
||||
def request_subscription(sub) do
|
||||
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
|
||||
def request_subscription(websub) do
|
||||
SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
|
||||
end
|
||||
|
||||
def refresh_subscriptions do
|
||||
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
|
||||
def refresh_subscriptions(worker_args \\ []) do
|
||||
SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
|
||||
end
|
||||
|
||||
# Job Worker Callbacks
|
||||
|
||||
def perform(:refresh_subscriptions) do
|
||||
Logger.debug("Federator running refresh subscriptions")
|
||||
Websub.refresh_subscriptions()
|
||||
|
||||
spawn(fn ->
|
||||
# 6 hours
|
||||
Process.sleep(1000 * 60 * 60 * 6)
|
||||
refresh_subscriptions()
|
||||
end)
|
||||
end
|
||||
|
||||
def perform(:request_subscription, websub) do
|
||||
Logger.debug("Refreshing #{websub.topic}")
|
||||
|
||||
with {:ok, websub} <- Websub.request_subscription(websub) do
|
||||
Logger.debug("Successfully refreshed #{websub.topic}")
|
||||
else
|
||||
_e -> Logger.debug("Couldn't refresh #{websub.topic}")
|
||||
end
|
||||
@spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
|
||||
def perform(:publish_one, module, params) do
|
||||
apply(module, :publish_one, [params])
|
||||
end
|
||||
|
||||
def perform(:publish, activity) do
|
||||
|
|
@ -92,14 +81,6 @@ defmodule Pleroma.Web.Federator do
|
|||
end
|
||||
end
|
||||
|
||||
def perform(:verify_websub, websub) do
|
||||
Logger.debug(fn ->
|
||||
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
|
||||
end)
|
||||
|
||||
Websub.verify(websub)
|
||||
end
|
||||
|
||||
def perform(:incoming_doc, doc) do
|
||||
Logger.info("Got document, trying to parse")
|
||||
OStatus.handle_incoming(doc)
|
||||
|
|
@ -130,22 +111,27 @@ defmodule Pleroma.Web.Federator do
|
|||
end
|
||||
end
|
||||
|
||||
def perform(
|
||||
:publish_single_websub,
|
||||
%{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
|
||||
) do
|
||||
case Websub.publish_one(params) do
|
||||
{:ok, _} ->
|
||||
:ok
|
||||
def perform(:request_subscription, websub) do
|
||||
Logger.debug("Refreshing #{websub.topic}")
|
||||
|
||||
{:error, _} ->
|
||||
RetryQueue.enqueue(params, Websub)
|
||||
with {:ok, websub} <- Websub.request_subscription(websub) do
|
||||
Logger.debug("Successfully refreshed #{websub.topic}")
|
||||
else
|
||||
_e -> Logger.debug("Couldn't refresh #{websub.topic}")
|
||||
end
|
||||
end
|
||||
|
||||
def perform(type, _) do
|
||||
Logger.debug(fn -> "Unknown task: #{type}" end)
|
||||
{:error, "Don't know what to do with this"}
|
||||
def perform(:verify_websub, websub) do
|
||||
Logger.debug(fn ->
|
||||
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
|
||||
end)
|
||||
|
||||
Websub.verify(websub)
|
||||
end
|
||||
|
||||
def perform(:refresh_subscriptions) do
|
||||
Logger.debug("Federator running refresh subscriptions")
|
||||
Websub.refresh_subscriptions()
|
||||
end
|
||||
|
||||
def ap_enabled_actor(id) do
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
|
|||
alias Pleroma.Activity
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.Federator.RetryQueue
|
||||
alias Pleroma.Workers.PublisherWorker
|
||||
|
||||
require Logger
|
||||
|
||||
|
|
@ -30,23 +30,11 @@ defmodule Pleroma.Web.Federator.Publisher do
|
|||
Enqueue publishing a single activity.
|
||||
"""
|
||||
@spec enqueue_one(module(), Map.t()) :: :ok
|
||||
def enqueue_one(module, %{} = params),
|
||||
do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params])
|
||||
|
||||
@spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
|
||||
def perform(:publish_one, module, params) do
|
||||
case apply(module, :publish_one, [params]) do
|
||||
{:ok, _} ->
|
||||
:ok
|
||||
|
||||
{:error, _e} ->
|
||||
RetryQueue.enqueue(params, module)
|
||||
end
|
||||
end
|
||||
|
||||
def perform(type, _, _) do
|
||||
Logger.debug("Unknown task: #{type}")
|
||||
{:error, "Don't know what to do with this"}
|
||||
def enqueue_one(module, %{} = params) do
|
||||
PublisherWorker.enqueue(
|
||||
"publish_one",
|
||||
%{"module" => to_string(module), "params" => params}
|
||||
)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
|
|
|||
|
|
@ -1,239 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.Federator.RetryQueue do
|
||||
use GenServer
|
||||
|
||||
require Logger
|
||||
|
||||
def init(args) do
|
||||
queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
|
||||
|
||||
{:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
|
||||
end
|
||||
|
||||
def start_link(_) do
|
||||
enabled =
|
||||
if Pleroma.Config.get(:env) == :test,
|
||||
do: true,
|
||||
else: Pleroma.Config.get([__MODULE__, :enabled], false)
|
||||
|
||||
if enabled do
|
||||
Logger.info("Starting retry queue")
|
||||
|
||||
linkres =
|
||||
GenServer.start_link(
|
||||
__MODULE__,
|
||||
%{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
|
||||
name: __MODULE__
|
||||
)
|
||||
|
||||
maybe_kickoff_timer()
|
||||
linkres
|
||||
else
|
||||
Logger.info("Retry queue disabled")
|
||||
:ignore
|
||||
end
|
||||
end
|
||||
|
||||
def enqueue(data, transport, retries \\ 0) do
|
||||
GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
|
||||
end
|
||||
|
||||
def get_stats do
|
||||
GenServer.call(__MODULE__, :get_stats)
|
||||
end
|
||||
|
||||
def reset_stats do
|
||||
GenServer.call(__MODULE__, :reset_stats)
|
||||
end
|
||||
|
||||
def get_retry_params(retries) do
|
||||
if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
|
||||
{:drop, "Max retries reached"}
|
||||
else
|
||||
{:retry, growth_function(retries)}
|
||||
end
|
||||
end
|
||||
|
||||
def get_retry_timer_interval do
|
||||
Pleroma.Config.get([:retry_queue, :interval], 1000)
|
||||
end
|
||||
|
||||
defp ets_count_expires(table, current_time) do
|
||||
:ets.select_count(
|
||||
table,
|
||||
[
|
||||
{
|
||||
{:"$1", :"$2"},
|
||||
[{:"=<", :"$1", {:const, current_time}}],
|
||||
[true]
|
||||
}
|
||||
]
|
||||
)
|
||||
end
|
||||
|
||||
defp ets_pop_n_expired(table, current_time, desired) do
|
||||
{popped, _continuation} =
|
||||
:ets.select(
|
||||
table,
|
||||
[
|
||||
{
|
||||
{:"$1", :"$2"},
|
||||
[{:"=<", :"$1", {:const, current_time}}],
|
||||
[:"$_"]
|
||||
}
|
||||
],
|
||||
desired
|
||||
)
|
||||
|
||||
popped
|
||||
|> Enum.each(fn e ->
|
||||
:ets.delete_object(table, e)
|
||||
end)
|
||||
|
||||
popped
|
||||
end
|
||||
|
||||
def maybe_start_job(running_jobs, queue_table) do
|
||||
# we don't want to hit the ets or the DateTime more times than we have to
|
||||
# could optimize slightly further by not using the count, and instead grabbing
|
||||
# up to N objects early...
|
||||
current_time = DateTime.to_unix(DateTime.utc_now())
|
||||
n_running_jobs = :sets.size(running_jobs)
|
||||
|
||||
if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
|
||||
n_ready_jobs = ets_count_expires(queue_table, current_time)
|
||||
|
||||
if n_ready_jobs > 0 do
|
||||
# figure out how many we could start
|
||||
available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
|
||||
start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
|
||||
else
|
||||
running_jobs
|
||||
end
|
||||
else
|
||||
running_jobs
|
||||
end
|
||||
end
|
||||
|
||||
defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
|
||||
running_jobs
|
||||
end
|
||||
|
||||
defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
|
||||
when available_job_slots > 0 do
|
||||
candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
|
||||
|
||||
candidates
|
||||
|> List.foldl(running_jobs, fn {_, e}, rj ->
|
||||
{:ok, pid} = Task.start(fn -> worker(e) end)
|
||||
mref = Process.monitor(pid)
|
||||
:sets.add_element(mref, rj)
|
||||
end)
|
||||
end
|
||||
|
||||
def worker({:send, data, transport, retries}) do
|
||||
case transport.publish_one(data) do
|
||||
{:ok, _} ->
|
||||
GenServer.cast(__MODULE__, :inc_delivered)
|
||||
:delivered
|
||||
|
||||
{:error, _reason} ->
|
||||
enqueue(data, transport, retries)
|
||||
:retry
|
||||
end
|
||||
end
|
||||
|
||||
def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
|
||||
{:reply, %{delivered: delivery_count, dropped: drop_count}, state}
|
||||
end
|
||||
|
||||
def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
|
||||
{:reply, %{delivered: delivery_count, dropped: drop_count},
|
||||
%{state | delivered: 0, dropped: 0}}
|
||||
end
|
||||
|
||||
def handle_cast(:reset_stats, state) do
|
||||
{:noreply, %{state | delivered: 0, dropped: 0}}
|
||||
end
|
||||
|
||||
def handle_cast(
|
||||
{:maybe_enqueue, data, transport, retries},
|
||||
%{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
|
||||
) do
|
||||
case get_retry_params(retries) do
|
||||
{:retry, timeout} ->
|
||||
:ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
|
||||
running_jobs = maybe_start_job(running_jobs, queue_table)
|
||||
{:noreply, %{state | running_jobs: running_jobs}}
|
||||
|
||||
{:drop, message} ->
|
||||
Logger.debug(message)
|
||||
{:noreply, %{state | dropped: drop_count + 1}}
|
||||
end
|
||||
end
|
||||
|
||||
def handle_cast(:kickoff_timer, state) do
|
||||
retry_interval = get_retry_timer_interval()
|
||||
Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
|
||||
{:noreply, %{state | delivered: delivery_count + 1}}
|
||||
end
|
||||
|
||||
def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
|
||||
{:noreply, %{state | dropped: drop_count + 1}}
|
||||
end
|
||||
|
||||
def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
|
||||
case transport.publish_one(data) do
|
||||
{:ok, _} ->
|
||||
{:noreply, %{state | delivered: delivery_count + 1}}
|
||||
|
||||
{:error, _reason} ->
|
||||
enqueue(data, transport, retries)
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
def handle_info(
|
||||
:retry_timer_run,
|
||||
%{queue_table: queue_table, running_jobs: running_jobs} = state
|
||||
) do
|
||||
maybe_kickoff_timer()
|
||||
running_jobs = maybe_start_job(running_jobs, queue_table)
|
||||
{:noreply, %{state | running_jobs: running_jobs}}
|
||||
end
|
||||
|
||||
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
|
||||
%{running_jobs: running_jobs, queue_table: queue_table} = state
|
||||
running_jobs = :sets.del_element(ref, running_jobs)
|
||||
running_jobs = maybe_start_job(running_jobs, queue_table)
|
||||
{:noreply, %{state | running_jobs: running_jobs}}
|
||||
end
|
||||
|
||||
def handle_info(unknown, state) do
|
||||
Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
if Pleroma.Config.get(:env) == :test do
|
||||
defp growth_function(_retries) do
|
||||
_shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
|
||||
DateTime.to_unix(DateTime.utc_now()) - 1
|
||||
end
|
||||
else
|
||||
defp growth_function(retries) do
|
||||
round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
|
||||
DateTime.to_unix(DateTime.utc_now())
|
||||
end
|
||||
end
|
||||
|
||||
defp maybe_kickoff_timer do
|
||||
GenServer.cast(__MODULE__, :kickoff_timer)
|
||||
end
|
||||
end
|
||||
|
|
@ -6,7 +6,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
use Pleroma.Web, :controller
|
||||
|
||||
import Pleroma.Web.ControllerHelper,
|
||||
only: [json_response: 3, add_link_headers: 5, add_link_headers: 4, add_link_headers: 3]
|
||||
only: [json_response: 3, add_link_headers: 2, add_link_headers: 3]
|
||||
|
||||
alias Ecto.Changeset
|
||||
alias Pleroma.Activity
|
||||
|
|
@ -365,7 +365,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|> Enum.reverse()
|
||||
|
||||
conn
|
||||
|> add_link_headers(:home_timeline, activities)
|
||||
|> add_link_headers(activities)
|
||||
|> put_view(StatusView)
|
||||
|> render("index.json", %{activities: activities, for: user, as: :activity})
|
||||
end
|
||||
|
|
@ -384,7 +384,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|> Enum.reverse()
|
||||
|
||||
conn
|
||||
|> add_link_headers(:public_timeline, activities, false, %{"local" => local_only})
|
||||
|> add_link_headers(activities, %{"local" => local_only})
|
||||
|> put_view(StatusView)
|
||||
|> render("index.json", %{activities: activities, for: user, as: :activity})
|
||||
end
|
||||
|
|
@ -398,7 +398,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
activities = ActivityPub.fetch_user_activities(user, reading_user, params)
|
||||
|
||||
conn
|
||||
|> add_link_headers(:user_statuses, activities, params["id"])
|
||||
|> add_link_headers(activities)
|
||||
|> put_view(StatusView)
|
||||
|> render("index.json", %{
|
||||
activities: activities,
|
||||
|
|
@ -422,7 +422,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|> Pagination.fetch_paginated(params)
|
||||
|
||||
conn
|
||||
|> add_link_headers(:dm_timeline, activities)
|
||||
|> add_link_headers(activities)
|
||||
|> put_view(StatusView)
|
||||
|> render("index.json", %{activities: activities, for: user, as: :activity})
|
||||
end
|
||||
|
|
@ -537,7 +537,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
def scheduled_statuses(%{assigns: %{user: user}} = conn, params) do
|
||||
with scheduled_activities <- MastodonAPI.get_scheduled_activities(user, params) do
|
||||
conn
|
||||
|> add_link_headers(:scheduled_statuses, scheduled_activities)
|
||||
|> add_link_headers(scheduled_activities)
|
||||
|> put_view(ScheduledActivityView)
|
||||
|> render("index.json", %{scheduled_activities: scheduled_activities})
|
||||
end
|
||||
|
|
@ -720,7 +720,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
notifications = MastodonAPI.get_notifications(user, params)
|
||||
|
||||
conn
|
||||
|> add_link_headers(:notifications, notifications)
|
||||
|> add_link_headers(notifications)
|
||||
|> put_view(NotificationView)
|
||||
|> render("index.json", %{notifications: notifications, for: user})
|
||||
end
|
||||
|
|
@ -842,6 +842,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|
||||
def favourited_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do
|
||||
with %Activity{} = activity <- Activity.get_by_id_with_object(id),
|
||||
{:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)},
|
||||
%Object{data: %{"likes" => likes}} <- Object.normalize(activity) do
|
||||
q = from(u in User, where: u.ap_id in ^likes)
|
||||
|
||||
|
|
@ -853,12 +854,14 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|> put_view(AccountView)
|
||||
|> render("accounts.json", %{for: user, users: users, as: :user})
|
||||
else
|
||||
{:visible, false} -> {:error, :not_found}
|
||||
_ -> json(conn, [])
|
||||
end
|
||||
end
|
||||
|
||||
def reblogged_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do
|
||||
with %Activity{} = activity <- Activity.get_by_id_with_object(id),
|
||||
{:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)},
|
||||
%Object{data: %{"announcements" => announces}} <- Object.normalize(activity) do
|
||||
q = from(u in User, where: u.ap_id in ^announces)
|
||||
|
||||
|
|
@ -870,6 +873,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|> put_view(AccountView)
|
||||
|> render("accounts.json", %{for: user, users: users, as: :user})
|
||||
else
|
||||
{:visible, false} -> {:error, :not_found}
|
||||
_ -> json(conn, [])
|
||||
end
|
||||
end
|
||||
|
|
@ -908,7 +912,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|> Enum.reverse()
|
||||
|
||||
conn
|
||||
|> add_link_headers(:hashtag_timeline, activities, params["tag"], %{"local" => local_only})
|
||||
|> add_link_headers(activities, %{"local" => local_only})
|
||||
|> put_view(StatusView)
|
||||
|> render("index.json", %{activities: activities, for: user, as: :activity})
|
||||
end
|
||||
|
|
@ -924,7 +928,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
end
|
||||
|
||||
conn
|
||||
|> add_link_headers(:followers, followers, user)
|
||||
|> add_link_headers(followers)
|
||||
|> put_view(AccountView)
|
||||
|> render("accounts.json", %{for: for_user, users: followers, as: :user})
|
||||
end
|
||||
|
|
@ -941,7 +945,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
end
|
||||
|
||||
conn
|
||||
|> add_link_headers(:following, followers, user)
|
||||
|> add_link_headers(followers)
|
||||
|> put_view(AccountView)
|
||||
|> render("accounts.json", %{for: for_user, users: followers, as: :user})
|
||||
end
|
||||
|
|
@ -1166,7 +1170,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|> Enum.reverse()
|
||||
|
||||
conn
|
||||
|> add_link_headers(:favourites, activities)
|
||||
|> add_link_headers(activities)
|
||||
|> put_view(StatusView)
|
||||
|> render("index.json", %{activities: activities, for: user, as: :activity})
|
||||
end
|
||||
|
|
@ -1193,7 +1197,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|> Enum.reverse()
|
||||
|
||||
conn
|
||||
|> add_link_headers(:favourites, activities)
|
||||
|> add_link_headers(activities)
|
||||
|> put_view(StatusView)
|
||||
|> render("index.json", %{activities: activities, for: for_user, as: :activity})
|
||||
else
|
||||
|
|
@ -1214,7 +1218,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|> Enum.map(fn b -> Map.put(b.activity, :bookmark, Map.delete(b, :activity)) end)
|
||||
|
||||
conn
|
||||
|> add_link_headers(:bookmarks, bookmarks)
|
||||
|> add_link_headers(bookmarks)
|
||||
|> put_view(StatusView)
|
||||
|> render("index.json", %{activities: activities, for: user, as: :activity})
|
||||
end
|
||||
|
|
@ -1654,7 +1658,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
end)
|
||||
|
||||
conn
|
||||
|> add_link_headers(:conversations, participations)
|
||||
|> add_link_headers(participations)
|
||||
|> json(conversations)
|
||||
end
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
|
|||
)
|
||||
|
||||
alias Pleroma.Web.OAuth.Token
|
||||
alias Pleroma.Workers.BackgroundWorker
|
||||
|
||||
def start_link(_), do: GenServer.start_link(__MODULE__, %{})
|
||||
|
||||
|
|
@ -27,9 +28,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
|
|||
|
||||
@doc false
|
||||
def handle_info(:perform, state) do
|
||||
Token.delete_expired_tokens()
|
||||
BackgroundWorker.enqueue("clean_expired_tokens", %{})
|
||||
|
||||
Process.send_after(self(), :perform, @interval)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def perform(:clean), do: Token.delete_expired_tokens()
|
||||
end
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do
|
||||
use Pleroma.Web, :controller
|
||||
|
||||
import Pleroma.Web.ControllerHelper, only: [add_link_headers: 7]
|
||||
import Pleroma.Web.ControllerHelper, only: [add_link_headers: 2]
|
||||
|
||||
alias Pleroma.Conversation.Participation
|
||||
alias Pleroma.Notification
|
||||
|
|
@ -27,31 +27,22 @@ defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do
|
|||
%{assigns: %{user: user}} = conn,
|
||||
%{"id" => participation_id} = params
|
||||
) do
|
||||
params =
|
||||
params
|
||||
|> Map.put("blocking_user", user)
|
||||
|> Map.put("muting_user", user)
|
||||
|> Map.put("user", user)
|
||||
|
||||
participation =
|
||||
participation_id
|
||||
|> Participation.get(preload: [:conversation])
|
||||
participation = Participation.get(participation_id, preload: [:conversation])
|
||||
|
||||
if user.id == participation.user_id do
|
||||
params =
|
||||
params
|
||||
|> Map.put("blocking_user", user)
|
||||
|> Map.put("muting_user", user)
|
||||
|> Map.put("user", user)
|
||||
|
||||
activities =
|
||||
participation.conversation.ap_id
|
||||
|> ActivityPub.fetch_activities_for_context(params)
|
||||
|> Enum.reverse()
|
||||
|
||||
conn
|
||||
|> add_link_headers(
|
||||
:conversation_statuses,
|
||||
activities,
|
||||
participation_id,
|
||||
params,
|
||||
nil,
|
||||
&pleroma_api_url/4
|
||||
)
|
||||
|> add_link_headers(activities)
|
||||
|> put_view(StatusView)
|
||||
|> render("index.json", %{activities: activities, for: user, as: :activity})
|
||||
end
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.Push do
|
||||
alias Pleroma.Web.Push.Impl
|
||||
alias Pleroma.Workers.WebPusherWorker
|
||||
|
||||
require Logger
|
||||
|
||||
|
|
@ -31,6 +31,7 @@ defmodule Pleroma.Web.Push do
|
|||
end
|
||||
end
|
||||
|
||||
def send(notification),
|
||||
do: PleromaJobQueue.enqueue(:web_push, Impl, [notification])
|
||||
def send(notification) do
|
||||
WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id})
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -224,6 +224,7 @@ defmodule Pleroma.Web.Router do
|
|||
scope [] do
|
||||
pipe_through(:oauth_write)
|
||||
|
||||
post("/change_email", UtilController, :change_email)
|
||||
post("/change_password", UtilController, :change_password)
|
||||
post("/delete_account", UtilController, :delete_account)
|
||||
put("/notification_settings", UtilController, :update_notificaton_settings)
|
||||
|
|
|
|||
|
|
@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do
|
|||
end
|
||||
end
|
||||
|
||||
def publish_one(%{recipient_id: recipient_id} = params) do
|
||||
recipient = User.get_cached_by_id(recipient_id)
|
||||
|
||||
params
|
||||
|> Map.delete(:recipient_id)
|
||||
|> Map.put(:recipient, recipient)
|
||||
|> publish_one()
|
||||
end
|
||||
|
||||
def publish_one(_), do: :noop
|
||||
|
||||
@supported_activities [
|
||||
|
|
@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do
|
|||
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
|
||||
|
||||
Publisher.enqueue_one(__MODULE__, %{
|
||||
recipient: remote_user,
|
||||
recipient_id: remote_user.id,
|
||||
feed: feed,
|
||||
unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
|
||||
})
|
||||
|
|
|
|||
|
|
@ -265,12 +265,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
|
|||
String.split(line, ",") |> List.first()
|
||||
end)
|
||||
|> List.delete("Account address") do
|
||||
PleromaJobQueue.enqueue(:background, User, [
|
||||
:follow_import,
|
||||
follower,
|
||||
followed_identifiers
|
||||
])
|
||||
|
||||
User.follow_import(follower, followed_identifiers)
|
||||
json(conn, "job started")
|
||||
end
|
||||
end
|
||||
|
|
@ -281,12 +276,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
|
|||
|
||||
def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do
|
||||
with blocked_identifiers <- String.split(list) do
|
||||
PleromaJobQueue.enqueue(:background, User, [
|
||||
:blocks_import,
|
||||
blocker,
|
||||
blocked_identifiers
|
||||
])
|
||||
|
||||
User.blocks_import(blocker, blocked_identifiers)
|
||||
json(conn, "job started")
|
||||
end
|
||||
end
|
||||
|
|
@ -314,6 +304,25 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
|
|||
end
|
||||
end
|
||||
|
||||
def change_email(%{assigns: %{user: user}} = conn, params) do
|
||||
case CommonAPI.Utils.confirm_current_password(user, params["password"]) do
|
||||
{:ok, user} ->
|
||||
with {:ok, _user} <- User.change_email(user, params["email"]) do
|
||||
json(conn, %{status: "success"})
|
||||
else
|
||||
{:error, changeset} ->
|
||||
{_, {error, _}} = Enum.at(changeset.errors, 0)
|
||||
json(conn, %{error: "Email #{error}."})
|
||||
|
||||
_ ->
|
||||
json(conn, %{error: "Unable to change email."})
|
||||
end
|
||||
|
||||
{:error, msg} ->
|
||||
json(conn, %{error: msg})
|
||||
end
|
||||
end
|
||||
|
||||
def delete_account(%{assigns: %{user: user}} = conn, params) do
|
||||
case CommonAPI.Utils.confirm_current_password(user, params["password"]) do
|
||||
{:ok, user} ->
|
||||
|
|
|
|||
18
lib/pleroma/workers/activity_expiration_worker.ex
Normal file
18
lib/pleroma/workers/activity_expiration_worker.ex
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.ActivityExpirationWorker do
|
||||
use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(
|
||||
%{
|
||||
"op" => "activity_expiration",
|
||||
"activity_expiration_id" => activity_expiration_id
|
||||
},
|
||||
_job
|
||||
) do
|
||||
Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id)
|
||||
end
|
||||
end
|
||||
69
lib/pleroma/workers/background_worker.ex
Normal file
69
lib/pleroma/workers/background_worker.ex
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.BackgroundWorker do
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
|
||||
alias Pleroma.Web.OAuth.Token.CleanWorker
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "background"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
|
||||
user = User.get_cached_by_id(user_id)
|
||||
User.perform(:fetch_initial_posts, user)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
|
||||
user = User.get_cached_by_id(user_id)
|
||||
User.perform(:deactivate_async, user, status)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
|
||||
user = User.get_cached_by_id(user_id)
|
||||
User.perform(:delete, user)
|
||||
end
|
||||
|
||||
def perform(
|
||||
%{
|
||||
"op" => "blocks_import",
|
||||
"blocker_id" => blocker_id,
|
||||
"blocked_identifiers" => blocked_identifiers
|
||||
},
|
||||
_job
|
||||
) do
|
||||
blocker = User.get_cached_by_id(blocker_id)
|
||||
User.perform(:blocks_import, blocker, blocked_identifiers)
|
||||
end
|
||||
|
||||
def perform(
|
||||
%{
|
||||
"op" => "follow_import",
|
||||
"follower_id" => follower_id,
|
||||
"followed_identifiers" => followed_identifiers
|
||||
},
|
||||
_job
|
||||
) do
|
||||
follower = User.get_cached_by_id(follower_id)
|
||||
User.perform(:follow_import, follower, followed_identifiers)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "clean_expired_tokens"}, _job) do
|
||||
CleanWorker.perform(:clean)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
|
||||
MediaProxyWarmingPolicy.perform(:preload, message)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do
|
||||
MediaProxyWarmingPolicy.perform(:prefetch, url)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do
|
||||
activity = Activity.get_by_id(activity_id)
|
||||
Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
|
||||
end
|
||||
end
|
||||
16
lib/pleroma/workers/digest_emails_worker.ex
Normal file
16
lib/pleroma/workers/digest_emails_worker.ex
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.DigestEmailsWorker do
|
||||
alias Pleroma.User
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
|
||||
user_id
|
||||
|> User.get_cached_by_id()
|
||||
|> Pleroma.Daemons.DigestEmailDaemon.perform()
|
||||
end
|
||||
end
|
||||
15
lib/pleroma/workers/mailer_worker.ex
Normal file
15
lib/pleroma/workers/mailer_worker.ex
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.MailerWorker do
|
||||
use Pleroma.Workers.WorkerHelper, queue: "mailer"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
|
||||
encoded_email
|
||||
|> Base.decode64!()
|
||||
|> :erlang.binary_to_term()
|
||||
|> Pleroma.Emails.Mailer.deliver(config)
|
||||
end
|
||||
end
|
||||
25
lib/pleroma/workers/publisher_worker.ex
Normal file
25
lib/pleroma/workers/publisher_worker.ex
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.PublisherWorker do
|
||||
alias Pleroma.Activity
|
||||
alias Pleroma.Web.Federator
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
|
||||
|
||||
def backoff(attempt) when is_integer(attempt) do
|
||||
Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
|
||||
end
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do
|
||||
activity = Activity.get_by_id(activity_id)
|
||||
Federator.perform(:publish, activity)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do
|
||||
params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)
|
||||
Federator.perform(:publish_one, String.to_atom(module_name), params)
|
||||
end
|
||||
end
|
||||
18
lib/pleroma/workers/receiver_worker.ex
Normal file
18
lib/pleroma/workers/receiver_worker.ex
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.ReceiverWorker do
|
||||
alias Pleroma.Web.Federator
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
|
||||
Federator.perform(:incoming_doc, doc)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do
|
||||
Federator.perform(:incoming_ap_doc, params)
|
||||
end
|
||||
end
|
||||
12
lib/pleroma/workers/scheduled_activity_worker.ex
Normal file
12
lib/pleroma/workers/scheduled_activity_worker.ex
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.ScheduledActivityWorker do
|
||||
use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
|
||||
Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id)
|
||||
end
|
||||
end
|
||||
26
lib/pleroma/workers/subscriber_worker.ex
Normal file
26
lib/pleroma/workers/subscriber_worker.ex
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.SubscriberWorker do
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.Web.Federator
|
||||
alias Pleroma.Web.Websub
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "refresh_subscriptions"}, _job) do
|
||||
Federator.perform(:refresh_subscriptions)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do
|
||||
websub = Repo.get(Websub.WebsubClientSubscription, websub_id)
|
||||
Federator.perform(:request_subscription, websub)
|
||||
end
|
||||
|
||||
def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do
|
||||
websub = Repo.get(Websub.WebsubServerSubscription, websub_id)
|
||||
Federator.perform(:verify_websub, websub)
|
||||
end
|
||||
end
|
||||
15
lib/pleroma/workers/transmogrifier_worker.ex
Normal file
15
lib/pleroma/workers/transmogrifier_worker.ex
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.TransmogrifierWorker do
|
||||
alias Pleroma.User
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
|
||||
user = User.get_cached_by_id(user_id)
|
||||
Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
|
||||
end
|
||||
end
|
||||
16
lib/pleroma/workers/web_pusher_worker.ex
Normal file
16
lib/pleroma/workers/web_pusher_worker.ex
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.WebPusherWorker do
|
||||
alias Pleroma.Notification
|
||||
alias Pleroma.Repo
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "web_push"
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
|
||||
notification = Repo.get(Notification, notification_id)
|
||||
Pleroma.Web.Push.Impl.perform(notification)
|
||||
end
|
||||
end
|
||||
46
lib/pleroma/workers/worker_helper.ex
Normal file
46
lib/pleroma/workers/worker_helper.ex
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.WorkerHelper do
|
||||
alias Pleroma.Config
|
||||
alias Pleroma.Workers.WorkerHelper
|
||||
|
||||
def worker_args(queue) do
|
||||
case Config.get([:workers, :retries, queue]) do
|
||||
nil -> []
|
||||
max_attempts -> [max_attempts: max_attempts]
|
||||
end
|
||||
end
|
||||
|
||||
def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
|
||||
backoff =
|
||||
:math.pow(attempt, pow) +
|
||||
base_backoff +
|
||||
:rand.uniform(2 * base_backoff) * attempt
|
||||
|
||||
trunc(backoff)
|
||||
end
|
||||
|
||||
defmacro __using__(opts) do
|
||||
caller_module = __CALLER__.module
|
||||
queue = Keyword.fetch!(opts, :queue)
|
||||
|
||||
quote do
|
||||
# Note: `max_attempts` is intended to be overridden in `new/2` call
|
||||
use Oban.Worker,
|
||||
queue: unquote(queue),
|
||||
max_attempts: 1
|
||||
|
||||
def enqueue(op, params, worker_args \\ []) do
|
||||
params = Map.merge(%{"op" => op}, params)
|
||||
queue_atom = String.to_atom(unquote(queue))
|
||||
worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
|
||||
|
||||
unquote(caller_module)
|
||||
|> apply(:new, [params, worker_args])
|
||||
|> Pleroma.Repo.insert()
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
Loading…
Add table
Add a link
Reference in a new issue