/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.internals;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.DescribeProducersOptions;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.DescribeProducersResult;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.ProducerState;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.internals.AdminApiFuture;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.internals.AdminApiHandler;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.internals.StaticBrokerStrategy;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.Node;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.TopicPartition;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.InvalidTopicException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.TopicAuthorizationException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.message.DescribeProducersRequestData;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.message.DescribeProducersResponseData;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.protocol.Errors;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.AbstractResponse;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.ApiError;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.DescribeProducersRequest;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.DescribeProducersResponse;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.CollectionUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.LogContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;

public class DescribeProducersHandler
extends AdminApiHandler.Batched<TopicPartition, DescribeProducersResult.PartitionProducerState> {
    private final Logger log;
    private final DescribeProducersOptions options;
    private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;

    public DescribeProducersHandler(DescribeProducersOptions options, LogContext logContext) {
        this.options = options;
        this.log = logContext.logger(DescribeProducersHandler.class);
        this.lookupStrategy = options.brokerId().isPresent() ? new StaticBrokerStrategy<TopicPartition>(options.brokerId().getAsInt()) : new PartitionLeaderStrategy(logContext);
    }

    public static AdminApiFuture.SimpleAdminApiFuture<TopicPartition, DescribeProducersResult.PartitionProducerState> newFuture(Collection<TopicPartition> topicPartitions) {
        return AdminApiFuture.forKeys(new HashSet<TopicPartition>(topicPartitions));
    }

    @Override
    public String apiName() {
        return "describeProducers";
    }

    @Override
    public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
        return this.lookupStrategy;
    }

    public DescribeProducersRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition> topicPartitions) {
        DescribeProducersRequestData request = new DescribeProducersRequestData();
        DescribeProducersRequest.Builder builder = new DescribeProducersRequest.Builder(request);
        CollectionUtils.groupPartitionsByTopic(topicPartitions, builder::addTopic, (topicRequest, partitionId) -> topicRequest.partitionIndexes().add((Integer)partitionId));
        return builder;
    }

    private void handlePartitionError(TopicPartition topicPartition, ApiError apiError, Map<TopicPartition, Throwable> failed, List<TopicPartition> unmapped) {
        switch (apiError.error()) {
            case NOT_LEADER_OR_FOLLOWER: {
                if (this.options.brokerId().isPresent()) {
                    int brokerId = this.options.brokerId().getAsInt();
                    this.log.error("Not leader error in `DescribeProducers` response for partition {} for brokerId {} set in options", new Object[]{topicPartition, brokerId, apiError.exception()});
                    failed.put(topicPartition, apiError.error().exception("Failed to describe active producers for partition " + topicPartition + " on brokerId " + brokerId));
                    break;
                }
                this.log.debug("Not leader error in `DescribeProducers` response for partition {}. Will retry later.", (Object)topicPartition);
                unmapped.add(topicPartition);
                break;
            }
            case UNKNOWN_TOPIC_OR_PARTITION: {
                this.log.debug("Unknown topic/partition error in `DescribeProducers` response for partition {}. Will retry later.", (Object)topicPartition);
                break;
            }
            case INVALID_TOPIC_EXCEPTION: {
                this.log.error("Invalid topic in `DescribeProducers` response for partition {}", (Object)topicPartition, (Object)apiError.exception());
                failed.put(topicPartition, new InvalidTopicException("Failed to fetch metadata for partition " + topicPartition + " due to invalid topic error: " + apiError.messageWithFallback(), Collections.singleton(topicPartition.topic())));
                break;
            }
            case TOPIC_AUTHORIZATION_FAILED: {
                this.log.error("Authorization failed in `DescribeProducers` response for partition {}", (Object)topicPartition, (Object)apiError.exception());
                failed.put(topicPartition, new TopicAuthorizationException("Failed to describe active producers for partition " + topicPartition + " due to authorization failure on topic `" + topicPartition.topic() + "`", Collections.singleton(topicPartition.topic())));
                break;
            }
            default: {
                this.log.error("Unexpected error in `DescribeProducers` response for partition {}", (Object)topicPartition, (Object)apiError.exception());
                failed.put(topicPartition, apiError.error().exception("Failed to describe active producers for partition " + topicPartition + " due to unexpected error"));
            }
        }
    }

    @Override
    public AdminApiHandler.ApiResult<TopicPartition, DescribeProducersResult.PartitionProducerState> handleResponse(Node broker, Set<TopicPartition> keys, AbstractResponse abstractResponse) {
        DescribeProducersResponse response = (DescribeProducersResponse)abstractResponse;
        HashMap<TopicPartition, DescribeProducersResult.PartitionProducerState> completed = new HashMap<TopicPartition, DescribeProducersResult.PartitionProducerState>();
        HashMap<TopicPartition, Throwable> failed = new HashMap<TopicPartition, Throwable>();
        ArrayList<TopicPartition> unmapped = new ArrayList<TopicPartition>();
        for (DescribeProducersResponseData.TopicResponse topicResponse : response.data().topics()) {
            for (DescribeProducersResponseData.PartitionResponse partitionResponse : topicResponse.partitions()) {
                TopicPartition topicPartition = new TopicPartition(topicResponse.name(), partitionResponse.partitionIndex());
                Errors error = Errors.forCode(partitionResponse.errorCode());
                if (error != Errors.NONE) {
                    ApiError apiError = new ApiError(error, partitionResponse.errorMessage());
                    this.handlePartitionError(topicPartition, apiError, failed, unmapped);
                    continue;
                }
                List<ProducerState> activeProducers = partitionResponse.activeProducers().stream().map(activeProducer -> {
                    OptionalLong currentTransactionFirstOffset = activeProducer.currentTxnStartOffset() < 0L ? OptionalLong.empty() : OptionalLong.of(activeProducer.currentTxnStartOffset());
                    OptionalInt coordinatorEpoch = activeProducer.coordinatorEpoch() < 0 ? OptionalInt.empty() : OptionalInt.of(activeProducer.coordinatorEpoch());
                    return new ProducerState(activeProducer.producerId(), activeProducer.producerEpoch(), activeProducer.lastSequence(), activeProducer.lastTimestamp(), coordinatorEpoch, currentTransactionFirstOffset);
                }).collect(Collectors.toList());
                completed.put(topicPartition, new DescribeProducersResult.PartitionProducerState(activeProducers));
            }
        }
        return new AdminApiHandler.ApiResult<TopicPartition, DescribeProducersResult.PartitionProducerState>(completed, failed, unmapped);
    }
}

