Stream follow updates

This commit is contained in:
Egor Kislitsyn 2020-12-02 00:17:52 +04:00
commit 35ba48494f
No known key found for this signature in database
GPG key ID: 1B49CB15B71E7805
30 changed files with 256 additions and 149 deletions

View file

@ -36,9 +36,8 @@ defmodule Pleroma.Web.Streamer do
) ::
{:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
case get_topic(stream, user, oauth_token, params) do
{:ok, topic} -> add_socket(topic, user)
error -> error
with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
add_socket(topic, user)
end
end
@ -70,10 +69,10 @@ defmodule Pleroma.Web.Streamer do
def get_topic(
stream,
%User{id: user_id} = user,
%Token{user_id: token_user_id} = oauth_token,
%Token{user_id: user_id} = oauth_token,
_params
)
when stream in @user_streams and user_id == token_user_id do
when stream in @user_streams do
# Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
required_scopes =
if stream == "user:notification" do
@ -97,10 +96,9 @@ defmodule Pleroma.Web.Streamer do
def get_topic(
"list",
%User{id: user_id} = user,
%Token{user_id: token_user_id} = oauth_token,
%Token{user_id: user_id} = oauth_token,
%{"list" => id}
)
when user_id == token_user_id do
) do
cond do
OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
{:error, :unauthorized}
@ -137,16 +135,10 @@ defmodule Pleroma.Web.Streamer do
def stream(topics, items) do
if should_env_send?() do
List.wrap(topics)
|> Enum.each(fn topic ->
List.wrap(items)
|> Enum.each(fn item ->
spawn(fn -> do_stream(topic, item) end)
end)
end)
for topic <- List.wrap(topics), item <- List.wrap(items) do
spawn(fn -> do_stream(topic, item) end)
end
end
:ok
end
def filtered_by_user?(user, item, streamed_type \\ :activity)
@ -160,8 +152,7 @@ defmodule Pleroma.Web.Streamer do
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
with parent <- Object.normalize(item) || item,
true <-
Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
true <-
!(streamed_type == :activity && item.data["type"] == "Announce" &&
@ -195,6 +186,22 @@ defmodule Pleroma.Web.Streamer do
end)
end
defp do_stream("relationships:update", item) do
text = StreamerView.render("relationships_update.json", item)
[item.follower, item.following]
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
|> Enum.each(fn user_topic ->
Logger.debug("Trying to push relationships:update to #{user_topic}\n\n")
Registry.dispatch(@registry, user_topic, fn list ->
Enum.each(list, fn {pid, _auth} ->
send(pid, {:text, text})
end)
end)
end)
end
defp do_stream("participation", participation) do
user_topic = "direct:#{participation.user_id}"
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")