/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service.ksql;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.provectus.kafka.ui.exception.KsqlApiException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;

@Service
public class KsqlServiceV2 {
    private static final Logger log = LoggerFactory.getLogger(KsqlServiceV2.class);
    private final Cache<String, KsqlExecuteCommand> registeredCommands = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.MINUTES).build();

    public String registerCommand(KafkaCluster cluster, String ksql, Map<String, String> streamProperties) {
        String uuid = UUID.randomUUID().toString();
        this.registeredCommands.put((Object)uuid, (Object)new KsqlExecuteCommand(cluster, ksql, streamProperties));
        return uuid;
    }

    public Flux<KsqlApiClient.KsqlResponseTable> execute(String commandId) {
        KsqlExecuteCommand cmd = (KsqlExecuteCommand)this.registeredCommands.getIfPresent((Object)commandId);
        if (cmd == null) {
            throw new ValidationException("No command registered with id " + commandId);
        }
        this.registeredCommands.invalidate((Object)commandId);
        return cmd.cluster.getKsqlClient().flux(client -> client.execute(cmd.ksql, cmd.streamProperties));
    }

    public Flux<KsqlTableDescriptionDTO> listTables(KafkaCluster cluster) {
        return cluster.getKsqlClient().flux(client -> client.execute("LIST TABLES;", Map.of())).flatMap(resp -> {
            if (!resp.getHeader().equals("Tables")) {
                log.error("Unexpected result header: {}", (Object)resp.getHeader());
                log.debug("Unexpected result {}", resp);
                return Flux.error((Throwable)new KsqlApiException("Error retrieving tables list"));
            }
            return Flux.fromIterable((Iterable)resp.getValues().stream().map(row -> new KsqlTableDescriptionDTO().name((String)resp.getColumnValue(row, "name").map(JsonNode::asText).orElse(null)).topic((String)resp.getColumnValue(row, "topic").map(JsonNode::asText).orElse(null)).keyFormat((String)resp.getColumnValue(row, "keyFormat").map(JsonNode::asText).orElse(null)).valueFormat((String)resp.getColumnValue(row, "valueFormat").map(JsonNode::asText).orElse(null)).isWindowed((Boolean)resp.getColumnValue(row, "isWindowed").map(JsonNode::asBoolean).orElse(null))).collect(Collectors.toList()));
        });
    }

    public Flux<KsqlStreamDescriptionDTO> listStreams(KafkaCluster cluster) {
        return cluster.getKsqlClient().flux(client -> client.execute("LIST STREAMS;", Map.of())).flatMap(resp -> {
            if (!resp.getHeader().equals("Streams")) {
                log.error("Unexpected result header: {}", (Object)resp.getHeader());
                log.debug("Unexpected result {}", resp);
                return Flux.error((Throwable)new KsqlApiException("Error retrieving streams list"));
            }
            return Flux.fromIterable((Iterable)resp.getValues().stream().map(row -> new KsqlStreamDescriptionDTO().name((String)resp.getColumnValue(row, "name").map(JsonNode::asText).orElse(null)).topic((String)resp.getColumnValue(row, "topic").map(JsonNode::asText).orElse(null)).keyFormat((String)resp.getColumnValue(row, "keyFormat").map(JsonNode::asText).orElse(null)).valueFormat((String)resp.getColumnValue(row, "valueFormat").or(() -> resp.getColumnValue(row, "format")).map(JsonNode::asText).orElse(null))).collect(Collectors.toList()));
        });
    }
}

