/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.MessagesProcessing;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.RangePollingEmitter;
import com.provectus.kafka.ui.emitter.SeekOperations;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;

public class ForwardEmitter
extends RangePollingEmitter {
    public ForwardEmitter(Supplier<EnhancedConsumer> consumerSupplier, ConsumerPosition consumerPosition, int messagesPerPage, ConsumerRecordDeserializer deserializer, Predicate<TopicMessageDTO> filter, PollingSettings pollingSettings) {
        super(consumerSupplier, consumerPosition, messagesPerPage, new MessagesProcessing(deserializer, filter, true, Integer.valueOf(messagesPerPage)), pollingSettings);
    }

    protected TreeMap<TopicPartition, RangePollingEmitter.FromToOffset> nextPollingRange(TreeMap<TopicPartition, RangePollingEmitter.FromToOffset> prevRange, SeekOperations seekOperations) {
        TreeMap<TopicPartition, Long> readFromOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
        if (prevRange.isEmpty()) {
            readFromOffsets.putAll(seekOperations.getOffsetsForSeek());
        } else {
            readFromOffsets.putAll(prevRange.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((RangePollingEmitter.FromToOffset)e.getValue()).to())));
        }
        int msgsToPollPerPartition = (int)Math.ceil((double)this.messagesPerPage / (double)readFromOffsets.size());
        TreeMap<TopicPartition, RangePollingEmitter.FromToOffset> result = new TreeMap<TopicPartition, RangePollingEmitter.FromToOffset>(Comparator.comparingInt(TopicPartition::partition));
        readFromOffsets.forEach((tp, fromOffset) -> {
            long tpEndOffset = (Long)seekOperations.getEndOffsets().get(tp);
            if (fromOffset < tpEndOffset) {
                result.put((TopicPartition)tp, new RangePollingEmitter.FromToOffset(fromOffset.longValue(), Math.min(tpEndOffset, fromOffset + (long)msgsToPollPerPartition)));
            }
        });
        return result;
    }
}

