Browse Source

generatoin and fetching

merge-requests/1875/head
Alex S 4 years ago
parent
commit
a1125bd564
4 changed files with 289 additions and 92 deletions
  1. +123
    -3
      lib/load_testing/fetcher.ex
  2. +147
    -56
      lib/load_testing/generator.ex
  3. +0
    -5
      lib/load_testing/helper.ex
  4. +19
    -28
      lib/mix/tasks/pleroma/load_testing.ex

+ 123
- 3
lib/load_testing/fetcher.ex View File

@@ -51,10 +51,52 @@ defmodule Pleroma.LoadTesting.Fetcher do
)
end,
"User mastodon public timeline" => fn ->
ActivityPub.ActivityPub.fetch_public_activities(mastodon_public_timeline_params)
Pleroma.Web.ActivityPub.ActivityPub.fetch_public_activities(
mastodon_public_timeline_params
)
end,
"User mastodon federated public timeline" => fn ->
ActivityPub.ActivityPub.fetch_public_activities(mastodon_federated_timeline_params)
Pleroma.Web.ActivityPub.ActivityPub.fetch_public_activities(
mastodon_federated_timeline_params
)
end
})

home_activities =
Pleroma.Web.ActivityPub.ActivityPub.fetch_activities(
[user.ap_id | user.following],
home_timeline_params
)

public_activities =
Pleroma.Web.ActivityPub.ActivityPub.fetch_public_activities(mastodon_public_timeline_params)

public_federated_activities =
Pleroma.Web.ActivityPub.ActivityPub.fetch_public_activities(
mastodon_federated_timeline_params
)

Benchee.run(%{
"Rendering home timeline" => fn ->
Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
activities: home_activities,
for: user,
as: :activity
})
end,
"Rendering public timeline" => fn ->
Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
activities: public_activities,
for: user,
as: :activity
})
end,
"Rendering public federated timeline" => fn ->
Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
activities: public_federated_activities,
for: user,
as: :activity
})
end
})
end
@@ -72,6 +114,27 @@ defmodule Pleroma.LoadTesting.Fetcher do
Pleroma.Web.MastodonAPI.MastodonAPI.get_notifications(user, with_muted_params)
end
})

without_muted_notifications =
Pleroma.Web.MastodonAPI.MastodonAPI.get_notifications(user, without_muted_params)

with_muted_notifications =
Pleroma.Web.MastodonAPI.MastodonAPI.get_notifications(user, with_muted_params)

Benchee.run(%{
"Render notifications without muted" => fn ->
Pleroma.Web.MastodonAPI.NotificationView.render("index.json", %{
notifications: without_muted_notifications,
for: user
})
end,
"Render notifications with muted" => fn ->
Pleroma.Web.MastodonAPI.NotificationView.render("index.json", %{
notifications: with_muted_notifications,
for: user
})
end
})
end

def query_dms(user) do
@@ -96,13 +159,40 @@ defmodule Pleroma.LoadTesting.Fetcher do
|> Pleroma.Pagination.fetch_paginated(Map.put(params, "with_muted", false))
end
})

dms_with_muted =
Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_query([user.ap_id], params)
|> Pleroma.Pagination.fetch_paginated(params)

dms_without_muted =
Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_query([user.ap_id], params)
|> Pleroma.Pagination.fetch_paginated(Map.put(params, "with_muted", false))

Benchee.run(%{
"Rendering dms with muted" => fn ->
Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
activities: dms_with_muted,
for: user,
as: :activity
})
end,
"Rendering dms without muted" => fn ->
Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
activities: dms_without_muted,
for: user,
as: :activity
})
end
})
end

def query_long_thread(user, activity) do
IO.puts("\n=================================")

Benchee.run(%{
"Fetch main post" => fn -> Activity.get_by_id_with_object(activity.id) end,
"Fetch main post" => fn ->
Pleroma.Activity.get_by_id_with_object(activity.id)
end,
"Fetch context of main post" => fn ->
Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_for_context(
activity.data["context"],
@@ -114,5 +204,35 @@ defmodule Pleroma.LoadTesting.Fetcher do
)
end
})

activity = Pleroma.Activity.get_by_id_with_object(activity.id)

context =
Pleroma.Web.ActivityPub.ActivityPub.fetch_activities_for_context(
activity.data["context"],
%{
"blocking_user" => user,
"user" => user,
"exclude_id" => activity.id
}
)

Benchee.run(%{
"Render status" => fn ->
Pleroma.Web.MastodonAPI.StatusView.render("status.json", %{
activity: activity,
for: user
})
end,
"Render context ancestors" => fn ->
Pleroma.Web.MastodonAPI.StatusView.render(
"index.json",
for: user,
activities: context,
as: :activity
)
|> Enum.reverse()
end
})
end
end

+ 147
- 56
lib/load_testing/generator.ex View File

@@ -1,9 +1,11 @@
defmodule Pleroma.LoadTesting.Generator do
use Pleroma.LoadTesting.Helper
alias Pleroma.Web.CommonAPI

def generate_users(opts) do
IO.puts("Starting generating #{opts[:users_max]} users...")
{time, _} = :timer.tc(fn -> do_generate_users(opts) end)

IO.puts("Inserting users take #{to_sec(time)} sec.\n")
end

@@ -37,34 +39,111 @@ defmodule Pleroma.LoadTesting.Generator do
following: [User.ap_id(user)]
}

Pleroma.Repo.insert!(user)
Repo.insert!(user)
end

def generate_activities(users, opts) do
IO.puts("Starting generating #{opts[:activities_max]} activities...")
{time, _} = :timer.tc(fn -> do_generate_activities(users, opts) end)
IO.puts("Inserting activities take #{to_sec(time)} sec.\n")
def generate_activities(user, users) do
do_generate_activities(user, users)
end

defp do_generate_activities(users, opts) do
Task.async_stream(
1..opts[:activities_max],
fn _ ->
do_generate_activity(users, opts)
end,
max_concurrency: 10,
timeout: 30_000
)
|> Stream.run()
defp do_generate_activities(user, users) do
IO.puts("Starting generating 20000 common activities...")

{time, _} =
:timer.tc(fn ->
Task.async_stream(
1..20_000,
fn _ ->
do_generate_activity([user | users])
end,
max_concurrency: 10,
timeout: 30_000
)
|> Stream.run()
end)

IO.puts("Inserting common activities take #{to_sec(time)} sec.\n")

IO.puts("Starting generating 20000 activities with mentions...")

{time, _} =
:timer.tc(fn ->
Task.async_stream(
1..20_000,
fn _ ->
do_generate_activity_with_mention(user, users)
end,
max_concurrency: 10,
timeout: 30_000
)
|> Stream.run()
end)

IO.puts("Inserting activities with menthions take #{to_sec(time)} sec.\n")

IO.puts("Starting generating 10000 activities with threads...")

{time, _} =
:timer.tc(fn ->
Task.async_stream(
1..10_000,
fn _ ->
do_generate_threads([user | users])
end,
max_concurrency: 10,
timeout: 30_000
)
|> Stream.run()
end)

IO.puts("Inserting activities with threads take #{to_sec(time)} sec.\n")
end

defp do_generate_activity(users) do
post = %{
"status" => "Some status without mention with random user"
}

CommonAPI.post(Enum.random(users), post)
end

defp do_generate_activity_with_mention(user, users) do
mentions_cnt = Enum.random([2, 3, 4, 5])
with_user = Enum.random([true, false])
users = Enum.shuffle(users)
mentions_users = Enum.take(users, mentions_cnt)
mentions_users = if with_user, do: [user | mentions_users], else: mentions_users

mentions_str =
Enum.map(mentions_users, fn user -> "@" <> user.nickname end) |> Enum.join(", ")

post = %{
"status" => mentions_str <> "some status with mentions random users"
}

CommonAPI.post(Enum.random(users), post)
end

defp do_generate_activity(users, opts) do
status =
if opts[:mention],
do: "some status with @#{opts[:mention].nickname}",
else: "some status"
defp do_generate_threads(users) do
thread_length = Enum.random([2, 3, 4, 5])
actor = Enum.random(users)

post = %{
"status" => "Start of the thread"
}

{:ok, activity} = CommonAPI.post(actor, post)

Enum.each(1..thread_length, fn _ ->
user = Enum.random(users)

post = %{
"status" => "@#{actor.nickname} reply to thread",
"in_reply_to_status_id" => activity.id
}

Pleroma.Web.CommonAPI.post(Enum.random(users), %{"status" => status})
CommonAPI.post(user, post)
end)
end

def generate_dms(user, users, opts) do
@@ -91,22 +170,21 @@ defmodule Pleroma.LoadTesting.Generator do
"visibility" => "direct"
}

Pleroma.Web.CommonAPI.post(Enum.random(users), post)
CommonAPI.post(Enum.random(users), post)
end

def generate_long_thread(user, users, opts) do
IO.puts("Starting generating long thread with #{opts[:long_thread_length]} replies")
IO.puts("Starting generating long thread with #{opts[:thread_length]} replies")
{time, activity} = :timer.tc(fn -> do_generate_long_thread(user, users, opts) end)
IO.puts("Inserting long thread replies take #{to_sec(time)} sec.\n")
{:ok, activity}
end

defp do_generate_long_thread(user, users, opts) do
{:ok, %{id: id} = activity} =
Pleroma.Web.CommonAPI.post(user, %{"status" => "Start of long thread"})
{:ok, %{id: id} = activity} = CommonAPI.post(user, %{"status" => "Start of long thread"})

Task.async_stream(
1..opts[:long_thread_length],
1..opts[:thread_length],
fn _ -> do_generate_thread(users, id) end,
max_concurrency: 10,
timeout: 30_000
@@ -117,50 +195,63 @@ defmodule Pleroma.LoadTesting.Generator do
end

defp do_generate_thread(users, activity_id) do
Pleroma.Web.CommonAPI.post(Enum.random(users), %{
CommonAPI.post(Enum.random(users), %{
"status" => "reply to main post",
"in_reply_to_status_id" => activity_id
})
end

def generate_private_thread(users, opts) do
IO.puts("Starting generating long thread with #{opts[:non_visible_posts_max]} replies")
{time, _} = :timer.tc(fn -> do_generate_non_visible_posts(users, opts) end)
IO.puts("Inserting long thread replies take #{to_sec(time)} sec.\n")
end
def generate_non_visible_message(user, users) do
IO.puts("Starting generating 1000 non visible posts")

defp do_generate_non_visible_posts(users, opts) do
[user1, user2] = Enum.take(users, 2)
{:ok, user1} = Pleroma.User.follow(user1, user2)
{:ok, user2} = Pleroma.User.follow(user2, user1)
{time, _} =
:timer.tc(fn ->
do_generate_non_visible_posts(user, users)
end)

{:ok, activity} =
Pleroma.Web.CommonAPI.post(user1, %{
"status" => "Some private post",
"visibility" => "private"
})
IO.puts("Inserting non visible posts take #{to_sec(time)} sec.\n")
end

{:ok, activity_public} =
Pleroma.Web.CommonAPI.post(user2, %{
"status" => "Some public reply",
"in_reply_to_status_id" => activity.id
})
defp do_generate_non_visible_posts(user, users) do
[not_friend | users] = users

Task.async_stream(
1..opts[:non_visible_posts_max],
fn _ -> do_generate_non_visible_post(users, activity_public) end,
make_friends(user, users)

Task.async_stream(1..1000, fn _ -> do_generate_non_visible_post(not_friend, users) end,
max_concurrency: 10,
timeout: 30_000
)
|> Stream.run()
end

defp do_generate_non_visible_post(users, activity) do
visibility = Enum.random(["private", "public"])
defp make_friends(_user, []), do: nil

Pleroma.Web.CommonAPI.post(Enum.random(users), %{
"visibility" => visibility,
"status" => "Some #{visibility} reply",
"in_reply_to_status_id" => activity.id
})
defp make_friends(user, [friend | users]) do
{:ok, _} = User.follow(user, friend)
{:ok, _} = User.follow(friend, user)
make_friends(user, users)
end

defp do_generate_non_visible_post(not_friend, users) do
post = %{
"status" => "some non visible post",
"visibility" => "private"
}

{:ok, activity} = CommonAPI.post(not_friend, post)

thread_length = Enum.random([2, 3, 4, 5])

Enum.each(1..thread_length, fn _ ->
user = Enum.random(users)

post = %{
"status" => "@#{not_friend.nickname} reply to non visible post",
"in_reply_to_status_id" => activity.id,
"visibility" => "private"
}

CommonAPI.post(user, post)
end)
end
end

+ 0
- 5
lib/load_testing/helper.ex View File

@@ -2,13 +2,8 @@ defmodule Pleroma.LoadTesting.Helper do
defmacro __using__(_) do
quote do
import Ecto.Query
alias Pleroma.Activity
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.ActivityPub
alias Pleroma.Web.CommonAPI

defp to_sec(microseconds), do: microseconds / 1_000_000
end


+ 19
- 28
lib/mix/tasks/pleroma/load_testing.ex View File

@@ -13,46 +13,37 @@ defmodule Mix.Tasks.Pleroma.LoadTesting do
- activities with notifications

## Generate data
MIX_ENV=test mix pleroma.load_testing --users 10000 --activities 20000
MIX_ENV=test mix pleroma.load_testing -u 10000 -a 20000
MIX_ENV=benchmark mix pleroma.load_testing --users 10000
MIX_ENV=benchmark mix pleroma.load_testing -u 10000

Options:
- `--users NUMBER` - number of users to generate (default: 10000)
- `--activities NUMBER` - number of activities to generate (default: 20000)
"""

@aliases [u: :users, a: :activities]
@aliases [u: :users, d: :dms, t: :thread_length]
@switches [
users: :integer,
activities: :integer,
dms: :integer,
thread_length: :integer,
non_visible_posts: :integer
thread_length: :integer
]
@users_default 20_000
@activities_default 50_000
@dms_default 50_000
@dms_default 20_000
@thread_length_default 2_000
@non_visible_posts_default 2_000

def run(args) do
start_pleroma()
{opts, _} = OptionParser.parse!(args, strict: @switches, aliases: @aliases)

users_max = Keyword.get(opts, :users, @users_default)
activities_max = Keyword.get(opts, :activities, @activities_default)
dms_max = Keyword.get(opts, :dms, @dms_default)
long_thread_length = Keyword.get(opts, :thread_length, @thread_length_default)
non_visible_posts = Keyword.get(opts, :non_visible_posts, @non_visible_posts_default)
thread_length = Keyword.get(opts, :thread_length, @thread_length_default)

clean_tables()

opts =
Keyword.put(opts, :users_max, users_max)
|> Keyword.put(:activities_max, activities_max)
|> Keyword.put(:dms_max, dms_max)
|> Keyword.put(:long_thread_length, long_thread_length)
|> Keyword.put(:non_visible_posts_max, non_visible_posts)
|> Keyword.put(:thread_length, thread_length)

generate_users(opts)

@@ -60,7 +51,9 @@ defmodule Mix.Tasks.Pleroma.LoadTesting do
IO.puts("Fetching main user...")

{time, user} =
:timer.tc(fn -> Repo.one(from(u in User, order_by: fragment("RANDOM()"), limit: 1)) end)
:timer.tc(fn ->
Repo.one(from(u in User, order_by: fragment("RANDOM()"), limit: 1))
end)

IO.puts("Fetching main user take #{to_sec(time)} sec.\n")

@@ -79,25 +72,23 @@ defmodule Mix.Tasks.Pleroma.LoadTesting do

IO.puts("Fetching users take #{to_sec(time)} sec.\n")

generate_activities(users, opts)

generate_activities(users, Keyword.put(opts, :mention, user))
generate_activities(user, users)

generate_dms(user, users, opts)

{:ok, activity} = generate_long_thread(user, users, opts)

generate_private_thread(users, opts)
generate_non_visible_message(user, users)

# generate_replies(user, users, activities)
IO.puts("Users in DB: #{Repo.aggregate(from(u in User), :count, :id)}")

# activity = Enum.random(activities)
# generate_long_thread(user, users, activity)
IO.puts("Activities in DB: #{Repo.aggregate(from(a in Pleroma.Activity), :count, :id)}")

IO.puts("Users in DB: #{Repo.aggregate(from(u in User), :count, :id)}")
IO.puts("Activities in DB: #{Repo.aggregate(from(a in Activity), :count, :id)}")
IO.puts("Objects in DB: #{Repo.aggregate(from(o in Object), :count, :id)}")
IO.puts("Notifications in DB: #{Repo.aggregate(from(n in Notification), :count, :id)}")
IO.puts("Objects in DB: #{Repo.aggregate(from(o in Pleroma.Object), :count, :id)}")

IO.puts(
"Notifications in DB: #{Repo.aggregate(from(n in Pleroma.Notification), :count, :id)}"
)

fetch_user(user)
query_timelines(user)


Loading…
Cancel
Save