/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.emitter.AbstractEmitter;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.MessagesProcessing;
import com.provectus.kafka.ui.emitter.PolledRecords;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.SeekOperations;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.HashMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxSink;

public class TailingEmitter
extends AbstractEmitter {
    private static final Logger log = LoggerFactory.getLogger(TailingEmitter.class);
    private final Supplier<EnhancedConsumer> consumerSupplier;
    private final ConsumerPosition consumerPosition;

    public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier, ConsumerPosition consumerPosition, ConsumerRecordDeserializer deserializer, Predicate<TopicMessageDTO> filter, PollingSettings pollingSettings) {
        super(new MessagesProcessing(deserializer, filter, false, null), pollingSettings);
        this.consumerSupplier = consumerSupplier;
        this.consumerPosition = consumerPosition;
    }

    public void accept(FluxSink<TopicMessageEventDTO> sink) {
        log.debug("Starting tailing polling for {}", (Object)this.consumerPosition);
        try (EnhancedConsumer consumer = (EnhancedConsumer)this.consumerSupplier.get();){
            this.assignAndSeek(consumer);
            while (!sink.isCancelled()) {
                this.sendPhase(sink, "Polling");
                PolledRecords polled = this.poll(sink, consumer);
                this.send(sink, (Iterable)polled);
            }
            sink.complete();
            log.debug("Tailing finished");
        }
        catch (InterruptException kafkaInterruptException) {
            log.debug("Tailing finished due to thread interruption");
            sink.complete();
        }
        catch (Exception e) {
            log.error("Error consuming {}", (Object)this.consumerPosition, (Object)e);
            sink.error((Throwable)e);
        }
    }

    private void assignAndSeek(EnhancedConsumer consumer) {
        SeekOperations seekOperations = SeekOperations.create((Consumer)consumer, (ConsumerPosition)this.consumerPosition);
        HashMap<TopicPartition, Long> seekOffsets = new HashMap<TopicPartition, Long>(seekOperations.getEndOffsets());
        seekOffsets.putAll(seekOperations.getOffsetsForSeek());
        consumer.assign(seekOffsets.keySet());
        seekOffsets.forEach((arg_0, arg_1) -> ((EnhancedConsumer)consumer).seek(arg_0, arg_1));
    }
}

