@@ -4,6 +4,7 @@ defmodule Pleroma.Web.Websub do | |||
alias Pleroma.Web.OStatus.FeedRepresenter | |||
alias Pleroma.Web.OStatus | |||
alias Pleroma.Web.XML | |||
require Logger | |||
import Ecto.Query | |||
@@ -98,8 +99,8 @@ defmodule Pleroma.Web.Websub do | |||
end | |||
end | |||
def subscribe(user, topic) do | |||
# Race condition, use transactions | |||
def subscribe(user, topic, requester \\ &request_subscription/1) do | |||
# FIXME: Race condition, use transactions | |||
{:ok, subscription} = with subscription when not is_nil(subscription) <- Repo.get_by(WebsubClientSubscription, topic: topic) do | |||
subscribers = [user.ap_id, subscription.subcribers] |> Enum.uniq | |||
change = Ecto.Changeset.change(subscription, %{subscribers: subscribers}) | |||
@@ -109,11 +110,60 @@ defmodule Pleroma.Web.Websub do | |||
topic: topic, | |||
subscribers: [user.ap_id], | |||
state: "requested", | |||
secret: :crypto.strong_rand_bytes(8) |> Base.url_encode64 | |||
secret: :crypto.strong_rand_bytes(8) |> Base.url_encode64, | |||
user: user | |||
} | |||
Repo.insert(subscription) | |||
end | |||
requester.(subscription) | |||
end | |||
def discover(topic, getter \\ &HTTPoison.get/1) do | |||
with {:ok, response} <- getter.(topic), | |||
status_code when status_code in 200..299 <- response.status_code, | |||
body <- response.body, | |||
doc <- XML.parse_document(body), | |||
url when not is_nil(url) <- XML.string_from_xpath(~S{/feed/link[@rel="self"]/@href}, doc), | |||
hub when not is_nil(hub) <- XML.string_from_xpath(~S{/feed/link[@rel="hub"]/@href}, doc) do | |||
{:ok, %{url: url, hub: hub}} | |||
else e -> | |||
{:error, e} | |||
end | |||
end | |||
def request_subscription(websub, poster \\ &HTTPoison.post/3, timeout \\ 10_000) do | |||
data = [ | |||
"hub.mode": "subscribe", | |||
"hub.topic": websub.topic, | |||
"hub.secret": websub.secret, | |||
"hub.callback": "https://social.heldscal.la/callback" | |||
] | |||
# This checks once a second if we are confirmed yet | |||
websub_checker = fn -> | |||
helper = fn (helper) -> | |||
:timer.sleep(1000) | |||
websub = Repo.get_by(WebsubClientSubscription, id: websub.id, state: "accepted") | |||
if websub, do: websub, else: helper.(helper) | |||
end | |||
helper.(helper) | |||
end | |||
{:ok, subscription} | |||
task = Task.async(websub_checker) | |||
with {:ok, %{status_code: 202}} <- poster.(websub.hub, {:form, data}, ["Content-type": "application/x-www-form-urlencoded"]), | |||
{:ok, websub} <- Task.yield(task, timeout) do | |||
{:ok, websub} | |||
else e -> | |||
Task.shutdown(task) | |||
change = Ecto.Changeset.change(websub, %{state: "rejected"}) | |||
{:ok, websub} = Repo.update(change) | |||
Logger.debug("Couldn't confirm subscription: #{inspect(websub)}") | |||
Logger.debug("error: #{inspect(e)}") | |||
{:error, websub} | |||
end | |||
end | |||
end |
@@ -76,4 +76,14 @@ defmodule Pleroma.Factory do | |||
state: "requested" | |||
} | |||
end | |||
def websub_client_subscription_factory do | |||
%Pleroma.Web.Websub.WebsubClientSubscription{ | |||
topic: "http://example.org", | |||
secret: "here's a secret", | |||
valid_until: nil, | |||
state: "requested", | |||
subscribers: [] | |||
} | |||
end | |||
end |
@@ -77,7 +77,6 @@ defmodule Pleroma.Web.WebsubTest do | |||
"hub.lease_seconds" => "100" | |||
} | |||
{:ok, subscription } = Websub.incoming_subscription_request(user, data) | |||
assert subscription.topic == Pleroma.Web.OStatus.feed_path(user) | |||
assert subscription.state == sub.state | |||
@@ -87,14 +86,72 @@ defmodule Pleroma.Web.WebsubTest do | |||
assert subscription.id == sub.id | |||
end | |||
def accepting_verifier(subscription) do | |||
{:ok, %{ subscription | state: "accepted" }} | |||
end | |||
test "initiate a subscription for a given user and topic" do | |||
user = insert(:user) | |||
topic = "http://example.org/some-topic.atom" | |||
{:ok, websub} = Websub.subscribe(user, topic) | |||
{:ok, websub} = Websub.subscribe(user, topic, &accepting_verifier/1) | |||
assert websub.subscribers == [user.ap_id] | |||
assert websub.topic == topic | |||
assert is_binary(websub.secret) | |||
assert websub.user == user | |||
assert websub.state == "accepted" | |||
end | |||
test "discovers the hub and canonical url" do | |||
topic = "https://mastodon.social/users/lambadalambda.atom" | |||
getter = fn(^topic) -> | |||
doc = File.read!("test/fixtures/lambadalambda.atom") | |||
{:ok, %{status_code: 200, body: doc}} | |||
end | |||
{:ok, discovered} = Websub.discover(topic, getter) | |||
assert %{hub: "https://mastodon.social/api/push", url: topic} == discovered | |||
end | |||
test "calls the hub, requests topic" do | |||
hub = "https://social.heldscal.la/main/push/hub" | |||
topic = "https://social.heldscal.la/api/statuses/user_timeline/23211.atom" | |||
websub = insert(:websub_client_subscription, %{hub: hub, topic: topic}) | |||
poster = fn (^hub, {:form, data}, _headers) -> | |||
assert Keyword.get(data, :"hub.mode") == "subscribe" | |||
{:ok, %{status_code: 202}} | |||
end | |||
task = Task.async(fn -> Websub.request_subscription(websub, poster) end) | |||
change = Ecto.Changeset.change(websub, %{state: "accepted"}) | |||
{:ok, _} = Repo.update(change) | |||
{:ok, websub} = Task.await(task) | |||
assert websub.state == "accepted" | |||
end | |||
test "rejects the subscription if it can't be accepted" do | |||
hub = "https://social.heldscal.la/main/push/hub" | |||
topic = "https://social.heldscal.la/api/statuses/user_timeline/23211.atom" | |||
websub = insert(:websub_client_subscription, %{hub: hub, topic: topic}) | |||
poster = fn (^hub, {:form, _data}, _headers) -> | |||
{:ok, %{status_code: 202}} | |||
end | |||
{:error, websub} = Websub.request_subscription(websub, poster, 1000) | |||
assert websub.state == "rejected" | |||
websub = insert(:websub_client_subscription, %{hub: hub, topic: topic}) | |||
poster = fn (^hub, {:form, _data}, _headers) -> | |||
{:ok, %{status_code: 400}} | |||
end | |||
{:error, websub} = Websub.request_subscription(websub, poster, 1000) | |||
assert websub.state == "rejected" | |||
end | |||
end |