Merge pull request 'Search: filter indexable activities before inserting Oban jobs' (#7538) from gitlab-mr-iid-4161 into develop
Reviewed-on: https://git.pleroma.social/pleroma/pleroma/pulls/7538
This commit is contained in:
commit
1d819195b6
6 changed files with 119 additions and 60 deletions
1
changelog.d/search-indexing.change
Normal file
1
changelog.d/search-indexing.change
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
Filter indexable activities before inserting indexing jobs into the queue.
|
||||||
|
|
@ -1,11 +1,28 @@
|
||||||
defmodule Pleroma.Search do
|
defmodule Pleroma.Search do
|
||||||
|
alias Pleroma.Activity
|
||||||
|
alias Pleroma.Object
|
||||||
|
alias Pleroma.Web.ActivityPub.Visibility
|
||||||
alias Pleroma.Workers.SearchIndexingWorker
|
alias Pleroma.Workers.SearchIndexingWorker
|
||||||
|
|
||||||
def add_to_index(%Pleroma.Activity{id: activity_id}) do
|
@spec add_to_index(Activity.t()) :: {:ok, Oban.Job.t() | :noop} | {:error, Oban.Job.changeset()}
|
||||||
SearchIndexingWorker.new(%{"op" => "add_to_index", "activity" => activity_id})
|
def add_to_index(%Activity{id: activity_id, object: %Object{} = object} = activity) do
|
||||||
|> Oban.insert()
|
with {_, true} <- {:indexable, indexable?(activity)},
|
||||||
|
{_, "public"} <- {:visibility, Visibility.get_visibility(object)} do
|
||||||
|
SearchIndexingWorker.new(%{"op" => "add_to_index", "activity" => activity_id})
|
||||||
|
|> Oban.insert()
|
||||||
|
else
|
||||||
|
_ -> {:ok, :noop}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def add_to_index(%Activity{id: activity_id}) do
|
||||||
|
case Activity.get_by_id_with_object(activity_id) do
|
||||||
|
%Activity{} = preloaded -> add_to_index(preloaded)
|
||||||
|
_ -> :ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec remove_from_index(Object.t()) :: {:ok, Oban.Job.t()} | {:error, Oban.Job.changeset()}
|
||||||
def remove_from_index(%Pleroma.Object{id: object_id}) do
|
def remove_from_index(%Pleroma.Object{id: object_id}) do
|
||||||
SearchIndexingWorker.new(%{"op" => "remove_from_index", "object" => object_id})
|
SearchIndexingWorker.new(%{"op" => "remove_from_index", "object" => object_id})
|
||||||
|> Oban.insert()
|
|> Oban.insert()
|
||||||
|
|
@ -20,4 +37,7 @@ defmodule Pleroma.Search do
|
||||||
search_module = Pleroma.Config.get([Pleroma.Search, :module])
|
search_module = Pleroma.Config.get([Pleroma.Search, :module])
|
||||||
search_module.healthcheck_endpoints()
|
search_module.healthcheck_endpoints()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp indexable?(%Activity{data: %{"type" => "Create"}}), do: true
|
||||||
|
defp indexable?(_), do: false
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ defmodule Pleroma.Search.Meilisearch do
|
||||||
|
|
||||||
alias Pleroma.Activity
|
alias Pleroma.Activity
|
||||||
alias Pleroma.Config.Getting, as: Config
|
alias Pleroma.Config.Getting, as: Config
|
||||||
|
alias Pleroma.Object
|
||||||
|
|
||||||
import Pleroma.Search.DatabaseSearch
|
import Pleroma.Search.DatabaseSearch
|
||||||
import Ecto.Query
|
import Ecto.Query
|
||||||
|
|
@ -156,28 +157,23 @@ defmodule Pleroma.Search.Meilisearch do
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def add_to_index(activity) do
|
def add_to_index(%Activity{object: %Object{} = object} = activity) do
|
||||||
maybe_search_data = object_to_search_data(activity.object)
|
search_data = object_to_search_data(object)
|
||||||
|
|
||||||
if activity.data["type"] == "Create" and maybe_search_data do
|
result =
|
||||||
result =
|
meili_put(
|
||||||
meili_put(
|
"/indexes/objects/documents",
|
||||||
"/indexes/objects/documents",
|
[search_data]
|
||||||
[maybe_search_data]
|
)
|
||||||
)
|
|
||||||
|
|
||||||
with {:ok, %{"status" => "enqueued"}} <- result do
|
with {:ok, %{"status" => "enqueued"}} <- result do
|
||||||
# Added successfully
|
# Added successfully
|
||||||
:ok
|
|
||||||
else
|
|
||||||
_ ->
|
|
||||||
# There was an error, report it
|
|
||||||
Logger.error("Failed to add activity #{activity.id} to index: #{inspect(result)}")
|
|
||||||
{:error, result}
|
|
||||||
end
|
|
||||||
else
|
|
||||||
# The post isn't something we can search, that's ok
|
|
||||||
:ok
|
:ok
|
||||||
|
else
|
||||||
|
_ ->
|
||||||
|
# There was an error, report it
|
||||||
|
Logger.error("Failed to add activity #{activity.id} to index: #{inspect(result)}")
|
||||||
|
{:error, result}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ defmodule Pleroma.Search.QdrantSearch do
|
||||||
|
|
||||||
alias Pleroma.Activity
|
alias Pleroma.Activity
|
||||||
alias Pleroma.Config.Getting, as: Config
|
alias Pleroma.Config.Getting, as: Config
|
||||||
|
alias Pleroma.Object
|
||||||
|
|
||||||
alias __MODULE__.OpenAIClient
|
alias __MODULE__.OpenAIClient
|
||||||
alias __MODULE__.QdrantClient
|
alias __MODULE__.QdrantClient
|
||||||
|
|
@ -82,23 +83,18 @@ defmodule Pleroma.Search.QdrantSearch do
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def add_to_index(activity) do
|
def add_to_index(%Activity{object: %Object{} = object} = activity) do
|
||||||
# This will only index public or unlisted notes
|
search_data = object_to_search_data(object)
|
||||||
maybe_search_data = object_to_search_data(activity.object)
|
|
||||||
|
|
||||||
if activity.data["type"] == "Create" and maybe_search_data do
|
with {:ok, embedding} <- get_embedding(search_data.content),
|
||||||
with {:ok, embedding} <- get_embedding(maybe_search_data.content),
|
{:ok, %{status: 200}} <-
|
||||||
{:ok, %{status: 200}} <-
|
QdrantClient.put(
|
||||||
QdrantClient.put(
|
"/collections/posts/points",
|
||||||
"/collections/posts/points",
|
build_index_payload(activity, embedding)
|
||||||
build_index_payload(activity, embedding)
|
) do
|
||||||
) do
|
|
||||||
:ok
|
|
||||||
else
|
|
||||||
e -> {:error, e}
|
|
||||||
end
|
|
||||||
else
|
|
||||||
:ok
|
:ok
|
||||||
|
else
|
||||||
|
e -> {:error, e}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -74,29 +74,6 @@ defmodule Pleroma.Search.MeilisearchTest do
|
||||||
assert_received("posted_to_meilisearch")
|
assert_received("posted_to_meilisearch")
|
||||||
end
|
end
|
||||||
|
|
||||||
test "doesn't index posts that are not public" do
|
|
||||||
user = insert(:user)
|
|
||||||
|
|
||||||
Enum.each(["private", "direct"], fn visibility ->
|
|
||||||
{:ok, activity} =
|
|
||||||
CommonAPI.post(user, %{
|
|
||||||
status: "guys i just don't wanna leave the swamp",
|
|
||||||
visibility: visibility
|
|
||||||
})
|
|
||||||
|
|
||||||
args = %{"op" => "add_to_index", "activity" => activity.id}
|
|
||||||
|
|
||||||
Config
|
|
||||||
|> expect(:get, fn
|
|
||||||
[Pleroma.Search, :module], nil ->
|
|
||||||
Meilisearch
|
|
||||||
end)
|
|
||||||
|
|
||||||
assert_enqueued(worker: SearchIndexingWorker, args: args)
|
|
||||||
assert :ok = perform_job(SearchIndexingWorker, args)
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
test "deletes posts from index when deleted locally" do
|
test "deletes posts from index when deleted locally" do
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
|
|
||||||
|
|
|
||||||
69
test/pleroma/search_test.exs
Normal file
69
test/pleroma/search_test.exs
Normal file
|
|
@ -0,0 +1,69 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.SearchTest do
|
||||||
|
use Pleroma.DataCase, async: true
|
||||||
|
use Oban.Testing, repo: Pleroma.Repo
|
||||||
|
|
||||||
|
import Pleroma.Factory
|
||||||
|
|
||||||
|
alias Pleroma.Web.CommonAPI
|
||||||
|
alias Pleroma.Workers.SearchIndexingWorker
|
||||||
|
|
||||||
|
test "indexes posts that are public" do
|
||||||
|
user = insert(:user)
|
||||||
|
|
||||||
|
{:ok, activity} =
|
||||||
|
CommonAPI.post(user, %{
|
||||||
|
status: "Well this is a story all about how my life got flipped turned upside down",
|
||||||
|
visibility: "public"
|
||||||
|
})
|
||||||
|
|
||||||
|
args = %{"op" => "add_to_index", "activity" => activity.id}
|
||||||
|
|
||||||
|
assert_enqueued(worker: SearchIndexingWorker, args: args)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "doesn't index posts that are not public" do
|
||||||
|
user = insert(:user)
|
||||||
|
|
||||||
|
Enum.each(["private", "direct"], fn visibility ->
|
||||||
|
{:ok, activity} =
|
||||||
|
CommonAPI.post(user, %{
|
||||||
|
status: "guys i just don't wanna leave the swamp",
|
||||||
|
visibility: visibility
|
||||||
|
})
|
||||||
|
|
||||||
|
args = %{"op" => "add_to_index", "activity" => activity.id}
|
||||||
|
|
||||||
|
refute_enqueued(worker: SearchIndexingWorker, args: args)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "Indexes appropriate activity types" do
|
||||||
|
user = insert(:user)
|
||||||
|
|
||||||
|
{:ok, activity} =
|
||||||
|
CommonAPI.post(user, %{
|
||||||
|
status: "I'm my own hype man",
|
||||||
|
visibility: "public"
|
||||||
|
})
|
||||||
|
|
||||||
|
args = %{"op" => "add_to_index", "activity" => activity.id}
|
||||||
|
|
||||||
|
assert_enqueued(worker: SearchIndexingWorker, args: args)
|
||||||
|
|
||||||
|
{:ok, fav_activity} = CommonAPI.favorite(activity.id, user)
|
||||||
|
|
||||||
|
args = %{"op" => "add_to_index", "activity" => fav_activity.id}
|
||||||
|
|
||||||
|
refute_enqueued(worker: SearchIndexingWorker, args: args)
|
||||||
|
|
||||||
|
{:ok, repeat_activity} = CommonAPI.repeat(activity.id, user)
|
||||||
|
|
||||||
|
args = %{"op" => "add_to_index", "activity" => repeat_activity.id}
|
||||||
|
|
||||||
|
refute_enqueued(worker: SearchIndexingWorker, args: args)
|
||||||
|
end
|
||||||
|
end
|
||||||
Loading…
Add table
Add a link
Reference in a new issue