A software engineer website

Haskell Legacy: Creating events

Gautier DI FOLCO July 30, 2024 [Haskell] #haskell #design #legacy #polysemy

Currently, our API has effects, it changed the way we deal with the code, but fundamentally, it does not change the way we will meet business needs.

Conceptually, events sourced system are based on:

Commands and Events are grouped by aggregate, although, they are multiple ways to represent Commands and Events, I have chosen Command handlers (commands are modeled as regular functions which emit events) and event stream (events are grouped in sequential unbounded collections).

Before diving into the design, we miss and important piece: the persistence, it's called an event store, and it has two basic operations:

newtype StreamId
  = StreamId Int
  deriving stock (Eq, Show)

newtype EventNumber
  = EventNumber Int
  deriving stock (Eq, Show)

data StoreEventError
  = DupliicatedEventSEE
  deriving stock (Eq, Show)

data EventStore (event :: Type) (m :: Type -> Type) (a :: Type) where
  StoreEvent :: StreamId -> EventNumber -> event -> EventStore event m (Either StoreEventError ())
  FetchEvents :: StreamId -> EventStore event m [(EventNumber, event)]

makeSem ''EventStore

Note: it is a basic implementation, not type-safe (events type is not saved nor checked).

We can start designing our events, mostly based on our effects:

data TrainEvent
  = TrainCreated
      { departureDate :: T.Text,
        departureStation :: T.Text,
        arrivalStation :: T.Text
      }
  | BookingCreated {id :: Int, travelerName :: T.Text}
  | BookingWithdrawn {id :: Int}

Note: Events are always expressed in the past tense

Note: we consider our effects as our Command handlers

Then, we have to implement these Command handlers, usually, in polysemy, you have interpreters, which interpret and consume effects, but here, at this time, we want to push a first version which registers only things which happened (a bit like a write-through cache).

In order to acheive that, we have to create an interceptor, which interprets effects, with consumming them, so we can call the "legacy"/"regular" logic.

Note: interceptors are one thing which sets polysemy and some other effects systems, it's a kind of aspect-oriented programming, the ability to change system's effects without changing it.

Let's take an example:

interceptBookingEffectEvents ::
  forall a m r.
  (Members '[EventStore TrainEvent, Error InternalApiError] r) =>
  Sem (BookingEffect ': r) a ->
  Sem (BookingEffect ': r) a
interceptBookingEffectEvents =
  intercept $ -- 1
    \case
      BookingCreate trainId travelerName -> do
        bookingId <- bookingCreate trainId travelerName -- 2
        let BookingKey bookingId' = bookingId
        events <- fetchEvents $ trainStreamId trainId -- 3
        unless (null events) $
          void $
            storeEvent -- 4
              (trainStreamId trainId)
              (EventNumber $ length events)
              BookingCreated {id = fromIntegral bookingId', travelerName = travelerName}
        return bookingId
      BookingDelete bookingId ->
        -- ...
  1. That's all it takes to go from an interpreter (interpret) to an interceptor (intercept)
  2. We emit again the effect (so legacy interpreter is called), we'll get an id, the "thing" is done
  3. We fetch events, in order to be sure the aggregate has been initialized (implicitly with TrainCreated)
  4. We register the event (the fact a new booking has been registered)

Finally, we have to add our interceptors in our api interpreter:

runPersistent :: ConnectionPool -> EffectsRunner a
runPersistent pool =
  -- ...
    . interpretBookingEffectPersistent -- interpret Booking (legacy)
    . interceptBookingEffectEvents -- intercept Booking (event)
    . interpretTrainEffectPersistent -- interpret Train (legacy)
    . interceptTrainEffectEvents -- intercept Train (event)

Tests are passing, the behavior is left unchanged, however events are only emitted for new trains.

In the next log, we'll see:

Back to top