Track signed fetches of objects and use them for delete federation See merge request pleroma/pleroma!1661object-id-column
@@ -0,0 +1,51 @@ | |||
# Pleroma: A lightweight social networking server | |||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Delivery do | |||
use Ecto.Schema | |||
alias Pleroma.Delivery | |||
alias Pleroma.FlakeId | |||
alias Pleroma.Object | |||
alias Pleroma.Repo | |||
alias Pleroma.User | |||
alias Pleroma.User | |||
import Ecto.Changeset | |||
import Ecto.Query | |||
schema "deliveries" do | |||
belongs_to(:user, User, type: FlakeId) | |||
belongs_to(:object, Object) | |||
end | |||
def changeset(delivery, params \\ %{}) do | |||
delivery | |||
|> cast(params, [:user_id, :object_id]) | |||
|> validate_required([:user_id, :object_id]) | |||
|> foreign_key_constraint(:object_id) | |||
|> foreign_key_constraint(:user_id) | |||
|> unique_constraint(:user_id, name: :deliveries_user_id_object_id_index) | |||
end | |||
def create(object_id, user_id) do | |||
%Delivery{} | |||
|> changeset(%{user_id: user_id, object_id: object_id}) | |||
|> Repo.insert(on_conflict: :nothing) | |||
end | |||
def get(object_id, user_id) do | |||
from(d in Delivery, where: d.user_id == ^user_id and d.object_id == ^object_id) | |||
|> Repo.one() | |||
end | |||
# A hack because user delete activities have a fake id for whatever reason | |||
# TODO: Get rid of this | |||
def delete_all_by_object_id("pleroma:fake_object_id"), do: {0, []} | |||
def delete_all_by_object_id(object_id) do | |||
from(d in Delivery, where: d.object_id == ^object_id) | |||
|> Repo.delete_all() | |||
end | |||
end |
@@ -20,6 +20,7 @@ defmodule Pleroma.Plugs.Cache do | |||
- `ttl`: An expiration time (time-to-live). This value should be in milliseconds or `nil` to disable expiration. Defaults to `nil`. | |||
- `query_params`: Take URL query string into account (`true`), ignore it (`false`) or limit to specific params only (list). Defaults to `true`. | |||
- `tracking_fun`: A function that is called on successfull responses, no matter if the request is cached or not. It should accept a conn as the first argument and the value assigned to `tracking_fun_data` as the second. | |||
Additionally, you can overwrite the TTL inside a controller action by assigning `cache_ttl` to the connection struct: | |||
@@ -56,6 +57,11 @@ defmodule Pleroma.Plugs.Cache do | |||
{:ok, nil} -> | |||
cache_resp(conn, opts) | |||
{:ok, {content_type, body, tracking_fun_data}} -> | |||
conn = opts.tracking_fun.(conn, tracking_fun_data) | |||
send_cached(conn, {content_type, body}) | |||
{:ok, record} -> | |||
send_cached(conn, record) | |||
@@ -88,9 +94,17 @@ defmodule Pleroma.Plugs.Cache do | |||
ttl = Map.get(conn.assigns, :cache_ttl, opts.ttl) | |||
key = cache_key(conn, opts) | |||
content_type = content_type(conn) | |||
record = {content_type, body} | |||
Cachex.put(:web_resp_cache, key, record, ttl: ttl) | |||
conn = | |||
unless opts[:tracking_fun] do | |||
Cachex.put(:web_resp_cache, key, {content_type, body}, ttl: ttl) | |||
conn | |||
else | |||
tracking_fun_data = Map.get(conn.assigns, :tracking_fun_data, nil) | |||
Cachex.put(:web_resp_cache, key, {content_type, body, tracking_fun_data}, ttl: ttl) | |||
opts.tracking_fun.(conn, tracking_fun_data) | |||
end | |||
put_resp_header(conn, "x-cache", "MISS from Pleroma") | |||
@@ -15,7 +15,8 @@ defmodule Pleroma.Web.Plugs.HTTPSignaturePlug do | |||
end | |||
def call(conn, _opts) do | |||
[signature | _] = get_req_header(conn, "signature") | |||
headers = get_req_header(conn, "signature") | |||
signature = Enum.at(headers, 0) | |||
if signature do | |||
# set (request-target) header to the appropriate value | |||
@@ -11,6 +11,7 @@ defmodule Pleroma.User do | |||
alias Comeonin.Pbkdf2 | |||
alias Ecto.Multi | |||
alias Pleroma.Activity | |||
alias Pleroma.Delivery | |||
alias Pleroma.Keys | |||
alias Pleroma.Notification | |||
alias Pleroma.Object | |||
@@ -62,6 +63,7 @@ defmodule Pleroma.User do | |||
field(:last_digest_emailed_at, :naive_datetime) | |||
has_many(:notifications, Notification) | |||
has_many(:registrations, Registration) | |||
has_many(:deliveries, Delivery) | |||
embeds_one(:info, User.Info) | |||
timestamps() | |||
@@ -1640,6 +1642,18 @@ defmodule Pleroma.User do | |||
def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true | |||
def is_internal_user?(_), do: false | |||
# A hack because user delete activities have a fake id for whatever reason | |||
# TODO: Get rid of this | |||
def get_delivered_users_by_object_id("pleroma:fake_object_id"), do: [] | |||
def get_delivered_users_by_object_id(object_id) do | |||
from(u in User, | |||
inner_join: delivery in assoc(u, :deliveries), | |||
where: delivery.object_id == ^object_id | |||
) | |||
|> Repo.all() | |||
end | |||
def change_email(user, email) do | |||
user | |||
|> cast(%{email: email}, [:email]) | |||
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||
use Pleroma.Web, :controller | |||
alias Pleroma.Activity | |||
alias Pleroma.Delivery | |||
alias Pleroma.Object | |||
alias Pleroma.Object.Fetcher | |||
alias Pleroma.User | |||
@@ -23,7 +24,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||
action_fallback(:errors) | |||
plug(Pleroma.Plugs.Cache, [query_params: false] when action in [:activity, :object]) | |||
plug( | |||
Pleroma.Plugs.Cache, | |||
[query_params: false, tracking_fun: &__MODULE__.track_object_fetch/2] | |||
when action in [:activity, :object] | |||
) | |||
plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay]) | |||
plug(:set_requester_reachable when action in [:inbox]) | |||
plug(:relay_active? when action in [:relay]) | |||
@@ -54,6 +60,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||
%Object{} = object <- Object.get_cached_by_ap_id(ap_id), | |||
{_, true} <- {:public?, Visibility.is_public?(object)} do | |||
conn | |||
|> assign(:tracking_fun_data, object.id) | |||
|> set_cache_ttl_for(object) | |||
|> put_resp_content_type("application/activity+json") | |||
|> put_view(ObjectView) | |||
@@ -64,6 +71,16 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||
end | |||
end | |||
def track_object_fetch(conn, nil), do: conn | |||
def track_object_fetch(conn, object_id) do | |||
with %{assigns: %{user: %User{id: user_id}}} <- conn do | |||
Delivery.create(object_id, user_id) | |||
end | |||
conn | |||
end | |||
def object_likes(conn, %{"uuid" => uuid, "page" => page}) do | |||
with ap_id <- o_status_url(conn, :object, uuid), | |||
%Object{} = object <- Object.get_cached_by_ap_id(ap_id), | |||
@@ -99,6 +116,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||
%Activity{} = activity <- Activity.normalize(ap_id), | |||
{_, true} <- {:public?, Visibility.is_public?(activity)} do | |||
conn | |||
|> maybe_set_tracking_data(activity) | |||
|> set_cache_ttl_for(activity) | |||
|> put_resp_content_type("application/activity+json") | |||
|> put_view(ObjectView) | |||
@@ -109,6 +127,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||
end | |||
end | |||
defp maybe_set_tracking_data(conn, %Activity{data: %{"type" => "Create"}} = activity) do | |||
object_id = Object.normalize(activity).id | |||
assign(conn, :tracking_fun_data, object_id) | |||
end | |||
defp maybe_set_tracking_data(conn, _activity), do: conn | |||
defp set_cache_ttl_for(conn, %Activity{object: object}) do | |||
set_cache_ttl_for(conn, object) | |||
end | |||
@@ -5,8 +5,10 @@ | |||
defmodule Pleroma.Web.ActivityPub.Publisher do | |||
alias Pleroma.Activity | |||
alias Pleroma.Config | |||
alias Pleroma.Delivery | |||
alias Pleroma.HTTP | |||
alias Pleroma.Instances | |||
alias Pleroma.Object | |||
alias Pleroma.User | |||
alias Pleroma.Web.ActivityPub.Relay | |||
alias Pleroma.Web.ActivityPub.Transmogrifier | |||
@@ -116,7 +118,18 @@ defmodule Pleroma.Web.ActivityPub.Publisher do | |||
{:ok, []} | |||
end | |||
Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers | |||
fetchers = | |||
with %Activity{data: %{"type" => "Delete"}} <- activity, | |||
%Object{id: object_id} <- Object.normalize(activity), | |||
fetchers <- User.get_delivered_users_by_object_id(object_id), | |||
_ <- Delivery.delete_all_by_object_id(object_id) do | |||
fetchers | |||
else | |||
_ -> | |||
[] | |||
end | |||
Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers ++ fetchers | |||
end | |||
defp get_cc_ap_ids(ap_id, recipients) do | |||
@@ -135,6 +135,7 @@ defmodule Pleroma.Web.Router do | |||
pipeline :http_signature do | |||
plug(Pleroma.Web.Plugs.HTTPSignaturePlug) | |||
plug(Pleroma.Web.Plugs.MappedSignatureToIdentityPlug) | |||
end | |||
scope "/api/pleroma", Pleroma.Web.TwitterAPI do | |||
@@ -514,6 +515,7 @@ defmodule Pleroma.Web.Router do | |||
scope "/", Pleroma.Web do | |||
pipe_through(:ostatus) | |||
pipe_through(:http_signature) | |||
get("/objects/:uuid", OStatus.OStatusController, :object) | |||
get("/activities/:uuid", OStatus.OStatusController, :activity) | |||
@@ -0,0 +1,12 @@ | |||
defmodule Pleroma.Repo.Migrations.CreateDeliveries do | |||
use Ecto.Migration | |||
def change do | |||
create_if_not_exists table(:deliveries) do | |||
add(:object_id, references(:objects, type: :id), null: false) | |||
add(:user_id, references(:users, type: :uuid, on_delete: :delete_all), null: false) | |||
end | |||
create_if_not_exists index(:deliveries, :object_id, name: :deliveries_object_id) | |||
create_if_not_exists(unique_index(:deliveries, [:user_id, :object_id])) | |||
end | |||
end |
@@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do | |||
import Pleroma.Factory | |||
alias Pleroma.Activity | |||
alias Pleroma.Delivery | |||
alias Pleroma.Instances | |||
alias Pleroma.Object | |||
alias Pleroma.Tests.ObanHelpers | |||
@@ -893,4 +894,86 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do | |||
assert result["totalItems"] == 15 | |||
end | |||
end | |||
describe "delivery tracking" do | |||
test "it tracks a signed object fetch", %{conn: conn} do | |||
user = insert(:user, local: false) | |||
activity = insert(:note_activity) | |||
object = Object.normalize(activity) | |||
object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url()) | |||
conn | |||
|> put_req_header("accept", "application/activity+json") | |||
|> assign(:user, user) | |||
|> get(object_path) | |||
|> json_response(200) | |||
assert Delivery.get(object.id, user.id) | |||
end | |||
test "it tracks a signed activity fetch", %{conn: conn} do | |||
user = insert(:user, local: false) | |||
activity = insert(:note_activity) | |||
object = Object.normalize(activity) | |||
activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url()) | |||
conn | |||
|> put_req_header("accept", "application/activity+json") | |||
|> assign(:user, user) | |||
|> get(activity_path) | |||
|> json_response(200) | |||
assert Delivery.get(object.id, user.id) | |||
end | |||
test "it tracks a signed object fetch when the json is cached", %{conn: conn} do | |||
user = insert(:user, local: false) | |||
other_user = insert(:user, local: false) | |||
activity = insert(:note_activity) | |||
object = Object.normalize(activity) | |||
object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url()) | |||
conn | |||
|> put_req_header("accept", "application/activity+json") | |||
|> assign(:user, user) | |||
|> get(object_path) | |||
|> json_response(200) | |||
build_conn() | |||
|> put_req_header("accept", "application/activity+json") | |||
|> assign(:user, other_user) | |||
|> get(object_path) | |||
|> json_response(200) | |||
assert Delivery.get(object.id, user.id) | |||
assert Delivery.get(object.id, other_user.id) | |||
end | |||
test "it tracks a signed activity fetch when the json is cached", %{conn: conn} do | |||
user = insert(:user, local: false) | |||
other_user = insert(:user, local: false) | |||
activity = insert(:note_activity) | |||
object = Object.normalize(activity) | |||
activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url()) | |||
conn | |||
|> put_req_header("accept", "application/activity+json") | |||
|> assign(:user, user) | |||
|> get(activity_path) | |||
|> json_response(200) | |||
build_conn() | |||
|> put_req_header("accept", "application/activity+json") | |||
|> assign(:user, other_user) | |||
|> get(activity_path) | |||
|> json_response(200) | |||
assert Delivery.get(object.id, user.id) | |||
assert Delivery.get(object.id, other_user.id) | |||
end | |||
end | |||
end |
@@ -3,7 +3,7 @@ | |||
# SPDX-License-Identifier: AGPL-3.0-only | |||
defmodule Pleroma.Web.ActivityPub.PublisherTest do | |||
use Pleroma.DataCase | |||
use Pleroma.Web.ConnCase | |||
import ExUnit.CaptureLog | |||
import Pleroma.Factory | |||
@@ -12,7 +12,9 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do | |||
alias Pleroma.Activity | |||
alias Pleroma.Instances | |||
alias Pleroma.Object | |||
alias Pleroma.Web.ActivityPub.Publisher | |||
alias Pleroma.Web.CommonAPI | |||
@as_public "https://www.w3.org/ns/activitystreams#Public" | |||
@@ -268,5 +270,69 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do | |||
}) | |||
) | |||
end | |||
test_with_mock "publishes a delete activity to peers who signed fetch requests to the create acitvity/object.", | |||
Pleroma.Web.Federator.Publisher, | |||
[:passthrough], | |||
[] do | |||
fetcher = | |||
insert(:user, | |||
local: false, | |||
info: %{ | |||
ap_enabled: true, | |||
source_data: %{"inbox" => "https://domain.com/users/nick1/inbox"} | |||
} | |||
) | |||
another_fetcher = | |||
insert(:user, | |||
local: false, | |||
info: %{ | |||
ap_enabled: true, | |||
source_data: %{"inbox" => "https://domain2.com/users/nick1/inbox"} | |||
} | |||
) | |||
actor = insert(:user) | |||
note_activity = insert(:note_activity, user: actor) | |||
object = Object.normalize(note_activity) | |||
activity_path = String.trim_leading(note_activity.data["id"], Pleroma.Web.Endpoint.url()) | |||
object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url()) | |||
build_conn() | |||
|> put_req_header("accept", "application/activity+json") | |||
|> assign(:user, fetcher) | |||
|> get(object_path) | |||
|> json_response(200) | |||
build_conn() | |||
|> put_req_header("accept", "application/activity+json") | |||
|> assign(:user, another_fetcher) | |||
|> get(activity_path) | |||
|> json_response(200) | |||
{:ok, delete} = CommonAPI.delete(note_activity.id, actor) | |||
res = Publisher.publish(actor, delete) | |||
assert res == :ok | |||
assert called( | |||
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{ | |||
inbox: "https://domain.com/users/nick1/inbox", | |||
actor: actor, | |||
id: delete.data["id"] | |||
}) | |||
) | |||
assert called( | |||
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{ | |||
inbox: "https://domain2.com/users/nick1/inbox", | |||
actor: actor, | |||
id: delete.data["id"] | |||
}) | |||
) | |||
end | |||
end | |||
end |