Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion extra/lib/plausible/funnel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ defmodule Plausible.Funnel do
@type t() :: %__MODULE__{}
schema "funnels" do
field :name, :string
field :open, :boolean, default: false
belongs_to :site, Plausible.Site

has_many :steps, Step,
Expand All @@ -55,7 +56,7 @@ defmodule Plausible.Funnel do

def changeset(funnel \\ %__MODULE__{}, attrs \\ %{}) do
funnel
|> cast(attrs, [:name])
|> cast(attrs, [:name, :open])
|> validate_required([:name])
|> put_steps(attrs[:steps] || attrs["steps"])
|> validate_length(:steps, min: @min_steps, max: @max_steps)
Expand Down
36 changes: 20 additions & 16 deletions extra/lib/plausible/funnels.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ defmodule Plausible.Funnels do

import Ecto.Query

@spec create(Plausible.Site.t(), String.t(), [map()]) ::
@spec create(Plausible.Site.t(), String.t(), [map()], Keyword.t()) ::
{:ok, Funnel.t()}
| {:error, Ecto.Changeset.t() | :invalid_funnel_size | :upgrade_required}
def create(site, name, steps)
def create(site, name, steps, opts \\ [])

def create(site, name, steps, opts)
when is_list(steps) and length(steps) in Funnel.min_steps()..Funnel.max_steps() do
site = Plausible.Repo.preload(site, :team)

Expand All @@ -26,19 +28,19 @@ defmodule Plausible.Funnels do

:ok ->
site
|> create_changeset(name, steps)
|> create_changeset(name, steps, opts)
|> Repo.insert()
end
end

def create(_site, _name, _goals) do
def create(_site, _name, _goals, _opts) do
{:error, :invalid_funnel_size}
end

@spec update(Funnel.t(), String.t(), [map()]) ::
@spec update(Funnel.t(), String.t(), [map()], Keyword.t()) ::
{:ok, Funnel.t()}
| {:error, Ecto.Changeset.t() | :invalid_funnel_size | :upgrade_required}
def update(funnel, name, steps) do
def update(funnel, name, steps, opts \\ []) do
site = Plausible.Repo.preload(funnel, site: :team).site

case Plausible.Billing.Feature.Funnels.check_availability(site.team) do
Expand All @@ -47,27 +49,29 @@ defmodule Plausible.Funnels do

:ok ->
funnel
|> Funnel.changeset(%{name: name, steps: steps})
|> edit_changeset(name, steps, opts)
|> Repo.update()
end
end

@spec create_changeset(Plausible.Site.t(), String.t(), [map()]) ::
@spec create_changeset(Plausible.Site.t(), String.t(), [map()], Keyword.t()) ::
Ecto.Changeset.t()
def create_changeset(site, name, steps) do
Funnel.changeset(%Funnel{site_id: site.id}, %{name: name, steps: steps})
def create_changeset(site, name, steps, opts \\ []) do
open? = Keyword.get(opts, :open?, false)
Funnel.changeset(%Funnel{site_id: site.id}, %{name: name, steps: steps, open: open?})
end

@spec edit_changeset(Plausible.Funnel.t(), String.t(), [map()]) ::
@spec edit_changeset(Plausible.Funnel.t(), String.t(), [map()], Keyword.t()) ::
Ecto.Changeset.t()
def edit_changeset(funnel, name, steps) do
Funnel.changeset(funnel, %{name: name, steps: steps})
def edit_changeset(funnel, name, steps, opts \\ []) do
open? = Keyword.get(opts, :open?, false)
Funnel.changeset(funnel, %{name: name, steps: steps, open: open?})
end

@spec ephemeral_definition(Plausible.Site.t(), String.t(), [map()]) :: Funnel.t()
def ephemeral_definition(site, name, steps) do
@spec ephemeral_definition(Plausible.Site.t(), String.t(), [map()], Keyword.t()) :: Funnel.t()
def ephemeral_definition(site, name, steps, opts \\ []) do
site
|> create_changeset(name, steps)
|> create_changeset(name, steps, opts)
|> Ecto.Changeset.apply_changes()
end

Expand Down
187 changes: 159 additions & 28 deletions extra/lib/plausible/stats/funnel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule Plausible.Stats.Funnel do
"""

@funnel_window_duration 86_400
@enter_offset 40
@max_steps 32

alias Plausible.Funnel
alias Plausible.Funnels
Expand Down Expand Up @@ -32,28 +34,33 @@ defmodule Plausible.Stats.Funnel do
query
|> Base.base_event_query()
|> query_funnel(funnel)
|> Enum.into(%{})

# Funnel definition steps are 1-indexed, if there's index 0 in the resulting query,
# it signifies the number of visitors that haven't entered the funnel.
not_entering_visitors =
case funnel_data do
[{0, count} | _] -> count
_ -> 0
end
not_entering_visitors = funnel_data[@enter_offset] || 0

all_visitors = Enum.reduce(funnel_data, 0, fn {_, n}, acc -> acc + n end)
steps = backfill_steps(funnel_data, funnel)
all_visitors =
Enum.reduce(funnel_data, 0, fn {step, n}, acc ->
if step >= @enter_offset do
acc + n
else
acc
end
end)

entering_visitors = all_visitors - not_entering_visitors

visitors_at_first_step = List.first(steps).visitors
steps = backfill_steps(funnel_data, funnel, entering_visitors)

{:ok,
%{
name: funnel.name,
steps: steps,
all_visitors: all_visitors,
entering_visitors: visitors_at_first_step,
entering_visitors_percentage: percentage(visitors_at_first_step, all_visitors),
never_entering_visitors: all_visitors - visitors_at_first_step,
entering_visitors: entering_visitors,
entering_visitors_percentage: percentage(entering_visitors, all_visitors),
never_entering_visitors: all_visitors - entering_visitors,
never_entering_visitors_percentage: percentage(not_entering_visitors, all_visitors)
}}
end
Expand All @@ -77,7 +84,22 @@ defmodule Plausible.Stats.Funnel do
ClickhouseRepo.all(query)
end

defp select_funnel(db_query, %{open: true} = funnel_definition) do
select_open_funnel(db_query, funnel_definition)
end

defp select_funnel(db_query, funnel_definition) do
select_closed_funnel(db_query, funnel_definition)
end

# The closed funnel matches when a user completes 1 or more continuous steps,
# starting from the first step.
#
# The select statement returns each completed step (1-indexed). Additionally
# it returns `@enter_offset + firstStep` special step, where `firstStep`
# is practically either 1 (user has entered the funnel) or 0 (user has
# not entered the funnel).
defp select_closed_funnel(db_query, funnel_definition) do
window_funnel_steps =
Enum.reduce(funnel_definition.steps, nil, fn step, acc ->
goal_condition = Plausible.Stats.Goals.goal_condition(step.goal)
Expand All @@ -89,10 +111,25 @@ defmodule Plausible.Stats.Funnel do
end
end)

funnel_steps =
dynamic(
[q],
fragment(
"if(length(range(1, windowFunnel(?)(timestamp, ?) + 1) AS funArr) > 0, funArr, ?)",
@funnel_window_duration,
^window_funnel_steps,
^[0]
)
)

dynamic_window_funnel =
dynamic(
[q],
fragment("windowFunnel(?)(timestamp, ?)", @funnel_window_duration, ^window_funnel_steps)
fragment(
"arrayJoin(arrayConcat(?, [funArr[1] + ?]))",
^funnel_steps,
@enter_offset
)
)

from(q in db_query,
Expand All @@ -103,40 +140,134 @@ defmodule Plausible.Stats.Funnel do
)
end

defp backfill_steps(funnel_result, funnel) do
# The open funnel matches when a user completes 1 or more continuous steps,
# starting from any step in the funnel.
#
# First, an array or funnel subsequences (arrays) is built. Subseqeuences
# are then checked starting from the 1st step, then from second, up until
# a sequence with last step of the funnel only.
#
# There's an optimization where we exit early if we match a funnel subsequence
# finishing at the last step of the funnel, as there's a guarantee that
# the further, shorter ones won't return a longer matched sequence.
#
# Next, the longest sequence out of the checked sequences is chosen. An
# additional step is appended computed as `@enter_offset + firstStep`
# where `firstStep` is the step index at which the user has entered the funnel.
# When `firstStep` is equal to 0, it means that the user has not entered
# the funnel.
defp select_open_funnel(db_query, funnel_definition) do
steps_count = length(funnel_definition.steps)

window_funnel_steps =
Enum.map(funnel_definition.steps, &Plausible.Stats.Goals.goal_condition(&1.goal))

offset_funnels =
Enum.map(steps_count..1//-1, fn idx ->
offset_steps =
window_funnel_steps
|> Enum.drop(idx - 1)
|> Enum.reduce(nil, fn step, acc ->
if acc do
dynamic([q], fragment("?, ?", ^acc, ^step))
else
dynamic([q], fragment("?", ^step))
end
end)

{nested_funnel(offset_steps, idx), idx}
end)

funnel_reduction =
Enum.reduce(offset_funnels, dynamic([q], fragment("array()")), fn {funnel_expr, idx}, acc ->
nested_funnel_conditional(funnel_expr, acc, idx, steps_count)
end)

longest_funnel =
dynamic([q], fragment("arraySort(x -> length(x), ?)[-1]", ^funnel_reduction))

dynamic_open_window_funnel =
dynamic([q], fragment("? AS funSteps", ^longest_funnel))

full_open_window_funnel =
dynamic(
[q],
fragment(
"arrayJoin(arrayPushBack(?, funSteps[1] + ?))",
^dynamic_open_window_funnel,
@enter_offset
)
)

from(q in db_query,
select_merge:
^%{
step: full_open_window_funnel
}
)
end

# Fragment expressions must be precompiled as `fragment` does compile-time
# checks on them - for instance it does not allow string interpolation.
# We must work around that in order to avoid calling `windowFunnel`
# with the same parameters multiple times and use alias instead. Ecto does not
# allow `selected_as` aliases anywhere below top level of `select`, `where` etc.

for idx <- 1..@max_steps do
fragment_str = "range(#{idx}, windowFunnel(?)(timestamp, ?) + #{idx}) as funArr#{idx}"

defp nested_funnel(steps, unquote(idx)) do
dynamic(
[q],
fragment(
unquote(fragment_str),
@funnel_window_duration,
^steps
)
)
end
end

for idx <- 1..@max_steps, steps <- 1..@max_steps do
fragment_str =
"if(length(?) >= #{steps - idx + 1}, [funArr#{idx}], arrayPushBack(?, funArr#{idx}))"

defp nested_funnel_conditional(current_expr, inner_expr, unquote(idx), unquote(steps)) do
dynamic(
[q],
fragment(
unquote(fragment_str),
^current_expr,
^inner_expr
)
)
end
end

defp backfill_steps(funnel_result, funnel, entering_visitors) do
# Directly from ClickHouse we only get {step_idx(), visitor_count()} tuples.
# but no totals including previous steps are aggregated.
# Hence we need to perform the appropriate backfill
# and also calculate dropoff and conversion rate for each step.
# In case ClickHouse returns 0-index funnel result, we're going to ignore it
# anyway, since we fold over steps as per definition, that are always
# indexed starting from 1.
funnel_result = Enum.into(funnel_result, %{})
max_step = Enum.max_by(funnel.steps, & &1.step_order).step_order

funnel
|> Map.fetch!(:steps)
|> Enum.reduce({nil, nil, []}, fn step, {total_visitors, visitors_at_previous, acc} ->
|> Enum.reduce({nil, []}, fn step, {visitors_at_previous, acc} ->
# first step contains the total number of all visitors qualifying for the funnel,
# with each subsequent step needing to accumulate sum of the previous one(s)
visitors_at_step =
step.step_order..max_step
|> Enum.map(&Map.get(funnel_result, &1, 0))
|> Enum.sum()
visitors_at_step = Map.get(funnel_result, step.step_order, 0)

# accumulate current_visitors for the next iteration
current_visitors = visitors_at_step

# First step contains the total number of visitors that we base percentage dropoff on
total_visitors =
total_visitors ||
current_visitors

# Dropoff is 0 for the first step, otherwise we subtract current from previous
dropoff = if visitors_at_previous, do: visitors_at_previous - current_visitors, else: 0

dropoff_percentage = percentage(dropoff, visitors_at_previous)
conversion_rate = percentage(current_visitors, total_visitors)
conversion_rate = percentage(current_visitors, entering_visitors)
conversion_rate_step = percentage(current_visitors, visitors_at_previous)

step = %{
Expand All @@ -148,9 +279,9 @@ defmodule Plausible.Stats.Funnel do
label: to_string(step.goal)
}

{total_visitors, current_visitors, [step | acc]}
{current_visitors, [step | acc]}
end)
|> elem(2)
|> elem(1)
|> Enum.reverse()
end

Expand Down
Loading
Loading