Merge remote-tracking branch 'pleroma/develop' into feature/addressable-lists

This commit is contained in:
Egor Kislitsyn 2019-05-14 19:00:07 +07:00
commit e82e73478e
228 changed files with 5490 additions and 1366 deletions

View file

@ -4,7 +4,7 @@
defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity
alias Pleroma.Instances
alias Pleroma.Conversation
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.Object.Fetcher
@ -14,7 +14,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.Federator
alias Pleroma.Web.WebFinger
import Ecto.Query
@ -23,8 +22,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
require Logger
@httpoison Application.get_env(:pleroma, :httpoison)
# For Announce activities, we filter the recipients based on following status for any actors
# that match actual users. See issue #164 for more information about why this is necessary.
defp get_recipients(%{"type" => "Announce"} = data) do
@ -140,7 +137,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end)
Notification.create_notifications(activity)
participations =
activity
|> Conversation.create_or_bump_for()
|> get_participations()
stream_out(activity)
stream_out_participations(participations)
{:ok, activity}
else
%Activity{} = activity ->
@ -163,11 +167,23 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
end
defp get_participations({:ok, %{participations: participations}}), do: participations
defp get_participations(_), do: []
def stream_out_participations(participations) do
participations =
participations
|> Repo.preload(:user)
Enum.each(participations, fn participation ->
Pleroma.Web.Streamer.stream("participation", participation)
end)
end
def stream_out(activity) do
public = "https://www.w3.org/ns/activitystreams#Public"
if activity.data["type"] in ["Create", "Announce", "Delete"] do
object = Object.normalize(activity)
Pleroma.Web.Streamer.stream("user", activity)
Pleroma.Web.Streamer.stream("list", activity)
@ -179,6 +195,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
if activity.data["type"] in ["Create"] do
object = Object.normalize(activity)
object.data
|> Map.get("tag", [])
|> Enum.filter(fn tag -> is_bitstring(tag) end)
@ -193,6 +211,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
end
else
# TODO: Write test, replace with visibility test
if !Enum.member?(activity.data["cc"] || [], public) &&
!Enum.member?(
activity.data["to"],
@ -455,35 +474,44 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
end
def fetch_activities_for_context(context, opts \\ %{}) do
defp fetch_activities_for_context_query(context, opts) do
public = ["https://www.w3.org/ns/activitystreams#Public"]
recipients =
if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
query = from(activity in Activity)
query =
query
|> restrict_blocked(opts)
|> restrict_recipients(recipients, opts["user"])
query =
from(
activity in query,
where:
fragment(
"?->>'type' = ? and ?->>'context' = ?",
activity.data,
"Create",
activity.data,
^context
),
order_by: [desc: :id]
from(activity in Activity)
|> restrict_blocked(opts)
|> restrict_recipients(recipients, opts["user"])
|> where(
[activity],
fragment(
"?->>'type' = ? and ?->>'context' = ?",
activity.data,
"Create",
activity.data,
^context
)
|> Activity.with_preloaded_object()
)
|> order_by([activity], desc: activity.id)
end
Repo.all(query)
@spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
def fetch_activities_for_context(context, opts \\ %{}) do
context
|> fetch_activities_for_context_query(opts)
|> Activity.with_preloaded_object()
|> Repo.all()
end
@spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) ::
Pleroma.FlakeId.t() | nil
def fetch_latest_activity_id_for_context(context, opts \\ %{}) do
context
|> fetch_activities_for_context_query(opts)
|> limit(1)
|> select([a], a.id)
|> Repo.one()
end
def fetch_public_activities(opts \\ %{}) do
@ -782,9 +810,30 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
|> Activity.with_preloaded_object()
end
defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query
defp maybe_preload_bookmarks(query, opts) do
query
|> Activity.with_preloaded_bookmark(opts["user"])
end
defp maybe_order(query, %{order: :desc}) do
query
|> order_by(desc: :id)
end
defp maybe_order(query, %{order: :asc}) do
query
|> order_by(asc: :id)
end
defp maybe_order(query, _), do: query
def fetch_activities_query(recipients, opts \\ %{}) do
Activity
|> maybe_preload_objects(opts)
|> maybe_preload_bookmarks(opts)
|> maybe_order(opts)
|> restrict_recipients(recipients, opts["user"])
|> restrict_tag(opts)
|> restrict_tag_reject(opts)
@ -925,134 +974,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
end
def should_federate?(inbox, public) do
if public do
true
else
inbox_info = URI.parse(inbox)
!Enum.member?(Pleroma.Config.get([:instance, :quarantined_instances], []), inbox_info.host)
end
end
defp recipients(actor, activity) do
followers =
if actor.follower_address in activity.recipients do
{:ok, followers} = User.get_followers(actor)
Enum.filter(followers, &(!&1.local))
else
[]
end
Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers
end
defp get_cc_ap_ids(ap_id, recipients) do
host = Map.get(URI.parse(ap_id), :host)
recipients
|> Enum.filter(fn %User{ap_id: ap_id} -> Map.get(URI.parse(ap_id), :host) == host end)
|> Enum.map(& &1.ap_id)
end
def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
public = is_public?(activity)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
recipients = recipients(actor, activity)
recipients
|> Enum.filter(&User.ap_enabled?/1)
|> Enum.map(fn %{info: %{source_data: data}} -> data["inbox"] end)
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
|> Instances.filter_reachable()
|> Enum.each(fn {inbox, unreachable_since} ->
%User{ap_id: ap_id} =
Enum.find(recipients, fn %{info: %{source_data: data}} -> data["inbox"] == inbox end)
cc = get_cc_ap_ids(ap_id, recipients)
json =
data
|> Map.put("cc", cc)
|> Map.put("directMessage", true)
|> Jason.encode!()
Federator.publish_single_ap(%{
inbox: inbox,
json: json,
actor: actor,
id: activity.data["id"],
unreachable_since: unreachable_since
})
end)
end
def publish(actor, activity) do
public = is_public?(activity)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Jason.encode!(data)
recipients(actor, activity)
|> Enum.filter(&User.ap_enabled?/1)
|> Enum.map(fn %{info: %{source_data: data}} ->
(is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
end)
|> Enum.uniq()
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
|> Instances.filter_reachable()
|> Enum.each(fn {inbox, unreachable_since} ->
Federator.publish_single_ap(%{
inbox: inbox,
json: json,
actor: actor,
id: activity.data["id"],
unreachable_since: unreachable_since
})
end)
end
def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
Logger.info("Federating #{id} to #{inbox}")
host = URI.parse(inbox).host
digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
date =
NaiveDateTime.utc_now()
|> Timex.format!("{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
signature =
Pleroma.Web.HTTPSignatures.sign(actor, %{
host: host,
"content-length": byte_size(json),
digest: digest,
date: date
})
with {:ok, %{status: code}} when code in 200..299 <-
result =
@httpoison.post(
inbox,
json,
[
{"Content-Type", "application/activity+json"},
{"Date", date},
{"signature", signature},
{"digest", digest}
]
) do
if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
do: Instances.set_reachable(inbox)
result
else
{_post_result, response} ->
unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
{:error, response}
end
end
# filter out broken threads
def contain_broken_threads(%Activity{} = activity, %User{} = user) do
entire_thread_visible_for_user?(activity, user)