Skip to content

Commit

Permalink
KAFKA-18326: fix merge iterator with cache tombstones (#18287)
Browse files Browse the repository at this point in the history
See https://issues.apache.org/jira/browse/KAFKA-18326 for more information. The main bug here is that in the old implementation, deleted cache entries would be skipped so long as they didn't equal the next store key, which resulted in potentially skipping tombstones for future keys in the store.

Reviewers: Guozhang Wang <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>
  • Loading branch information
agavra authored Jan 8, 2025
1 parent 639bb51 commit 26771a6
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,28 @@ private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFro
public boolean hasNext() {
// skip over items deleted from cache, and corresponding store items if they have the same key
while (cacheIterator.hasNext() && isDeletedCacheEntry(cacheIterator.peekNext())) {
if (storeIterator.hasNext()) {
final KS nextStoreKey = storeIterator.peekNextKey();
// advance the store iterator if the key is the same as the deleted cache key
if (compare(cacheIterator.peekNextKey(), nextStoreKey) == 0) {
storeIterator.next();
}
if (!storeIterator.hasNext()) {
// if storeIterator is exhausted, we can just skip over every tombstone
// in the cache since they don't shadow any valid key
cacheIterator.next();
continue;
}

final KS nextStoreKey = storeIterator.peekNextKey();
final int compare = compare(cacheIterator.peekNextKey(), nextStoreKey);

if (compare == 0) {
// next cache entry is a valid tombstone for the next store key
storeIterator.next();
cacheIterator.next();
} else if (compare < 0) {
// cache has a tombstone for an entry that doesn't exist in the store
cacheIterator.next();
} else {
// store iterator has a valid entry, but we should not advance the cache
// iterator because it may still shadow a future store key
return true;
}
cacheIterator.next();
}

return cacheIterator.hasNext() || storeIterator.hasNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,51 @@ public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() {
assertFalse(createIterator().hasNext());
}

@Test
public void shouldIterateCacheOnly() {
final byte[][] bytes = {{0}, {1}, {2}};
for (final byte[] aByte : bytes) {
cache.put(namespace, Bytes.wrap(aByte), new LRUCacheEntry(aByte));
}

try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) {
assertArrayEquals(bytes[0], iterator.next().key.get());
assertArrayEquals(bytes[1], iterator.next().key.get());
assertArrayEquals(bytes[2], iterator.next().key.get());
assertFalse(iterator.hasNext());
}
}

@Test
public void shouldIterateStoreOnly() {
final byte[][] bytes = {{0}, {1}, {2}};
for (final byte[] aByte : bytes) {
store.put(Bytes.wrap(aByte), aByte);
}

try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) {
assertArrayEquals(bytes[0], iterator.next().key.get());
assertArrayEquals(bytes[1], iterator.next().key.get());
assertArrayEquals(bytes[2], iterator.next().key.get());
assertFalse(iterator.hasNext());
}
}

@Test
public void shouldSkipAllDeletedFromCache() {
final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
for (final byte[] aByte : bytes) {
final Bytes aBytes = Bytes.wrap(aByte);
store.put(aBytes, aByte);
cache.put(namespace, aBytes, new LRUCacheEntry(aByte));
}

cache.put(namespace, Bytes.wrap(new byte[]{-1}), new LRUCacheEntry(null));
cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
cache.put(namespace, Bytes.wrap(bytes[2]), new LRUCacheEntry(null));
cache.put(namespace, Bytes.wrap(bytes[3]), new LRUCacheEntry(null));
cache.put(namespace, Bytes.wrap(bytes[8]), new LRUCacheEntry(null));
cache.put(namespace, Bytes.wrap(bytes[11]), new LRUCacheEntry(null));
cache.put(namespace, Bytes.wrap(new byte[]{14}), new LRUCacheEntry(null));

try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) {
assertArrayEquals(bytes[0], iterator.next().key.get());
Expand All @@ -174,6 +206,13 @@ public void shouldSkipAllDeletedFromCache() {
}
}

@Test
public void shouldNotHaveNextIfBothIteratorsInitializedEmpty() {
try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) {
assertFalse(iterator.hasNext());
}
}

@Test
public void shouldPeekNextKey() {
final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore("one");
Expand Down

0 comments on commit 26771a6

Please sign in to comment.