/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.emitter.AbstractEmitter;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.MessagesProcessing;
import com.provectus.kafka.ui.emitter.PolledRecords;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.RangePollingEmitter;
import com.provectus.kafka.ui.emitter.SeekOperations;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxSink;

abstract class RangePollingEmitter
extends AbstractEmitter {
    private static final Logger log = LoggerFactory.getLogger(RangePollingEmitter.class);
    private final Supplier<EnhancedConsumer> consumerSupplier;
    protected final ConsumerPosition consumerPosition;
    protected final int messagesPerPage;

    protected RangePollingEmitter(Supplier<EnhancedConsumer> consumerSupplier, ConsumerPosition consumerPosition, int messagesPerPage, MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
        super(messagesProcessing, pollingSettings);
        this.consumerPosition = consumerPosition;
        this.messagesPerPage = messagesPerPage;
        this.consumerSupplier = consumerSupplier;
    }

    protected abstract TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> var1, SeekOperations var2);

    public void accept(FluxSink<TopicMessageEventDTO> sink) {
        log.debug("Starting polling for {}", (Object)this.consumerPosition);
        try (EnhancedConsumer consumer = (EnhancedConsumer)this.consumerSupplier.get();){
            this.sendPhase(sink, "Consumer created");
            SeekOperations seekOperations = SeekOperations.create((Consumer)consumer, (ConsumerPosition)this.consumerPosition);
            TreeMap pollRange = this.nextPollingRange(new TreeMap(), seekOperations);
            log.debug("Starting from offsets {}", (Object)pollRange);
            while (!(sink.isCancelled() || pollRange.isEmpty() || this.sendLimitReached())) {
                List polled = this.poll(consumer, sink, pollRange);
                this.send(sink, (Iterable)polled);
                pollRange = this.nextPollingRange(pollRange, seekOperations);
            }
            if (sink.isCancelled()) {
                log.debug("Polling finished due to sink cancellation");
            }
            this.sendFinishStatsAndCompleteSink(sink);
            log.debug("Polling finished");
        }
        catch (InterruptException kafkaInterruptException) {
            log.debug("Polling finished due to thread interruption");
            sink.complete();
        }
        catch (Exception e) {
            log.error("Error occurred while consuming records", (Throwable)e);
            sink.error((Throwable)e);
        }
    }

    private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer, FluxSink<TopicMessageEventDTO> sink, TreeMap<TopicPartition, FromToOffset> range) {
        log.trace("Polling range {}", range);
        this.sendPhase(sink, "Polling partitions: %s".formatted(range.keySet().stream().map(TopicPartition::partition).sorted().toList()));
        consumer.assign(range.keySet());
        range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
        ArrayList<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
        while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
            PolledRecords polledRecords = this.poll(sink, consumer);
            range.forEach((tp, fromTo) -> {
                polledRecords.records(tp).stream().filter(r -> r.offset() < fromTo.to).forEach(result::add);
                if (consumer.position(tp) >= fromTo.to) {
                    consumer.pause(List.of(tp));
                }
            });
        }
        consumer.resume((Collection)consumer.paused());
        return result;
    }
}

