/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.util;

import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.util.ReactiveFailover;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.net.ssl.TrustManagerFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ResourceUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/*
 * Exception performing whole class analysis ignored.
 */
public final class KafkaServicesValidation {
    private static final Logger log = LoggerFactory.getLogger(KafkaServicesValidation.class);

    private KafkaServicesValidation() {
    }

    private static Mono<ApplicationPropertyValidationDTO> valid() {
        return Mono.just((Object)new ApplicationPropertyValidationDTO().error(Boolean.valueOf(false)));
    }

    private static Mono<ApplicationPropertyValidationDTO> invalid(String errorMsg) {
        return Mono.just((Object)new ApplicationPropertyValidationDTO().error(Boolean.valueOf(true)).errorMessage(errorMsg));
    }

    private static Mono<ApplicationPropertyValidationDTO> invalid(Throwable th) {
        return Mono.just((Object)new ApplicationPropertyValidationDTO().error(Boolean.valueOf(true)).errorMessage(th.getMessage()));
    }

    public static Optional<String> validateTruststore(ClustersProperties.TruststoreConfig truststoreConfig) {
        if (truststoreConfig.getTruststoreLocation() != null && truststoreConfig.getTruststorePassword() != null) {
            try (FileInputStream fileInputStream = new FileInputStream(ResourceUtils.getFile((String)truststoreConfig.getTruststoreLocation()));){
                KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
                trustStore.load(fileInputStream, truststoreConfig.getTruststorePassword().toCharArray());
                TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
                trustManagerFactory.init(trustStore);
            }
            catch (Exception e) {
                return Optional.of(e.getMessage());
            }
        }
        return Optional.empty();
    }

    public static Mono<ApplicationPropertyValidationDTO> validateClusterConnection(String bootstrapServers, Properties clusterProps, @Nullable ClustersProperties.TruststoreConfig ssl) {
        Properties properties = new Properties();
        SslPropertiesUtil.addKafkaSslProperties((ClustersProperties.TruststoreConfig)ssl, (Properties)properties);
        properties.putAll((Map<?, ?>)clusterProps);
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("retries", (Object)1);
        properties.put("request.timeout.ms", (Object)5000);
        properties.put("default.api.timeout.ms", (Object)5000);
        properties.put("client.id", "kui-admin-client-validation-" + System.currentTimeMillis());
        AdminClient adminClient = null;
        try {
            adminClient = AdminClient.create((Properties)properties);
        }
        catch (Exception e) {
            log.error("Error creating admin client during validation", (Throwable)e);
            return KafkaServicesValidation.invalid((String)"Error while creating AdminClient. See logs for details.");
        }
        return Mono.just((Object)adminClient).then(ReactiveAdminClient.toMono((KafkaFuture)adminClient.listTopics().names())).then(KafkaServicesValidation.valid()).doOnTerminate(() -> ((AdminClient)adminClient).close()).onErrorResume(th -> {
            log.error("Error connecting to cluster", th);
            return KafkaServicesValidation.invalid((String)"Error connecting to cluster. See logs for details.");
        });
    }

    public static Mono<ApplicationPropertyValidationDTO> validateSchemaRegistry(Supplier<ReactiveFailover<KafkaSrClientApi>> clientSupplier) {
        ReactiveFailover<KafkaSrClientApi> client;
        try {
            client = clientSupplier.get();
        }
        catch (Exception e) {
            log.error("Error creating Schema Registry client", (Throwable)e);
            return KafkaServicesValidation.invalid((String)("Error creating Schema Registry client: " + e.getMessage()));
        }
        return client.mono(KafkaSrClientApi::getGlobalCompatibilityLevel).then(KafkaServicesValidation.valid()).onErrorResume(KafkaServicesValidation::invalid);
    }

    public static Mono<ApplicationPropertyValidationDTO> validateConnect(Supplier<ReactiveFailover<KafkaConnectClientApi>> clientSupplier) {
        ReactiveFailover<KafkaConnectClientApi> client;
        try {
            client = clientSupplier.get();
        }
        catch (Exception e) {
            log.error("Error creating Connect client", (Throwable)e);
            return KafkaServicesValidation.invalid((String)("Error creating Connect client: " + e.getMessage()));
        }
        return client.flux(KafkaConnectClientApi::getConnectorPlugins).collectList().then(KafkaServicesValidation.valid()).onErrorResume(KafkaServicesValidation::invalid);
    }

    public static Mono<ApplicationPropertyValidationDTO> validateKsql(Supplier<ReactiveFailover<KsqlApiClient>> clientSupplier) {
        ReactiveFailover<KsqlApiClient> client;
        try {
            client = clientSupplier.get();
        }
        catch (Exception e) {
            log.error("Error creating Ksql client", (Throwable)e);
            return KafkaServicesValidation.invalid((String)("Error creating Ksql client: " + e.getMessage()));
        }
        return client.flux(c -> c.execute("SHOW VARIABLES;", Map.of())).collectList().flatMap(ksqlResults -> Flux.fromIterable((Iterable)ksqlResults).filter(KsqlApiClient.KsqlResponseTable::isError).flatMap(err -> KafkaServicesValidation.invalid((String)("Error response from ksql: " + err))).next().switchIfEmpty(KafkaServicesValidation.valid())).onErrorResume(KafkaServicesValidation::invalid);
    }
}

