/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Table;
import com.provectus.kafka.ui.exception.IllegalEntityStateException;
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/*
 * Exception performing whole class analysis ignored.
 */
public class ReactiveAdminClient
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ReactiveAdminClient.class);
    private final AdminClient client;
    private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
    private volatile ConfigRelatedInfo configRelatedInfo;

    public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
        Mono configRelatedInfoMono = ConfigRelatedInfo.extract((AdminClient)adminClient);
        return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
    }

    private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
        return ReactiveAdminClient.toMono((KafkaFuture)ac.describeAcls(AclBindingFilter.ANY).values()).thenReturn((Object)true).doOnError(th -> !(th instanceof SecurityDisabledException) && !(th instanceof InvalidRequestException) && !(th instanceof UnsupportedVersionException), th -> log.debug("Error checking if security enabled", th)).onErrorReturn((Object)false);
    }

    public static <T> Mono<T> toMono(KafkaFuture<T> future) {
        return Mono.create((T sink) -> future.whenComplete((res, ex) -> {
            if (ex != null) {
                if (ex instanceof CompletionException || ex instanceof ExecutionException) {
                    sink.error(ex.getCause());
                } else {
                    sink.error(ex);
                }
            } else {
                sink.success(res);
            }
        })).doOnCancel(() -> future.cancel(true)).publishOn(Schedulers.parallel());
    }

    public Set<SupportedFeature> getClusterFeatures() {
        return this.configRelatedInfo.features();
    }

    public Mono<Set<String>> listTopics(boolean listInternal) {
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
    }

    public Mono<Void> deleteTopic(String topicName) {
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.deleteTopics(List.of(topicName)).all());
    }

    public String getVersion() {
        return this.configRelatedInfo.version();
    }

    public boolean isTopicDeletionEnabled() {
        return this.configRelatedInfo.topicDeletionIsAllowed();
    }

    public Mono<Void> updateInternalStats(@Nullable Node controller) {
        if (controller == null) {
            return Mono.empty();
        }
        return this.configRelatedInfoMono.doOnNext(info -> {
            this.configRelatedInfo = info;
        }).then();
    }

    public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
        return this.listTopics(true).flatMap(topics -> this.getTopicsConfig((Collection)topics, false));
    }

    public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
        boolean includeDocFixed = includeDoc && this.getClusterFeatures().contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL);
        return ReactiveAdminClient.partitionCalls(topicNames, (int)200, (T part) -> this.getTopicsConfigImpl(part, includeDocFixed), (BiFunction)ReactiveAdminClient.mapMerger());
    }

    private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames, boolean includeDoc) {
        List resources = topicNames.stream().map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)).collect(Collectors.toList());
        return ReactiveAdminClient.toMonoWithExceptionFilter((Map)this.client.describeConfigs(resources, new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(), (Class[])new Class[]{UnknownTopicOrPartitionException.class, TopicAuthorizationException.class}).map(config -> config.entrySet().stream().collect(Collectors.toMap(c -> ((ConfigResource)c.getKey()).name(), c -> List.copyOf(((Config)c.getValue()).entries()))));
    }

    private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClient client, List<Integer> brokerIds) {
        List resources = brokerIds.stream().map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId))).collect(Collectors.toList());
        return ReactiveAdminClient.toMono((KafkaFuture)client.describeConfigs(resources).all()).onErrorResume(th -> th instanceof InvalidRequestException || th instanceof UnknownTopicOrPartitionException, th -> {
            log.trace("Error while getting configs for brokers {}", (Object)brokerIds, th);
            return Mono.just(Map.of());
        }).onErrorResume(ClusterAuthorizationException.class, th -> {
            log.trace("AuthorizationException while getting configs for brokers {}", (Object)brokerIds, th);
            return Mono.just(Map.of());
        }).onErrorResume(th -> true, th -> {
            log.warn("Unexpected error while getting configs for brokers {}", (Object)brokerIds, th);
            return Mono.just(Map.of());
        }).map(config -> config.entrySet().stream().collect(Collectors.toMap(c -> Integer.valueOf(((ConfigResource)c.getKey()).name()), c -> new ArrayList(((Config)c.getValue()).entries()))));
    }

    public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
        return ReactiveAdminClient.loadBrokersConfig((AdminClient)this.client, brokerIds);
    }

    public Mono<Map<String, TopicDescription>> describeTopics() {
        return this.listTopics(true).flatMap(arg_0 -> this.describeTopics(arg_0));
    }

    public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> topics) {
        return ReactiveAdminClient.partitionCalls(topics, (int)200, arg_0 -> this.describeTopicsImpl(arg_0), (BiFunction)ReactiveAdminClient.mapMerger());
    }

    private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
        return ReactiveAdminClient.toMonoWithExceptionFilter((Map)this.client.describeTopics(topics).topicNameValues(), (Class[])new Class[]{UnknownTopicOrPartitionException.class, TopicAuthorizationException.class});
    }

    public Mono<TopicDescription> describeTopic(String topic) {
        return this.describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty((Object)((TopicDescription)m.get(topic))));
    }

    @SafeVarargs
    static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values, Class<? extends KafkaException> ... classes) {
        if (values.isEmpty()) {
            return Mono.just(Map.of());
        }
        List<Mono> monos = values.entrySet().stream().map(e -> ReactiveAdminClient.toMono((KafkaFuture)((KafkaFuture)e.getValue())).map(r -> Tuples.of(e.getKey(), Optional.of(r))).defaultIfEmpty((Object)Tuples.of(e.getKey(), Optional.empty())).onErrorResume(th -> Stream.of(classes).anyMatch(clazz -> th.getClass().isAssignableFrom((Class<?>)clazz)), th -> Mono.just((Object)Tuples.of(e.getKey(), Optional.empty())))).toList();
        return Mono.zip(monos, resultsArr -> Stream.of(resultsArr).map(obj -> (Tuple2)obj).filter(t -> ((Optional)t.getT2()).isPresent()).collect(Collectors.toMap(Tuple2::getT1, t -> ((Optional)t.getT2()).get())));
    }

    public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
        return this.describeCluster().map(d -> d.getNodes().stream().map(Node::id).collect(Collectors.toList())).flatMap(arg_0 -> this.describeLogDirs(arg_0));
    }

    public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(Collection<Integer> brokerIds) {
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.describeLogDirs(brokerIds).all()).onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of())).onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of())).onErrorResume(th -> true, th -> {
            log.warn("Error while calling describeLogDirs", th);
            return Mono.just(Map.of());
        });
    }

    public Mono<ClusterDescription> describeCluster() {
        return ReactiveAdminClient.describeClusterImpl((AdminClient)this.client, (Set)this.getClusterFeatures());
    }

    private static Mono<ClusterDescription> describeClusterImpl(AdminClient client, Set<SupportedFeature> features) {
        boolean includeAuthorizedOperations = features.contains(SupportedFeature.DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS);
        DescribeClusterResult result = client.describeCluster(new DescribeClusterOptions().includeAuthorizedOperations(includeAuthorizedOperations));
        KafkaFuture allOfFuture = KafkaFuture.allOf((KafkaFuture[])new KafkaFuture[]{result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations()});
        return ReactiveAdminClient.toMono((KafkaFuture)allOfFuture).then(Mono.fromCallable(() -> new ClusterDescription((Node)result.controller().get(), (String)result.clusterId().get(), (Collection)result.nodes().get(), (Set)result.authorizedOperations().get())));
    }

    public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.deleteConsumerGroups(groupIds).all()).onErrorResume(GroupIdNotFoundException.class, th -> Mono.error((Throwable)new NotFoundException("The group id does not exist"))).onErrorResume(GroupNotEmptyException.class, th -> Mono.error((Throwable)new IllegalEntityStateException("The group is not empty")));
    }

    public Mono<Void> createTopic(String name, int numPartitions, @Nullable Integer replicationFactor, Map<String, String> configs) {
        NewTopic newTopic = new NewTopic(name, Optional.of(numPartitions), Optional.ofNullable(replicationFactor).map(Integer::shortValue)).configs(configs);
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.createTopics(List.of(newTopic)).all());
    }

    public Mono<Void> alterPartitionReassignments(Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.alterPartitionReassignments(reassignments).all());
    }

    public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.createPartitions(newPartitionsMap).all());
    }

    public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
        if (this.getClusterFeatures().contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
            return this.getTopicsConfigImpl(List.of(topicName), false).map(conf -> conf.getOrDefault(topicName, List.of())).flatMap(currentConfigs -> this.incrementalAlterConfig(topicName, currentConfigs, configs));
        }
        return this.alterConfig(topicName, configs);
    }

    public Mono<List<String>> listConsumerGroupNames() {
        return this.listConsumerGroups().map(lst -> lst.stream().map(ConsumerGroupListing::groupId).toList());
    }

    public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.listConsumerGroups().all());
    }

    public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
        return ReactiveAdminClient.partitionCalls(groupIds, (int)25, (int)4, (T ids) -> ReactiveAdminClient.toMono((KafkaFuture)this.client.describeConsumerGroups(ids).all()), (BiFunction)ReactiveAdminClient.mapMerger());
    }

    public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<String> consumerGroups, @Nullable List<TopicPartition> partitions) {
        Function<Collection, Mono> call = groups -> ReactiveAdminClient.toMono((KafkaFuture)this.client.listConsumerGroupOffsets(groups.stream().collect(Collectors.toMap(g -> g, g -> new ListConsumerGroupOffsetsSpec().topicPartitions((Collection)partitions)))).all());
        Mono merged = ReactiveAdminClient.partitionCalls(consumerGroups, (int)25, (int)4, call, (BiFunction)ReactiveAdminClient.mapMerger());
        return merged.map(map -> {
            ImmutableTable.Builder table = ImmutableTable.builder();
            map.forEach((g, tpOffsets) -> tpOffsets.forEach((tp, offset) -> {
                if (offset != null) {
                    table.put(g, tp, (Object)offset.offset());
                }
            }));
            return table.build();
        });
    }

    public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.alterConsumerGroupOffsets(groupId, offsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(((Long)e.getValue()).longValue())))).all());
    }

    public Mono<Map<TopicPartition, Long>> listTopicOffsets(String topic, OffsetSpec offsetSpec, boolean failOnUnknownLeader) {
        return this.describeTopic(topic).map(td -> ReactiveAdminClient.filterPartitionsWithLeaderCheck(List.of(td), (T p) -> true, (boolean)failOnUnknownLeader)).flatMap(partitions -> this.listOffsetsUnsafe((Collection)partitions, offsetSpec));
    }

    public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> partitions, OffsetSpec offsetSpec, boolean failOnUnknownLeader) {
        return this.filterPartitionsWithLeaderCheck(partitions, failOnUnknownLeader).flatMap(parts -> this.listOffsetsUnsafe(parts, offsetSpec));
    }

    public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicDescription> topicDescriptions, OffsetSpec offsetSpec) {
        return this.listOffsetsUnsafe((Collection)ReactiveAdminClient.filterPartitionsWithLeaderCheck(topicDescriptions, (T p) -> true, (boolean)false), offsetSpec);
    }

    private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions, boolean failOnUnknownLeader) {
        Set targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
        return this.describeTopicsImpl(targetTopics).map(descriptions -> ReactiveAdminClient.filterPartitionsWithLeaderCheck(descriptions.values(), partitions::contains, (boolean)failOnUnknownLeader));
    }

    @VisibleForTesting
    static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions, Predicate<TopicPartition> partitionPredicate, boolean failOnUnknownLeader) {
        HashSet<TopicPartition> goodPartitions = new HashSet<TopicPartition>();
        for (TopicDescription description : topicDescriptions) {
            ArrayList<TopicPartition> goodTopicPartitions = new ArrayList<TopicPartition>();
            for (TopicPartitionInfo partitionInfo : description.partitions()) {
                TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
                if (partitionInfo.leader() == null) {
                    if (failOnUnknownLeader) {
                        throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
                    }
                    goodTopicPartitions.clear();
                    break;
                }
                if (!partitionPredicate.test(topicPartition)) continue;
                goodTopicPartitions.add(topicPartition);
            }
            goodPartitions.addAll(goodTopicPartitions);
        }
        return goodPartitions;
    }

    @KafkaClientInternalsDependant
    @VisibleForTesting
    Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
        if (partitions.isEmpty()) {
            return Mono.just(Map.of());
        }
        Function<Collection, Mono> call = parts -> {
            ListOffsetsResult r = this.client.listOffsets(parts.stream().collect(Collectors.toMap(tp -> tp, tp -> offsetSpec)));
            HashMap perPartitionResults = new HashMap();
            parts.forEach(p -> perPartitionResults.put(p, r.partitionResult(p)));
            return ReactiveAdminClient.toMonoWithExceptionFilter(perPartitionResults, (Class[])new Class[]{UnknownTopicOrPartitionException.class}).map(offsets -> offsets.entrySet().stream().filter(e -> ((ListOffsetsResult.ListOffsetsResultInfo)e.getValue()).offset() >= 0L).collect(Collectors.toMap(Map.Entry::getKey, e -> ((ListOffsetsResult.ListOffsetsResultInfo)e.getValue()).offset())));
        };
        return ReactiveAdminClient.partitionCalls(partitions, (int)200, call, (BiFunction)ReactiveAdminClient.mapMerger());
    }

    public Mono<Collection<AclBinding>> listAcls(ResourcePatternFilter filter) {
        Preconditions.checkArgument((boolean)this.getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values());
    }

    public Mono<Void> createAcls(Collection<AclBinding> aclBindings) {
        Preconditions.checkArgument((boolean)this.getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.createAcls(aclBindings).all());
    }

    public Mono<Void> deleteAcls(Collection<AclBinding> aclBindings) {
        Preconditions.checkArgument((boolean)this.getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
        Set filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet());
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.deleteAcls(filters).all()).then();
    }

    public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
        ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
        AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all());
    }

    public Mono<Void> deleteRecords(Map<TopicPartition, Long> offsets) {
        Map<TopicPartition, RecordsToDelete> records = offsets.entrySet().stream().map(entry -> Map.entry((TopicPartition)entry.getKey(), RecordsToDelete.beforeOffset((long)((Long)entry.getValue())))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.deleteRecords(records).all());
    }

    public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.alterReplicaLogDirs(replicaAssignment).all());
    }

    public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(String topic) {
        return this.describeTopic(topic).map(td -> this.client.describeProducers(IntStream.range(0, td.partitions().size()).mapToObj(i -> new TopicPartition(topic, i)).toList()).all()).flatMap(ReactiveAdminClient::toMono).map(map -> map.entrySet().stream().filter(e -> !((DescribeProducersResult.PartitionProducerState)e.getValue()).activeProducers().isEmpty()).collect(Collectors.toMap(Map.Entry::getKey, e -> ((DescribeProducersResult.PartitionProducerState)e.getValue()).activeProducers())));
    }

    private Mono<Void> incrementalAlterConfig(String topicName, List<ConfigEntry> currentConfigs, Map<String, String> newConfigs) {
        Stream<AlterConfigOp> configsToDelete = currentConfigs.stream().filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG).filter(e -> !newConfigs.containsKey(e.name())).map(e -> new AlterConfigOp(e, AlterConfigOp.OpType.DELETE));
        Stream<AlterConfigOp> configsToSet = newConfigs.entrySet().stream().map(e -> new AlterConfigOp(new ConfigEntry((String)e.getKey(), (String)e.getValue()), AlterConfigOp.OpType.SET));
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.incrementalAlterConfigs(Map.of(new ConfigResource(ConfigResource.Type.TOPIC, topicName), Stream.concat(configsToDelete, configsToSet).toList())).all());
    }

    private Mono<Void> alterConfig(String topicName, Map<String, String> configs) {
        List configEntries = configs.entrySet().stream().flatMap(cfg -> Stream.of(new ConfigEntry((String)cfg.getKey(), (String)cfg.getValue()))).collect(Collectors.toList());
        Config config = new Config(configEntries);
        ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        return ReactiveAdminClient.toMono((KafkaFuture)this.client.alterConfigs(Map.of(topicResource, config)).all());
    }

    private static <R, I> Mono<R> partitionCalls(Collection<I> items, int partitionSize, Function<Collection<I>, Mono<R>> call, BiFunction<R, R, R> merger) {
        if (items.isEmpty()) {
            return call.apply(items);
        }
        Iterable parts = Iterables.partition(items, (int)partitionSize);
        return Flux.fromIterable((Iterable)parts).concatMap(call).reduce(merger);
    }

    private static <R, I> Mono<R> partitionCalls(Collection<I> items, int partitionSize, int concurrency, Function<Collection<I>, Mono<R>> call, BiFunction<R, R, R> merger) {
        if (items.isEmpty()) {
            return call.apply(items);
        }
        Iterable parts = Iterables.partition(items, (int)partitionSize);
        return Flux.fromIterable((Iterable)parts).flatMap(call, concurrency).reduce(merger);
    }

    private static <K, V> BiFunction<Map<K, V>, Map<K, V>, Map<K, V>> mapMerger() {
        return (m1, m2) -> {
            HashMap merged = new HashMap();
            merged.putAll(m1);
            merged.putAll(m2);
            return merged;
        };
    }

    @Override
    public void close() {
        this.client.close();
    }

    public ReactiveAdminClient(AdminClient client, Mono<ConfigRelatedInfo> configRelatedInfoMono, ConfigRelatedInfo configRelatedInfo) {
        this.client = client;
        this.configRelatedInfoMono = configRelatedInfoMono;
        this.configRelatedInfo = configRelatedInfo;
    }

    AdminClient getClient() {
        return this.client;
    }
}

