Goals
We want a simplified version of event sourcing without the CQRS baggage. We have the following requirements
- Out of order events can be processed
- All states/projections uses the same sources, so we don’t have state mismatches
- We want to be flexible enough to allow changes, mainly to our event schema.
The whole optimization could happen later, we will talk about that in a second.
Event store
We created a very quick event store so we don’t need real event store libraries. This allows us to reduce complexity and doesn’t require specific PostgreSQL plug-ins or databases.
The downside of not using a specialized event store is that we cannot ‘notify’ parts of our code on new events or other things typically associated for event sourcing, we would need to build everything ourselves. Since we want to do a boring version of event sourcing, we’re a-okay with this trade off.
When we need more features, we could always migrate or enable plug-ins.
What we want to store
defmodule Events.ContentPieceStarted do
defstruct [:user_id, :unique_id, :session_id, :timestamp, :timezone, :locale, :object_uri]
end
defmodule Events.ContentPieceCompleted do
defstruct [:user_id, :unique_id, :session_id, :timestamp, :timezone, :locale, :object_uri]
end
defmodule Events.ContentPieceCancelled do
defstruct [:user_id, :unique_id, :session_id, :timestamp, :timezone, :locale, :object_uri]
end
Preparation: Schema & Repo
A very quick events
schema for the prototype could look like this
defmodule Persistence.Event do
use Ecto.Schema
schema "events" do
# which would be something like `Content Piece Started`,`Content Piece Completed`,`Content Piece Cancelled`.
field(:event_type, :string)
# This comes from PubSub - to reject duplicates
field(:unique_id, :string)
# This comes from RudderStack - its corrected for ingestion timezones
field(:timestamp, :utc_datetime)
# The UUID so we can quickly filter the events without looking in the payload
field(:user_uuid, :binary_id)
# The actual payload, it we can decide if we want to duplicate the metadata fields from above.
field(:payload, :map)
end
end
We choose to separate the metadata to either filter all our events into user streams, being able to order and de-duplicate them.
Basically everything that our even store will use for the inner workings are needed outside of the payload
.
The repo for completeness sake is just a default Repository.
defmodule Persistence.Repo do
use Ecto.Repo, otp_app: :eventsourcing_test, adapter: Ecto.Adapters.Postgres
end
The database layer
This EventStore
module will handle all our database interactions.
These interactions are:
- inserting a list or a single event into our database
- With all the necessary processing, extracting metadata and whatnot.
- fetching a stream events specifically tailored for a single user, this stream we’ll call the
user_stream
from now on.
defmodule EventStore do
import Ecto.Query
alias Persistence.Event
alias Persistence.Repo
def append(events) when is_list(events) do
data = Enum.map(events, &to_persistence/1)
Repo.insert_all(Persistence.Event, data)
end
def append(event) do
struct(Persistence.Event, to_persistence(event))
|> Repo.insert()
end
def list(user_uuid) do
from(e in Event, where: e.user_uuid == ^user_uuid, order_by: [asc: e.timestamp])
|> Repo.all()
end
defp to_persistence(
%event_type{user_id: user_id, timestamp: timestamp, unique_id: unique_id} = payload
) do
payload = Map.drop(payload, [:__struct__, :user_id, :timestamp, :unique_id])
%{
event_type: event_name(event_type),
unique_id: unique_id,
timestamp: prep_timestamp(timestamp),
user_uuid: user_id,
payload: payload
}
end
defp prep_timestamp(%DateTime{} = dt), do: DateTime.truncate(dt, :second)
defp prep_timestamp(other), do: other
defp event_name(module) when is_atom(module) do
module
|> Module.split()
|> List.last()
end
end
Small note: We create the event names from the module, we truncate the timestamps as we cannot insert microseconds in PostgreSQL fields.
Optimization: We would most likely want to use streams
for this to help optimize this process.
Projection
We create a behaviour that all projections need to implement.
defmodule Projection do
@type event :: any()
@type projection :: any()
@callback project([event()]) :: projection()
end
In project/1
we currently use a list of events, which we ideally want to be in a stream.
Dummy implementation
We create a dummy projection to prototype that basically counts all different event types a given user has. This projection / state is not saved in the database currently and will be created on the fly.
defmodule Projection.Stats do
@moduledoc """
Projects all events into a single set of statistics calculated per user.
"""
alias Persistence.Event
@behaviour Projection
defstruct completed_count: 0, started_count: 0, cancelled_count: 0
@impl true
def project(events) when is_list(events) do
Enum.reduce(events, %__MODULE__{}, fn
%Event{event_type: "ContentPieceCompleted"},
%__MODULE__{completed_count: count} = projection ->
%__MODULE__{projection | completed_count: count + 1}
%Event{event_type: "ContentPieceStarted"}, %__MODULE__{started_count: count} = projection ->
%__MODULE__{projection | started_count: count + 1}
%Event{event_type: "ContentPieceCancelled"},
%__MODULE__{cancelled_count: count} = projection ->
%__MODULE__{projection | cancelled_count: count + 1}
end)
%__MODULE__{
projection
| total: projection.completed_count + projection.started_count + projection.cancelled_count
}
end
end
Generating data
Using Streamdata we generated some plausible looking data for one specific user.
These events still need a bit of love as the timestamp
is Datetime.utc_now/0
instead of more plausible looking data.
Dummy data
We would also want to update this with interspersing other users data, so it’s not one chunk.
defmodule Generator do
use ExUnitProperties
alias Events
defp gen_uuid, do: Ecto.UUID.generate()
defp now_date, do: DateTime.utc_now()
@all_countries [
{"Europe/Berlin", "de"},
{"Europe/Lisbon", "pt"},
{"Europe/London", "en"},
{"Europe/Paris", "fr"},
{"Europe/Madrid", "es"},
{"Europe/Rome", "it"}
]
def single_uri_generator() do
gen all single_id <- string(:alphanumeric, length: 5),
record_id <- string(:alphanumeric, length: 5) do
"sevenmind://content/single/#{single_id}/recording/#{record_id}"
end
end
def course_item_uri_generator() do
gen all course_id <- string(:alphanumeric, length: 5),
item_id <- string(:alphanumeric, length: 5),
record_id <- string(:alphanumeric, length: 5) do
"sevenmind://content/course/#{course_id}/item/#{item_id}/recording/#{record_id}"
end
end
def object_uri_generator(), do: one_of([single_uri_generator(), course_item_uri_generator()])
# TODO make a generator that uses the normal generator and throws some weird/out of order events in there.
def session_events_generator(user_id) do
gen all session_id <- repeatedly(&gen_uuid/0),
object_uri <- object_uri_generator(),
first_event_type <-
frequency([
{85, constant(Events.ContentPieceStarted)},
{15, constant(nil)}
]),
last_event_type <-
frequency([
{55, constant(Events.ContentPieceCancelled)},
{40, constant(Events.ContentPieceCompleted)},
{5, constant(nil)}
]),
first <- piece_event_generator(first_event_type, user_id, {session_id, object_uri}),
last <- piece_event_generator(last_event_type, user_id, {session_id, object_uri}) do
Enum.shuffle([first, last])
end
end
# TODO we'll generate events for a _specific_ user
# the users are split into "main charcters" and "background characters"
# the idea is to verify if we can quickly extract an event stream for a single user.
def piece_event_generator(nil, _user_id, _params), do: nil
def piece_event_generator(type, user_id, {session_id, object_uri}) do
gen all unique_id <- repeatedly(&gen_uuid/0),
timestamp <- repeatedly(&now_date/0),
{timezone, locale} <- repeatedly(fn -> Enum.random(@all_countries) end) do
struct(type,
user_id: user_id,
session_id: session_id,
unique_id: unique_id,
timestamp: timestamp,
timezone: timezone,
locale: locale,
object_uri: object_uri
)
end
end
end
Using two mix tasks, we can create around 12_000
events at a time and use our dummy projector to read the this data.
> mix events.project fd138856-8d18-4ad1-a642-729454aeb633
13:52:25.107 [debug] QUERY OK source="events" db=927.4ms decode=48.6ms queue=41.2ms idle=0.0ms
SELECT e0."id", e0."event_type", e0."unique_id", e0."timestamp", e0."user_uuid", e0."payload" FROM "events" AS e0 WHERE (e0."user_uuid" = $1) ORDER BY e0."timestamp" ["fd138856-8d18-4ad1-a642-729454aeb633"]
%Projection.Stats{
completed_count: 22200,
started_count: 47099,
cancelled_count: 30701,
total: 100000
}
TODO
This research note is still under construction. Please have mercy on our soul.
- Fuck up and scramble more data
- Change event structure after already inserted
- Create a more hands-on projection (history?)
- How to handle the metadadata better