@@ -36,7 +36,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do | |||||
{:ok, activity} = add_conversation_id(activity) | {:ok, activity} = add_conversation_id(activity) | ||||
if actor.local do | if actor.local do | ||||
Pleroma.Web.Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity) | |||||
Pleroma.Web.Federator.enqueue(:publish, activity) | |||||
end | end | ||||
{:ok, activity} | {:ok, activity} | ||||
@@ -0,0 +1,32 @@ | |||||
defmodule Pleroma.Web.Federator do | |||||
alias Pleroma.User | |||||
require Logger | |||||
@websub_verifier Application.get_env(:pleroma, :websub_verifier) | |||||
def handle(:publish, activity) do | |||||
Logger.debug("Running publish for #{activity.data["id"]}") | |||||
with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do | |||||
Pleroma.Web.Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity) | |||||
end | |||||
end | |||||
def handle(:verify_websub, websub) do | |||||
Logger.debug("Running websub verification for #{websub.id} (#{websub.topic}, #{websub.callback})") | |||||
@websub_verifier.verify(websub) | |||||
end | |||||
def handle(type, payload) do | |||||
Logger.debug("Unknown task: #{type}") | |||||
{:error, "Don't know what do do with this"} | |||||
end | |||||
def enqueue(type, payload) do | |||||
# for now, just run immediately in a new process. | |||||
if Mix.env == :test do | |||||
handle(type, payload) | |||||
else | |||||
spawn(fn -> handle(type, payload) end) | |||||
end | |||||
end | |||||
end |
@@ -1,13 +1,11 @@ | |||||
defmodule Pleroma.Web.Websub do | defmodule Pleroma.Web.Websub do | ||||
alias Pleroma.Repo | alias Pleroma.Repo | ||||
alias Pleroma.Web.Websub.WebsubServerSubscription | |||||
alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription} | |||||
alias Pleroma.Web.OStatus.FeedRepresenter | alias Pleroma.Web.OStatus.FeedRepresenter | ||||
alias Pleroma.Web.OStatus | alias Pleroma.Web.OStatus | ||||
import Ecto.Query | import Ecto.Query | ||||
@websub_verifier Application.get_env(:pleroma, :websub_verifier) | |||||
def verify(subscription, getter \\ &HTTPoison.get/3 ) do | def verify(subscription, getter \\ &HTTPoison.get/3 ) do | ||||
challenge = Base.encode16(:crypto.strong_rand_bytes(8)) | challenge = Base.encode16(:crypto.strong_rand_bytes(8)) | ||||
lease_seconds = NaiveDateTime.diff(subscription.valid_until, subscription.updated_at) |> to_string | lease_seconds = NaiveDateTime.diff(subscription.valid_until, subscription.updated_at) |> to_string | ||||
@@ -71,8 +69,7 @@ defmodule Pleroma.Web.Websub do | |||||
change = Ecto.Changeset.change(websub, %{valid_until: NaiveDateTime.add(websub.updated_at, lease_time)}) | change = Ecto.Changeset.change(websub, %{valid_until: NaiveDateTime.add(websub.updated_at, lease_time)}) | ||||
websub = Repo.update!(change) | websub = Repo.update!(change) | ||||
# Just spawn that for now, maybe pool later. | |||||
spawn(fn -> @websub_verifier.verify(websub) end) | |||||
Pleroma.Web.Federator.enqueue(:verify_websub, websub) | |||||
{:ok, websub} | {:ok, websub} | ||||
else {:error, reason} -> | else {:error, reason} -> | ||||
@@ -99,4 +96,23 @@ defmodule Pleroma.Web.Websub do | |||||
{:error, "Wrong topic requested, expected #{OStatus.feed_path(user)}, got #{topic}"} | {:error, "Wrong topic requested, expected #{OStatus.feed_path(user)}, got #{topic}"} | ||||
end | end | ||||
end | end | ||||
def subscribe(user, topic) do | |||||
# Race condition, use transactions | |||||
{:ok, subscription} = with subscription when not is_nil(subscription) <- Repo.get_by(WebsubClientSubscription, topic: topic) do | |||||
subscribers = [user.ap_id, subscription.subcribers] |> Enum.uniq | |||||
change = Ecto.Changeset.change(subscription, %{subscribers: subscribers}) | |||||
Repo.update(change) | |||||
else _e -> | |||||
subscription = %WebsubClientSubscription{ | |||||
topic: topic, | |||||
subscribers: [user.ap_id], | |||||
state: "requested", | |||||
secret: :crypto.strong_rand_bytes(8) |> Base.url_encode64 | |||||
} | |||||
Repo.insert(subscription) | |||||
end | |||||
{:ok, subscription} | |||||
end | |||||
end | end |
@@ -0,0 +1,13 @@ | |||||
defmodule Pleroma.Web.Websub.WebsubClientSubscription do | |||||
use Ecto.Schema | |||||
schema "websub_client_subscriptions" do | |||||
field :topic, :string | |||||
field :secret, :string | |||||
field :valid_until, :naive_datetime | |||||
field :state, :string | |||||
field :subscribers, {:array, :string}, default: [] | |||||
timestamps() | |||||
end | |||||
end |
@@ -0,0 +1,15 @@ | |||||
defmodule Pleroma.Repo.Migrations.CreateWebsubClientSubscription do | |||||
use Ecto.Migration | |||||
def change do | |||||
create table(:websub_client_subscriptions) do | |||||
add :topic, :string | |||||
add :secret, :string | |||||
add :valid_until, :naive_datetime | |||||
add :state, :string | |||||
add :subscribers, :map | |||||
timestamps() | |||||
end | |||||
end | |||||
end |
@@ -58,7 +58,6 @@ defmodule Pleroma.Web.WebsubTest do | |||||
"hub.lease_seconds" => "100" | "hub.lease_seconds" => "100" | ||||
} | } | ||||
{:ok, subscription } = Websub.incoming_subscription_request(user, data) | {:ok, subscription } = Websub.incoming_subscription_request(user, data) | ||||
assert subscription.topic == Pleroma.Web.OStatus.feed_path(user) | assert subscription.topic == Pleroma.Web.OStatus.feed_path(user) | ||||
assert subscription.state == "requested" | assert subscription.state == "requested" | ||||
@@ -87,4 +86,15 @@ defmodule Pleroma.Web.WebsubTest do | |||||
assert length(Repo.all(WebsubServerSubscription)) == 1 | assert length(Repo.all(WebsubServerSubscription)) == 1 | ||||
assert subscription.id == sub.id | assert subscription.id == sub.id | ||||
end | end | ||||
test "initiate a subscription for a given user and topic" do | |||||
user = insert(:user) | |||||
topic = "http://example.org/some-topic.atom" | |||||
{:ok, websub} = Websub.subscribe(user, topic) | |||||
assert websub.subscribers == [user.ap_id] | |||||
assert websub.topic == topic | |||||
assert is_binary(websub.secret) | |||||
assert websub.state == "accepted" | |||||
end | |||||
end | end |