Alessandro Lai / @AlessandroLai
SymfonyDay 2018 - 19 ottobre 2018, Verona
What's Event Sourcing?
It's normally opposed to CRUD,
see Martin Fowler's article
It's a design pattern, it stands for:
{
"_id" : ObjectId("5898ab5a22c92d69123f7281"),
"type" : "agendaCreated",
"eventFamily" : "agenda",
"meta" : {
"createdAt" : ISODate("2017-02-06T17:59:05"),
"receivedAt" : ISODate("2017-02-06T17:59:06"),
"currentUserId" : 29877,
"taskId" : 3503425,
"productId" : 5937725,
...
},
"payload" : {
"currentUser" : {
"id" : 29877,
...
},
"task" : {
"id" : 3503425,
...
},
"product" : {
"id" : 5937725,
...
}
...
}
}
createdAt
Runner
+ RunState
Projector
+ Executor
ProjectionState
class Runner
{
public function run(ProjectorInterface $projector);
}
Projector
Used by the runner to persist execution state
class RunState implements MongoDocumentInterface
{
/** @var string The Projector FQCN */
private $class;
/** @var bool */
private $stillRunning;
/** @var \DateTimeInterface */
private $lastRunAt;
/** @var Mongo\ObjectId */
private $lastMaxId;
/** @var \DateTimeInterface */
private $lastProjectedEventCreatedAt;
// ...
}
interface ProjectorInterface
{
// projection execution
public function initializeProjector();
public function projectAndSave(Event $event): Result;
public function normalizeAfterRun(\DateTime $from, \DateTime $to);
// event filters
public function getEventTypes(): array;
public function getAdditionalEventFilters(): array;
}
Runner::run()
$runState = $this->loadRunState($projector);
if ($runState->isStillRunning()) {
return $this->resetProjection($projector, $runState);
}
$iterator = $this->loadEventsIterator($projector, $runState);
if (! $iterator->valid()) {
return $this->updateRunStateOnAccomplished($runState, $lastEvent);
}
$firstEvent = $iterator->current();
if ($this->eventIsOlderThanLastProjected($runState, $firstEvent)) {
return $this->resetProjection($projector, $runState);
}
$this->updateRunStateOnStarted($runState);
// ...
Runner::run()
do {
$lastEvent = $iterator->current();
$projector->projectAndSave($lastEvent);
if ($this->shouldStop($start, $projector)) {
$this->updateRunStateOnStopped($runState, $lastEvent);
break;
}
} while ($iterator->next());
$projector->normalizeAfterRun(
$firstEvent->getCreatedAt(),
$lastEvent->getCreatedAt()
);
$this->updateRunStateOnAccomplished($runState, $maxId, $lastCreatedAt);
The Projector delegates the calculations
to a group of Executor
classes
interface ExecutorInterface
{
public function supportEventsType(): array;
public function execute(Event $event): Result;
}
class SomeProjector implements ProjectorInterface
{
public function projectAndSave(Event $event): Result
{
return $this->getExecutor($event)->execute($event);
}
}
ProjectionState
document
class ProjectionState implements MongoDocumentInterface
{
/** @var string The Projector FQCN */
private $class;
/** @var string */
private $correlationId;
/** @var mixed */
private $data;
/** @var \DateTime|null */
private $expireAt;
}
Executor
should delete the ProjectionState
as soon as it's no longer needed
ProjectionStates::$expirationDate
db.projection_state.createIndex(
{ "expireAt": 1 },
{ expireAfterSeconds: 0 }
);
RunState
, ProjectionState
, the projection itself)
we save it as documents on MongoDB
gc_collect_cycles
, ext-mongodb
)
LockableTrait
Due to CQRS and our objects' architecture,
reusing code between projections is impractical
A projector can produce an intermediate result:
a simple denormalization that can be easily queried
Events Projector Projection (intermediate)
Projection aggregation Final result (snapshot)
Small context, easily testable
Runner | ||
Projector1 | Projector2 | Projector3 |
Executor1a | Executor2a | Executor3a |
Executor1b | Executor2b | Executor3b |
... | ... | ... |
Since we have events, we know EVERYTHING
Investigating a strange bug report
is now easier than ever
facile-it/mongodb-bundle
ext-mongodb
Please rate my talk on Joind.in: https://joind.in/talk/de1e0
Feedback su Joind.in per favore!