/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.MessagesProcessing;
import com.provectus.kafka.ui.emitter.PolledRecords;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;

abstract class AbstractEmitter
implements Consumer<FluxSink<TopicMessageEventDTO>> {
    private final MessagesProcessing messagesProcessing;
    private final PollingSettings pollingSettings;

    protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
        this.messagesProcessing = messagesProcessing;
        this.pollingSettings = pollingSettings;
    }

    protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
        PolledRecords records = consumer.pollEnhanced(this.pollingSettings.getPollTimeout());
        this.sendConsuming(sink, records);
        return records;
    }

    protected boolean sendLimitReached() {
        return this.messagesProcessing.limitReached();
    }

    protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
        this.messagesProcessing.send(sink, records);
    }

    protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
        this.messagesProcessing.sendPhase(sink, name);
    }

    protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
        this.messagesProcessing.sentConsumingInfo(sink, records);
    }

    protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
        this.messagesProcessing.sendFinishEvent(sink);
        sink.complete();
    }
}

