/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.emitter;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.provectus.kafka.ui.emitter.ConsumingStats;
import com.provectus.kafka.ui.emitter.PolledRecords;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxSink;

/*
 * Exception performing whole class analysis ignored.
 */
class MessagesProcessing {
    private static final Logger log = LoggerFactory.getLogger(MessagesProcessing.class);
    private final ConsumingStats consumingStats = new ConsumingStats();
    private long sentMessages = 0L;
    private final ConsumerRecordDeserializer deserializer;
    private final Predicate<TopicMessageDTO> filter;
    private final boolean ascendingSortBeforeSend;
    @Nullable
    private final Integer limit;

    boolean limitReached() {
        return this.limit != null && this.sentMessages >= (long)this.limit.intValue();
    }

    void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
        MessagesProcessing.sortForSending(polled, (boolean)this.ascendingSortBeforeSend).forEach(rec -> {
            if (!this.limitReached() && !sink.isCancelled()) {
                TopicMessageDTO topicMessage = this.deserializer.deserialize(rec);
                try {
                    if (this.filter.test(topicMessage)) {
                        sink.next((Object)new TopicMessageEventDTO().type(TopicMessageEventDTO.TypeEnum.MESSAGE).message(topicMessage));
                        ++this.sentMessages;
                    }
                }
                catch (Exception e) {
                    this.consumingStats.incFilterApplyError();
                    log.trace("Error applying filter for message {}", (Object)topicMessage);
                }
            }
        });
    }

    void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
        if (!sink.isCancelled()) {
            this.consumingStats.sendConsumingEvt(sink, polledRecords);
        }
    }

    void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
        if (!sink.isCancelled()) {
            this.consumingStats.sendFinishEvent(sink);
        }
    }

    void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
        if (!sink.isCancelled()) {
            sink.next((Object)new TopicMessageEventDTO().type(TopicMessageEventDTO.TypeEnum.PHASE).phase(new TopicMessagePhaseDTO().name(name)));
        }
    }

    @VisibleForTesting
    static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records, boolean asc) {
        Comparator<ConsumerRecord> offsetComparator = asc ? Comparator.comparingLong(ConsumerRecord::offset) : Comparator.comparingLong(ConsumerRecord::offset).reversed();
        Map perPartition = Streams.stream(records).collect(Collectors.groupingBy(ConsumerRecord::partition, TreeMap::new, Collectors.collectingAndThen(Collectors.toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
        Comparator<ConsumerRecord> tsComparator = asc ? Comparator.comparing(ConsumerRecord::timestamp) : Comparator.comparingLong(ConsumerRecord::timestamp).reversed();
        return Iterables.mergeSorted(perPartition.values(), tsComparator);
    }

    public MessagesProcessing(ConsumerRecordDeserializer deserializer, Predicate<TopicMessageDTO> filter, boolean ascendingSortBeforeSend, @Nullable Integer limit) {
        this.deserializer = deserializer;
        this.filter = filter;
        this.ascendingSortBeforeSend = ascendingSortBeforeSend;
        this.limit = limit;
    }
}

