Goals

We want a simplified version of event sourcing without the CQRS baggage. We have the following requirements

  • Out of order events can be processed
  • All states/projections uses the same sources, so we don’t have state mismatches
  • We want to be flexible enough to allow changes, mainly to our event schema.

The whole optimization could happen later, we will talk about that in a second.

Event store

We created a very quick event store so we don’t need real event store libraries. This allows us to reduce complexity and doesn’t require specific PostgreSQL plug-ins or databases.

The downside of not using a specialized event store is that we cannot ‘notify’ parts of our code on new events or other things typically associated for event sourcing, we would need to build everything ourselves. Since we want to do a boring version of event sourcing, we’re a-okay with this trade off.

When we need more features, we could always migrate or enable plug-ins.

What we want to store

defmodule Events.ContentPieceStarted do
  defstruct [:user_id, :unique_id, :session_id, :timestamp, :timezone, :locale, :object_uri]
end
 
defmodule Events.ContentPieceCompleted do
  defstruct [:user_id, :unique_id, :session_id, :timestamp, :timezone, :locale, :object_uri]
end
 
defmodule Events.ContentPieceCancelled do
  defstruct [:user_id, :unique_id, :session_id, :timestamp, :timezone, :locale, :object_uri]
end

Preparation: Schema & Repo

A very quick events schema for the prototype could look like this

defmodule Persistence.Event do
  use Ecto.Schema
 
  schema "events" do
    # which would be something like `Content Piece Started`,`Content Piece Completed`,`Content Piece Cancelled`.
    field(:event_type, :string)
    # This comes from PubSub - to reject duplicates
    field(:unique_id, :string)
    # This comes from RudderStack - its corrected for ingestion timezones
    field(:timestamp, :utc_datetime)
    # The UUID so we can quickly filter the events without looking in the payload
    field(:user_uuid, :binary_id)
    # The actual payload, it we can decide if we want to duplicate the metadata fields from above.
    field(:payload, :map)
  end
end

We choose to separate the metadata to either filter all our events into user streams, being able to order and de-duplicate them. Basically everything that our even store will use for the inner workings are needed outside of the payload.

The repo for completeness sake is just a default Repository.

defmodule Persistence.Repo do
  use Ecto.Repo, otp_app: :eventsourcing_test, adapter: Ecto.Adapters.Postgres
end

The database layer

This EventStore module will handle all our database interactions. These interactions are:

  • inserting a list or a single event into our database
    • With all the necessary processing, extracting metadata and whatnot.
  • fetching a stream events specifically tailored for a single user, this stream we’ll call the user_stream from now on.
defmodule EventStore do
  import Ecto.Query
  alias Persistence.Event
  alias Persistence.Repo
 
  def append(events) when is_list(events) do
    data = Enum.map(events, &to_persistence/1)
 
    Repo.insert_all(Persistence.Event, data)
  end
 
  def append(event) do
    struct(Persistence.Event, to_persistence(event))
    |> Repo.insert()
  end
 
  def list(user_uuid) do
    from(e in Event, where: e.user_uuid == ^user_uuid, order_by: [asc: e.timestamp])
    |> Repo.all()
  end
 
  defp to_persistence(
         %event_type{user_id: user_id, timestamp: timestamp, unique_id: unique_id} = payload
       ) do
    payload = Map.drop(payload, [:__struct__, :user_id, :timestamp, :unique_id])
 
    %{
      event_type: event_name(event_type),
      unique_id: unique_id,
      timestamp: prep_timestamp(timestamp),
      user_uuid: user_id,
      payload: payload
    }
  end
 
  defp prep_timestamp(%DateTime{} = dt), do: DateTime.truncate(dt, :second)
  defp prep_timestamp(other), do: other
 
  defp event_name(module) when is_atom(module) do
    module
    |> Module.split()
    |> List.last()
  end
end

Small note: We create the event names from the module, we truncate the timestamps as we cannot insert microseconds in PostgreSQL fields. Optimization: We would most likely want to use streams for this to help optimize this process.

Projection

We create a behaviour that all projections need to implement.

defmodule Projection do
  @type event :: any()
  @type projection :: any()
 
  @callback project([event()]) :: projection()
end

In project/1 we currently use a list of events, which we ideally want to be in a stream.

Dummy implementation

We create a dummy projection to prototype that basically counts all different event types a given user has. This projection / state is not saved in the database currently and will be created on the fly.

defmodule Projection.Stats do
  @moduledoc """
  Projects all events into a single set of statistics calculated per user.
  """
  alias Persistence.Event
  @behaviour Projection
 
  defstruct completed_count: 0, started_count: 0, cancelled_count: 0
 
  @impl true
  def project(events) when is_list(events) do
    Enum.reduce(events, %__MODULE__{}, fn
      %Event{event_type: "ContentPieceCompleted"},
      %__MODULE__{completed_count: count} = projection ->
        %__MODULE__{projection | completed_count: count + 1}
 
      %Event{event_type: "ContentPieceStarted"}, %__MODULE__{started_count: count} = projection ->
        %__MODULE__{projection | started_count: count + 1}
 
      %Event{event_type: "ContentPieceCancelled"},
      %__MODULE__{cancelled_count: count} = projection ->
        %__MODULE__{projection | cancelled_count: count + 1}
    end)
 
    %__MODULE__{
      projection
      | total: projection.completed_count + projection.started_count + projection.cancelled_count
    }
  end
end

Generating data

Using Streamdata we generated some plausible looking data for one specific user. These events still need a bit of love as the timestamp is Datetime.utc_now/0 instead of more plausible looking data.

Dummy data

We would also want to update this with interspersing other users data, so it’s not one chunk.

defmodule Generator do
  use ExUnitProperties
  alias Events
 
  defp gen_uuid, do: Ecto.UUID.generate()
  defp now_date, do: DateTime.utc_now()
 
  @all_countries [
    {"Europe/Berlin", "de"},
    {"Europe/Lisbon", "pt"},
    {"Europe/London", "en"},
    {"Europe/Paris", "fr"},
    {"Europe/Madrid", "es"},
    {"Europe/Rome", "it"}
  ]
 
  def single_uri_generator() do
    gen all single_id <- string(:alphanumeric, length: 5),
            record_id <- string(:alphanumeric, length: 5) do
      "sevenmind://content/single/#{single_id}/recording/#{record_id}"
    end
  end
 
  def course_item_uri_generator() do
    gen all course_id <- string(:alphanumeric, length: 5),
            item_id <- string(:alphanumeric, length: 5),
            record_id <- string(:alphanumeric, length: 5) do
      "sevenmind://content/course/#{course_id}/item/#{item_id}/recording/#{record_id}"
    end
  end
 
  def object_uri_generator(), do: one_of([single_uri_generator(), course_item_uri_generator()])
 
  # TODO make a generator that uses the normal generator and throws some weird/out of order events in there.
  def session_events_generator(user_id) do
    gen all session_id <- repeatedly(&gen_uuid/0),
            object_uri <- object_uri_generator(),
            first_event_type <-
              frequency([
                {85, constant(Events.ContentPieceStarted)},
                {15, constant(nil)}
              ]),
            last_event_type <-
              frequency([
                {55, constant(Events.ContentPieceCancelled)},
                {40, constant(Events.ContentPieceCompleted)},
                {5, constant(nil)}
              ]),
            first <- piece_event_generator(first_event_type, user_id, {session_id, object_uri}),
            last <- piece_event_generator(last_event_type, user_id, {session_id, object_uri}) do
      Enum.shuffle([first, last])
    end
  end
 
  # TODO we'll generate events for a _specific_ user
  # the users are split into "main charcters" and "background characters"
  # the idea is to verify if we can quickly extract an event stream for a single user.
  def piece_event_generator(nil, _user_id, _params), do: nil
 
  def piece_event_generator(type, user_id, {session_id, object_uri}) do
    gen all unique_id <- repeatedly(&gen_uuid/0),
            timestamp <- repeatedly(&now_date/0),
            {timezone, locale} <- repeatedly(fn -> Enum.random(@all_countries) end) do
      struct(type,
        user_id: user_id,
        session_id: session_id,
        unique_id: unique_id,
        timestamp: timestamp,
        timezone: timezone,
        locale: locale,
        object_uri: object_uri
      )
    end
  end
end
 

Using two mix tasks, we can create around 12_000 events at a time and use our dummy projector to read the this data.

> mix events.project fd138856-8d18-4ad1-a642-729454aeb633

13:52:25.107 [debug] QUERY OK source="events" db=927.4ms decode=48.6ms queue=41.2ms idle=0.0ms
SELECT e0."id", e0."event_type", e0."unique_id", e0."timestamp", e0."user_uuid", e0."payload" FROM "events" AS e0 WHERE (e0."user_uuid" = $1) ORDER BY e0."timestamp" ["fd138856-8d18-4ad1-a642-729454aeb633"]
%Projection.Stats{
  completed_count: 22200,
  started_count: 47099,
  cancelled_count: 30701,
  total: 100000
}

TODO

This research note is still under construction. Please have mercy on our soul.

  • Fuck up and scramble more data
  • Change event structure after already inserted
  • Create a more hands-on projection (history?)
  • How to handle the metadadata better

References

Commanded Event Store Stream Data