/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service.audit;

import com.google.common.annotations.VisibleForTesting;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ClustersStorage;
import com.provectus.kafka.ui.service.MessagesService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import com.provectus.kafka.ui.service.audit.AuditWriter;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;

/*
 * Exception performing whole class analysis ignored.
 */
@Service
public class AuditService
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(AuditService.class);
    private static final Mono<AuthenticatedUser> NO_AUTH_USER = Mono.just((Object)new AuthenticatedUser("Unknown", Set.of()));
    private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5L);
    private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
    private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
    private static final Map<String, String> DEFAULT_AUDIT_TOPIC_CONFIG = Map.of("retention.ms", String.valueOf(TimeUnit.DAYS.toMillis(7L)), "cleanup.policy", "delete");
    private static final Map<String, Object> AUDIT_PRODUCER_CONFIG = Map.of("compression.type", "gzip");
    private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger((String)"audit");
    private final Map<String, AuditWriter> auditWriters;

    @Autowired
    public AuditService(AdminClientService adminClientService, ClustersStorage clustersStorage) {
        HashMap auditWriters = new HashMap();
        for (KafkaCluster cluster : clustersStorage.getKafkaClusters()) {
            Supplier<ReactiveAdminClient> adminClientSupplier = () -> (ReactiveAdminClient)adminClientService.get(cluster).block(BLOCK_TIMEOUT);
            AuditService.createAuditWriter((KafkaCluster)cluster, adminClientSupplier, () -> MessagesService.createProducer((KafkaCluster)cluster, (Map)AUDIT_PRODUCER_CONFIG)).ifPresent(writer -> auditWriters.put(cluster.getName(), writer));
        }
        this.auditWriters = auditWriters;
    }

    @VisibleForTesting
    AuditService(Map<String, AuditWriter> auditWriters) {
        this.auditWriters = auditWriters;
    }

    @VisibleForTesting
    static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster, Supplier<ReactiveAdminClient> acSupplier, Supplier<KafkaProducer<byte[], byte[]>> producerFactory) {
        ClustersProperties.AuditProperties auditProps = cluster.getOriginalProperties().getAudit();
        if (auditProps == null) {
            return Optional.empty();
        }
        boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
        boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
        boolean alterLogOnly = Optional.ofNullable(auditProps.getLevel()).map(lvl -> lvl == ClustersProperties.AuditProperties.LogLevel.ALTER_ONLY).orElse(true);
        if (!topicAudit && !consoleAudit) {
            return Optional.empty();
        }
        if (!topicAudit) {
            log.info("Audit initialization finished for cluster '{}' (console only)", (Object)cluster.getName());
            return Optional.of(AuditService.consoleOnlyWriter((KafkaCluster)cluster, (boolean)alterLogOnly));
        }
        String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse("__kui-audit-log");
        boolean topicAuditCanBeDone = AuditService.createTopicIfNeeded((KafkaCluster)cluster, acSupplier, (String)auditTopicName, (ClustersProperties.AuditProperties)auditProps);
        if (!topicAuditCanBeDone) {
            if (consoleAudit) {
                log.info("Audit initialization finished for cluster '{}' (console only, topic audit init failed)", (Object)cluster.getName());
                return Optional.of(AuditService.consoleOnlyWriter((KafkaCluster)cluster, (boolean)alterLogOnly));
            }
            return Optional.empty();
        }
        log.info("Audit initialization finished for cluster '{}'", (Object)cluster.getName());
        return Optional.of(new AuditWriter(cluster.getName(), alterLogOnly, auditTopicName, producerFactory.get(), (Logger)(consoleAudit ? AUDIT_LOGGER : null)));
    }

    private static AuditWriter consoleOnlyWriter(KafkaCluster cluster, boolean alterLogOnly) {
        return new AuditWriter(cluster.getName(), alterLogOnly, null, null, AUDIT_LOGGER);
    }

    private static boolean createTopicIfNeeded(KafkaCluster cluster, Supplier<ReactiveAdminClient> acSupplier, String auditTopicName, ClustersProperties.AuditProperties auditProps) {
        boolean topicExists;
        ReactiveAdminClient ac;
        try {
            ac = acSupplier.get();
        }
        catch (Exception e) {
            AuditService.printAuditInitError((KafkaCluster)cluster, (String)"Error while connecting to the cluster", (Exception)e);
            return false;
        }
        try {
            topicExists = ((Set)ac.listTopics(true).block(BLOCK_TIMEOUT)).contains(auditTopicName);
        }
        catch (Exception e) {
            AuditService.printAuditInitError((KafkaCluster)cluster, (String)"Error checking audit topic existence", (Exception)e);
            return false;
        }
        if (topicExists) {
            return true;
        }
        try {
            int topicPartitions = Optional.ofNullable(auditProps.getAuditTopicsPartitions()).orElse(1);
            HashMap topicConfig = new HashMap(DEFAULT_AUDIT_TOPIC_CONFIG);
            Optional.ofNullable(auditProps.getAuditTopicProperties()).ifPresent(topicConfig::putAll);
            log.info("Creating audit topic '{}' for cluster '{}'", (Object)auditTopicName, (Object)cluster.getName());
            ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block(BLOCK_TIMEOUT);
            log.info("Audit topic created for cluster '{}'", (Object)cluster.getName());
            return true;
        }
        catch (Exception e) {
            AuditService.printAuditInitError((KafkaCluster)cluster, (String)"Error creating topic '%s'".formatted(auditTopicName), (Exception)e);
            return false;
        }
    }

    private static void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
        log.error("-----------------------------------------------------------------");
        log.error("Error initializing Audit for cluster '{}'. Audit will be disabled. See error below: ", (Object)cluster.getName());
        log.error("{}", (Object)errorMsg, (Object)cause);
        log.error("-----------------------------------------------------------------");
    }

    public boolean isAuditTopic(KafkaCluster cluster, String topic) {
        AuditWriter writer = (AuditWriter)this.auditWriters.get(cluster.getName());
        return writer != null && topic.equals(writer.targetTopic()) && writer.isTopicWritingEnabled();
    }

    public void audit(AccessContext acxt, Signal<?> sig) {
        if (sig.isOnComplete()) {
            this.extractUser(sig).doOnNext(u -> this.sendAuditRecord(acxt, u)).subscribe();
        } else if (sig.isOnError()) {
            this.extractUser(sig).doOnNext(u -> this.sendAuditRecord(acxt, u, sig.getThrowable())).subscribe();
        }
    }

    private Mono<AuthenticatedUser> extractUser(Signal<?> sig) {
        Class<SecurityContext> key = SecurityContext.class;
        if (sig.getContextView().hasKey(key)) {
            return ((Mono)sig.getContextView().get(key)).map(context -> context.getAuthentication().getPrincipal()).cast(UserDetails.class).map(user -> {
                Set roles = user.getAuthorities().stream().map(GrantedAuthority::getAuthority).collect(Collectors.toSet());
                return new AuthenticatedUser(user.getUsername(), roles);
            }).switchIfEmpty(NO_AUTH_USER);
        }
        return NO_AUTH_USER;
    }

    private void sendAuditRecord(AccessContext ctx, AuthenticatedUser user) {
        this.sendAuditRecord(ctx, user, null);
    }

    private void sendAuditRecord(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
        try {
            if (ctx.getCluster() != null) {
                AuditWriter writer = (AuditWriter)this.auditWriters.get(ctx.getCluster());
                if (writer != null) {
                    writer.write(ctx, user, th);
                }
            } else {
                AuditWriter.writeAppOperation((Logger)AUDIT_LOGGER, (AccessContext)ctx, (AuthenticatedUser)user, (Throwable)th);
            }
        }
        catch (Exception e) {
            log.warn("Error sending audit record", (Throwable)e);
        }
    }

    @Override
    public void close() throws IOException {
        this.auditWriters.values().forEach(AuditWriter::close);
    }
}

