From a743940463a7d0a7346f77792310dff6a98e7f31 Mon Sep 17 00:00:00 2001
From: Roger Braun <rbraun@Bobble.local>
Date: Thu, 16 Nov 2017 16:49:51 +0100
Subject: [PATCH] MastoAPI: Implement all streaming functions.

---
 lib/pleroma/notification.ex                   |  3 +-
 lib/pleroma/user.ex                           | 11 +++++
 lib/pleroma/web/activity_pub/activity_pub.ex  |  1 +
 .../mastodon_api/mastodon_api_controller.ex   |  2 +-
 .../web/mastodon_api/mastodon_socket.ex       |  2 +-
 lib/pleroma/web/streamer.ex                   | 42 +++++++++++++++++--
 test/user_test.exs                            | 18 ++++++++
 7 files changed, 73 insertions(+), 6 deletions(-)

diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex
index 039cc7312..65e3265d4 100644
--- a/lib/pleroma/notification.ex
+++ b/lib/pleroma/notification.ex
@@ -78,8 +78,9 @@ defmodule Pleroma.Notification do
   # TODO move to sql, too.
   def create_notification(%Activity{} = activity, %User{} = user) do
     unless User.blocks?(user, %{ap_id: activity.data["actor"]}) do
-      notification = %Notification{user_id: user.id, activity_id: activity.id}
+      notification = %Notification{user_id: user.id, activity: activity}
       {:ok, notification} = Repo.insert(notification)
+      Pleroma.Web.Streamer.stream("user", notification)
       notification
     end
   end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 771c54e81..56502e897 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -284,6 +284,17 @@ defmodule Pleroma.User do
     Repo.all(query)
   end
 
+  def get_recipients_from_activity(%Activity{data: %{"to" => to}} = activity) do
+    query = from u in User,
+      where: u.local == true
+
+    query = from u in query,
+      where: u.ap_id in ^to,
+      or_where: fragment("? \\\?| ?", u.following, ^to)
+
+    Repo.all(query)
+  end
+
   def search(query, resolve) do
     if resolve do
       User.get_or_fetch_by_nickname(query)
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 5cbf14868..b4e59050b 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -24,6 +24,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          :ok <- maybe_federate(activity) do
       if activity.data["type"] == "Create" and Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do
         Pleroma.Web.Streamer.stream("public", activity)
+        Pleroma.Web.Streamer.stream("user", activity)
         if local do
           Pleroma.Web.Streamer.stream("public:local", activity)
         end
diff --git a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex
index 8b5714555..bbd003b06 100644
--- a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex
+++ b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex
@@ -595,7 +595,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
     json(conn, [])
   end
 
-  defp render_notification(user, %{id: id, activity: activity, inserted_at: created_at} = _params) do
+  def render_notification(user, %{id: id, activity: activity, inserted_at: created_at} = _params) do
     actor = User.get_cached_by_ap_id(activity.data["actor"])
     created_at = NaiveDateTime.to_iso8601(created_at)
     |> String.replace(~r/(\.\d+)?$/, ".000Z", global: false)
diff --git a/lib/pleroma/web/mastodon_api/mastodon_socket.ex b/lib/pleroma/web/mastodon_api/mastodon_socket.ex
index af76c8701..1d276e64a 100644
--- a/lib/pleroma/web/mastodon_api/mastodon_socket.ex
+++ b/lib/pleroma/web/mastodon_api/mastodon_socket.ex
@@ -11,7 +11,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
     with token when not is_nil(token) <- params["access_token"],
          %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
          %User{} = user <- Repo.get(User, user_id),
-         stream when stream in ["public", "public:local"] <- params["stream"] do
+         stream when stream in ["public", "public:local", "user"] <- params["stream"] do
       socket = socket
       |> assign(:topic, params["stream"])
       |> assign(:user, user)
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index 3b2938676..9f1080015 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -2,6 +2,7 @@ defmodule Pleroma.Web.Streamer do
   use GenServer
   require Logger
   import Plug.Conn
+  alias Pleroma.{User, Notification}
 
   def start_link do
     spawn(fn ->
@@ -37,21 +38,55 @@ defmodule Pleroma.Web.Streamer do
     {:noreply, topics}
   end
 
-  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
-    Logger.debug("Trying to push to #{topic}")
-    Logger.debug("Pushing item to #{topic}")
+  def push_to_socket(topics, topic, item) do
     Enum.each(topics[topic] || [], fn (socket) ->
       json = %{
         event: "update",
         payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item, for: socket.assigns[:user]) |> Poison.encode!
       } |> Poison.encode!
 
+      send socket.transport_pid, {:text, json}
+    end)
+  end
+
+  def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
+    topic = "user:#{item.user_id}"
+    Enum.each(topics[topic] || [], fn (socket) ->
+      json = %{
+        event: "notification",
+        payload: Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(socket.assigns["user"], item) |> Poison.encode!
+      } |> Poison.encode!
+
       send socket.transport_pid, {:text, json}
     end)
     {:noreply, topics}
   end
 
+  def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
+    Logger.debug("Trying to push to users")
+    recipient_topics = User.get_recipients_from_activity(item)
+    |> Enum.map(fn (%{id: id}) -> "user:#{id}" end)
+
+    Enum.each(recipient_topics, fn (topic) ->
+      push_to_socket(topics, topic, item)
+    end)
+    {:noreply, topics}
+  end
+
+  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
+    Logger.debug("Trying to push to #{topic}")
+    Logger.debug("Pushing item to #{topic}")
+    push_to_socket(topics, topic, item)
+    {:noreply, topics}
+  end
+
+  defp internal_topic("user", socket) do
+    "user:#{socket.assigns[:user].id}"
+  end
+  defp internal_topic(topic, socket), do: topic
+
   def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
+    topic = internal_topic(topic, socket)
     sockets_for_topic = sockets[topic] || []
     sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
     sockets = Map.put(sockets, topic, sockets_for_topic)
@@ -61,6 +96,7 @@ defmodule Pleroma.Web.Streamer do
   end
 
   def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
+    topic = internal_topic(topic, socket)
     sockets_for_topic = sockets[topic] || []
     sockets_for_topic = List.delete(sockets_for_topic, socket)
     sockets = Map.put(sockets, topic, sockets_for_topic)
diff --git a/test/user_test.exs b/test/user_test.exs
index 151b9afc0..430f56846 100644
--- a/test/user_test.exs
+++ b/test/user_test.exs
@@ -3,6 +3,7 @@ defmodule Pleroma.UserTest do
   alias Pleroma.{User, Repo}
   alias Pleroma.Web.OStatus
   alias Pleroma.Web.Websub.WebsubClientSubscription
+  alias Pleroma.Web.CommonAPI
   use Pleroma.DataCase
 
   import Pleroma.Factory
@@ -296,5 +297,22 @@ defmodule Pleroma.UserTest do
       refute User.blocks?(user, blocked_user)
     end
   end
+
+  test "get recipients from activity" do
+    actor = insert(:user)
+    user = insert(:user, local: true)
+    user_two = insert(:user, local: false)
+    addressed = insert(:user, local: true)
+    addressed_remote = insert(:user, local: false)
+    {:ok, activity} = CommonAPI.post(actor, %{"status" => "hey @#{addressed.nickname} @#{addressed_remote.nickname}"})
+
+    assert [addressed] == User.get_recipients_from_activity(activity)
+
+    {:ok, user} = User.follow(user, actor)
+    recipients = User.get_recipients_from_activity(activity)
+    assert length(recipients) == 2
+    assert user in recipients
+    assert addressed in recipients
+  end
 end