Skip to content

Commit

Permalink
MINOR: add testRestoreCompactedDeletedConnector back to KafkaConfigBa…
Browse files Browse the repository at this point in the history
…ckingStoreTest (#18392)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
FrankYang0529 authored Jan 8, 2025
1 parent 058f0a9 commit a97fb66
Showing 1 changed file with 53 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -174,7 +175,8 @@ public class KafkaConfigBackingStoreTest {
.put("state.v2", "STOPPED");
private static final List<Struct> CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList(
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6),
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9),
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 2)
);

// The exact format doesn't matter here since both conversions are mocked
Expand Down Expand Up @@ -818,6 +820,56 @@ public void testRestoreZeroTasks() {
verify(configLog).stop();
}

@Test
public void testRestoreCompactedDeletedConnector() {
// When a connector is deleted, we emit a tombstone record for its config (with key
// "connector-<name>") and its target state (with key "target-state-<name>"), but not
// for its task configs
// As a result, we need to carefully handle the case where task configs are present in
// the config topic for a connector, but there is no accompanying config for the
// connector itself

int offset = 0;
List<ConsumerRecord<String, byte[]>> existingRecords = List.of(
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(2));
logOffset = offset;
expectStart(existingRecords, deserialized);
when(configLog.partitionCount()).thenReturn(1);

configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();

// Should see no connectors and no task configs
ClusterConfigState configState = configStorage.snapshot();
assertEquals(Set.of(), configState.connectors());
assertEquals(0, configState.taskCount(CONNECTOR_1_NAME));
assertNull(configState.rawTaskConfig(TASK_IDS.get(0)));
assertNull(configState.rawTaskConfig(TASK_IDS.get(1)));

// Probe internal collections just to be sure
assertEquals(Map.of(), configState.connectorConfigs);
assertEquals(Map.of(), configState.taskConfigs);
assertEquals(Map.of(), configState.connectorTaskCounts);

// Exception: we still include task count records, for the unlikely-but-possible case
// where there are still zombie instances of the tasks for this long-deleted connector
// running somewhere on the cluster
assertEquals(2, (int) configState.taskCountRecord(CONNECTOR_1_NAME));
}

@Test
public void testRecordToRestartRequest() {
ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
Expand Down

0 comments on commit a97fb66

Please sign in to comment.