This post describes my implementation of concurrent manual compactions in Pebble during my internship at Cockroach Labs in 2022.
Before we dive into the details of the project, let’s first understand the storage engine of Pebble, a key-value store inspired by RocksDB/LevelDB and the storage engine for CockroachDB.
The core of Pebble is a log-structured merge tree (LSM) with six levels, and an in-memory data structure called the memtable that sits above.
New entries or edits are created as new files, so we can perform sequential IO rather than random IO (faster). These files are called SSTables (Sorted String Tables) and are a list of consecutive key-value pairs sorted in key order and are immutable.
For the sake of explanation, let’s consider a SET
and READ
operation.
SET
:
READ
:
Ideally, we want to keep our LSM in a pyramid shape (▲) to minimize:
To achieve this, we want to combine redundant SSTs and push them to lower levels; this is done through compactions.
Compactions are done both automatically and manually, where the former is done in the background and the latter is ad-hoc. We will be discussing manual compactions.
When a workload adds files to L0 faster than they can be compacted out, a large number of files accumulate in L0 and lead to an inverted LSM. A manual compaction can be used in this scenario to help return the LSM to a pyramid shape.
CockroachDB allows running manual compactions either through the SQL shell or through the Pebble instance directly.
Online (Blocking SQL command):
SELECT crdb_internal.compact_engine_span(<nodeID>, <storeID>, decode('00', 'hex'), decode('FFFFFFFF', 'hex'));
Offline (node is taken offline):
./cockroach debug compact
Let's consider a simple example of a manual compaction:
In the above sequence of images, you can see the manual compaction proceed from L0 to L1, L1 to L2, and so on. While compacting, the size of the LSM reduces as we combine redundant keys and push them to lower levels.
Before my changes, manual compactions were done serially level by level, i.e. L0 to L1, then L1 to L2, etc. When a node is taken offline, we don't need to be considerate for any foreground traffic – thus allowing us to make full use of the available system resources.
As an anecdote, during this project, we encountered a customer who was manually compacting a database several petabytes in size. The process had been running for 6 days(!), highlighting the urgency of the issue.
The main performance optimization opportunity was to parallelize the compactions. To do this, we'd need to:
By segmenting the key ranges, we can ensure that only one concurrent compaction is operating (reading/writing) on a particular key range at a time.
Here is a stub of the method we will incrementally fill in:
func (d *DB) manualCompact(start, end []byte, level int, parallelize bool) error { // 1. Split the compaction range into non-overlapping key ranges // 2. Execute the compactions in parallel // 3. Synchronize their completion }
Pebble has a function to calculate the distinct key ranges across 2 levels: calculateInuseKeyRanges
(implementation).
With that, we can split up the compaction range of a particular level into non-overlapping key ranges:
// splitManualCompaction splits a manual compaction over [start,end] on level // such that the resulting compactions have no key overlap. // // d.mu must be held when calling this. func (d *DB) splitManualCompaction( start, end []byte, level int, ) (splitCompactions []*manualCompaction) { curr := d.mu.versions.currentVersion() endLevel := level + 1 if level == 0 { // If we are compacting from L0, compact to Lbase instead of level+1 endLevel = d.mu.versions.picker.getBaseLevel() } // Get non-overlapping key ranges keyRanges := calculateInuseKeyRanges(curr, d.cmp, level, endLevel, start, end) for _, keyRange := range keyRanges { splitCompactions = append(splitCompactions, &manualCompaction{ level: level, // Use a channel to signal completion, more on this later! done: make(chan error, 1), start: keyRange.Start, end: keyRange.End, split: true, }) } return splitCompactions }
We call the above method in db.manualCompact
:
func (d *DB) manualCompact(start, end []byte, level int, parallelize bool) error { d.mu.Lock() var compactions []*manualCompaction if parallelize { // Get non-overlapping compactions compactions = append(compactions, d.splitManualCompaction(start, end, level)...) } else { compactions = append(compactions, &manualCompaction{ level: level, done: make(chan error, 1), start: start, end: end, }) } // Execute the compactions... }
We now have multiple compactions for a single level that don't have overlapping key ranges, let's execute them!
To execute the compactions, we add them to a queue and then attempt to schedule them for execution:
func (d *DB) manualCompact(start, end []byte, level int, parallelize bool) error { d.mu.Lock() var compactions []*manualCompaction // non-overlapping compactions from previous step d.mu.compact.manual = append(d.mu.compact.manual, compactions...) d.maybeScheduleCompaction() d.mu.Unlock() }
maybeScheduleCompaction
will schedule the compactions for execution.
pickManual
returns a *pickedCompaction
representing a compaction that has been picked for execution,
with verified compaction input/output levels and protection against running a conflicting compaction.
That *pickedCompaction
is then turned into a *compaction
which can be directly executed by Pebble.
// maybeScheduleCompaction schedules a compaction if necessary // // d.mu must be held when calling this. func (d *DB) maybeScheduleCompaction() { ... for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < d.opts.MaxConcurrentCompactions { manual := d.mu.compact.manual[0] // Check if we can run the compaction pc, retryLater := d.mu.versions.picker.pickManual(env, manual) if pc != nil { c := newCompaction(pc, d.opts, env.bytesCompacted) // Pop from the queue d.mu.compact.manual = d.mu.compact.manual[1:] // Add to currently executing compactions d.addInProgressCompaction(c) // Concurrently execute the compaction go d.compact(c, manual.done) } else if !retryLater { // Drop the compaction d.mu.compact.manual = d.mu.compact.manual[1:] manual.done <- nil // Don't forget to send to the channel! } else { ... } } ... }
d.compact
runs the picked concurrent manual compaction and sends to the done
channel to signal completion.
With manual compactions we need to ensure all compactions for a particular level complete before proceeding to the next level. As mentioned above we use a go channel to accomplish this.
This completes our manualCompact
function:
// Compacts the given level of the LSM between start and end. func (d *DB) manualCompact(start, end []byte, level int, parallelize bool) error { d.mu.Lock() // If level is empty, return early var compactions []*manualCompaction if parallelize { // Get non-overlapping compactions compactions = append(compactions, d.splitManualCompaction(start, end, level)...) } else { compactions = append(compactions, &manualCompaction{ level: level, done: make(chan error, 1), start: start, end: end, }) } d.mu.compact.manual = append(d.mu.compact.manual, compactions...) d.maybeScheduleCompaction() d.mu.Unlock() // Each of the channels is guaranteed to be eventually sent to once. After a // compaction is possibly picked in d.maybeScheduleCompaction(), either the // compaction is dropped, executed after being scheduled, or retried later. // Assuming eventual progress when a compaction is retried, all outcomes send // a value to the done channel. Since the channels are buffered, it is not // necessary to read from each channel, and so we can exit early in the event // of an error. for _, compaction := range compactions { if err := <-compaction.done; err != nil { return err } } return nil }
Finally, we run the concurrent manual compactions level by level in the main compaction method:
// Compacts the entire LSM between start and end. func (d *DB) Compact(start, end []byte, parallelize bool) error { d.mu.Lock() // Find Lbase (lowest non-empty level of LSM) maxLevelWithFiles := 1 for level := 0; level < numLevels; level++ { overlaps := // if SSTs in `level` have overlap with start-end if !overlaps.Empty() { maxLevelWithFiles = level + 1 } } // Determine if any memtable overlaps with the compaction range. We wait for // any such overlap to flush (initiating a flush if necessary). d.mu.Unlock() for level := 0; level < maxLevelWithFiles; { if err := d.manualCompact(iStart.UserKey, iEnd.UserKey, level, parallelize); err != nil { return err } level++ if level == numLevels-1 { // A manual compaction of the bottommost level occurred. // There is no next level to try and compact. break } } return nil }
While it would have been ideal to run these benchmarks programmatically, the precondition for the use of a manual compaction is an inverted LSM.
Thus, I seeded an LSM with the YCSB F workload which inserts keys following a zipf distribution (higher frequency of key ranges). Workload F is a write-heavy workload and is known to lead to an inverted LSM when using a large number of concurrent writers.
Here is the LSM after seeding:
__level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___write(sz_cnt)____read___r-amp___w-amp WAL 3 138 M - 0 B - - - - 138 M - - - 0.0 0 69428 84 G 3.02 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 1 0 0 B 0.00 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 2 2478 9.7 G 5.45 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 3 1975 12 G 5.72 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 4 1497 15 G 5.73 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 5 961 18 G 0.87 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 6 703 15 G - 0 B 0 B 0 0 B 0 0 B 0 0 B 0 0.0 total 77042 154 G - 138 M 0 B 0 0 B 0 138 M 0 0 B 0 1.0 flush 0 compact 0 583 G 0 B 0 (size == estimated-debt, score = in-progress-bytes, in = num-in-progress) ctype 0 0 0 0 0 (default, delete, elision, move, read) memtbl 39 144 M zmemtbl 0 0 B ztbl 0 0 B bcache 0 0 B 0.0% (score == hit-rate) tcache 0 0 B 0.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 0 filter - - 0.0% (score == utility)
You can see that L0 alone makes up more than half of the total size of the LSM. This is a clear indication of an inverted LSM.
After seeding an inverted LSM of around 150 GB, I ran the manual compactions with and without parallelization:
Implementation | Times |
---|---|
Serial | Real: 550m32.809s User: 589m52.219s Sys: 36m25.205s |
Concurrent | Real: 384m56.152s User: 373m0.258s Sys: 28m25.139s |
The results were promising with a roughly 30% improvement in compaction time!
While the performance improvement is significant, the compaction logs when benchmarking revealed that the majority of the time is spent in L0 to Lbase compactions. My implementation was extended to use L0 sublevels to further parallelize compactions in L0, yielding an additional 92% improvement in overall compaction time! Kudos to Arjun Nair for the follow-up work.
All in all, this project was my first large contribution to CockroachDB and the performance improvements were significant and impactful. Compactions are still an active work area in Pebble and I’m excited to see the future optimizations that the team will come up with.