Building a minimalistic time-sorted Event Store using a key-value database in Go

Event sourcing

Lately, I’ve been looking into the Event Sourcing pattern as a way to make a simple auditable data storage system.

In a traditional database system, state is often stored as a collection of related tables or as documents. You can query this state and mutate it at will, but problems arise when you want to find out something about previous states of your database. This requires keeping track of changes to tables or documents which often inspires hacky solutions.

The Event Sourcing pattern turns this on its head. Instead of storing only the current state of the database, you store all mutations that led to this state in chronological order. Each of these mutations, called events, leads to a specific change to the database state. The exact nature of this change is often formalised in a function that takes the current state of the data and the event to produce the new state. ‘Replaying’ all these events in chronological order allows us to reconstruct all states the database was in at any point in time.

Event Sourcing is part of a new wave of software design patterns that emphasise immutable data structures and functional programming to help maintain developer sanity when building complex, data-rich applications. Cool generalisations of this pattern can be found in a lot of software nowadays, from frontend UI development (see the Flux pattern, and its popular Redux implementation) to database design.

Event sourcing does present a few challenges, though, one of which being the storage of events to ensure previous database states are always reproducible. Events are usually stored along with a timestamp and a unique identifier. This does not necessarily require a complex data structure, as long as you can replay events in chronological order. However, the longer an Event Sourcing-based application exists the more events it will have generated. Depending on your design, this may put some serious strain on your storage implementation. Storing events in a single table of a relational database, for example, may cause performance issues down the road when your event store gets too bloated.

While NoSQL technologies may help with these performance considerations, you may simply not like the idea of having to include a full-fledged database server in your minimalistic application. I’m a big fan of simple embedded key-value databases myself. In this post, I’ll discuss how to use Badger, a fast key-value database written in pure Go, as a simple event store.

Badger DB as a key value store

Badger is based on a data structure called a Log-structured merge-tree (LSM). The LSM-based design comes with O(1) inserts and O(n) lookups. Most importantly, however, it allows for sorted iteration over the keys. This is important, as using a ‘flat’ key-value store means we can no longer rely on a fancy query language like SQL to give us our events in chronological order when we want to replay the event stream. Moreover, we want to be able to query individual events by their ID, so simply numbering the events and storing them in for example a file is not an option (disregarding the reliability nightmare of that approach).

ULIDs as lexicographically sortable keys

In Badger, keys are kept sorted in the LSM tree in byte-wise lexicographic sort order. This means that if we want to iterate over our keys in chronological order, we need to somehow map chronological order to byte-wise lexicographic order. Numbering the events and using integers as IDs wont do.

This is where Universally Unique Lexicographically Sortable Identifiers (ULIDs) come in. An ULID is a 128-bit unique identifier that can be created without central coordination just like its more well-known cousin the UUID. Contrary to the UUID, an ULID is not fully random but instead derives its first 48 bits from a timestamp. This timestamp consists of the UNIX time in milliseconds encoded as a 48-bit integer and ensures the ULID’s lexicographical sortabilty. The remaining 80 bits are derived from an entropy source (i.e. are ‘random’), ensuring that the odds of creating a duplicate ULID are negligible.

An ULID is often encoded as a 26 character string using Crockford’s base32, making them look like this: 01ARZ3NDEKTSV4RRFFQ69G5FAV. The absence of special characters in this encoding has the nice added benefit of being URL-safe.

Writing a monotonic ULID source

To use ULIDs, we first need an implementation of the algorithm through which they are generated. There is a nice Go library that implements the ULID spec almost to the letter (github.com/oklog/ulid).

There is one problem with this library, though: it does not properly guarantee the right sort order for ULIDs created in the same millisecond. As only the 48-bit timestamp part of the ULID facilitates its sortability, you can imagine that having the exact same timestamp leads to problems in the sort order.

As exact sortability of the ULIDs in order of creation is crucial for our use case, we need a workaround to ensure that ULID generation leads to a perfectly monotonic sort order.

We can do this by simply butchering the timestamp and faking that the ULID is created in the next millisecond, but that gets messy for obvious reasons. Instead, it is neater to create a wrapper around the ULID generation function that increments the entropy when it needs to create multiple ULIDs with the same millisecond timestamp (as discussed in this PR).

For this, we are going to need a stateful ULID source that can look something like this:

type MonotonicULIDsource struct {
	sync.Mutex // mutex to allow clean concurrent access
	entropy  *rand.Rand // the entropy source
	lastMs   uint64 // the last millisecond timestamp it encountered
	lastULID ulid.ULID // the last ULID it generated using "github.com/oklog/ulid"
}

func NewMonotonicULIDsource(entropy *rand.Rand) *MonotonicULIDsource {

	// get an initial ULID to kick the monotonic generation off with
	inital, err := ulid.New(ulid.Now(), entropy)
	if err != nil {
		panic(err)
	}

	return &MonotonicULIDsource{
		entropy:  entropy,
		lastMs:   0,
		lastULID: inital,
	}
}

This ULID source uses the following method to generate a new ULID given a timestamp:

func (u *MonotonicULIDsource) New(t time.Time) (ulid.ULID, error) {
	u.Lock()
	defer u.Unlock()

	ms := ulid.Timestamp(t)
	var err error
	if ms > u.lastMs {
		u.lastMs = ms
		u.lastULID, err = ulid.New(ms, u.entropy)
		return u.lastULID, err
	}

	// if the ms are the same, increment the entropy part of the last ULID 
    // and use it as the entropy part of the new ULID.
	incrEntropy := incrementBytes(u.lastULID.Entropy())
	var dup ulid.ULID
	dup.SetTime(ms)
	if err := dup.SetEntropy(incrEntropy); err != nil {
		return dup, err
	}
	u.lastULID = dup
	u.lastMs = ms
	return dup, nil
}

To actually increment the bytes of entropy, we use the following function:

func incrementBytes(in []byte) []byte {
	const (
		minByte byte = 0
		maxByte byte = 255
	)
    
    // copy the byte slice
	out := make([]byte, len(in))
	copy(out, in)

    // iterate over the byte slice from right to left
    // most significant byte == first byte (big-endian)
	leastSigByteIdx := len(out) - 1
	mostSigByteIdex := 0
	for i := leastSigByteIdx; i >= mostSigByteIdex; i-- {
        
        // If its value is 255, rollover back to 0 and try the next byte.
		if out[i] == maxByte {
			out[i] = minByte
			continue
		}
        // Else: increment.
		out[i]++
		return out
	}
	// panic if the increments are exhausted
	panic(errOverflow)
}

Using ULIDs with BadgerDB

Next, let’s see how to use the ULIDs with Badger. Let’s generate some ‘events’ in a known order and see if we can retrieve them by iterating over the Badger LSM tree.

We use the following fake events to simulate our event store:

type FakeEvent struct {
    Number    int // we also add a number to keep track of the ground-truth creation order
    Timestamp time.Time
    ID        ulid.ULID
}

Generating and inserting simulated events

We generate a slice of FakeEvent instances as follows:

// reproducible entropy source
entropy := rand.New(rand.NewSource(time.Unix(1000000, 0).UnixNano()))

// sub-ms safe ULID generator
ulidSource := NewMonotonicULIDsource(entropy)

// generate fake events that contain a ground-truth sorting order and a ULID
var events []FakeEvent
for i := 0; i < 100000; i++ {
    now := time.Now()
    id, _ := ulidSource.New(now)
    ev := FakeEvent{i, now, id}
    events = append(events, ev)
}

Lets shuffle the event slice to make sure the order in which the events are inserted into Badger is not relevant.

entropy.Shuffle(len(events), func(i, j int) {
    events[i], events[j] = events[j], events[i]
})

Open a Badger database instance in the working directory:

tmpDir := "./badgerdb_tmp"
opts := badger.DefaultOptions
opts.Dir = tmpDir
opts.ValueDir = tmpDir
db, _ := badger.Open(opts)
defer db.Close()

Now we can import the events into a BadgerDB instance. Note that we may use multiple transactions depending on how many fake events we generated and the page size we provisioned for the Badger instance.

// open a database transaction
txn := db.NewTransaction(true)

for _, e := range events {

	// serialize the event payload (to JSON for simplicity)
	eSerial, err := json.Marshal(e)
	if err != nil {
		panic(err)
	}

	// serialize the ULID to its binary form
	binID, err := e.ID.MarshalBinary()
	if err != nil {
		panic(err)
	}

	// add the insert operation to the transaction
	// and open a new transaction if this one is full
	if err := txn.Set(binID, []byte(eSerial)); err == badger.ErrTxnTooBig {
		if err := txn.Commit(nil); err != nil {
			panic(err)
		}
		txn = db.NewTransaction(true)
		if err := txn.Set(binID, []byte(eSerial)); err != nil {
			panic(err)
		}
	}
}

// flush the transaction
if err := txn.Commit(nil); err != nil {
	panic(err)
}

Retrieving the simulated events in order of creation

We can retrieve the events as follows:

// validate badger iteration order is equal to original creation order
var retrieved []FakeEvent
if err := db.View(func(txn *badger.Txn) error {

	// create a Badger iterator with the default settings
	opts := badger.DefaultIteratorOptions
	opts.PrefetchSize = 10
	it := txn.NewIterator(opts)
	defer it.Close()

	// have the iterator walk the LMB tree
	for it.Rewind(); it.Valid(); it.Next() {
		item := it.Item()
		// k := item.Key()
		v, err := item.Value()
		if err != nil {
			panic(err)
		}

		// deserialize the fake events and store them
		var des FakeEvent
		err = json.Unmarshal(v, &des)
		if err != nil {
			panic(err)
		}

		retrieved = append(retrieved, des)

	}
	return nil
}); err != nil {
	panic(err)
}

Lets check that iterating the Badger LMB tree yielded the events in the right order using the ground-truth numbers we assigned to each event:

success := sort.SliceIsSorted(retrieved, func(i, j int) bool {
	return retrieved[i].Number < retrieved[j].Number
})

fmt.Println(success)

Thats it!