/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service.acl;

import com.google.common.collect.Sets;
import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
import com.provectus.kafka.ui.model.CreateProducerAclDTO;
import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import com.provectus.kafka.ui.service.acl.AclCsv;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class AclsService {
    private static final Logger log = LoggerFactory.getLogger(AclsService.class);
    private final AdminClientService adminClientService;

    public Mono<Void> createAcl(KafkaCluster cluster, AclBinding aclBinding) {
        return this.adminClientService.get(cluster).flatMap(ac -> this.createAclsWithLogging(ac, List.of(aclBinding)));
    }

    private Mono<Void> createAclsWithLogging(ReactiveAdminClient ac, Collection<AclBinding> bindings) {
        bindings.forEach(b -> log.info("CREATING ACL: [{}]", (Object)AclCsv.createAclString((AclBinding)b)));
        return ac.createAcls(bindings).doOnSuccess(v -> bindings.forEach(b -> log.info("ACL CREATED: [{}]", (Object)AclCsv.createAclString((AclBinding)b))));
    }

    public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
        String aclString = AclCsv.createAclString((AclBinding)aclBinding);
        log.info("DELETING ACL: [{}]", (Object)aclString);
        return this.adminClientService.get(cluster).flatMap(ac -> ac.deleteAcls(List.of(aclBinding))).doOnSuccess(v -> log.info("ACL DELETED: [{}]", (Object)aclString));
    }

    public Flux<AclBinding> listAcls(KafkaCluster cluster, ResourcePatternFilter filter) {
        return this.adminClientService.get(cluster).flatMap(c -> c.listAcls(filter)).flatMapIterable(acls -> acls).sort(Comparator.comparing(AclBinding::toString));
    }

    public Mono<String> getAclAsCsvString(KafkaCluster cluster) {
        return this.adminClientService.get(cluster).flatMap(c -> c.listAcls(ResourcePatternFilter.ANY)).map(AclCsv::transformToCsvString);
    }

    public Mono<Void> syncAclWithAclCsv(KafkaCluster cluster, String csv) {
        return this.adminClientService.get(cluster).flatMap(ac -> ac.listAcls(ResourcePatternFilter.ANY).flatMap(existingAclList -> {
            Set existingSet = Set.copyOf(existingAclList);
            Set newAcls = Set.copyOf(AclCsv.parseCsv((String)csv));
            Sets.SetView toDelete = Sets.difference(existingSet, newAcls);
            Sets.SetView toAdd = Sets.difference(newAcls, existingSet);
            this.logAclSyncPlan(cluster, (Set)toAdd, (Set)toDelete);
            if (toAdd.isEmpty() && toDelete.isEmpty()) {
                return Mono.empty();
            }
            log.info("Starting new ACLs creation");
            return ac.createAcls((Collection)toAdd).doOnSuccess(v -> {
                log.info("{} new ACLs created", (Object)toAdd.size());
                log.info("Starting ACLs deletion");
            }).then(ac.deleteAcls((Collection)toDelete).doOnSuccess(v -> log.info("{} ACLs deleted", (Object)toDelete.size())));
        }));
    }

    private void logAclSyncPlan(KafkaCluster cluster, Set<AclBinding> toBeAdded, Set<AclBinding> toBeDeleted) {
        log.info("'{}' cluster ACL sync plan: ", (Object)cluster.getName());
        if (toBeAdded.isEmpty() && toBeDeleted.isEmpty()) {
            log.info("Nothing to do, ACL is already in sync");
            return;
        }
        if (!toBeAdded.isEmpty()) {
            log.info("ACLs to be added ({}): ", (Object)toBeAdded.size());
            for (AclBinding aclBinding : toBeAdded) {
                log.info(" " + AclCsv.createAclString((AclBinding)aclBinding));
            }
        }
        if (!toBeDeleted.isEmpty()) {
            log.info("ACLs to be deleted ({}): ", (Object)toBeDeleted.size());
            for (AclBinding aclBinding : toBeDeleted) {
                log.info(" " + AclCsv.createAclString((AclBinding)aclBinding));
            }
        }
    }

    private List<AclBinding> createAllowBindings(ResourceType resourceType, List<AclOperation> opsToAllow, String principal, String host, @Nullable String resourcePrefix, @Nullable Collection<String> resourceNames) {
        ArrayList<AclBinding> bindings = new ArrayList<AclBinding>();
        if (resourcePrefix != null) {
            for (AclOperation op : opsToAllow) {
                bindings.add(new AclBinding(new ResourcePattern(resourceType, resourcePrefix, PatternType.PREFIXED), new AccessControlEntry(principal, host, op, AclPermissionType.ALLOW)));
            }
        }
        if (!CollectionUtils.isEmpty(resourceNames)) {
            resourceNames.stream().distinct().forEach(resource -> opsToAllow.forEach(op -> bindings.add(new AclBinding(new ResourcePattern(resourceType, resource, PatternType.LITERAL), new AccessControlEntry(principal, host, op, AclPermissionType.ALLOW)))));
        }
        return bindings;
    }

    public Mono<Void> createConsumerAcl(KafkaCluster cluster, CreateConsumerAclDTO request) {
        return this.adminClientService.get(cluster).flatMap(ac -> this.createAclsWithLogging(ac, (Collection)this.createConsumerBindings(request))).then();
    }

    private List<AclBinding> createConsumerBindings(CreateConsumerAclDTO request) {
        ArrayList<AclBinding> bindings = new ArrayList<AclBinding>();
        bindings.addAll(this.createAllowBindings(ResourceType.TOPIC, List.of(AclOperation.READ, AclOperation.DESCRIBE), request.getPrincipal(), request.getHost(), request.getTopicsPrefix(), (Collection)request.getTopics()));
        bindings.addAll(this.createAllowBindings(ResourceType.GROUP, List.of(AclOperation.READ), request.getPrincipal(), request.getHost(), request.getConsumerGroupsPrefix(), (Collection)request.getConsumerGroups()));
        return bindings;
    }

    public Mono<Void> createProducerAcl(KafkaCluster cluster, CreateProducerAclDTO request) {
        return this.adminClientService.get(cluster).flatMap(ac -> this.createAclsWithLogging(ac, (Collection)this.createProducerBindings(request))).then();
    }

    private List<AclBinding> createProducerBindings(CreateProducerAclDTO request) {
        ArrayList<AclBinding> bindings = new ArrayList<AclBinding>();
        bindings.addAll(this.createAllowBindings(ResourceType.TOPIC, List.of(AclOperation.WRITE, AclOperation.DESCRIBE, AclOperation.CREATE), request.getPrincipal(), request.getHost(), request.getTopicsPrefix(), (Collection)request.getTopics()));
        bindings.addAll(this.createAllowBindings(ResourceType.TRANSACTIONAL_ID, List.of(AclOperation.WRITE, AclOperation.DESCRIBE), request.getPrincipal(), request.getHost(), request.getTransactionsIdPrefix(), (Collection)Optional.ofNullable(request.getTransactionalId()).map(List::of).orElse(null)));
        if (Boolean.TRUE.equals(request.getIdempotent())) {
            bindings.addAll(this.createAllowBindings(ResourceType.CLUSTER, List.of(AclOperation.IDEMPOTENT_WRITE), request.getPrincipal(), request.getHost(), null, List.of("kafka-cluster")));
        }
        return bindings;
    }

    public Mono<Void> createStreamAppAcl(KafkaCluster cluster, CreateStreamAppAclDTO request) {
        return this.adminClientService.get(cluster).flatMap(ac -> this.createAclsWithLogging(ac, (Collection)this.createStreamAppBindings(request))).then();
    }

    private List<AclBinding> createStreamAppBindings(CreateStreamAppAclDTO request) {
        ArrayList<AclBinding> bindings = new ArrayList<AclBinding>();
        bindings.addAll(this.createAllowBindings(ResourceType.TOPIC, List.of(AclOperation.READ), request.getPrincipal(), request.getHost(), null, (Collection)request.getInputTopics()));
        bindings.addAll(this.createAllowBindings(ResourceType.TOPIC, List.of(AclOperation.WRITE), request.getPrincipal(), request.getHost(), null, (Collection)request.getOutputTopics()));
        bindings.addAll(this.createAllowBindings(ResourceType.GROUP, List.of(AclOperation.ALL), request.getPrincipal(), request.getHost(), request.getApplicationId(), null));
        bindings.addAll(this.createAllowBindings(ResourceType.TOPIC, List.of(AclOperation.ALL), request.getPrincipal(), request.getHost(), request.getApplicationId(), null));
        return bindings;
    }

    public AclsService(AdminClientService adminClientService) {
        this.adminClientService = adminClientService;
    }
}

