/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service.integration.odd;

import com.google.common.collect.ImmutableMap;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Statistics;
import com.provectus.kafka.ui.service.StatisticsCache;
import com.provectus.kafka.ui.service.integration.odd.Oddrn;
import com.provectus.kafka.ui.service.integration.odd.SchemaReferencesResolver;
import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;
import org.opendatadiscovery.client.model.DataEntity;
import org.opendatadiscovery.client.model.DataEntityList;
import org.opendatadiscovery.client.model.DataEntityType;
import org.opendatadiscovery.client.model.DataSet;
import org.opendatadiscovery.client.model.DataSetField;
import org.opendatadiscovery.client.model.MetadataExtension;
import org.opendatadiscovery.oddrn.model.KafkaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

class TopicsExporter {
    private static final Logger log = LoggerFactory.getLogger(TopicsExporter.class);
    private final Predicate<String> topicFilter;
    private final StatisticsCache statisticsCache;

    Flux<DataEntityList> export(KafkaCluster cluster) {
        String clusterOddrn = Oddrn.clusterOddrn((KafkaCluster)cluster);
        Statistics stats = this.statisticsCache.get(cluster);
        return Flux.fromIterable(stats.getTopicDescriptions().keySet()).filter(this.topicFilter).flatMap(topic -> this.createTopicDataEntity(cluster, topic, stats)).onErrorContinue((th, topic) -> log.warn("Error exporting data for topic {}, cluster {}", new Object[]{topic, cluster.getName(), th})).buffer(100).map(topicsEntities -> new DataEntityList().dataSourceOddrn(clusterOddrn).items(topicsEntities));
    }

    private Mono<DataEntity> createTopicDataEntity(KafkaCluster cluster, String topic, Statistics stats) {
        KafkaPath topicOddrnPath = Oddrn.topicOddrnPath((KafkaCluster)cluster, (String)topic);
        return Mono.zip((Mono)this.getTopicSchema(cluster, topic, topicOddrnPath, true), (Mono)this.getTopicSchema(cluster, topic, topicOddrnPath, false)).map(keyValueFields -> {
            DataSet dataset = new DataSet();
            ((List)keyValueFields.getT1()).forEach(arg_0 -> ((DataSet)dataset).addFieldListItem(arg_0));
            ((List)keyValueFields.getT2()).forEach(arg_0 -> ((DataSet)dataset).addFieldListItem(arg_0));
            return new DataEntity().name(topic).description("Kafka topic \"%s\"".formatted(topic)).oddrn(Oddrn.topicOddrn((KafkaCluster)cluster, (String)topic)).type(DataEntityType.KAFKA_TOPIC).dataset(dataset).addMetadataItem(new MetadataExtension().schemaUrl(URI.create("wontbeused.oops")).metadata(this.getTopicMetadata(topic, stats)));
        });
    }

    private Map<String, Object> getNonDefaultConfigs(String topic, Statistics stats) {
        List config = (List)stats.getTopicConfigs().get(topic);
        if (config == null) {
            return Map.of();
        }
        return config.stream().filter(c -> c.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG).collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
    }

    private Map<String, Object> getTopicMetadata(String topic, Statistics stats) {
        TopicDescription topicDescription = (TopicDescription)stats.getTopicDescriptions().get(topic);
        return ImmutableMap.builder().put((Object)"partitions", (Object)topicDescription.partitions().size()).put((Object)"replication_factor", (Object)((TopicPartitionInfo)topicDescription.partitions().get(0)).replicas().size()).putAll(this.getNonDefaultConfigs(topic, stats)).build();
    }

    private Mono<List<DataSetField>> getTopicSchema(KafkaCluster cluster, String topic, KafkaPath topicOddrn, boolean isKey) {
        if (cluster.getSchemaRegistryClient() == null) {
            return Mono.just(List.of());
        }
        String subject = topic + (isKey ? "-key" : "-value");
        return this.getSubjWithResolvedRefs(cluster, subject).map(t -> DataSetFieldsExtractors.extract((SchemaSubject)((SchemaSubject)t.getT1()), (Map)((Map)t.getT2()), (KafkaPath)topicOddrn, (boolean)isKey)).onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of())).onErrorMap(WebClientResponseException.class, err -> new IllegalStateException("Error retrieving subject %s".formatted(subject), (Throwable)err));
    }

    private Mono<Tuple2<SchemaSubject, Map<String, String>>> getSubjWithResolvedRefs(KafkaCluster cluster, String subjectName) {
        return cluster.getSchemaRegistryClient().mono(client -> client.getSubjectVersion(subjectName, "latest", Boolean.valueOf(false)).flatMap(subj -> new SchemaReferencesResolver(client).resolve(subj.getReferences()).map(resolvedRefs -> Tuples.of((Object)subj, (Object)resolvedRefs))));
    }

    public TopicsExporter(Predicate<String> topicFilter, StatisticsCache statisticsCache) {
        this.topicFilter = topicFilter;
        this.statisticsCache = statisticsCache;
    }
}

