@@ -508,7 +508,8 @@ config :pleroma, Oban, | |||||
], | ], | ||||
crontab: [ | crontab: [ | ||||
{"0 0 * * *", Pleroma.Workers.Cron.ClearOauthTokenWorker}, | {"0 0 * * *", Pleroma.Workers.Cron.ClearOauthTokenWorker}, | ||||
{"0 * * * *", Pleroma.Workers.Cron.StatsWorker} | |||||
{"0 * * * *", Pleroma.Workers.Cron.StatsWorker}, | |||||
{"* * * * *", Pleroma.Workers.Cron.ScheduledActivityWorker} | |||||
] | ] | ||||
config :pleroma, :workers, | config :pleroma, :workers, | ||||
@@ -35,7 +35,6 @@ defmodule Pleroma.Application do | |||||
Pleroma.Config.TransferTask, | Pleroma.Config.TransferTask, | ||||
Pleroma.Emoji, | Pleroma.Emoji, | ||||
Pleroma.Captcha, | Pleroma.Captcha, | ||||
Pleroma.Daemons.ScheduledActivityDaemon, | |||||
Pleroma.Daemons.ActivityExpirationDaemon, | Pleroma.Daemons.ActivityExpirationDaemon, | ||||
Pleroma.Plugs.RateLimiter.Supervisor | Pleroma.Plugs.RateLimiter.Supervisor | ||||
] ++ | ] ++ | ||||
@@ -2,37 +2,33 @@ | |||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | ||||
# SPDX-License-Identifier: AGPL-3.0-only | # SPDX-License-Identifier: AGPL-3.0-only | ||||
defmodule Pleroma.Daemons.ScheduledActivityDaemon do | |||||
defmodule Pleroma.Workers.Cron.ScheduledActivityWorker do | |||||
@moduledoc """ | @moduledoc """ | ||||
Sends scheduled activities to the job queue. | |||||
The worker to post scheduled actvities. | |||||
""" | """ | ||||
use Oban.Worker, queue: "scheduled_activities" | |||||
alias Pleroma.Config | alias Pleroma.Config | ||||
alias Pleroma.ScheduledActivity | alias Pleroma.ScheduledActivity | ||||
alias Pleroma.User | alias Pleroma.User | ||||
alias Pleroma.Web.CommonAPI | alias Pleroma.Web.CommonAPI | ||||
use GenServer | |||||
require Logger | require Logger | ||||
@schedule_interval :timer.minutes(1) | @schedule_interval :timer.minutes(1) | ||||
def start_link(_) do | |||||
GenServer.start_link(__MODULE__, nil) | |||||
end | |||||
def init(_) do | |||||
@impl Oban.Worker | |||||
def perform(_opts, _job) do | |||||
if Config.get([ScheduledActivity, :enabled]) do | if Config.get([ScheduledActivity, :enabled]) do | ||||
schedule_next() | |||||
{:ok, nil} | |||||
else | |||||
:ignore | |||||
@schedule_interval | |||||
|> ScheduledActivity.due_activities() | |||||
|> Enum.each(&post_activity/1) | |||||
end | end | ||||
end | end | ||||
def perform(:execute, scheduled_activity_id) do | |||||
def post_activity(scheduled_activity) do | |||||
try do | try do | ||||
{:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity_id) | |||||
{:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity) | |||||
%User{} = user = User.get_cached_by_id(scheduled_activity.user_id) | %User{} = user = User.get_cached_by_id(scheduled_activity.user_id) | ||||
{:ok, _result} = CommonAPI.post(user, scheduled_activity.params) | {:ok, _result} = CommonAPI.post(user, scheduled_activity.params) | ||||
rescue | rescue | ||||
@@ -42,21 +38,4 @@ defmodule Pleroma.Daemons.ScheduledActivityDaemon do | |||||
) | ) | ||||
end | end | ||||
end | end | ||||
def handle_info(:perform, state) do | |||||
ScheduledActivity.due_activities(@schedule_interval) | |||||
|> Enum.each(fn scheduled_activity -> | |||||
Pleroma.Workers.ScheduledActivityWorker.enqueue( | |||||
"execute", | |||||
%{"activity_id" => scheduled_activity.id} | |||||
) | |||||
end) | |||||
schedule_next() | |||||
{:noreply, state} | |||||
end | |||||
defp schedule_next do | |||||
Process.send_after(self(), :perform, @schedule_interval) | |||||
end | |||||
end | end |
@@ -1,12 +0,0 @@ | |||||
# Pleroma: A lightweight social networking server | |||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | |||||
# SPDX-License-Identifier: AGPL-3.0-only | |||||
defmodule Pleroma.Workers.ScheduledActivityWorker do | |||||
use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities" | |||||
@impl Oban.Worker | |||||
def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do | |||||
Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id) | |||||
end | |||||
end |
@@ -1,19 +0,0 @@ | |||||
# Pleroma: A lightweight social networking server | |||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> | |||||
# SPDX-License-Identifier: AGPL-3.0-only | |||||
defmodule Pleroma.ScheduledActivityDaemonTest do | |||||
use Pleroma.DataCase | |||||
alias Pleroma.ScheduledActivity | |||||
import Pleroma.Factory | |||||
test "creates a status from the scheduled activity" do | |||||
user = insert(:user) | |||||
scheduled_activity = insert(:scheduled_activity, user: user, params: %{status: "hi"}) | |||||
Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, scheduled_activity.id) | |||||
refute Repo.get(ScheduledActivity, scheduled_activity.id) | |||||
activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id)) | |||||
assert Pleroma.Object.normalize(activity).data["content"] == "hi" | |||||
end | |||||
end |
@@ -8,6 +8,8 @@ defmodule Pleroma.ScheduledActivityTest do | |||||
alias Pleroma.ScheduledActivity | alias Pleroma.ScheduledActivity | ||||
import Pleroma.Factory | import Pleroma.Factory | ||||
clear_config([ScheduledActivity, :enabled]) | |||||
setup context do | setup context do | ||||
DataCase.ensure_local_uploader(context) | DataCase.ensure_local_uploader(context) | ||||
end | end | ||||
@@ -61,4 +63,30 @@ defmodule Pleroma.ScheduledActivityTest do | |||||
assert changeset.errors == [scheduled_at: {"must be at least 5 minutes from now", []}] | assert changeset.errors == [scheduled_at: {"must be at least 5 minutes from now", []}] | ||||
end | end | ||||
end | end | ||||
test "creates a status from the scheduled activity" do | |||||
Pleroma.Config.put([ScheduledActivity, :enabled], true) | |||||
user = insert(:user) | |||||
naive_datetime = | |||||
NaiveDateTime.add( | |||||
NaiveDateTime.utc_now(), | |||||
-:timer.minutes(2), | |||||
:millisecond | |||||
) | |||||
scheduled_activity = | |||||
insert( | |||||
:scheduled_activity, | |||||
scheduled_at: naive_datetime, | |||||
user: user, | |||||
params: %{status: "hi"} | |||||
) | |||||
Pleroma.Workers.Cron.ScheduledActivityWorker.perform(:opts, :pid) | |||||
refute Repo.get(ScheduledActivity, scheduled_activity.id) | |||||
activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id)) | |||||
assert Pleroma.Object.normalize(activity).data["content"] == "hi" | |||||
end | |||||
end | end |