Alessandro Lai / @AlessandroLai
SymfonyCon 2019 - November 21st-22nd 2019, Amsterdam
What's Event Sourcing?
It's normally opposed to CRUD,
see Martin Fowler's article
It's a design pattern, it stands for:
{
"_id" : ObjectId("5898ab5a22c92d69123f7281"),
"type" : "agendaCreated",
"eventFamily" : "agenda",
"meta" : {
"createdAt" : ISODate("2017-02-06T17:59:05"),
"receivedAt" : ISODate("2017-02-06T17:59:06"),
"currentUserId" : 29877,
"taskId" : 3503425,
"productId" : 5937725,
...
},
"payload" : {
"currentUser" : {
"id" : 29877,
...
},
"task" : {
"id" : 3503425,
...
},
"product" : {
"id" : 5937725,
...
}
...
}
}
createdAt
Classes | Documents |
---|---|
|
|
class Runner
{
public function run(ProjectorInterface $projector);
}
Used by the runner to persist execution state
class RunState implements MongoDocumentInterface
{
/** @var string The Projector FQCN */
private $class;
/** @var bool */
private $stillRunning;
/** @var \DateTimeInterface */
private $lastRunAt;
/** @var Mongo\ObjectId */
private $lastMaxId;
/** @var \DateTimeInterface */
private $lastProjectedEventCreatedAt;
}
interface ProjectorInterface
{
// projection execution
public function initializeProjector();
public function projectAndSave(Event $event): Result;
public function normalizeAfterRun(\DateTime $from, \DateTime $to);
// event filters
public function getEventTypes(): array;
public function getAdditionalEventFilters(): array;
}
Runner::run()
$runState = $this->loadRunState($projector);
$iterator = $this->loadIterator($runState, $projector);
$event = $iterator->rewind();
if (! $iterator->valid()) {
return $this->updateRunStateOnAccomplished($runState, $lastEvent);
}
if ($this->isInvalidRun($runState, $event)) {
$runState->setLastProjectedCreateAt($event->getCreatedAt());
$runState->setStopped(true);
$this->persistanceManager->save($runState);
return $this->run($projector);
}
$this->updateRunStateOnStarted($runState);
// ...
Runner::run()
do {
$lastEvent = $iterator->current();
$projector->projectAndSave($lastEvent);
if ($this->shouldStop($start, $projector)) {
$this->updateRunStateOnStopped($runState, $lastEvent);
break;
}
} while ($iterator->next());
$projector->normalizeAfterRun(
$firstEvent->getCreatedAt(),
$lastEvent->getCreatedAt()
);
$this->updateRunStateOnAccomplished($runState, $maxId, $lastCreatedAt);
The Projector delegates the calculations
to a group of Executor
classes
interface ExecutorInterface
{
public function supportEventsType(): array;
public function execute(Event $event): Result;
}
class SomeProjector implements ProjectorInterface
{
public function projectAndSave(Event $event): Result
{
return $this->getExecutor($event)->execute($event);
}
}
ProjectionState
stores additional information
class ProjectionState implements MongoDocumentInterface
{
/** @var string The Projector FQCN */
private $class;
/** @var string */
private $correlationId;
/** @var mixed */
private $data;
/** @var \DateTime|null */
private $expireAt;
}
ProjectionState
ASAP
ProjectionStates::$expirationDate
db.projection_state.createIndex(
{ "expireAt": 1 },
{ expireAfterSeconds: 0 }
);
RunState
, ProjectionState
, the projection itself)
we save it as documents on MongoDB
Executors
code...
Events Projector Projection (intermediate)
Projection aggregation Final result (snapshot)
Small context, easily testable
Runner | ||
Projector1 | Projector2 | Projector3 |
Executor1a | Executor2a | Executor3a |
Executor1b | Executor2b | Executor3b |
... | ... | ... |
Since we have events, we know EVERYTHING
Investigating a strange bug report
is now easier than ever
facile-it/mongodb-bundle
ext-mongodb