diff --git a/CHANGELOG.md b/CHANGELOG.md
index b3d8e1e0c..fe6ab002c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
 - A [job queue](https://git.pleroma.social/pleroma/pleroma_job_queue) for federation, emails, web push, etc.
 - [Prometheus](https://prometheus.io/) metrics
 - Support for Mastodon's remote interaction
+- Mix Tasks: `mix pleroma.database bump_all_conversations`
 - Mix Tasks: `mix pleroma.database remove_embedded_objects`
 - Mix Tasks: `mix pleroma.user toggle_confirmed`
 - Federation: Support for reports
diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex
index ab9a3a7ff..42753a1a4 100644
--- a/lib/mix/tasks/pleroma/database.ex
+++ b/lib/mix/tasks/pleroma/database.ex
@@ -4,6 +4,7 @@
 
 defmodule Mix.Tasks.Pleroma.Database do
   alias Mix.Tasks.Pleroma.Common
+  alias Pleroma.Conversation
   require Logger
   use Mix.Task
 
@@ -19,6 +20,11 @@ defmodule Mix.Tasks.Pleroma.Database do
 
     Options:
     - `--vacuum` - run `VACUUM FULL` after the embedded objects are replaced with their references
+
+  ## Create a conversation for all existing DMs. Can be safely re-run.
+
+      mix pleroma.database bump_all_conversations
+
   """
   def run(["remove_embedded_objects" | args]) do
     {options, [], []} =
@@ -48,4 +54,9 @@ defmodule Mix.Tasks.Pleroma.Database do
       )
     end
   end
+
+  def run(["bump_all_conversations"]) do
+    Common.start_pleroma()
+    Conversation.bump_for_all_activities()
+  end
 end
diff --git a/lib/pleroma/conversation.ex b/lib/pleroma/conversation.ex
index 0db195988..238c1acf2 100644
--- a/lib/pleroma/conversation.ex
+++ b/lib/pleroma/conversation.ex
@@ -45,7 +45,7 @@ defmodule Pleroma.Conversation do
   2. Create a participation for all the people involved who don't have one already
   3. Bump all relevant participations to 'unread'
   """
-  def create_or_bump_for(activity) do
+  def create_or_bump_for(activity, opts \\ []) do
     with true <- Pleroma.Web.ActivityPub.Visibility.is_direct?(activity),
          "Create" <- activity.data["type"],
          object <- Pleroma.Object.normalize(activity),
@@ -58,7 +58,7 @@ defmodule Pleroma.Conversation do
       participations =
         Enum.map(users, fn user ->
           {:ok, participation} =
-            Participation.create_for_user_and_conversation(user, conversation)
+            Participation.create_for_user_and_conversation(user, conversation, opts)
 
           participation
         end)
@@ -72,4 +72,21 @@ defmodule Pleroma.Conversation do
       e -> {:error, e}
     end
   end
+
+  @doc """
+  This is only meant to be run by a mix task. It creates conversations/participations for all direct messages in the database.
+  """
+  def bump_for_all_activities do
+    stream =
+      Pleroma.Web.ActivityPub.ActivityPub.fetch_direct_messages_query()
+      |> Repo.stream()
+
+    Repo.transaction(
+      fn ->
+        stream
+        |> Enum.each(fn a -> create_or_bump_for(a, read: true) end)
+      end,
+      timeout: :infinity
+    )
+  end
 end
diff --git a/lib/pleroma/conversation/participation.ex b/lib/pleroma/conversation/participation.ex
index 61021fb18..2a11f9069 100644
--- a/lib/pleroma/conversation/participation.ex
+++ b/lib/pleroma/conversation/participation.ex
@@ -22,15 +22,17 @@ defmodule Pleroma.Conversation.Participation do
 
   def creation_cng(struct, params) do
     struct
-    |> cast(params, [:user_id, :conversation_id])
+    |> cast(params, [:user_id, :conversation_id, :read])
     |> validate_required([:user_id, :conversation_id])
   end
 
-  def create_for_user_and_conversation(user, conversation) do
+  def create_for_user_and_conversation(user, conversation, opts \\ []) do
+    read = !!opts[:read]
+
     %__MODULE__{}
-    |> creation_cng(%{user_id: user.id, conversation_id: conversation.id})
+    |> creation_cng(%{user_id: user.id, conversation_id: conversation.id, read: read})
     |> Repo.insert(
-      on_conflict: [set: [read: false, updated_at: NaiveDateTime.utc_now()]],
+      on_conflict: [set: [read: read, updated_at: NaiveDateTime.utc_now()]],
       returning: true,
       conflict_target: [:user_id, :conversation_id]
     )
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 7cd5b889b..92d1fab6e 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -974,4 +974,11 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
   def contain_activity(%Activity{} = activity, %User{} = user) do
     contain_broken_threads(activity, user)
   end
+
+  def fetch_direct_messages_query do
+    Activity
+    |> restrict_type(%{"type" => "Create"})
+    |> restrict_visibility(%{visibility: "direct"})
+    |> order_by([activity], asc: activity.id)
+  end
 end
diff --git a/lib/pleroma/web/activity_pub/visibility.ex b/lib/pleroma/web/activity_pub/visibility.ex
index 46dd46575..93b50ee47 100644
--- a/lib/pleroma/web/activity_pub/visibility.ex
+++ b/lib/pleroma/web/activity_pub/visibility.ex
@@ -14,11 +14,12 @@ defmodule Pleroma.Web.ActivityPub.Visibility do
   end
 
   def is_private?(activity) do
-    unless is_public?(activity) do
-      follower_address = User.get_cached_by_ap_id(activity.data["actor"]).follower_address
-      Enum.any?(activity.data["to"], &(&1 == follower_address))
+    with false <- is_public?(activity),
+         %User{follower_address: follower_address} <-
+           User.get_cached_by_ap_id(activity.data["actor"]) do
+      follower_address in activity.data["to"]
     else
-      false
+      _ -> false
     end
   end
 
diff --git a/test/conversation_test.exs b/test/conversation_test.exs
index 864b2eb03..5903d10ff 100644
--- a/test/conversation_test.exs
+++ b/test/conversation_test.exs
@@ -11,6 +11,26 @@ defmodule Pleroma.ConversationTest do
 
   import Pleroma.Factory
 
+  test "it goes through old direct conversations" do
+    user = insert(:user)
+    other_user = insert(:user)
+
+    {:ok, _activity} =
+      CommonAPI.post(user, %{"visibility" => "direct", "status" => "hey @#{other_user.nickname}"})
+
+    Repo.delete_all(Conversation)
+    Repo.delete_all(Conversation.Participation)
+
+    refute Repo.one(Conversation)
+
+    Conversation.bump_for_all_activities()
+
+    assert Repo.one(Conversation)
+    [participation, _p2] = Repo.all(Conversation.Participation)
+
+    assert participation.read
+  end
+
   test "it creates a conversation for given ap_id" do
     assert {:ok, %Conversation{} = conversation} =
              Conversation.create_for_ap_id("https://some_ap_id")
diff --git a/test/web/activity_pub/visibilty_test.exs b/test/web/activity_pub/visibilty_test.exs
index 9c03c8be2..e2584f635 100644
--- a/test/web/activity_pub/visibilty_test.exs
+++ b/test/web/activity_pub/visibilty_test.exs
@@ -96,6 +96,16 @@ defmodule Pleroma.Web.ActivityPub.VisibilityTest do
     refute Visibility.visible_for_user?(direct, unrelated)
   end
 
+  test "doesn't die when the user doesn't exist",
+       %{
+         direct: direct,
+         user: user
+       } do
+    Repo.delete(user)
+    Cachex.clear(:user_cache)
+    refute Visibility.is_private?(direct)
+  end
+
   test "get_visibility", %{
     public: public,
     private: private,