/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service;

import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.config.WebclientProperties;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.MetricsConfig;
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
import com.provectus.kafka.ui.service.masking.DataMasking;
import com.provectus.kafka.ui.sr.ApiClient;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.util.KafkaServicesValidation;
import com.provectus.kafka.ui.util.ReactiveFailover;
import com.provectus.kafka.ui.util.WebClientConfigurator;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

@Service
public class KafkaClusterFactory {
    private static final Logger log = LoggerFactory.getLogger(KafkaClusterFactory.class);
    private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse((CharSequence)"20MB");
    private final DataSize webClientMaxBuffSize;

    public KafkaClusterFactory(WebclientProperties webclientProperties) {
        this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize()).map(DataSize::parse).orElse(DEFAULT_WEBCLIENT_BUFFER);
    }

    public KafkaCluster create(ClustersProperties properties, ClustersProperties.Cluster clusterProperties) {
        KafkaCluster.KafkaClusterBuilder builder = KafkaCluster.builder();
        builder.name(clusterProperties.getName());
        builder.bootstrapServers(clusterProperties.getBootstrapServers());
        builder.properties(this.convertProperties(clusterProperties.getProperties()));
        builder.readOnly(clusterProperties.isReadOnly());
        builder.masking(DataMasking.create((List)clusterProperties.getMasking()));
        builder.pollingSettings(PollingSettings.create((ClustersProperties.Cluster)clusterProperties, (ClustersProperties)properties));
        if (this.schemaRegistryConfigured(clusterProperties)) {
            builder.schemaRegistryClient(this.schemaRegistryClient(clusterProperties));
        }
        if (this.connectClientsConfigured(clusterProperties)) {
            builder.connectsClients(this.connectClients(clusterProperties));
        }
        if (this.ksqlConfigured(clusterProperties)) {
            builder.ksqlClient(this.ksqlClient(clusterProperties));
        }
        if (this.metricsConfigured(clusterProperties)) {
            builder.metricsConfig(this.metricsConfigDataToMetricsConfig(clusterProperties.getMetrics()));
        }
        builder.originalProperties(clusterProperties);
        return builder.build();
    }

    public Mono<ClusterConfigValidationDTO> validate(ClustersProperties.Cluster clusterProperties) {
        Optional errMsg;
        if (clusterProperties.getSsl() != null && (errMsg = KafkaServicesValidation.validateTruststore((ClustersProperties.TruststoreConfig)clusterProperties.getSsl())).isPresent()) {
            return Mono.just((Object)new ClusterConfigValidationDTO().kafka(new ApplicationPropertyValidationDTO().error(Boolean.valueOf(true)).errorMessage("Truststore not valid: " + (String)errMsg.get())));
        }
        return Mono.zip((Mono)KafkaServicesValidation.validateClusterConnection((String)clusterProperties.getBootstrapServers(), (Properties)this.convertProperties(clusterProperties.getProperties()), (ClustersProperties.TruststoreConfig)clusterProperties.getSsl()), (Mono)(this.schemaRegistryConfigured(clusterProperties) ? KafkaServicesValidation.validateSchemaRegistry(() -> this.schemaRegistryClient(clusterProperties)).map(Optional::of) : Mono.just(Optional.empty())), (Mono)(this.ksqlConfigured(clusterProperties) ? KafkaServicesValidation.validateKsql(() -> this.ksqlClient(clusterProperties)).map(Optional::of) : Mono.just(Optional.empty())), (Mono)(this.connectClientsConfigured(clusterProperties) ? Flux.fromIterable((Iterable)clusterProperties.getKafkaConnect()).flatMap(c -> KafkaServicesValidation.validateConnect(() -> this.connectClient(clusterProperties, c)).map(r -> Tuples.of((Object)c.getName(), (Object)r))).collectMap(Tuple2::getT1, Tuple2::getT2).map(Optional::of) : Mono.just(Optional.empty()))).map(tuple -> {
            ClusterConfigValidationDTO validation = new ClusterConfigValidationDTO();
            validation.kafka((ApplicationPropertyValidationDTO)tuple.getT1());
            ((Optional)tuple.getT2()).ifPresent(arg_0 -> ((ClusterConfigValidationDTO)validation).schemaRegistry(arg_0));
            ((Optional)tuple.getT3()).ifPresent(arg_0 -> ((ClusterConfigValidationDTO)validation).ksqldb(arg_0));
            ((Optional)tuple.getT4()).ifPresent(arg_0 -> ((ClusterConfigValidationDTO)validation).kafkaConnects(arg_0));
            return validation;
        });
    }

    private Properties convertProperties(Map<String, Object> propertiesMap) {
        Properties properties = new Properties();
        if (propertiesMap != null) {
            properties.putAll(propertiesMap);
        }
        return properties;
    }

    private boolean connectClientsConfigured(ClustersProperties.Cluster clusterProperties) {
        return clusterProperties.getKafkaConnect() != null;
    }

    private Map<String, ReactiveFailover<KafkaConnectClientApi>> connectClients(ClustersProperties.Cluster clusterProperties) {
        HashMap<String, ReactiveFailover<KafkaConnectClientApi>> connects = new HashMap<String, ReactiveFailover<KafkaConnectClientApi>>();
        clusterProperties.getKafkaConnect().forEach(c -> connects.put(c.getName(), this.connectClient(clusterProperties, c)));
        return connects;
    }

    private ReactiveFailover<KafkaConnectClientApi> connectClient(ClustersProperties.Cluster cluster, ClustersProperties.ConnectCluster connectCluster) {
        return ReactiveFailover.create((List)this.parseUrlList(connectCluster.getAddress()), url -> new RetryingKafkaConnectClient(connectCluster.toBuilder().address(url).build(), cluster.getSsl(), this.webClientMaxBuffSize), (Predicate)ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, (String)"No alive connect instances available", (Duration)ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS);
    }

    private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterProperties) {
        return clusterProperties.getSchemaRegistry() != null;
    }

    private ReactiveFailover<KafkaSrClientApi> schemaRegistryClient(ClustersProperties.Cluster clusterProperties) {
        ClustersProperties.SchemaRegistryAuth auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth()).orElse(new ClustersProperties.SchemaRegistryAuth());
        WebClient webClient = new WebClientConfigurator().configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl()).configureBasicAuth(auth.getUsername(), auth.getPassword()).configureBufferSize(this.webClientMaxBuffSize).build();
        return ReactiveFailover.create((List)this.parseUrlList(clusterProperties.getSchemaRegistry()), url -> new KafkaSrClientApi(new ApiClient(webClient, null, null).setBasePath(url)), (Predicate)ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, (String)"No live schemaRegistry instances available", (Duration)ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS);
    }

    private boolean ksqlConfigured(ClustersProperties.Cluster clusterProperties) {
        return clusterProperties.getKsqldbServer() != null;
    }

    private ReactiveFailover<KsqlApiClient> ksqlClient(ClustersProperties.Cluster clusterProperties) {
        return ReactiveFailover.create((List)this.parseUrlList(clusterProperties.getKsqldbServer()), url -> new KsqlApiClient(url, clusterProperties.getKsqldbServerAuth(), clusterProperties.getSsl(), clusterProperties.getKsqldbServerSsl(), this.webClientMaxBuffSize), (Predicate)ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, (String)"No live ksqldb instances available", (Duration)ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS);
    }

    private List<String> parseUrlList(String url) {
        return Stream.of(url.split(",")).map(String::trim).filter(s -> !s.isBlank()).toList();
    }

    private boolean metricsConfigured(ClustersProperties.Cluster clusterProperties) {
        return clusterProperties.getMetrics() != null;
    }

    @Nullable
    private MetricsConfig metricsConfigDataToMetricsConfig(ClustersProperties.MetricsConfigData metricsConfigData) {
        if (metricsConfigData == null) {
            return null;
        }
        MetricsConfig.MetricsConfigBuilder builder = MetricsConfig.builder();
        builder.type(metricsConfigData.getType());
        builder.port(metricsConfigData.getPort());
        builder.ssl(Optional.ofNullable(metricsConfigData.getSsl()).orElse(false).booleanValue());
        builder.username(metricsConfigData.getUsername());
        builder.password(metricsConfigData.getPassword());
        builder.keystoreLocation(metricsConfigData.getKeystoreLocation());
        builder.keystorePassword(metricsConfigData.getKeystorePassword());
        return builder.build();
    }
}

