Adding Event Sourcing
to an existing PHP project

(for the right reasons)

Alessandro Lai / @AlessandroLai

PHPDay 2018 - May 10th-11th 2018, Verona

https://joind.in/talk/87e9e

Who am i?

What this talks is about?

  • It's a case history
  • Just a brief theory explanation
  • We bent the state of the art to our use case
  • We'll see practical implementation details!




Disclaimer 0:

It's a group effort!

The Facile.it Shark team

Disclaimer 1:

We got help!

We reached out to @gabrielelana
for a speedy bootstrap with
MongoDB + Event Sourcing

First things first:

What's Event Sourcing?

The prime example:

Money transactions, without events...

  • You save only the current state (the balance)
  • Each operation completely rewrites the information
  • What about concurrency???
  • We are addicted to software transactions,
    but they are inherently fallible

With events, instead...

  • We store performed actions
    (withdrawal, deposit)
  • Concurrency is no longer a big issue
    (negative balance)
  • Aggregating events gives me the current state
    (current balance)

And CQRS?

It's normally opposed to CRUD,
see Martin Fowler's article

It's a design pattern, it stands for:

  • Command (write)
  • Query (read)
  • Responsibility
  • Segregation

Story time!

In the real world...

  • Projects start small and simple
  • Adding event sourcing from the beginning is costly
  • Especially with PHP, a fast CRUD approach
    is used 99% of the time
  • Inherent business complexity comes only with time, while adding new features

Project Background

Our use case

  • The Shark team works on the
    Facile Partner Network (FPN) project
  • It's a B2B platform for car insurance brokering
  • We offer the platform to insurance agents
    that already have their own physical business

Complex domain

  • We handle the product after it's sold
  • We handle customer relationships
  • We have to be an adapter for a lot of processes
  • Last but not least, we have a team of 40+ accounts
  • The project is 6+ years old

Our issues

  • Business rules evolve really fast!
  • New reports are requested often
  • We still have to provide historical data
  • We also had a small piece of functionality
    that was having growing pains inside our RDBMS
    (MySQL + Doctrine)

Our solutions

  1. Gabriele
  2. MongoDB
  3. Event sourcing

Events in practice

Anatomy of an event

An event is just a document with some properties
 
  • Type
  • Family
  • Payload
  • Metadata
{
    "_id" : ObjectId("5898ab5a22c92d69123f7281"),
    "type" : "agendaCreated",
    "eventFamily" : "agenda",
    "meta" : {
        "createdAt" : ISODate("2017-02-06T17:59:05"),
        "receivedAt" : ISODate("2017-02-06T17:59:06"),
        "currentUserId" : 29877,
        "taskId" : 3503425,
        "productId" : 5937725,
        ... 
    },
    "payload" : {
        "currentUser" : {
            "id" : 29877,
            ...
        },
        "task" : {
            "id" : 3503425,
            ...
        },
        "product" : {
            "id" : 5937725,
            ...
        }
        ...
    }
}
						

Type & Family

  • The type is the event name
    It represents the kind of document that we will have
  • Family is a more generic grouping
    (depends on business domain)
  • Helpful for searching/filtering:
    you normally work with a specific list of events

Metadata

  • We use it for search purposes
  • Indexes are a must, at least on createdAt
  • We can also use it to store system info
    (user, origin, application...)

The payload

  • Should contain all the data related to the event
  • Remember, events are always in the past
  • How many things should I put in there?
    When in doubt, do not omit

Storing events

  • We already used the (Symfony) Event Dispatcher
    a lot to drive our business logic
  • We easily identified the points where we needed to store events
  • Base event + decorators
  • We didn't have enough confidence with MongoDB...

Real-time (delayed) storing

  • ... so we used our (RabbitMQ) queue to delay the event storing and avoid failures in critical processes
  • In theory, events should be append only,
    immutable and ordered in time...
  • We made this exception as an intentional trade-off

Generating events for the past

  • In theory, you shouldn't do it...
  • The trade-off allowed us to easily generate events for the past
  • We had to avoid writing duplicates
  • Deriving events from the current (lossy) state

It's time to read!

Projections

  • Aggregated result of multiple events
  • They are produced replaying events one at a time,
    and using them to reconstruct the desired result

Our implementation

  1. Runner + RunState
  2. Projector + Executor
  3. ProjectionState

The (one and only) Runner


class Runner
{
    public function run(ProjectorInterface $projector);
}					
						
  • Handles the cycle, one event at a time
  • Handles event filters provided by the Projector
  • Iterates through the events, one at a time

The RunState document

Used by the runner to persist execution state


class RunState implements MongoDocumentInterface
{
    /** @var string The Projector FQCN */
    private $class; 

    /** @var bool */
    private $stillRunning;
    /** @var \DateTimeInterface */
    private $lastRunAt;

    /** @var Mongo\ObjectId */
    private $lastMaxId;
    /** @var \DateTimeInterface */
    private $lastProjectedEventCreatedAt;

    // ...
}
						

The Projectors


interface ProjectorInterface
{
     // projection execution
    public function initializeProjector();

    public function projectAndSave(Event $event): Result;

    public function normalizeAfterRun(\DateTime $from, \DateTime $to);

    // event filters
    public function getEventTypes(): array;

    public function getAdditionalEventFilters(): array;
}
						
Runner::run()
Pre-execution checks

$runState = $this->loadRunState($projector);
if ($runState->isStillRunning()) {
    return $this->resetProjection($projector, $runState);
}

$iterator = $this->loadEventsIterator($projector, $runState);

if (! $iterator->valid()) {
    return $this->updateRunStateOnAccomplished($runState, $lastEvent);
}

$firstEvent = $iterator->current();

if ($this->eventIsOlderThanLastProjected($runState, $firstEvent)) {
    return $this->resetProjection($projector, $runState);
}

$this->updateRunStateOnStarted($runState);
// ...
                        
Runner::run()
The loop

do {
    $lastEvent = $iterator->current();

    $projector->projectAndSave($lastEvent);

    if ($this->shouldStop($start, $projector)) {
        $this->updateRunStateOnStopped($runState, $lastEvent);

        break;
    }

} while ($iterator->next());

$projector->normalizeAfterRun(
    $firstEvent->getCreatedAt(), 
    $lastEvent->getCreatedAt()
);

$this->updateRunStateOnAccomplished($runState, $maxId, $lastCreatedAt);
						

The executors

The Projector delegates the calculations
to a group of Executor classes


interface ExecutorInterface
{
    public function supportEventsType(): array;

    public function execute(Event $event): Result;
}
                        

class SomeProjector implements ProjectorInterface
{
    public function projectAndSave(Event $event): Result
    {
        return $this->getExecutor($event)->execute($event);
    }
}
                        

Intermediate state and correlation

  • Executors must be idempotent:
    i.e. if a bug produces a duplicated event, we must handle it gracefully
  • We use a ProjectionState document
    to store additional informations

class ProjectionState implements MongoDocumentInterface
{
    /** @var string The Projector FQCN */
    private $class;

    /** @var string */
    private $correlationId;

    /** @var mixed */
    private $data;

    /** @var \DateTime|null */
    private $expireAt;
}
                        

Reducing the amount of data

  • The Executor should delete the ProjectionState as soon as it's no longer needed
  • ProjectionStates::$expirationDate
    makes data disappear
  • 
    db.projection_state.createIndex(
        { "expireAt": 1 },
        { expireAfterSeconds: 0 }
    );
                                

The real trick: pushing state away

  • Every class is stateless:
    no properties contain data, only dependencies
  • When we have state
    (RunState, ProjectionState, the projection itself) we save it as documents on MongoDB

Practical issues

Duration of a projection

  • Since events' collection grows really fast,
    projections can take a very long time to complete from the beginning
  • Long running processes are not great with PHP
    We found multiple memory leaks (gc_collect_cycles , ext-mongodb)

Runner execution

  • We execute the runner in short intervals
  • We check if an other instance of the projector is already running
    We use the Symfony's LockableTrait

Updating past events

  • Especially early in the development phase,
    we forgot to add some payload data...
  • Since we did that trade-off to start with,
    we were able to fix it
  • Solution: fix code + fix events + reset projection
  • To avoid report outages,
    we started to take snapshots of projected data

Projections and code reuse

Due to CQRS and our objects' architecture,
reusing code between projections is impractical

What we gained

We reached our goal!!!

  • We obtained an easy way to produce
    new complex reports with historical reliability
  • We added MongoDB to our tech stack

Projection reuse

A projector can produce an intermediate result:
a simple denormalization that can be easily queried

Events Projector Projection (intermediate)

Projection aggregation Final result (snapshot)

Complexity splitted

Small context, easily testable

Runner
Projector1 Projector2 Projector3
Executor1a Executor2a Executor3a
Executor1b Executor2b Executor3b
... ... ...

Debug superpowers

Since we have events, we know EVERYTHING

Investigating a strange bug report
is now easier than ever

Our Symfony bundle

Thanks!

Please rate my talk on Joind.in: https://joind.in/talk/87e9e

Joind.in link: https://joind.in/talk/87e9e

Contacts

Attributions

Additional references

Questions?