@@ -0,0 +1,58 @@ | |||||
# Pleroma: A lightweight social networking server | |||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | |||||
# SPDX-License-Identifier: AGPL-3.0-only | |||||
defmodule Pleroma.Delivery do | |||||
use Ecto.Schema | |||||
alias Pleroma.Delivery | |||||
alias Pleroma.FlakeId | |||||
alias Pleroma.User | |||||
alias Pleroma.Repo | |||||
alias Pleroma.Object | |||||
alias Pleroma.User | |||||
import Ecto.Changeset | |||||
import Ecto.Query | |||||
schema "deliveries" do | |||||
belongs_to(:user, User, type: FlakeId) | |||||
belongs_to(:object, Object) | |||||
end | |||||
def changeset(delivery, params \\ %{}) do | |||||
delivery | |||||
|> cast(params, [:user_id, :object_id]) | |||||
|> foreign_key_constraint(:object_id) | |||||
|> foreign_key_constraint(:user_id) | |||||
|> unique_constraint(:user_id, name: :deliveries_user_id_object_id_index) | |||||
end | |||||
def create(object_id, user_id) do | |||||
%Delivery{} | |||||
|> changeset(%{user_id: user_id, object_id: object_id}) | |||||
|> Repo.insert() | |||||
end | |||||
def get(object_id, user_id) do | |||||
from(d in Delivery, where: d.user_id == ^user_id and d.object_id == ^object_id) | |||||
|> Repo.one() | |||||
end | |||||
def get_or_create(object_id, user_id) do | |||||
case get(object_id, user_id) do | |||||
%Delivery{} = delivery -> {:ok, delivery} | |||||
nil -> create(object_id, user_id) | |||||
end | |||||
end | |||||
def delete_all_by_object_id(object_id) do | |||||
from(d in Delivery, where: d.object_id == ^object_id) | |||||
|> Repo.delete_all() | |||||
end | |||||
def get_all_by_object_id(object_id) do | |||||
from(d in Delivery, where: d.object_id == ^object_id) | |||||
|> Repo.all() | |||||
end | |||||
end |
@@ -20,6 +20,7 @@ defmodule Pleroma.Plugs.Cache do | |||||
- `ttl`: An expiration time (time-to-live). This value should be in milliseconds or `nil` to disable expiration. Defaults to `nil`. | - `ttl`: An expiration time (time-to-live). This value should be in milliseconds or `nil` to disable expiration. Defaults to `nil`. | ||||
- `query_params`: Take URL query string into account (`true`), ignore it (`false`) or limit to specific params only (list). Defaults to `true`. | - `query_params`: Take URL query string into account (`true`), ignore it (`false`) or limit to specific params only (list). Defaults to `true`. | ||||
- `tracking_fun`: A function that is called on successfull responses, no matter if the request is cached or not. It should accept a conn as the first argument and the value assigned to `tracking_fun_data` as the second. | |||||
Additionally, you can overwrite the TTL inside a controller action by assigning `cache_ttl` to the connection struct: | Additionally, you can overwrite the TTL inside a controller action by assigning `cache_ttl` to the connection struct: | ||||
@@ -56,6 +57,10 @@ defmodule Pleroma.Plugs.Cache do | |||||
{:ok, nil} -> | {:ok, nil} -> | ||||
cache_resp(conn, opts) | cache_resp(conn, opts) | ||||
{:ok, {content_type, body, tracking_fun_data}} -> | |||||
conn = opts.tracking_fun(conn, tracking_fun_data) | |||||
send_cached(conn, {content_type, body}) | |||||
{:ok, record} -> | {:ok, record} -> | ||||
send_cached(conn, record) | send_cached(conn, record) | ||||
@@ -90,7 +95,16 @@ defmodule Pleroma.Plugs.Cache do | |||||
content_type = content_type(conn) | content_type = content_type(conn) | ||||
record = {content_type, body} | record = {content_type, body} | ||||
Cachex.put(:web_resp_cache, key, record, ttl: ttl) | |||||
conn = | |||||
unless opts[:tracking_fun] do | |||||
Cachex.put(:web_resp_cache, key, {content_type, body}, ttl: ttl) | |||||
conn | |||||
else | |||||
tracking_fun_data = Map.get(conn.assigns, :tracking_fun_data, nil) | |||||
Cachex.put(:web_resp_cache, {content_type, body, tracking_fun_data}, record, ttl: ttl) | |||||
opts.tracking_fun.(conn, tracking_fun_data) | |||||
end | |||||
put_resp_header(conn, "x-cache", "MISS from Pleroma") | put_resp_header(conn, "x-cache", "MISS from Pleroma") | ||||
@@ -11,6 +11,7 @@ defmodule Pleroma.User do | |||||
alias Comeonin.Pbkdf2 | alias Comeonin.Pbkdf2 | ||||
alias Ecto.Multi | alias Ecto.Multi | ||||
alias Pleroma.Activity | alias Pleroma.Activity | ||||
alias Pleroma.Delivery | |||||
alias Pleroma.Keys | alias Pleroma.Keys | ||||
alias Pleroma.Notification | alias Pleroma.Notification | ||||
alias Pleroma.Object | alias Pleroma.Object | ||||
@@ -61,6 +62,7 @@ defmodule Pleroma.User do | |||||
field(:last_digest_emailed_at, :naive_datetime) | field(:last_digest_emailed_at, :naive_datetime) | ||||
has_many(:notifications, Notification) | has_many(:notifications, Notification) | ||||
has_many(:registrations, Registration) | has_many(:registrations, Registration) | ||||
has_many(:deliveries, Delivery) | |||||
embeds_one(:info, User.Info) | embeds_one(:info, User.Info) | ||||
timestamps() | timestamps() | ||||
@@ -1624,4 +1626,12 @@ defmodule Pleroma.User do | |||||
def is_internal_user?(%User{nickname: nil}), do: true | def is_internal_user?(%User{nickname: nil}), do: true | ||||
def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true | def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true | ||||
def is_internal_user?(_), do: false | def is_internal_user?(_), do: false | ||||
def get_delivered_users_by_object_id(object_id) do | |||||
from(u in User, | |||||
inner_join: delivery in assoc(u, :deliveries), | |||||
where: delivery.object_id == ^object_id | |||||
) | |||||
|> Repo.all() | |||||
end | |||||
end | end |
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||||
use Pleroma.Web, :controller | use Pleroma.Web, :controller | ||||
alias Pleroma.Activity | alias Pleroma.Activity | ||||
alias Pleroma.Delivery | |||||
alias Pleroma.Object | alias Pleroma.Object | ||||
alias Pleroma.Object.Fetcher | alias Pleroma.Object.Fetcher | ||||
alias Pleroma.User | alias Pleroma.User | ||||
@@ -23,7 +24,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||||
action_fallback(:errors) | action_fallback(:errors) | ||||
plug(Pleroma.Plugs.Cache, [query_params: false] when action in [:activity, :object]) | |||||
plug( | |||||
Pleroma.Plugs.Cache, | |||||
[ | |||||
query_params: false, | |||||
tracking_fun: &Pleroma.Web.ActivityPub.ActivityPubController.track_object_fetch/2 | |||||
] | |||||
when action in [:activity, :object] | |||||
) | |||||
plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay]) | plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay]) | ||||
plug(:set_requester_reachable when action in [:inbox]) | plug(:set_requester_reachable when action in [:inbox]) | ||||
plug(:relay_active? when action in [:relay]) | plug(:relay_active? when action in [:relay]) | ||||
@@ -54,6 +63,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||||
%Object{} = object <- Object.get_cached_by_ap_id(ap_id), | %Object{} = object <- Object.get_cached_by_ap_id(ap_id), | ||||
{_, true} <- {:public?, Visibility.is_public?(object)} do | {_, true} <- {:public?, Visibility.is_public?(object)} do | ||||
conn | conn | ||||
|> assign(:tracking_fun_data, object.id) | |||||
|> set_cache_ttl_for(object) | |> set_cache_ttl_for(object) | ||||
|> put_resp_content_type("application/activity+json") | |> put_resp_content_type("application/activity+json") | ||||
|> put_view(ObjectView) | |> put_view(ObjectView) | ||||
@@ -64,6 +74,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||||
end | end | ||||
end | end | ||||
def track_object_fetch(conn, object_id) do | |||||
case conn.assigns[:user] do | |||||
%User{id: user_id} -> Delivery.create(object_id, user_id) | |||||
_ -> nil | |||||
end | |||||
conn | |||||
end | |||||
def object_likes(conn, %{"uuid" => uuid, "page" => page}) do | def object_likes(conn, %{"uuid" => uuid, "page" => page}) do | ||||
with ap_id <- o_status_url(conn, :object, uuid), | with ap_id <- o_status_url(conn, :object, uuid), | ||||
%Object{} = object <- Object.get_cached_by_ap_id(ap_id), | %Object{} = object <- Object.get_cached_by_ap_id(ap_id), | ||||
@@ -99,6 +118,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||||
%Activity{} = activity <- Activity.normalize(ap_id), | %Activity{} = activity <- Activity.normalize(ap_id), | ||||
{_, true} <- {:public?, Visibility.is_public?(activity)} do | {_, true} <- {:public?, Visibility.is_public?(activity)} do | ||||
conn | conn | ||||
|> maybe_set_tracking_data(activity) | |||||
|> set_cache_ttl_for(activity) | |> set_cache_ttl_for(activity) | ||||
|> put_resp_content_type("application/activity+json") | |> put_resp_content_type("application/activity+json") | ||||
|> put_view(ObjectView) | |> put_view(ObjectView) | ||||
@@ -109,6 +129,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do | |||||
end | end | ||||
end | end | ||||
defp maybe_set_tracking_data(conn, %Activity{data: %{"type" => "Create"}} = activity) do | |||||
object_id = Object.normalize(activity).id | |||||
assign(conn, :tracking_fun_data, object_id) | |||||
end | |||||
defp maybe_set_tracking_data(conn, _activity), do: assign(conn, :tracking_fun_data, nil) | |||||
defp set_cache_ttl_for(conn, %Activity{object: object}) do | defp set_cache_ttl_for(conn, %Activity{object: object}) do | ||||
set_cache_ttl_for(conn, object) | set_cache_ttl_for(conn, object) | ||||
end | end | ||||
@@ -0,0 +1,12 @@ | |||||
defmodule Pleroma.Repo.Migrations.CreateDeliveries do | |||||
use Ecto.Migration | |||||
def change do | |||||
create_if_not_exists table(:deliveries) do | |||||
add(:object_id, references(:objects, type: :id)) | |||||
add(:user_id, references(:users, type: :uuid, on_delete: :delete_all)) | |||||
end | |||||
create_if_not_exists index(:deliveries, :object_id, name: :deliveries_object_id) | |||||
create_if_not_exists(unique_index(:deliveries, [:user_id, :object_id])) | |||||
end | |||||
end |
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do | |||||
use Pleroma.Web.ConnCase | use Pleroma.Web.ConnCase | ||||
import Pleroma.Factory | import Pleroma.Factory | ||||
alias Pleroma.Activity | alias Pleroma.Activity | ||||
alias Pleroma.Delivery | |||||
alias Pleroma.Instances | alias Pleroma.Instances | ||||
alias Pleroma.Object | alias Pleroma.Object | ||||
alias Pleroma.User | alias Pleroma.User | ||||
@@ -885,4 +886,86 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do | |||||
assert result["totalItems"] == 15 | assert result["totalItems"] == 15 | ||||
end | end | ||||
end | end | ||||
describe "delivery tracking" do | |||||
test "it tracks a signed object fetch", %{conn: conn} do | |||||
user = insert(:user, local: false) | |||||
activity = insert(:note_activity) | |||||
object = Object.normalize(activity) | |||||
object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url()) | |||||
conn | |||||
|> put_req_header("accept", "application/activity+json") | |||||
|> assign(:user, user) | |||||
|> get(object_path) | |||||
|> json_response(200) | |||||
assert Delivery.get(object.id, user.id) | |||||
end | |||||
test "it tracks a signed activity fetch", %{conn: conn} do | |||||
user = insert(:user, local: false) | |||||
activity = insert(:note_activity) | |||||
object = Object.normalize(activity) | |||||
activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url()) | |||||
conn | |||||
|> put_req_header("accept", "application/activity+json") | |||||
|> assign(:user, user) | |||||
|> get(activity_path) | |||||
|> json_response(200) | |||||
assert Delivery.get(object.id, user.id) | |||||
end | |||||
test "it tracks a signed object fetch when the json is cached", %{conn: conn} do | |||||
user = insert(:user, local: false) | |||||
other_user = insert(:user, local: false) | |||||
activity = insert(:note_activity) | |||||
object = Object.normalize(activity) | |||||
object_path = String.trim_leading(object.data["id"], Pleroma.Web.Endpoint.url()) | |||||
conn | |||||
|> put_req_header("accept", "application/activity+json") | |||||
|> assign(:user, user) | |||||
|> get(object_path) | |||||
|> json_response(200) | |||||
build_conn() | |||||
|> put_req_header("accept", "application/activity+json") | |||||
|> assign(:user, other_user) | |||||
|> get(object_path) | |||||
|> json_response(200) | |||||
assert Delivery.get(object.id, user.id) | |||||
assert Delivery.get(object.id, other_user.id) | |||||
end | |||||
test "it tracks a signed activity fetch when the json is cached", %{conn: conn} do | |||||
user = insert(:user, local: false) | |||||
other_user = insert(:user, local: false) | |||||
activity = insert(:note_activity) | |||||
object = Object.normalize(activity) | |||||
activity_path = String.trim_leading(activity.data["id"], Pleroma.Web.Endpoint.url()) | |||||
conn | |||||
|> put_req_header("accept", "application/activity+json") | |||||
|> assign(:user, user) | |||||
|> get(activity_path) | |||||
|> json_response(200) | |||||
build_conn() | |||||
|> put_req_header("accept", "application/activity+json") | |||||
|> assign(:user, other_user) | |||||
|> get(activity_path) | |||||
|> json_response(200) | |||||
assert Delivery.get(object.id, user.id) | |||||
assert Delivery.get(object.id, other_user.id) | |||||
end | |||||
end | |||||
end | end |