Haskell Legacy: Going event-first

Previously, we have reversed writes, which means that we write events, then we write the legacy tables.

As a reminder, we have some code looking like that:

interceptTrainEffectEvents ::
  forall r.
  (Members '[EventStore TrainEvent, Embed IO, Error InternalApiError, TrainProjectionEffect] r) =>
  InterpreterFor TrainEffect r
interceptTrainEffectEvents =
  interpret $
    \case
      TrainCreate
        departureDate'@(DepartureDate departureDate)
        departureStation'@(DepartureStation departureStation)
        arrivalStation'@(ArrivalStation arrivalStation) -> do
          newTrainId <- embed $ TrainId' <$> randomRIO (1000000, 9999999)
          eventStored <-
            storeEvent
              (StreamId newTrainId.unTrainId')
              (EventNumber 0)
              TrainCreated
                { departureDate = departureDate,
                  departureStation = departureStation,
                  arrivalStation = arrivalStation
                }
          case eventStored of
            Left e -> throw $ EventStoreIAE e
            Right x -> do
              trainProjectionCreate newTrainId departureDate' departureStation' arrivalStation'
              return newTrainId
      TrainFetch trainId ->
        trainProjectionFetch trainId

This was a first step, however it's temporal coupling, one of the worst type of coupling.

It means that we have established correlation (things happen together), not causation (things happen because they are linked by the relationships).

I'll rehash just to make the point: if tests are green, it's merely coincidental.

Your code will break (or force to change) if you:

  • Add/change a view (model)/query/projection
  • Reorder events persistence
  • Add event persistence elsewhere

Instead of calling legacy effects as backup, we can create an interceptor in charge of it:

apiEventProjection ::
  (Members '[TrainProjectionEffect, BookingProjectionEffect] r) =>
  StreamId ->
  TrainEvent ->
  Sem r ()
apiEventProjection (StreamId streamId) =
  \case
    event@(TrainCreated {}) ->
      trainProjectionCreate
        trainId
        (DepartureDate event.departureDate)
        (DepartureStation event.departureStation)
        (ArrivalStation event.arrivalStation)
    event@(BookingCreated {}) ->
      bookingProjectionCreate trainId (BookingId' $ fromIntegral event.id) event.travelerName
    event@(BookingWithdrawn {}) ->
      bookingProjectionDelete (BookingId' $ fromIntegral event.id)
  where
    trainId = TrainId' streamId

For each event, we will perform an action to "synchronize" a stateful value.

It aims to be used with this interceptor:

interceptEventStoreWith ::
  forall event a r.
  (StreamId -> event -> Sem r ()) ->
  Sem (EventStore event ': r) a ->
  Sem (EventStore event ': r) a
interceptEventStoreWith f =
  intercept $
    \case
      StoreEvent streamId eventNumber event -> do
        result <- storeEvent streamId eventNumber event
        forM_ result $ \() ->
          raise $ f streamId event
        return result
      FetchEvents streamId ->
        fetchEvents streamId

It works as follows:

  • Fallbacks on the real EventStore effect to persist the event
  • If the persistence succeeded, run the callback to perform the projection
  • Return the result

Note: it is a really simple implementation, in the previous real world implementations I have done, I have relied on diffs (i.e. pushing multiples events and making diffs on local state, so it can be persisted independently).

We can drop the calls from our interpreter:

interceptTrainEffectEvents ::
  forall r.
  (Members '[EventStore TrainEvent, Embed IO, Error InternalApiError, TrainProjectionEffect] r) =>
  InterpreterFor TrainEffect r
interceptTrainEffectEvents =
  interpret $
    \case
      TrainCreate
        (DepartureDate departureDate)
        (DepartureStation departureStation)
        (ArrivalStation arrivalStation) -> do
          newTrainId <- embed $ TrainId' <$> randomRIO (1000000, 9999999)
          eventStored <-
            storeEvent
              (StreamId newTrainId.unTrainId')
              (EventNumber 0)
              TrainCreated
                { departureDate = departureDate,
                  departureStation = departureStation,
                  arrivalStation = arrivalStation
                }
          case eventStored of
            Left e -> throw $ EventStoreIAE e
            Right x -> return newTrainId
      TrainFetch trainId ->
        trainProjectionFetch trainId

and add it to our interpreters stacks:

runPersistent :: ConnectionPool -> EffectsRunner a
runPersistent pool =
  liftIO
    . join
    . fmap (either (throwIO . iaeToYesod) return)
    . runM
    . runError
    . runEmbedded (flip runSqlPool pool)
    . interpretBookingEffectPersistent
    . interpretTrainProjectionEffectPersistent
    . interpretEventStorePersistent
    . interceptEventStoreWith apiEventProjection
    . interpretBookingEffectEvents
    . interceptTrainEffectEvents

Doing so allows us to add/change our views without impacting our main, event-sourced interpreters.