/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service;

import com.google.common.collect.Sets;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.exception.TopicMetadataException;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
import com.provectus.kafka.ui.exception.TopicRecreationException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ClusterFeature;
import com.provectus.kafka.ui.model.InternalLogDirStats;
import com.provectus.kafka.ui.model.InternalPartition;
import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
import com.provectus.kafka.ui.model.InternalReplica;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Metrics;
import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
import com.provectus.kafka.ui.model.Statistics;
import com.provectus.kafka.ui.model.TopicCreationDTO;
import com.provectus.kafka.ui.model.TopicUpdateDTO;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import com.provectus.kafka.ui.service.StatisticsCache;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

@Service
public class TopicsService {
    private final AdminClientService adminClientService;
    private final StatisticsCache statisticsCache;
    private final ClustersProperties clustersProperties;
    @Value(value="${topic.recreate.maxRetries:15}")
    private int recreateMaxRetries;
    @Value(value="${topic.recreate.delay.seconds:1}")
    private int recreateDelayInSeconds;
    @Value(value="${topic.load.after.create.maxRetries:10}")
    private int loadTopicAfterCreateRetries;
    @Value(value="${topic.load.after.create.delay.ms:500}")
    private int loadTopicAfterCreateDelayInMs;

    public Mono<List<InternalTopic>> loadTopics(KafkaCluster c, List<String> topics) {
        if (topics.isEmpty()) {
            return Mono.just(List.of());
        }
        return this.adminClientService.get(c).flatMap(ac -> ac.describeTopics((Collection)topics).zipWith(ac.getTopicsConfig((Collection)topics, false), (descriptions, configs) -> {
            this.statisticsCache.update(c, descriptions, configs);
            return this.getPartitionOffsets(descriptions, ac).map(offsets -> {
                Statistics metrics = this.statisticsCache.get(c);
                return this.createList(topics, descriptions, configs, offsets, metrics.getMetrics(), metrics.getLogDirInfo());
            });
        })).flatMap(Function.identity());
    }

    private Mono<InternalTopic> loadTopic(KafkaCluster c, String topicName) {
        return this.loadTopics(c, List.of(topicName)).flatMap(lst -> lst.stream().findFirst().map(Mono::just).orElse(Mono.error(TopicNotFoundException::new)));
    }

    private Mono<InternalTopic> loadTopicAfterCreation(KafkaCluster c, String topicName) {
        return this.loadTopic(c, topicName).retryWhen((Retry)Retry.fixedDelay((long)this.loadTopicAfterCreateRetries, (Duration)Duration.ofMillis(this.loadTopicAfterCreateDelayInMs)).filter(TopicNotFoundException.class::isInstance).onRetryExhaustedThrow((spec, sig) -> new TopicMetadataException(String.format("Error while loading created topic '%s' - topic is not visible via API after waiting for %d ms.", topicName, this.loadTopicAfterCreateDelayInMs * this.loadTopicAfterCreateRetries))));
    }

    private List<InternalTopic> createList(List<String> orderedNames, Map<String, TopicDescription> descriptions, Map<String, List<ConfigEntry>> configs, InternalPartitionsOffsets partitionsOffsets, Metrics metrics, InternalLogDirStats logDirInfo) {
        return orderedNames.stream().filter(descriptions::containsKey).map(t -> InternalTopic.from((TopicDescription)((TopicDescription)descriptions.get(t)), configs.getOrDefault(t, List.of()), (InternalPartitionsOffsets)partitionsOffsets, (Metrics)metrics, (InternalLogDirStats)logDirInfo, (String)this.clustersProperties.getInternalTopicPrefix())).collect(Collectors.toList());
    }

    private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription> descriptionsMap, ReactiveAdminClient ac) {
        Collection<TopicDescription> descriptions = descriptionsMap.values();
        return ac.listOffsets(descriptions, OffsetSpec.earliest()).zipWith(ac.listOffsets(descriptions, OffsetSpec.latest()), (earliest, latest) -> Sets.intersection(earliest.keySet(), latest.keySet()).stream().map(tp -> Map.entry(tp, new InternalPartitionsOffsets.Offsets((Long)earliest.get(tp), (Long)latest.get(tp)))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).map(InternalPartitionsOffsets::new);
    }

    public Mono<InternalTopic> getTopicDetails(KafkaCluster cluster, String topicName) {
        return this.loadTopic(cluster, topicName);
    }

    public Mono<List<ConfigEntry>> getTopicConfigs(KafkaCluster cluster, String topicName) {
        return this.adminClientService.get(cluster).flatMap(ac -> ac.describeTopic(topicName).switchIfEmpty(Mono.error((Throwable)new TopicNotFoundException())).then(ac.getTopicsConfig(List.of(topicName), true)).map(m -> m.values().stream().findFirst().orElse(List.of())));
    }

    private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient, TopicCreationDTO topicData) {
        return adminClient.createTopic(topicData.getName(), topicData.getPartitions().intValue(), topicData.getReplicationFactor(), topicData.getConfigs()).thenReturn((Object)topicData).onErrorMap(t -> new TopicMetadataException(t.getMessage(), t)).then(this.loadTopicAfterCreation(c, topicData.getName()));
    }

    public Mono<InternalTopic> createTopic(KafkaCluster cluster, TopicCreationDTO topicCreation) {
        return this.adminClientService.get(cluster).flatMap(ac -> this.createTopic(cluster, ac, topicCreation));
    }

    public Mono<InternalTopic> recreateTopic(KafkaCluster cluster, String topicName) {
        return this.loadTopic(cluster, topicName).flatMap(t -> this.deleteTopic(cluster, topicName).thenReturn(t).delayElement(Duration.ofSeconds(this.recreateDelayInSeconds)).flatMap(topic -> this.adminClientService.get(cluster).flatMap(ac -> ac.createTopic(topic.getName(), topic.getPartitionCount(), Integer.valueOf(topic.getReplicationFactor()), topic.getTopicConfigs().stream().collect(Collectors.toMap(InternalTopicConfig::getName, InternalTopicConfig::getValue))).thenReturn((Object)topicName)).retryWhen((Retry)Retry.fixedDelay((long)this.recreateMaxRetries, (Duration)Duration.ofSeconds(this.recreateDelayInSeconds)).filter(TopicExistsException.class::isInstance).onRetryExhaustedThrow((a, b) -> new TopicRecreationException(topicName, this.recreateMaxRetries * this.recreateDelayInSeconds))).flatMap(a -> this.loadTopicAfterCreation(cluster, topicName))));
    }

    private Mono<InternalTopic> updateTopic(KafkaCluster cluster, String topicName, TopicUpdateDTO topicUpdate) {
        return this.adminClientService.get(cluster).flatMap(ac -> ac.updateTopicConfig(topicName, topicUpdate.getConfigs()).then(this.loadTopic(cluster, topicName)));
    }

    public Mono<InternalTopic> updateTopic(KafkaCluster cl, String topicName, Mono<TopicUpdateDTO> topicUpdate) {
        return topicUpdate.flatMap(t -> this.updateTopic(cl, topicName, t));
    }

    private Mono<InternalTopic> changeReplicationFactor(KafkaCluster cluster, ReactiveAdminClient adminClient, String topicName, Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
        return adminClient.alterPartitionReassignments(reassignments).then(this.loadTopic(cluster, topicName));
    }

    public Mono<ReplicationFactorChangeResponseDTO> changeReplicationFactor(KafkaCluster cluster, String topicName, ReplicationFactorChangeDTO replicationFactorChange) {
        return this.loadTopic(cluster, topicName).flatMap(topic -> this.adminClientService.get(cluster).flatMap(ac -> {
            Integer actual = topic.getReplicationFactor();
            Integer requested = replicationFactorChange.getTotalReplicationFactor();
            Integer brokersCount = this.statisticsCache.get(cluster).getClusterDescription().getNodes().size();
            if (requested.equals(actual)) {
                return Mono.error((Throwable)new ValidationException(String.format("Topic already has replicationFactor %s.", actual)));
            }
            if (requested <= 0) {
                return Mono.error((Throwable)new ValidationException(String.format("Requested replication factor (%s) should be greater or equal to 1.", requested)));
            }
            if (requested > brokersCount) {
                return Mono.error((Throwable)new ValidationException(String.format("Requested replication factor %s more than brokers count %s.", requested, brokersCount)));
            }
            return this.changeReplicationFactor(cluster, ac, topicName, this.getPartitionsReassignments(cluster, topic, replicationFactorChange));
        }).map(t -> new ReplicationFactorChangeResponseDTO().topicName(t.getName()).totalReplicationFactor(Integer.valueOf(t.getReplicationFactor()))));
    }

    private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsReassignments(KafkaCluster cluster, InternalTopic topic, ReplicationFactorChangeDTO replicationFactorChange) {
        Map currentAssignment = this.getCurrentAssignment(topic);
        Map brokersUsage = this.getBrokersMap(cluster, currentAssignment);
        int currentReplicationFactor = topic.getReplicationFactor();
        if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) {
            for (List assignmentList : currentAssignment.values()) {
                List brokers = brokersUsage.entrySet().stream().sorted(Map.Entry.comparingByValue()).map(Map.Entry::getKey).collect(Collectors.toList());
                for (Integer broker : brokers) {
                    if (!assignmentList.contains(broker)) {
                        assignmentList.add(broker);
                        brokersUsage.merge(broker, 1, Integer::sum);
                    }
                    if (assignmentList.size() != replicationFactorChange.getTotalReplicationFactor().intValue()) continue;
                    break;
                }
                if (assignmentList.size() == replicationFactorChange.getTotalReplicationFactor().intValue()) continue;
                throw new ValidationException("Something went wrong during adding replicas");
            }
        } else if (replicationFactorChange.getTotalReplicationFactor() < currentReplicationFactor) {
            for (Map.Entry assignmentEntry : currentAssignment.entrySet()) {
                Integer partition = (Integer)assignmentEntry.getKey();
                List brokers = (List)assignmentEntry.getValue();
                List brokersUsageList = brokersUsage.entrySet().stream().sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())).map(Map.Entry::getKey).collect(Collectors.toList());
                for (Integer broker : brokersUsageList) {
                    if (!((InternalPartition)topic.getPartitions().get(partition)).getLeader().equals(broker)) {
                        brokers.remove(broker);
                        brokersUsage.merge(broker, -1, Integer::sum);
                    }
                    if (brokers.size() != replicationFactorChange.getTotalReplicationFactor().intValue()) continue;
                    break;
                }
                if (brokers.size() == replicationFactorChange.getTotalReplicationFactor().intValue()) continue;
                throw new ValidationException("Something went wrong during removing replicas");
            }
        } else {
            throw new ValidationException("Replication factor already equals requested");
        }
        return currentAssignment.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(topic.getName(), ((Integer)e.getKey()).intValue()), e -> Optional.of(new NewPartitionReassignment((List)e.getValue()))));
    }

    private Map<Integer, List<Integer>> getCurrentAssignment(InternalTopic topic) {
        return topic.getPartitions().values().stream().collect(Collectors.toMap(InternalPartition::getPartition, p -> p.getReplicas().stream().map(InternalReplica::getBroker).collect(Collectors.toList())));
    }

    private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster, Map<Integer, List<Integer>> currentAssignment) {
        Map<Integer, Integer> result = this.statisticsCache.get(cluster).getClusterDescription().getNodes().stream().map(Node::id).collect(Collectors.toMap(c -> c, c -> 0));
        currentAssignment.values().forEach(brokers -> brokers.forEach(broker -> result.put((Integer)broker, (Integer)result.get(broker) + 1)));
        return result;
    }

    public Mono<PartitionsIncreaseResponseDTO> increaseTopicPartitions(KafkaCluster cluster, String topicName, PartitionsIncreaseDTO partitionsIncrease) {
        return this.loadTopic(cluster, topicName).flatMap(topic -> this.adminClientService.get(cluster).flatMap(ac -> {
            Integer actualCount = topic.getPartitionCount();
            Integer requestedCount = partitionsIncrease.getTotalPartitionsCount();
            if (requestedCount < actualCount) {
                return Mono.error((Throwable)new ValidationException(String.format("Topic currently has %s partitions, which is higher than the requested %s.", actualCount, requestedCount)));
            }
            if (requestedCount.equals(actualCount)) {
                return Mono.error((Throwable)new ValidationException(String.format("Topic already has %s partitions.", actualCount)));
            }
            Map<String, NewPartitions> newPartitionsMap = Collections.singletonMap(topicName, NewPartitions.increaseTo((int)partitionsIncrease.getTotalPartitionsCount()));
            return ac.createPartitions(newPartitionsMap).then(this.loadTopic(cluster, topicName));
        }).map(t -> new PartitionsIncreaseResponseDTO().topicName(t.getName()).totalPartitionsCount(Integer.valueOf(t.getPartitionCount()))));
    }

    public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
        if (this.statisticsCache.get(cluster).getFeatures().contains(ClusterFeature.TOPIC_DELETION)) {
            return this.adminClientService.get(cluster).flatMap(c -> c.deleteTopic(topicName)).doOnSuccess(t -> this.statisticsCache.onTopicDelete(cluster, topicName));
        }
        return Mono.error((Throwable)new ValidationException("Topic deletion restricted"));
    }

    public Mono<InternalTopic> cloneTopic(KafkaCluster cluster, String topicName, String newTopicName) {
        return this.loadTopic(cluster, topicName).flatMap(topic -> this.adminClientService.get(cluster).flatMap(ac -> ac.createTopic(newTopicName, topic.getPartitionCount(), Integer.valueOf(topic.getReplicationFactor()), topic.getTopicConfigs().stream().collect(Collectors.toMap(InternalTopicConfig::getName, InternalTopicConfig::getValue)))).thenReturn((Object)newTopicName).flatMap(a -> this.loadTopicAfterCreation(cluster, newTopicName)));
    }

    public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster) {
        Statistics stats = this.statisticsCache.get(cluster);
        return this.filterExisting(cluster, stats.getTopicDescriptions().keySet()).map(lst -> lst.stream().map(topicName -> InternalTopic.from((TopicDescription)((TopicDescription)stats.getTopicDescriptions().get(topicName)), stats.getTopicConfigs().getOrDefault(topicName, List.of()), (InternalPartitionsOffsets)InternalPartitionsOffsets.empty(), (Metrics)stats.getMetrics(), (InternalLogDirStats)stats.getLogDirInfo(), (String)this.clustersProperties.getInternalTopicPrefix())).collect(Collectors.toList()));
    }

    public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(KafkaCluster cluster, String topic) {
        return this.adminClientService.get(cluster).flatMap(ac -> ac.getActiveProducersState(topic));
    }

    private Mono<List<String>> filterExisting(KafkaCluster cluster, Collection<String> topics) {
        return this.adminClientService.get(cluster).flatMap(ac -> ac.listTopics(true)).map(existing -> existing.stream().filter(topics::contains).collect(Collectors.toList()));
    }

    public TopicsService(AdminClientService adminClientService, StatisticsCache statisticsCache, ClustersProperties clustersProperties) {
        this.adminClientService = adminClientService;
        this.statisticsCache = statisticsCache;
        this.clustersProperties = clustersProperties;
    }
}

