/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.MessagesProcessing;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.RangePollingEmitter;
import com.provectus.kafka.ui.emitter.SeekOperations;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;

public class BackwardEmitter
extends RangePollingEmitter {
    public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier, ConsumerPosition consumerPosition, int messagesPerPage, ConsumerRecordDeserializer deserializer, Predicate<TopicMessageDTO> filter, PollingSettings pollingSettings) {
        super(consumerSupplier, consumerPosition, messagesPerPage, new MessagesProcessing(deserializer, filter, false, Integer.valueOf(messagesPerPage)), pollingSettings);
    }

    protected TreeMap<TopicPartition, RangePollingEmitter.FromToOffset> nextPollingRange(TreeMap<TopicPartition, RangePollingEmitter.FromToOffset> prevRange, SeekOperations seekOperations) {
        TreeMap<TopicPartition, Long> readToOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
        if (prevRange.isEmpty()) {
            readToOffsets.putAll(seekOperations.getOffsetsForSeek());
        } else {
            readToOffsets.putAll(prevRange.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((RangePollingEmitter.FromToOffset)e.getValue()).from())));
        }
        int msgsToPollPerPartition = (int)Math.ceil((double)this.messagesPerPage / (double)readToOffsets.size());
        TreeMap<TopicPartition, RangePollingEmitter.FromToOffset> result = new TreeMap<TopicPartition, RangePollingEmitter.FromToOffset>(Comparator.comparingInt(TopicPartition::partition));
        readToOffsets.forEach((tp, toOffset) -> {
            long tpStartOffset = (Long)seekOperations.getBeginOffsets().get(tp);
            if (toOffset > tpStartOffset) {
                result.put((TopicPartition)tp, new RangePollingEmitter.FromToOffset(Math.max(tpStartOffset, toOffset - (long)msgsToPollPerPartition), toOffset.longValue()));
            }
        });
        return result;
    }
}

