Skip to content

Commit

Permalink
MINOR: Collection/Option usage simplification via methods introduced …
Browse files Browse the repository at this point in the history
…in Java 9 & 11 (#18305)

Relevant methods:
1. `List.of`, `Set.of`, `Map.of` and similar (introduced in Java 9)
2. Optional: `isEmpty` (introduced in Java 11), `stream` (introduced in Java 9).

Reviewers: Mickael Maison <[email protected]>
  • Loading branch information
ijuma authored Jan 4, 2025
1 parent ca511cd commit 409a43e
Show file tree
Hide file tree
Showing 87 changed files with 194 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ public long maybeUpdate(long now) {
return metadataTimeout;
}

if (!metadataAttemptStartMs.isPresent())
if (metadataAttemptStartMs.isEmpty())
metadataAttemptStartMs = Optional.of(now);

// Beware that the behavior of this method and the computation of timeouts for poll() are
Expand Down Expand Up @@ -1412,7 +1412,7 @@ private long maybeUpdate(long now, Node node) {
if (canSendRequest(nodeConnectionId, now)) {
Optional<AbstractRequest.Builder<?>> requestOpt = clientTelemetrySender.createRequest();

if (!requestOpt.isPresent())
if (requestOpt.isEmpty())
return Long.MAX_VALUE;

AbstractRequest.Builder<?> request = requestOpt.get();
Expand Down
8 changes: 1 addition & 7 deletions clients/src/main/java/org/apache/kafka/common/Uuid.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -51,11 +49,7 @@ public class Uuid implements Comparable<Uuid> {
/**
* The set of reserved UUIDs that will never be returned by the randomUuid method.
*/
public static final Set<Uuid> RESERVED = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
METADATA_TOPIC_ID,
ZERO_UUID,
ONE_UUID
)));
public static final Set<Uuid> RESERVED = Set.of(ZERO_UUID, ONE_UUID);

private final long mostSignificantBits;
private final long leastSignificantBits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public boolean hasExpired() {
}

synchronized List<KafkaMetric> metrics() {
return unmodifiableList(new ArrayList<>(this.metrics.values()));
return List.copyOf(this.metrics.values());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.jose4j.keys.resolvers.VerificationKeyResolver;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -57,7 +55,7 @@ public static AccessTokenValidator create(Map<String, ?> configs,
List<String> l = cu.get(SASL_OAUTHBEARER_EXPECTED_AUDIENCE);

if (l != null)
expectedAudiences = Collections.unmodifiableSet(new HashSet<>(l));
expectedAudiences = Set.copyOf(l);

Integer clockSkew = cu.validateInteger(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, false);
String expectedIssuer = cu.validateString(SASL_OAUTHBEARER_EXPECTED_ISSUER, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstance
groupInstanceId,
retryBackoffMs,
retryBackoffMaxMs,
!groupInstanceId.isPresent());
groupInstanceId.isEmpty());
}

@AfterEach
Expand Down Expand Up @@ -4135,7 +4135,7 @@ private static class RackAwareAssignor extends MockPartitionAssignor {
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
subscriptions.forEach((consumer, subscription) -> {
if (!subscription.rackId().isPresent())
if (subscription.rackId().isEmpty())
throw new IllegalStateException("Rack id not provided in subscription for " + consumer);
rackIds.add(subscription.rackId().get());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ private KafkaMetric getMetric(String name, Map<String, String> tags) throws Exce
.filter(entry ->
entry.getKey().name().equals(name) && entry.getKey().tags().equals(tags))
.findFirst();
if (!metric.isPresent())
if (metric.isEmpty())
throw new Exception(String.format("Could not find metric called %s with tags %s", name, tags.toString()));

return metric.get().getValue();
Expand Down Expand Up @@ -1112,7 +1112,7 @@ private KafkaMetric getMetric(String name) throws Exception {
Optional<Map.Entry<MetricName, KafkaMetric>> metric = metrics.metrics().entrySet().stream()
.filter(entry -> entry.getKey().name().equals(name))
.findFirst();
if (!metric.isPresent())
if (metric.isEmpty())
throw new Exception(String.format("Could not find metric called %s", name));

return metric.get().getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public Schema valueSchema() {
public Schema build() {
return new ConnectSchema(type, isOptional(), defaultValue, name, version, doc,
parameters == null ? null : Collections.unmodifiableMap(parameters),
fields == null ? null : Collections.unmodifiableList(new ArrayList<>(fields.values())), keySchema, valueSchema);
fields == null ? null : List.copyOf(fields.values()), keySchema, valueSchema);
}

/**
Expand All @@ -441,4 +441,4 @@ private static void checkNotNull(String fieldName, Object val, String fieldToSet
if (val == null)
throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " must be specified to set " + fieldToSet);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.kafka.connect.mirror.MirrorUtils.adminCall;

Expand Down Expand Up @@ -196,7 +195,7 @@ Map<TopicPartition, Checkpoint> checkpointsForGroup(Map<TopicPartition, OffsetAn
return upstreamGroupOffsets.entrySet().stream()
.filter(x -> shouldCheckpointTopic(x.getKey().topic())) // Only perform relevant checkpoints filtered by "topic filter"
.map(x -> checkpoint(group, x.getKey(), x.getValue()))
.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs
.flatMap(o -> o.stream()) // do not emit checkpoints for partitions that don't have offset-syncs
.filter(x -> x.downstreamOffset() >= 0) // ignore offsets we cannot translate accurately
.filter(this::checkpointIsMoreRecent) // do not emit checkpoints for partitions that have a later checkpoint
.collect(Collectors.toMap(Checkpoint::topicPartition, Function.identity()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,7 @@ public class MirrorMaker {

private static final long SHUTDOWN_TIMEOUT_SECONDS = 60L;

public static final List<Class<?>> CONNECTOR_CLASSES = Collections.unmodifiableList(
Arrays.asList(
MirrorSourceConnector.class,
MirrorHeartbeatConnector.class,
MirrorCheckpointConnector.class));
public static final List<Class<?>> CONNECTOR_CLASSES = List.of(MirrorSourceConnector.class, MirrorHeartbeatConnector.class, MirrorCheckpointConnector.class);

private final Map<SourceAndTarget, Herder> herders = new HashMap<>();
private CountDownLatch startLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ private Set<String> toTopics(Collection<TopicPartition> tps) {
void syncTopicAcls()
throws InterruptedException, ExecutionException {
Optional<Collection<AclBinding>> rawBindings = listTopicAclBindings();
if (!rawBindings.isPresent())
if (rawBindings.isEmpty())
return;
List<AclBinding> filteredBindings = rawBindings.get().stream()
.filter(x -> x.pattern().resourceType() == ResourceType.TOPIC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ public abstract class RestServerConfig extends AbstractConfig {
static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers";
// Visible for testing
static final String RESPONSE_HTTP_HEADERS_DEFAULT = "";
private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList(
Arrays.asList("set", "add", "setDate", "addDate")
);
private static final Collection<String> HEADER_ACTIONS = List.of("set", "add", "setDate", "addDate");


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.maven.artifact.versioning.VersionRange;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -149,7 +148,7 @@ public List<PluginInfo> listConnectorPlugins(
.filter(p -> PluginType.SINK.toString().equals(p.type()) || PluginType.SOURCE.toString().equals(p.type()))
.collect(Collectors.toList()));
} else {
return Collections.unmodifiableList(new ArrayList<>(connectorPlugins));
return List.copyOf(connectorPlugins);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public synchronized void restartConnectorAndTasks(RestartRequest request, Callba
}

Optional<RestartPlan> maybePlan = buildRestartPlan(request);
if (!maybePlan.isPresent()) {
if (maybePlan.isEmpty()) {
cb.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ default Optional<Integer> fixedLength() {
}

default boolean isVariableLength() {
return !fixedLength().isPresent();
return fixedLength().isEmpty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ private void generateClassReader(String className, StructSpec struct,
for (FieldSpec field : struct.fields()) {
Versions validTaggedVersions = field.versions().intersect(field.taggedVersions());
if (!validTaggedVersions.empty()) {
if (!field.tag().isPresent()) {
if (field.tag().isEmpty()) {
throw new RuntimeException("Field " + field.name() + " has tagged versions, but no tag.");
}
buffer.printf("case %d: {%n", field.tag().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -58,7 +57,7 @@ public MessageSpec(@JsonProperty("name") String name,
this.apiKey = apiKey == null ? Optional.empty() : Optional.of(apiKey);
this.type = Objects.requireNonNull(type);
this.commonStructs = commonStructs == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(commonStructs));
List.copyOf(commonStructs);
if (flexibleVersions == null) {
throw new RuntimeException("You must specify a value for flexibleVersions. " +
"Please use 0+ for all new messages.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ static void validateTaggedVersions(
FieldSpec field,
Versions topLevelFlexibleVersions
) {
if (!field.flexibleVersions().isPresent()) {
if (field.flexibleVersions().isEmpty()) {
if (!topLevelFlexibleVersions.contains(field.taggedVersions())) {
throw new RuntimeException("Tagged versions for " + what + " " +
field.name() + " are " + field.taggedVersions() + ", but top " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private ConfigurationControlManager(LogContext logContext,
this.alterConfigPolicy = alterConfigPolicy;
this.validator = validator;
this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
this.staticConfig = Collections.unmodifiableMap(new HashMap<>(staticConfig));
this.staticConfig = Map.copyOf(staticConfig);
this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId));
}

Expand Down Expand Up @@ -439,7 +439,7 @@ Map<String, String> getConfigs(ConfigResource configResource) {
if (map == null) {
return Collections.emptyMap();
} else {
return Collections.unmodifiableMap(new HashMap<>(map));
return Map.copyOf(map);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType,
.setEligibleLeaderReplicasEnabled(isElrEnabled())
.setDefaultDirProvider(clusterDescriber)
.build();
if (!record.isPresent()) {
if (record.isEmpty()) {
if (electionType == ElectionType.PREFERRED) {
return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
} else {
Expand Down Expand Up @@ -1649,7 +1649,7 @@ public ControllerResult<Void> unregisterBroker(int brokerId) {
ControllerResult<Boolean> maybeFenceOneStaleBroker() {
BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
Optional<BrokerIdAndEpoch> idAndEpoch = heartbeatManager.tracker().maybeRemoveExpired();
if (!idAndEpoch.isPresent()) {
if (idAndEpoch.isEmpty()) {
log.debug("No stale brokers found.");
return ControllerResult.of(Collections.emptyList(), false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.kafka.image.MetadataImage;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

Expand All @@ -37,22 +35,18 @@ public class MetadataImageNode implements MetadataNode {
*/
private final MetadataImage image;

private static final Map<String, Function<MetadataImage, MetadataNode>> CHILDREN;

static {
Map<String, Function<MetadataImage, MetadataNode>> children = new HashMap<>();
children.put(ProvenanceNode.NAME, image -> new ProvenanceNode(image.provenance()));
children.put(FeaturesImageNode.NAME, image -> new FeaturesImageNode(image.features()));
children.put(ClusterImageNode.NAME, image -> new ClusterImageNode(image.cluster()));
children.put(TopicsImageNode.NAME, image -> new TopicsImageNode(image.topics()));
children.put(ConfigurationsImageNode.NAME, image -> new ConfigurationsImageNode(image.configs()));
children.put(ClientQuotasImageNode.NAME, image -> new ClientQuotasImageNode(image.clientQuotas()));
children.put(ProducerIdsImageNode.NAME, image -> new ProducerIdsImageNode(image.producerIds()));
children.put(AclsImageNode.NAME, image -> new AclsImageByIdNode(image.acls()));
children.put(ScramImageNode.NAME, image -> new ScramImageNode(image.scram()));
children.put(DelegationTokenImageNode.NAME, image -> new DelegationTokenImageNode(image.delegationTokens()));
CHILDREN = Collections.unmodifiableMap(children);
}
private static final Map<String, Function<MetadataImage, MetadataNode>> CHILDREN = Map.of(
ProvenanceNode.NAME, image -> new ProvenanceNode(image.provenance()),
FeaturesImageNode.NAME, image -> new FeaturesImageNode(image.features()),
ClusterImageNode.NAME, image -> new ClusterImageNode(image.cluster()),
TopicsImageNode.NAME, image -> new TopicsImageNode(image.topics()),
ConfigurationsImageNode.NAME, image -> new ConfigurationsImageNode(image.configs()),
ClientQuotasImageNode.NAME, image -> new ClientQuotasImageNode(image.clientQuotas()),
ProducerIdsImageNode.NAME, image -> new ProducerIdsImageNode(image.producerIds()),
AclsImageNode.NAME, image -> new AclsImageByIdNode(image.acls()),
ScramImageNode.NAME, image -> new ScramImageNode(image.scram()),
DelegationTokenImageNode.NAME, image -> new DelegationTokenImageNode(image.delegationTokens())
);

public MetadataImageNode(MetadataImage image) {
this.image = image;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

import org.apache.kafka.common.Uuid;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand All @@ -39,12 +36,8 @@ public class PartitionAssignment {
private final List<Uuid> directories;

public PartitionAssignment(List<Integer> replicas, DefaultDirProvider defaultDirProvider) {
this.replicas = Collections.unmodifiableList(new ArrayList<>(replicas));
Uuid[] directories = new Uuid[replicas.size()];
for (int i = 0; i < directories.length; i++) {
directories[i] = defaultDirProvider.defaultDir(replicas.get(i));
}
this.directories = Collections.unmodifiableList(Arrays.asList(directories));
this.replicas = List.copyOf(replicas);
this.directories = replicas.stream().map(replica -> defaultDirProvider.defaultDir(replica)).toList();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.kafka.metadata.placement;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand All @@ -31,7 +29,7 @@ public class TopicAssignment {
private final List<PartitionAssignment> assignments;

public TopicAssignment(List<PartitionAssignment> assignments) {
this.assignments = Collections.unmodifiableList(new ArrayList<>(assignments));
this.assignments = List.copyOf(assignments);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@ public void close() throws Exception {
}

private static final List<ApiMessageAndVersion> PRE_PRODUCTION_RECORDS =
Collections.unmodifiableList(Arrays.asList(
List.of(
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerEpoch(42).
setBrokerId(123).
Expand All @@ -1352,7 +1352,7 @@ public void close() throws Exception {
new ApiMessageAndVersion(new TopicRecord().
setName("bar").
setTopicId(Uuid.fromString("cxBT72dK4si8Ied1iP4wBA")),
(short) 0)));
(short) 0));

private static final BootstrapMetadata COMPLEX_BOOTSTRAP = BootstrapMetadata.fromRecords(
Arrays.asList(
Expand Down
Loading

0 comments on commit 409a43e

Please sign in to comment.