Skip to content

Commit

Permalink
KAFKA-15599: Move SegmentPosition/TimingWheelExpirationService to raf…
Browse files Browse the repository at this point in the history
…t module (#18094)

Reviewers: Divij Vaidya <[email protected]>, Jason Taylor <[email protected]>
  • Loading branch information
mimaison authored Jan 8, 2025
1 parent 0c435e3 commit d1aa370
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 72 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.storage.log.FetchIsolation
Expand Down Expand Up @@ -81,7 +81,7 @@ final class KafkaMetadataLog private (

new LogOffsetMetadata(
fetchInfo.fetchOffsetMetadata.messageOffset,
Optional.of(SegmentPosition(
Optional.of(new SegmentPosition(
fetchInfo.fetchOffsetMetadata.segmentBaseOffset,
fetchInfo.fetchOffsetMetadata.relativePositionInSegment))
)
Expand Down Expand Up @@ -155,7 +155,7 @@ final class KafkaMetadataLog private (
val endOffsetMetadata = log.logEndOffsetMetadata
new LogOffsetMetadata(
endOffsetMetadata.messageOffset,
Optional.of(SegmentPosition(
Optional.of(new SegmentPosition(
endOffsetMetadata.segmentBaseOffset,
endOffsetMetadata.relativePositionInSegment)
)
Expand Down Expand Up @@ -226,7 +226,7 @@ final class KafkaMetadataLog private (
override def highWatermark: LogOffsetMetadata = {
val hwm = log.fetchOffsetSnapshot.highWatermark
val segmentPosition: Optional[OffsetMetadata] = if (!hwm.messageOffsetOnly) {
Optional.of(SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment))
Optional.of(new SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment))
} else {
Optional.empty()
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog}
import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog, TimingWheelExpirationService}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.Feature
import org.apache.kafka.server.common.serialization.RecordSerde
Expand Down
63 changes: 0 additions & 63 deletions core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.raft
package org.apache.kafka.raft;

import org.apache.kafka.raft.OffsetMetadata
public record SegmentPosition(long baseOffset, int relativePosition) implements OffsetMetadata {

case class SegmentPosition(baseOffset: Long, relativePosition: Int) extends OffsetMetadata {
override def toString: String = s"(segmentBaseOffset=$baseOffset,relativePositionInSegment=$relativePosition)"
@Override
public String toString() {
return "(segmentBaseOffset=" + baseOffset + ",relativePositionInSegment=" + relativePosition + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;

import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.server.util.ShutdownableThread;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;

import java.util.concurrent.CompletableFuture;

public class TimingWheelExpirationService implements ExpirationService {

private static final long WORK_TIMEOUT_MS = 200L;

private final ExpiredOperationReaper expirationReaper;
private final Timer timer;

public TimingWheelExpirationService(Timer timer) {
this.timer = timer;
this.expirationReaper = new ExpiredOperationReaper();
expirationReaper.start();
}

@Override
public <T> CompletableFuture<T> failAfter(long timeoutMs) {
TimerTaskCompletableFuture<T> task = new TimerTaskCompletableFuture<>(timeoutMs);
task.future.whenComplete((t, throwable) -> task.cancel());
timer.add(task);
return task.future;
}

public void shutdown() throws InterruptedException {
expirationReaper.shutdown();
}

private static class TimerTaskCompletableFuture<T> extends TimerTask {

private final CompletableFuture<T> future = new CompletableFuture<>();

TimerTaskCompletableFuture(long delayMs) {
super(delayMs);
}

@Override
public void run() {
future.completeExceptionally(new TimeoutException("Future failed to be completed before timeout of " + delayMs + " ms was reached"));
}
}

private class ExpiredOperationReaper extends ShutdownableThread {

ExpiredOperationReaper() {
super("raft-expiration-reaper", false);
}

@Override
public void doWork() {
try {
timer.advanceClock(WORK_TIMEOUT_MS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

0 comments on commit d1aa370

Please sign in to comment.