PHP + ES + CQRS + DDD = ?

An integrated strategy

Alessandro Lai / @AlessandroLai

PHPDay - May 20th 2022, Verona - https://joind.in/talk/3dab6

Who am I?

  • Alessandro Lai
  • Engineering manager @ Facile.it
  • @Jean85
  • @AlessandroLai
  • PHP UG Milan Coordinator
  • PHP-FIG Secretary
Payments team roster
  1. Base concepts
  2. Our starting point
  3. The tools
  4. The migration strategy
  5. Conclusions

Normally,
we reach immediately to code...

Common approaches

We focused the problem, now how do we solve it?

We start with Domains expert and Developers

"The context is King"

A.Brandolini

Data-oriented development

⛔ Cons

  • Not suitable for complex problems
  • Difficult to evolve
  • Not very expressive

✅ Pro

  • Fast
  • Minimal effort
  • Proven development mode

Business-oriented development

⛔ Cons

  • More effort upfront
  • Not suitable for simple problems
  • Needs discipline

✅ Pro

  • Suitable for complex problems
  • More expressive
  • More flexible

The evolution of DDD - CQRS - CQRS/ES

We start with Domains expert and Developers
Credits: https://eventmodeling.org/about/history.jpg

The workshop

Marco Heimeshoff


Domain Driven Design

DDD simplified

Domain Driven Design + CQRS

CQRS simplified

Domain Driven Design + CQRS/ES

CQRS/ES simplified

The goal! ⚽️

  • New capacity to explore the Domain
  • New opportunities for Business & Devs
  • The true and unique invariant rules are in the business
  • Leverage the tech to produce business value
  1. Base concepts
  2. Our starting point
  3. The tools
  4. The migration strategy
  5. Conclusions

Scenario

Adding a new payment method to our payments service provider

Product (aggregate root)
1..n Orders 0..n Payment type A NEW: PHPayment 🐘
  1. Base concepts
  2. Our starting point
  3. The tools
  4. The migration strategy
  5. Conclusions

The DDD "onion" / layers

DDD layers
Cfr. "Implementing Domain-Driven Design" - V.Vernon (the so-called "Red Book")
Figure 4.3, page 124

Presentation: API

Open API + Stoplight
Stoplight Rif. API contracts: Leveraging OpenAPI during API development - A.Lai

Enforcing the "onion"

PHP AT
class CoreArchitectureTest extends ArchitectureTest
{
    public function testCoreDomainDependencies(): Rule
    {
        return $this->newRule
          ->classesThat(Selector::haveClassName('Facile\Domain\*'))

          ->canOnlyDependOn()

          ->classesThat(Selector::haveClassName('Facile\Domain\*'))
          ->classesThat(Selector::haveClassName(\Webmozart\Assert::class))
          ->classesThat(Selector::haveClassName(\Ramsey\Uuid\UuidInterface::class))

          ->build();
    }
}

EventSauce

EventSauce logo
  1. Focused on event sourcing
  2. Pragmatic library
  3. Extensible by design
composer require eventsauce/eventsauce
https://github.com/eventsaucephp/eventsauce

Aggregate Root ID


namespace EventSauce\EventSourcing;

interface AggregateRootId
{
    public function toString(): string;

    public static function fromString(string $aggregateRootId): self;
}
            

Aggregate Root ID - UUID

namespace Facile\Domain\Model;
use EventSauce\EventSourcing\AggregateRootId;

abstract class AbstractAggregateRootId implements AggregateRootId
{
    private function __construct(private UuidInterface $uuid){}
    
    public static function create(): static
    {
        return new static(Uuid::uuid4());
    }
    
    public function toString(): string
    {
        return $this->uuid->toString();
    }
    
    public static function fromString(string $aggregateRootId): static
    {
        return new static(Uuid::fromString($aggregateRootId));
    }
}

Aggregate Root ID - Product ID

namespace Facile\Domain\Model;

final class ProductId extends AbstractAggregateRootId
{
}

Event

namespace EventSauce\EventSourcing\Serialization;

interface SerializablePayload
{
    public function toPayload(): array;
    
    public static function fromPayload(array $payload): self;
}

Event - Product Created (1/2)


namespace Facile\Domain\Events;
                    
final class ProductCreated implements SerializablePayload
{
    private function __construct(
        private ProductId $productId,
        private \DateTimeImmutable $creationDate
    ) {
    }
    
    public static function create(ProductId $productId): self
    {
        $creationDate = new \DateTimeImmutable();
        
        return new self($productId, $creationDate);
    }

    // next slide...
}
            

Event - Product Created (2/2)


final class ProductCreated implements SerializablePayload
{
    // ...previous slide                    

    public function toPayload(): array
    {
        return [
            'productId' => $this->productId->toString(),
            'creationDate' => $this->creationDate->format(\DATE_ATOM),
        ];
    }

    public static function fromPayload(array $payload): self
    {
        return new self(
            ProductId::fromString($payload['productId']),
            \DateTimeImmutable::createFromFormat(\DATE_ATOM, $payload['creationDate'])
        );
    }
}

Aggregate Root

namespace EventSauce\EventSourcing;

interface AggregateRoot
{
    public function aggregateRootId(): AggregateRootId;
    
    public function aggregateRootVersion(): int;
    
    public function releaseEvents(): array;
    
    public static function reconstituteFromEvents(
        AggregateRootId $aggregateRootId,
        Generator $events
    ): static;
}

Aggregate Root - Product

namespace Facile\Domain\Model;

use EventSauce\EventSourcing\AggregateRoot;
use EventSauce\EventSourcing\AggregateRootBehaviour;

final class Product implements AggregateRoot
{
    use AggregateRootBehaviour;
}

Aggregate Root - Creation

final class Product implements AggregateRoot
{
    use AggregateRootBehaviour;
    
    public static function create(ProductId $productId): self
    {
        $product = new self($productId);
        $productCreated = ProductCreated::create($productId);
        $product->recordThat($productCreated);
        
        return $product;
    }
}

Aggregate Root - Trait

namespace EventSauce\EventSourcing;

trait AggregateRootBehaviour
{
    // ...
    
    private AggregateRootId $aggregateRootId;
    
    private function __construct(AggregateRootId $aggregateRootId)
    {
        $this->aggregateRootId = $aggregateRootId;
    }
    
    protected function recordThat(object $event): void
    {
        $this->apply($event);
        $this->recordedEvents[] = $event;
    }

    // ...
}
            

Aggregate Root - Applying events

namespace EventSauce\EventSourcing;

trait AggregateAlwaysAppliesEvents
{
    private int $aggregateRootVersion = 0;
    
    protected function apply(object $event): void
    {
        $parts = explode('\\', get_class($event));

        $this->{'apply' . end($parts)}($event);

        ++$this->aggregateRootVersion;
    }
}

Aggregate Root - Applying events

namespace Facile\Domain\Model;

final class Product implements AggregateRoot
{
    // ...
    
    private ?\DateTimeImmutable $creationDate = null;
    
    public function applyProductCreated(ProductCreated $productCreated): void
    {
        $this->creationDate = $productCreated->getCreationDate();
    }
}

Aggregate Root - Reconstitution

namespace EventSauce\EventSourcing;

trait AggregateRootBehaviour
{
    // ...

    public static function reconstituteFromEvents(
        AggregateRootId $aggregateRootId,
        Generator $events
    ): static
    {
        $aggregateRoot = new static($aggregateRootId);
        
        foreach ($events as $event) {
          $aggregateRoot->apply($event);
        }
        // ...
    
        return $aggregateRoot;
    }
}

Aggregate Root Repository

namespace EventSauce\EventSourcing;

interface AggregateRootRepository
{
    public function retrieve(AggregateRootId $aggregateRootId): object;
    
    public function persist(object $aggregateRoot): void;
    
    // ...
}

Product Repository

namespace Facile\Infrastructure\Repository;

use EventSauce\EventSourcing\EventSourcedAggregateRootRepository;

final class ProductRepository extends EventSourcedAggregateRootRepository
{
}

Aggregate Root Repository (1/2)

namespace EventSauce\EventSourcing;

class EventSourcedAggregateRootRepository implements AggregateRootRepository
{
    public function __construct(
        string $aggregateRootClassName,
        MessageRepository $messageRepository,
        MessageDispatcher $dispatcher = null,
        // ...
    ) {
        // ...
        $this->dispatcher = $dispatcher ?: new SynchronousMessageDispatcher();
    }

    public function retrieve(AggregateRootId $aggregateRootId): object
    {
        $className = $this->aggregateRootClassName;
        $events = $this->retrieveAllEvents($aggregateRootId);
        
        return $className::reconstituteFromEvents($aggregateRootId, $events);
    }

    // next slide...

Aggregate Root Repository (2/2)

    // ...previous slide
    public function persist(object $aggregateRoot): void
    {
        // ...
        $this->persistEvents(
            $aggregateRoot->aggregateRootId(),
            $aggregateRoot->aggregateRootVersion(),
            ...$aggregateRoot->releaseEvents()
        );
    }
    
    public function persistEvents(AggregateRootId $id, int $version, object ...$events): void
    {
        $messages = array_map(function (object $event) {
            // Events are transformed into messages
        }, $events);
        
        $this->messageRepository->persist(...$messages);
        $this->dispatcher->dispatch(...$messages);
    }
}

Doctrine2 Message Repository

composer require eventsauce/message-repository-for-doctrine-v2
  CREATE TABLE IF NOT EXISTS `your_table_name` (

    `event_id`          BINARY(16) NOT NULL,
    `aggregate_root_id` BINARY(16) NOT NULL,
    `version`           int(20) unsigned NULL,
    `payload`           varchar(16001) NOT NULL,

    PRIMARY KEY (`event_id`),

    KEY                  (`aggregate_root_id`),
    KEY `reconstitution` (`aggregate_root_id`, `version` ASC)
  )

  DEFAULT CHARACTER SET utf8mb4
  COLLATE utf8mb4_general_ci ENGINE=InnoDB;

Serialized Payload

  {
    "headers": {
       "__aggregate_root_id": "b62a84f8-72f0-11ec-90d6-0242ac120003",
       "__aggregate_root_type": "facile.domain.model.product",
       "__aggregate_root_version": 1,
       "__event_type": "facile.domain.events.product_created",
       "__time_of_recording": "2022-02-08 09:44:36.407236+0000",
       "__aggregate_root_id_type": "facile.domain.model.product_id",
       "__event_id": "24b38b08-9042-4035-bbff-6bd75295e1b9"
    },
    "payload": {
       "productId": "b62a84f8-72f0-11ec-90d6-0242ac120003",
       "creationDate": "2022-02-08T10:44:36+01:00"
    }
  }

EventSauce Lifecycle


    $product = $productRepository->retrieve($aggregateRootId);
    
    $product->addOrder($command);
    
    $productRepository->persist($product);
            

EventSauce Lifecycle

namespace Facile\Domain\Model;

class Product implements AggregateRoot
{
    // ...

    public function addOrder(AddOrderCommand $command): void
    {
        if (null !== $this->order) {
            throw new \DomainException('Order already present');
        }
        
        $orderAdded = OrderAdded::create($command);
        $this->recordThat($orderAdded);
    }

    public function applyOrderAdded(OrderAdded $orderAdded): void
    {
        $this->order = new Order($orderAdded->getOrderId(), $orderAdded->getMoney());
    }
}

Message Consumer

use EventSauce\EventSourcing\MessageConsumer;

class OrdersReadModel implements MessageConsumer
{
    public function handle(Message $message)
    {
        $aggregateRootId = $message->aggregateRootId();
        $event = $message->event();
    
        if ($event instanceof OrderAdded){
            # Instantiates a custom entity and persist it
        }
    }
}
  1. Base concepts
  2. Our starting point
  3. The tools
  4. The migration strategy
  5. Conclusions

Green field vs running project

All this is great for a new project...
...but what can we do in a running project?

Root-first or Leaf first?

Product (root)
1..n Orders 0..n
NEW: PHPayment 🐘

The breakthrough


"You have a single source of truth, or multiple sources of lies" (source: RT from @mipsytipsy)

The great workshop take-away

  • DB transactions to save data atomically
  • Sync between legacy tables and event store
  • Only downside: migrating will take longer

Generating events on-the-fly


class ProductRepository 
    extends EventSourcedAggregateRootRepository 
    implements ProductRepositoryInterface
{
    public function retrieve(AggregateRootId $id): object
    {
        $product = parent::retrieve($id);

        if ($product === null) {
            $doctrineProduct = $this->doctrineRepository->find($id);

            $product = Product::reconstituteFromEvents(
                $this->eventGenerator->generateFrom($doctrineProduct)
            );
        }

        return $product;
    }
}

Maintain the Doctrine tables (1/2)

final class Prodotto implements AggregateRoot
{
    // ...

    public function reservePHPayment(ReservePayment $command): void
    {
        if (/* ... */) {
            // domain logic & validation...
        }

        $event = new PHPaymentReservationCreated(/* ... */);

        // add the event to the aggregate
        $this->recordThat($event);

        // register the same stuff in the Doctrine entity
        $this->getDoctrineProduct()->addNewReservation($event)
    }

    // ...
}

Maintain the Doctrine tables (2/2)

class ReservePHPaymentHandler implements MessageHandlerInterface
{
    public function __invoke(ReservePHPayment $command): void
    {
        $id = $command->getRootId();
        // retrieve both the aggregate and the Doctrine entity
        $product = $this->productRepository->retrieve($id);

        // invoke the domain logic
        $product->reservePHPayment($command);

        // atomic flush due to single DB connection
        $this->doctrineRepository->beginTransaction();

        $this->productRepository->persist($product);
        $this->doctrineRepository->flush();

        $this->doctrineRepository->commit();
    }
}

How to iterate

class ProductRepository 
    extends EventSourcedAggregateRootRepository 
    implements ProductRepositoryInterface
{
    public function retrieve(AggregateRootId $id): object
    {
        $product = parent::retrieve($id);
    
        if ($this->eventsAreOutdated($product)) {
            $this->purgeEvents($id);
            $product = null;
        }

        if ($product === null) {
            // ...
        }

        return $product;
    }
    // ...
}
  1. Base concepts
  2. Our starting point
  3. The tools
  4. The migration strategy
  5. Conclusions

Advantages

  • Fast iterations
  • Gain confidence iteratively
  • Leverage ES & DDD immediately
  • Old code still works
  • No fixed deadlines
  • Still deliver business value

Disadvantages / pitfalls

  1. It may take a long time
  2. ...can you tell us more?

Thanks!

Please rate my talk on Joind.in: https://joind.in/talk/3dab6

Joind.in link: https://joind.in/talk/3dab6

Questions?