/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service;

import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.io.Closeable;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.admin.AdminClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
public class AdminClientServiceImpl
implements AdminClientService,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(AdminClientServiceImpl.class);
    private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30000;
    private static final AtomicLong CLIENT_ID_SEQ = new AtomicLong();
    private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap();
    private final int clientTimeout;

    public AdminClientServiceImpl(ClustersProperties clustersProperties) {
        this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout()).orElse(30000);
    }

    public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
        return Mono.justOrEmpty((Object)((ReactiveAdminClient)this.adminClientCache.get(cluster.getName()))).switchIfEmpty(this.createAdminClient(cluster)).map(e -> this.adminClientCache.computeIfAbsent(cluster.getName(), key -> e));
    }

    private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
        return Mono.fromSupplier(() -> {
            Properties properties = new Properties();
            SslPropertiesUtil.addKafkaSslProperties((ClustersProperties.TruststoreConfig)cluster.getOriginalProperties().getSsl(), (Properties)properties);
            properties.putAll((Map<?, ?>)cluster.getProperties());
            properties.put("bootstrap.servers", cluster.getBootstrapServers());
            properties.putIfAbsent("request.timeout.ms", (Object)this.clientTimeout);
            properties.putIfAbsent("client.id", "kafka-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet());
            return AdminClient.create((Properties)properties);
        }).flatMap(ac -> ReactiveAdminClient.create((AdminClient)ac).doOnError(th -> ac.close())).onErrorMap(th -> new IllegalStateException("Error while creating AdminClient for Cluster " + cluster.getName(), (Throwable)th));
    }

    @Override
    public void close() {
        this.adminClientCache.values().forEach(ReactiveAdminClient::close);
    }
}

