Split failed-signature inbox retries
Route failed-signature ActivityPub inbox retries through a dedicated worker so legacy and malformed retry jobs fail closed before processing.
This commit is contained in:
parent
bd45704dba
commit
7756f491d5
7 changed files with 786 additions and 461 deletions
|
|
@ -348,7 +348,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
|
|||
end
|
||||
|
||||
def inbox(%{assigns: %{valid_signature: false}} = conn, params) do
|
||||
Federator.incoming_ap_doc(%{
|
||||
Federator.incoming_failed_signature_ap_doc(%{
|
||||
method: conn.method,
|
||||
req_headers: conn.req_headers,
|
||||
request_path: conn.request_path,
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ defmodule Pleroma.Web.Federator do
|
|||
alias Pleroma.Web.ActivityPub.Utils
|
||||
alias Pleroma.Workers.PublisherWorker
|
||||
alias Pleroma.Workers.ReceiverWorker
|
||||
alias Pleroma.Workers.SignatureRetryWorker
|
||||
|
||||
require Logger
|
||||
|
||||
|
|
@ -35,12 +36,21 @@ defmodule Pleroma.Web.Federator do
|
|||
end
|
||||
|
||||
# Client API
|
||||
def incoming_ap_doc(%{params: params, req_headers: req_headers}) do
|
||||
ReceiverWorker.new(
|
||||
def incoming_failed_signature_ap_doc(%{
|
||||
method: method,
|
||||
params: params,
|
||||
req_headers: req_headers,
|
||||
request_path: request_path,
|
||||
query_string: query_string
|
||||
}) do
|
||||
SignatureRetryWorker.new(
|
||||
%{
|
||||
"op" => "incoming_ap_doc",
|
||||
"op" => "incoming_failed_signature_ap_doc",
|
||||
"method" => method,
|
||||
"req_headers" => req_headers,
|
||||
"params" => params,
|
||||
"request_path" => request_path,
|
||||
"query_string" => query_string,
|
||||
"timeout" => :timer.seconds(20)
|
||||
},
|
||||
priority: 2
|
||||
|
|
|
|||
|
|
@ -4,55 +4,36 @@
|
|||
|
||||
defmodule Pleroma.Workers.ReceiverWorker do
|
||||
alias Pleroma.Instances
|
||||
alias Pleroma.Signature
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.Federator
|
||||
alias Pleroma.Web.Plugs.MappedSignatureToIdentityPlug
|
||||
alias Pleroma.Workers.SignatureRetryWorker
|
||||
|
||||
use Oban.Worker, queue: :federator_incoming, max_attempts: 5, unique: [period: :infinity]
|
||||
|
||||
@impl true
|
||||
|
||||
def perform(%Job{
|
||||
args: %{
|
||||
"op" => "incoming_ap_doc",
|
||||
"method" => method,
|
||||
"params" => params,
|
||||
"req_headers" => req_headers,
|
||||
"request_path" => request_path,
|
||||
"query_string" => query_string
|
||||
}
|
||||
}) do
|
||||
# Oban's serialization converts our tuple headers to lists.
|
||||
# Revert it for the signature validation.
|
||||
req_headers = Enum.into(req_headers, [], &List.to_tuple(&1))
|
||||
|
||||
conn_data = %Plug.Conn{
|
||||
assigns: %{valid_signature: true},
|
||||
method: method,
|
||||
params: params,
|
||||
req_headers: req_headers,
|
||||
request_path: request_path,
|
||||
query_string: query_string
|
||||
}
|
||||
|
||||
with {:ok, %User{}} <- User.get_or_fetch_by_ap_id(conn_data.params["actor"]),
|
||||
{:ok, _public_key} <- Signature.refetch_public_key(conn_data),
|
||||
{:signature, true} <- {:signature, Signature.validate_signature(conn_data)},
|
||||
{:same_actor, true} <- {:same_actor, validate_same_actor(conn_data)},
|
||||
{:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
|
||||
unless Instances.reachable?(params["actor"]) do
|
||||
domain = URI.parse(params["actor"]).host
|
||||
Oban.insert(Pleroma.Workers.ReachabilityWorker.new(%{"domain" => domain}))
|
||||
end
|
||||
|
||||
{:ok, res}
|
||||
def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params} = args} = job) do
|
||||
if signature_retry_job?(args) do
|
||||
perform_signature_retry(job)
|
||||
else
|
||||
e -> process_errors(e)
|
||||
perform_incoming(params)
|
||||
end
|
||||
end
|
||||
|
||||
def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
|
||||
def perform(%Job{args: %{"op" => "incoming_ap_doc"} = args} = job) do
|
||||
if signature_retry_job?(args) do
|
||||
perform_signature_retry(job)
|
||||
else
|
||||
process_errors(:missing_incoming_ap_doc_params)
|
||||
end
|
||||
end
|
||||
|
||||
defp perform_signature_retry(%Job{args: args} = job) do
|
||||
SignatureRetryWorker.perform(%Job{
|
||||
job
|
||||
| args: Map.put(args, "op", "incoming_failed_signature_ap_doc")
|
||||
})
|
||||
end
|
||||
|
||||
defp perform_incoming(params) do
|
||||
with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
|
||||
unless Instances.reachable?(params["actor"]) do
|
||||
domain = URI.parse(params["actor"]).host
|
||||
|
|
@ -65,21 +46,15 @@ defmodule Pleroma.Workers.ReceiverWorker do
|
|||
end
|
||||
end
|
||||
|
||||
defp signature_retry_job?(args) do
|
||||
Enum.any?(~w(method req_headers request_path query_string), &Map.has_key?(args, &1))
|
||||
end
|
||||
|
||||
@impl true
|
||||
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
|
||||
|
||||
def timeout(_job), do: :timer.seconds(5)
|
||||
|
||||
defp validate_same_actor(conn_data) do
|
||||
case MappedSignatureToIdentityPlug.call(conn_data, []) do
|
||||
%Plug.Conn{assigns: %{valid_signature: true}} ->
|
||||
true
|
||||
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
defp process_errors({:error, {:error, _} = error}), do: process_errors(error)
|
||||
|
||||
defp process_errors(errors) do
|
||||
|
|
@ -103,6 +78,7 @@ defmodule Pleroma.Workers.ReceiverWorker do
|
|||
{:error, :origin_containment_failed} -> {:cancel, :origin_containment_failed}
|
||||
# Unclear if this can be reached
|
||||
{:error, {:side_effects, {:error, :no_object_actor}} = reason} -> {:cancel, reason}
|
||||
:missing_incoming_ap_doc_params -> {:cancel, :missing_incoming_ap_doc_params}
|
||||
# Catchall
|
||||
{:error, _} = e -> e
|
||||
e -> {:error, e}
|
||||
|
|
|
|||
144
lib/pleroma/workers/signature_retry_worker.ex
Normal file
144
lib/pleroma/workers/signature_retry_worker.ex
Normal file
|
|
@ -0,0 +1,144 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.SignatureRetryWorker do
|
||||
alias Pleroma.Instances
|
||||
alias Pleroma.Signature
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.ActivityPub.Utils
|
||||
alias Pleroma.Web.Federator
|
||||
alias Pleroma.Web.Plugs.MappedSignatureToIdentityPlug
|
||||
|
||||
use Oban.Worker, queue: :federator_incoming, max_attempts: 5, unique: [period: :infinity]
|
||||
|
||||
@impl true
|
||||
def perform(%Job{
|
||||
args: %{
|
||||
"op" => "incoming_failed_signature_ap_doc",
|
||||
"method" => method,
|
||||
"params" => params,
|
||||
"req_headers" => req_headers,
|
||||
"request_path" => request_path,
|
||||
"query_string" => query_string
|
||||
}
|
||||
})
|
||||
when is_binary(method) and is_map(params) and is_list(req_headers) and
|
||||
is_binary(request_path) and is_binary(query_string) do
|
||||
with {:ok, req_headers} <- normalize_req_headers(req_headers),
|
||||
conn_data = %Plug.Conn{
|
||||
assigns: %{valid_signature: true},
|
||||
method: method,
|
||||
params: params,
|
||||
req_headers: req_headers,
|
||||
request_path: request_path,
|
||||
query_string: query_string
|
||||
},
|
||||
actor_id = Utils.get_ap_id(params["actor"]),
|
||||
{:signature_actor, {:ok, signature_actor_id}} <-
|
||||
{:signature_actor, signature_actor_id(conn_data)},
|
||||
{:same_actor, true} <- {:same_actor, signature_actor_id == actor_id},
|
||||
{:ok, %User{}} <- User.get_or_fetch_by_ap_id(actor_id),
|
||||
{:ok, _public_key} <- Signature.refetch_public_key(conn_data),
|
||||
{:signature, true} <- {:signature, validate_signature(conn_data)},
|
||||
{:same_actor, true} <- {:same_actor, validate_same_actor(conn_data)},
|
||||
{:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
|
||||
unless Instances.reachable?(params["actor"]) do
|
||||
domain = URI.parse(params["actor"]).host
|
||||
Oban.insert(Pleroma.Workers.ReachabilityWorker.new(%{"domain" => domain}))
|
||||
end
|
||||
|
||||
{:ok, res}
|
||||
else
|
||||
e -> process_errors(e)
|
||||
end
|
||||
end
|
||||
|
||||
def perform(%Job{args: %{"op" => "incoming_failed_signature_ap_doc"}}) do
|
||||
process_errors(:missing_signature_retry_metadata)
|
||||
end
|
||||
|
||||
def perform(%Job{}), do: process_errors(:missing_signature_retry_metadata)
|
||||
|
||||
@impl true
|
||||
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
|
||||
|
||||
def timeout(_job), do: :timer.seconds(5)
|
||||
|
||||
defp normalize_req_headers(req_headers) do
|
||||
req_headers
|
||||
|> Enum.reduce_while({:ok, []}, fn
|
||||
{key, value}, {:ok, acc} when is_binary(key) and is_binary(value) ->
|
||||
{:cont, {:ok, [{key, value} | acc]}}
|
||||
|
||||
[key, value], {:ok, acc} when is_binary(key) and is_binary(value) ->
|
||||
{:cont, {:ok, [{key, value} | acc]}}
|
||||
|
||||
_, _ ->
|
||||
{:halt, {:error, :invalid_signature_retry_metadata}}
|
||||
end)
|
||||
|> case do
|
||||
{:ok, headers} -> {:ok, Enum.reverse(headers)}
|
||||
error -> error
|
||||
end
|
||||
end
|
||||
|
||||
defp validate_same_actor(conn_data) do
|
||||
case MappedSignatureToIdentityPlug.call(conn_data, []) do
|
||||
%Plug.Conn{assigns: %{valid_signature: true}} ->
|
||||
true
|
||||
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
defp validate_signature(conn_data) do
|
||||
Signature.validate_signature(conn_data)
|
||||
rescue
|
||||
_ -> false
|
||||
catch
|
||||
_, _ -> false
|
||||
end
|
||||
|
||||
defp signature_actor_id(conn_data) do
|
||||
Signature.get_actor_id(conn_data)
|
||||
rescue
|
||||
_ -> {:error, :invalid_signature}
|
||||
catch
|
||||
_, _ -> {:error, :invalid_signature}
|
||||
end
|
||||
|
||||
defp process_errors({:error, {:error, _} = error}), do: process_errors(error)
|
||||
|
||||
defp process_errors(errors) do
|
||||
case errors do
|
||||
# User fetch failures
|
||||
{:error, :not_found} = reason -> {:cancel, reason}
|
||||
{:error, :forbidden} = reason -> {:cancel, reason}
|
||||
# Inactive user
|
||||
{:error, {:user_active, false} = reason} -> {:cancel, reason}
|
||||
# Validator will error and return a changeset error
|
||||
# e.g., duplicate activities or if the object was deleted
|
||||
{:error, {:validate, {:error, _changeset} = reason}} -> {:cancel, reason}
|
||||
# Duplicate detection during Normalization
|
||||
{:error, :already_present} -> {:cancel, :already_present}
|
||||
# MRFs will return a reject
|
||||
{:error, {:reject, _} = reason} -> {:cancel, reason}
|
||||
# HTTP Sigs
|
||||
{:signature_actor, {:error, _}} -> {:cancel, :invalid_signature}
|
||||
{:signature, false} -> {:cancel, :invalid_signature}
|
||||
{:same_actor, false} -> {:cancel, :actor_signature_mismatch}
|
||||
# Origin / URL validation failed somewhere possibly due to spoofing
|
||||
{:error, :origin_containment_failed} -> {:cancel, :origin_containment_failed}
|
||||
# Unclear if this can be reached
|
||||
{:error, {:side_effects, {:error, :no_object_actor}} = reason} -> {:cancel, reason}
|
||||
# Fail closed if the retry cannot reconstruct the original request.
|
||||
:missing_signature_retry_metadata -> {:cancel, :missing_signature_retry_metadata}
|
||||
{:error, :invalid_signature_retry_metadata} -> {:cancel, :invalid_signature_retry_metadata}
|
||||
# Catchall
|
||||
{:error, _} = e -> e
|
||||
e -> {:error, e}
|
||||
end
|
||||
end
|
||||
end
|
||||
Loading…
Add table
Add a link
Reference in a new issue