Merge branch 'develop' into issue-7887-featured-collection
This commit is contained in:
commit
33fdc59bc1
17 changed files with 418 additions and 65 deletions
|
|
@ -9,8 +9,8 @@ defmodule Pleroma.Web.Endpoint do
|
|||
|
||||
alias Pleroma.Config
|
||||
|
||||
socket("/api/v1/streaming", Pleroma.Web.MastodonAPI.WebsocketHandler,
|
||||
longpoll: false,
|
||||
plug(Pleroma.Web.MastodonAPI.WebsocketPlug,
|
||||
path: "/api/v1/streaming",
|
||||
websocket: [
|
||||
path: "/",
|
||||
compress: false,
|
||||
|
|
@ -169,8 +169,7 @@ defmodule Pleroma.Web.Endpoint do
|
|||
else: "pleroma_key"
|
||||
|
||||
extra =
|
||||
Config.get([__MODULE__, :extra_cookie_attrs])
|
||||
|> Enum.join(";")
|
||||
Enum.join(Config.get([__MODULE__, :extra_cookie_attrs]), ";")
|
||||
|
||||
# The session will be stored in the cookie and signed,
|
||||
# this means its contents can be read but not tampered with.
|
||||
|
|
|
|||
|
|
@ -28,9 +28,13 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
|
|||
# This is a naive way to do this, just spawning a process per activity
|
||||
# to fetch the preview. However it should be fine considering
|
||||
# pagination is restricted to 40 activities at a time
|
||||
defp fetch_rich_media_for_activities(activities) do
|
||||
# Force disable Websockets streaming for backfill jobs,
|
||||
# otherwise old posts can show up on timelines.
|
||||
defp fetch_rich_media_for_activities(activities, opts) do
|
||||
opts = Map.put(opts, :stream, false)
|
||||
|
||||
Enum.each(activities, fn activity ->
|
||||
Card.get_by_activity(activity)
|
||||
Card.get_by_activity(activity, opts)
|
||||
end)
|
||||
end
|
||||
|
||||
|
|
@ -113,7 +117,8 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
|
|||
activities = Enum.filter(opts.activities, & &1)
|
||||
|
||||
# Start prefetching rich media before doing anything else
|
||||
fetch_rich_media_for_activities(activities)
|
||||
fetch_rich_media_for_activities(activities, opts)
|
||||
|
||||
replied_to_activities = get_replied_to_activities(activities)
|
||||
quoted_activities = get_quoted_activities(activities)
|
||||
|
||||
|
|
@ -361,8 +366,10 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
|
|||
|
||||
summary = object.data["summary"] || ""
|
||||
|
||||
# Force disable Websockets streaming for backfill jobs which the below call will create,
|
||||
# otherwise old posts can show up on timelines.
|
||||
card =
|
||||
case Card.get_by_activity(activity) do
|
||||
case Card.get_by_activity(activity, Map.put(opts, :stream, false)) do
|
||||
%Card{} = result -> render("card.json", result)
|
||||
_ -> nil
|
||||
end
|
||||
|
|
|
|||
|
|
@ -67,9 +67,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
|
|||
|
||||
@impl Phoenix.Socket.Transport
|
||||
def handle_in({text, [opcode: :text]}, state) do
|
||||
with {:ok, %{} = event} <- Jason.decode(text) do
|
||||
handle_client_event(event, state)
|
||||
else
|
||||
case Jason.decode(text) do
|
||||
{:ok, %{} = event} ->
|
||||
handle_client_event(event, state)
|
||||
|
||||
_ ->
|
||||
Logger.error("#{__MODULE__} received non-JSON event: #{inspect(text)}")
|
||||
{:ok, state}
|
||||
|
|
@ -85,11 +86,11 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
|
|||
def handle_info({:render_with_user, view, template, item, topic}, state) do
|
||||
user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
|
||||
|
||||
unless Streamer.filtered_by_user?(user, item) do
|
||||
if Streamer.filtered_by_user?(user, item) do
|
||||
{:ok, state}
|
||||
else
|
||||
message = view.render(template, item, user, topic)
|
||||
{:push, {:text, message}, %{state | user: user}}
|
||||
else
|
||||
{:ok, state}
|
||||
end
|
||||
end
|
||||
|
||||
|
|
@ -253,7 +254,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
|
|||
|
||||
defp find_sec_websocket_protocol(sec_headers) do
|
||||
Enum.find_value(sec_headers, fn
|
||||
{"sec-websocket-protocol", token} -> token
|
||||
{"sec-websocket-protocol", protocols} -> protocols |> Plug.Conn.Utils.list() |> List.first()
|
||||
_ -> nil
|
||||
end)
|
||||
end
|
||||
|
|
|
|||
104
lib/pleroma/web/mastodon_api/websocket_plug.ex
Normal file
104
lib/pleroma/web/mastodon_api/websocket_plug.ex
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.MastodonAPI.WebsocketPlug do
|
||||
@moduledoc """
|
||||
A Phoenix 1.8 compatible WebSocket transport for Mastodon streaming.
|
||||
|
||||
It mirrors Phoenix.Transports.WebSocket, but echoes a successfully authenticated
|
||||
Mastodon-style Sec-WebSocket-Protocol token so browser clients accept the handshake.
|
||||
"""
|
||||
|
||||
@behaviour Plug
|
||||
|
||||
import Plug.Conn
|
||||
|
||||
alias Phoenix.Socket.Transport
|
||||
alias Pleroma.Web.Endpoint
|
||||
alias Pleroma.Web.MastodonAPI.WebsocketHandler
|
||||
|
||||
@connect_info_opts [:check_csrf]
|
||||
|
||||
@impl Plug
|
||||
def init(opts) do
|
||||
path = String.split(Keyword.fetch!(opts, :path), "/", trim: true)
|
||||
websocket = Keyword.fetch!(opts, :websocket)
|
||||
config = Transport.load_config(websocket, Phoenix.Transports.WebSocket)
|
||||
|
||||
{path, config}
|
||||
end
|
||||
|
||||
@impl Plug
|
||||
def call(%{method: "GET", path_info: path} = conn, {path, opts}) do
|
||||
conn
|
||||
|> fetch_query_params()
|
||||
|> Transport.code_reload(Endpoint, opts)
|
||||
|> Transport.transport_log(opts[:transport_log])
|
||||
|> Transport.check_origin(WebsocketHandler, Endpoint, opts)
|
||||
|> connect(opts)
|
||||
end
|
||||
|
||||
def call(%{path_info: path} = conn, {path, _opts}) do
|
||||
conn
|
||||
|> send_resp(400, "")
|
||||
|> halt()
|
||||
end
|
||||
|
||||
def call(conn, _opts), do: conn
|
||||
|
||||
defp connect(%{halted: true} = conn, _opts), do: conn
|
||||
|
||||
defp connect(%{params: params} = conn, opts) do
|
||||
keys = Keyword.get(opts, :connect_info, [])
|
||||
|
||||
connect_info =
|
||||
Transport.connect_info(conn, Endpoint, keys, Keyword.take(opts, @connect_info_opts))
|
||||
|
||||
config = %{
|
||||
endpoint: Endpoint,
|
||||
transport: :websocket,
|
||||
options: opts,
|
||||
params: params,
|
||||
connect_info: connect_info
|
||||
}
|
||||
|
||||
case WebsocketHandler.connect(config) do
|
||||
{:ok, arg} ->
|
||||
try do
|
||||
conn
|
||||
|> echo_sec_websocket_protocol()
|
||||
|> WebSockAdapter.upgrade(WebsocketHandler, arg, opts)
|
||||
|> halt()
|
||||
rescue
|
||||
e in WebSockAdapter.UpgradeError ->
|
||||
conn
|
||||
|> send_resp(400, e.message)
|
||||
|> halt()
|
||||
end
|
||||
|
||||
:error ->
|
||||
conn
|
||||
|> send_resp(403, "")
|
||||
|> halt()
|
||||
|
||||
{:error, reason} ->
|
||||
{m, f, args} = opts[:error_handler]
|
||||
|
||||
halt(apply(m, f, [conn, reason | args]))
|
||||
end
|
||||
end
|
||||
|
||||
defp echo_sec_websocket_protocol(conn) do
|
||||
case get_req_header(conn, "sec-websocket-protocol") do
|
||||
[protocols | _] ->
|
||||
case Plug.Conn.Utils.list(protocols) do
|
||||
[protocol | _] -> put_resp_header(conn, "sec-websocket-protocol", protocol)
|
||||
[] -> conn
|
||||
end
|
||||
|
||||
[] ->
|
||||
conn
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
@ -11,6 +11,8 @@ defmodule Pleroma.Web.RichMedia.Backfill do
|
|||
|
||||
require Logger
|
||||
|
||||
@callback run(map()) :: :ok | Parser.parse_errors() | Helpers.get_errors()
|
||||
|
||||
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
|
||||
@stream_out_impl Pleroma.Config.get(
|
||||
[__MODULE__, :stream_out],
|
||||
|
|
@ -26,11 +28,7 @@ defmodule Pleroma.Web.RichMedia.Backfill do
|
|||
{:ok, card} = Card.create(url, fields)
|
||||
|
||||
maybe_schedule_expiration(url, fields)
|
||||
|
||||
with %{"activity_id" => activity_id} <- args,
|
||||
false <- is_nil(activity_id) do
|
||||
stream_update(args)
|
||||
end
|
||||
maybe_update_stream(args)
|
||||
|
||||
warm_cache(url_hash, card)
|
||||
:ok
|
||||
|
|
@ -55,12 +53,17 @@ defmodule Pleroma.Web.RichMedia.Backfill do
|
|||
end
|
||||
end
|
||||
|
||||
defp stream_update(%{"activity_id" => activity_id}) do
|
||||
defp maybe_update_stream(%{"activity_id" => activity_id, "stream" => true})
|
||||
when is_binary(activity_id) do
|
||||
Pleroma.Activity.get_by_id(activity_id)
|
||||
|> Pleroma.Activity.normalize()
|
||||
|> @stream_out_impl.stream_out()
|
||||
end
|
||||
|
||||
# Streamer.stream_out returns noop when unsupported activity type is requested to be streamed.
|
||||
# Do the same here for unwanted streaming
|
||||
defp maybe_update_stream(_), do: :noop
|
||||
|
||||
defp warm_cache(key, val), do: @cachex.put(:rich_media_cache, key, val)
|
||||
|
||||
defp negative_cache(key, ttl \\ :timer.minutes(15)),
|
||||
|
|
|
|||
|
|
@ -91,7 +91,18 @@ defmodule Pleroma.Web.RichMedia.Card do
|
|||
nil ->
|
||||
activity_id = Keyword.get(opts, :activity_id, nil)
|
||||
|
||||
RichMediaWorker.new(%{"op" => "backfill", "url" => url, "activity_id" => activity_id})
|
||||
# Nested opts, first layer comes from get_by_activity/2 as Keyword,
|
||||
# second from API views/Federation as Map.
|
||||
# Provide default Map when called directly.
|
||||
opts = Keyword.get(opts, :opts, %{})
|
||||
stream = Map.get(opts, :stream, true)
|
||||
|
||||
RichMediaWorker.new(%{
|
||||
"op" => "backfill",
|
||||
"url" => url,
|
||||
"activity_id" => activity_id,
|
||||
"stream" => stream
|
||||
})
|
||||
|> Oban.insert()
|
||||
|
||||
nil
|
||||
|
|
@ -112,9 +123,11 @@ defmodule Pleroma.Web.RichMedia.Card do
|
|||
end
|
||||
end
|
||||
|
||||
@spec get_by_activity(Activity.t()) :: t() | nil | :error
|
||||
@spec get_by_activity(Activity.t(), %{}) :: t() | nil | :error
|
||||
def get_by_activity(activity, opts \\ %{})
|
||||
|
||||
# Fake/Draft activity
|
||||
def get_by_activity(%Activity{id: "pleroma:fakeid"} = activity) do
|
||||
def get_by_activity(%Activity{id: "pleroma:fakeid"} = activity, _opts) do
|
||||
with {_, true} <- {:config, @config_impl.get([:rich_media, :enabled])},
|
||||
%Object{} = object <- Object.normalize(activity, fetch: false),
|
||||
url when not is_nil(url) <- HTML.extract_first_external_url_from_object(object) do
|
||||
|
|
@ -138,13 +151,13 @@ defmodule Pleroma.Web.RichMedia.Card do
|
|||
end
|
||||
end
|
||||
|
||||
def get_by_activity(activity) do
|
||||
def get_by_activity(activity, opts) do
|
||||
with %Object{} = object <- Object.normalize(activity, fetch: false),
|
||||
{_, nil} <- {:cached, get_cached_url(object, activity.id)} do
|
||||
nil
|
||||
else
|
||||
{:cached, url} ->
|
||||
get_or_backfill_by_url(url, activity_id: activity.id)
|
||||
get_or_backfill_by_url(url, activity_id: activity.id, opts: opts)
|
||||
|
||||
_ ->
|
||||
:error
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue