RichMedia: Add support for disabling wss streaming out on backfill
This commit is contained in:
parent
94a28d1286
commit
678fe8a064
4 changed files with 79 additions and 16 deletions
|
|
@ -28,9 +28,9 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
|
||||||
# This is a naive way to do this, just spawning a process per activity
|
# This is a naive way to do this, just spawning a process per activity
|
||||||
# to fetch the preview. However it should be fine considering
|
# to fetch the preview. However it should be fine considering
|
||||||
# pagination is restricted to 40 activities at a time
|
# pagination is restricted to 40 activities at a time
|
||||||
defp fetch_rich_media_for_activities(activities) do
|
defp fetch_rich_media_for_activities(activities, opts) do
|
||||||
Enum.each(activities, fn activity ->
|
Enum.each(activities, fn activity ->
|
||||||
Card.get_by_activity(activity)
|
Card.get_by_activity(activity, opts)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -113,7 +113,8 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
|
||||||
activities = Enum.filter(opts.activities, & &1)
|
activities = Enum.filter(opts.activities, & &1)
|
||||||
|
|
||||||
# Start prefetching rich media before doing anything else
|
# Start prefetching rich media before doing anything else
|
||||||
fetch_rich_media_for_activities(activities)
|
fetch_rich_media_for_activities(activities, opts)
|
||||||
|
|
||||||
replied_to_activities = get_replied_to_activities(activities)
|
replied_to_activities = get_replied_to_activities(activities)
|
||||||
quoted_activities = get_quoted_activities(activities)
|
quoted_activities = get_quoted_activities(activities)
|
||||||
|
|
||||||
|
|
@ -362,7 +363,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
|
||||||
summary = object.data["summary"] || ""
|
summary = object.data["summary"] || ""
|
||||||
|
|
||||||
card =
|
card =
|
||||||
case Card.get_by_activity(activity) do
|
case Card.get_by_activity(activity, opts) do
|
||||||
%Card{} = result -> render("card.json", result)
|
%Card{} = result -> render("card.json", result)
|
||||||
_ -> nil
|
_ -> nil
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,8 @@ defmodule Pleroma.Web.RichMedia.Backfill do
|
||||||
|
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
|
@callback run(map()) :: :ok | Parser.parse_errors() | Helpers.get_errors()
|
||||||
|
|
||||||
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
|
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
|
||||||
@stream_out_impl Pleroma.Config.get(
|
@stream_out_impl Pleroma.Config.get(
|
||||||
[__MODULE__, :stream_out],
|
[__MODULE__, :stream_out],
|
||||||
|
|
@ -26,11 +28,7 @@ defmodule Pleroma.Web.RichMedia.Backfill do
|
||||||
{:ok, card} = Card.create(url, fields)
|
{:ok, card} = Card.create(url, fields)
|
||||||
|
|
||||||
maybe_schedule_expiration(url, fields)
|
maybe_schedule_expiration(url, fields)
|
||||||
|
maybe_update_stream(args)
|
||||||
with %{"activity_id" => activity_id} <- args,
|
|
||||||
false <- is_nil(activity_id) do
|
|
||||||
stream_update(args)
|
|
||||||
end
|
|
||||||
|
|
||||||
warm_cache(url_hash, card)
|
warm_cache(url_hash, card)
|
||||||
:ok
|
:ok
|
||||||
|
|
@ -55,12 +53,16 @@ defmodule Pleroma.Web.RichMedia.Backfill do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp stream_update(%{"activity_id" => activity_id}) do
|
defp maybe_update_stream(%{"activity_id" => activity_id, "stream" => true}) when is_binary(activity_id) do
|
||||||
Pleroma.Activity.get_by_id(activity_id)
|
Pleroma.Activity.get_by_id(activity_id)
|
||||||
|> Pleroma.Activity.normalize()
|
|> Pleroma.Activity.normalize()
|
||||||
|> @stream_out_impl.stream_out()
|
|> @stream_out_impl.stream_out()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Streamer.stream_out returns noop when unsupported activity type is requested to be streamed.
|
||||||
|
# Do the same here for unwanted streaming
|
||||||
|
defp maybe_update_stream(_), do: :noop
|
||||||
|
|
||||||
defp warm_cache(key, val), do: @cachex.put(:rich_media_cache, key, val)
|
defp warm_cache(key, val), do: @cachex.put(:rich_media_cache, key, val)
|
||||||
|
|
||||||
defp negative_cache(key, ttl \\ :timer.minutes(15)),
|
defp negative_cache(key, ttl \\ :timer.minutes(15)),
|
||||||
|
|
|
||||||
|
|
@ -90,8 +90,12 @@ defmodule Pleroma.Web.RichMedia.Card do
|
||||||
|
|
||||||
nil ->
|
nil ->
|
||||||
activity_id = Keyword.get(opts, :activity_id, nil)
|
activity_id = Keyword.get(opts, :activity_id, nil)
|
||||||
|
# Nested opts, first layer comes from get_by_activity/2 as Keyword, second from API views/Federation as Map.
|
||||||
|
# Provide default Map when called directly.
|
||||||
|
opts = Keyword.get(opts, :opts, %{})
|
||||||
|
stream = Map.get(opts, :stream, true)
|
||||||
|
|
||||||
RichMediaWorker.new(%{"op" => "backfill", "url" => url, "activity_id" => activity_id})
|
RichMediaWorker.new(%{"op" => "backfill", "url" => url, "activity_id" => activity_id, "stream" => stream})
|
||||||
|> Oban.insert()
|
|> Oban.insert()
|
||||||
|
|
||||||
nil
|
nil
|
||||||
|
|
@ -112,9 +116,11 @@ defmodule Pleroma.Web.RichMedia.Card do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec get_by_activity(Activity.t()) :: t() | nil | :error
|
@spec get_by_activity(Activity.t(), %{}) :: t() | nil | :error
|
||||||
|
def get_by_activity(activity, opts \\ %{})
|
||||||
|
|
||||||
# Fake/Draft activity
|
# Fake/Draft activity
|
||||||
def get_by_activity(%Activity{id: "pleroma:fakeid"} = activity) do
|
def get_by_activity(%Activity{id: "pleroma:fakeid"} = activity, _opts) do
|
||||||
with {_, true} <- {:config, @config_impl.get([:rich_media, :enabled])},
|
with {_, true} <- {:config, @config_impl.get([:rich_media, :enabled])},
|
||||||
%Object{} = object <- Object.normalize(activity, fetch: false),
|
%Object{} = object <- Object.normalize(activity, fetch: false),
|
||||||
url when not is_nil(url) <- HTML.extract_first_external_url_from_object(object) do
|
url when not is_nil(url) <- HTML.extract_first_external_url_from_object(object) do
|
||||||
|
|
@ -138,13 +144,13 @@ defmodule Pleroma.Web.RichMedia.Card do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def get_by_activity(activity) do
|
def get_by_activity(activity, opts) do
|
||||||
with %Object{} = object <- Object.normalize(activity, fetch: false),
|
with %Object{} = object <- Object.normalize(activity, fetch: false),
|
||||||
{_, nil} <- {:cached, get_cached_url(object, activity.id)} do
|
{_, nil} <- {:cached, get_cached_url(object, activity.id)} do
|
||||||
nil
|
nil
|
||||||
else
|
else
|
||||||
{:cached, url} ->
|
{:cached, url} ->
|
||||||
get_or_backfill_by_url(url, activity_id: activity.id)
|
get_or_backfill_by_url(url, activity_id: activity.id, opts: opts)
|
||||||
|
|
||||||
_ ->
|
_ ->
|
||||||
:error
|
:error
|
||||||
|
|
|
||||||
|
|
@ -5,12 +5,20 @@
|
||||||
defmodule Pleroma.Web.RichMedia.BackfillTest do
|
defmodule Pleroma.Web.RichMedia.BackfillTest do
|
||||||
use Pleroma.DataCase
|
use Pleroma.DataCase
|
||||||
|
|
||||||
|
alias Pleroma.Web.CommonAPI
|
||||||
alias Pleroma.Web.RichMedia.Backfill
|
alias Pleroma.Web.RichMedia.Backfill
|
||||||
alias Pleroma.Web.RichMedia.Card
|
alias Pleroma.Web.RichMedia.Card
|
||||||
|
|
||||||
import Mox
|
import Mox
|
||||||
|
import Pleroma.Factory
|
||||||
|
|
||||||
setup_all do: clear_config([:rich_media, :enabled], true)
|
setup do
|
||||||
|
clear_config([:rich_media, :enabled], true)
|
||||||
|
|
||||||
|
Mox.stub_with(Pleroma.CachexMock, Pleroma.NullCache)
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
test "sets a negative cache entry for an error" do
|
test "sets a negative cache entry for an error" do
|
||||||
url = "https://bad.example.com/"
|
url = "https://bad.example.com/"
|
||||||
|
|
@ -45,4 +53,50 @@ defmodule Pleroma.Web.RichMedia.BackfillTest do
|
||||||
|
|
||||||
Backfill.run(%{"url" => url})
|
Backfill.run(%{"url" => url})
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "streams out update when stream == true" do
|
||||||
|
url = "https://example.com"
|
||||||
|
user = insert(:user)
|
||||||
|
|
||||||
|
Tesla.Mock.mock(fn %{url: ^url} ->
|
||||||
|
{:ok,
|
||||||
|
%Tesla.Env{
|
||||||
|
status: 200,
|
||||||
|
body: "<head><meta name=\"twitter:title\" content=\"Cofe\"></head>"
|
||||||
|
}}
|
||||||
|
end)
|
||||||
|
|
||||||
|
{:ok, activity} = CommonAPI.post(user, %{status: "#cofe #{url}"})
|
||||||
|
|
||||||
|
Pleroma.CachexMock
|
||||||
|
|> expect(:put, fn :rich_media_cache, _, _ -> {:ok, true} end)
|
||||||
|
|
||||||
|
Pleroma.Web.ActivityPub.ActivityPubMock
|
||||||
|
|> expect(:stream_out, fn _ -> :ok end)
|
||||||
|
|
||||||
|
Backfill.run(%{"url" => url, "activity_id" => "#{activity.data["id"]}", "stream" => true})
|
||||||
|
end
|
||||||
|
|
||||||
|
test "does not stream out update when stream == false" do
|
||||||
|
url = "https://example.com"
|
||||||
|
user = insert(:user)
|
||||||
|
|
||||||
|
Tesla.Mock.mock(fn %{url: ^url} ->
|
||||||
|
{:ok,
|
||||||
|
%Tesla.Env{
|
||||||
|
status: 200,
|
||||||
|
body: "<head><meta name=\"twitter:title\" content=\"Cofe\"></head>"
|
||||||
|
}}
|
||||||
|
end)
|
||||||
|
|
||||||
|
{:ok, activity} = CommonAPI.post(user, %{status: "#cofe #{url}"})
|
||||||
|
|
||||||
|
Pleroma.CachexMock
|
||||||
|
|> expect(:put, fn :rich_media_cache, _, _ -> {:ok, true} end)
|
||||||
|
|
||||||
|
Pleroma.Web.ActivityPub.ActivityPubMock
|
||||||
|
|> deny(:stream_out, 1)
|
||||||
|
|
||||||
|
Backfill.run(%{"url" => url, "activity_id" => "#{activity.data["id"]}", "stream" => false})
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue