/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service;

import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.emitter.BackwardEmitter;
import com.provectus.kafka.ui.emitter.ForwardEmitter;
import com.provectus.kafka.ui.emitter.MessageFilters;
import com.provectus.kafka.ui.emitter.TailingEmitter;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ConsumerGroupService;
import com.provectus.kafka.ui.service.DeserializationService;
import com.provectus.kafka.ui.service.MessagesService;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/*
 * Exception performing whole class analysis ignored.
 */
@Service
public class MessagesService {
    private static final Logger log = LoggerFactory.getLogger(MessagesService.class);
    private static final int DEFAULT_MAX_PAGE_SIZE = 500;
    private static final int DEFAULT_PAGE_SIZE = 100;
    private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
    private final AdminClientService adminClientService;
    private final DeserializationService deserializationService;
    private final ConsumerGroupService consumerGroupService;
    private final int maxPageSize;
    private final int defaultPageSize;

    public MessagesService(AdminClientService adminClientService, DeserializationService deserializationService, ConsumerGroupService consumerGroupService, ClustersProperties properties) {
        this.adminClientService = adminClientService;
        this.deserializationService = deserializationService;
        this.consumerGroupService = consumerGroupService;
        ClustersProperties.PollingProperties pollingProps = Optional.ofNullable(properties.getPolling()).orElseGet(ClustersProperties.PollingProperties::new);
        this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize()).orElse(500);
        this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize()).orElse(100);
    }

    private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
        return this.adminClientService.get(cluster).flatMap(client -> client.describeTopic(topicName)).switchIfEmpty(Mono.error((Throwable)new TopicNotFoundException()));
    }

    public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterTestExecutionDTO execData) {
        Predicate predicate;
        try {
            predicate = MessageFilters.createMsgFilter((String)execData.getFilterCode(), (MessageFilterTypeDTO)MessageFilterTypeDTO.GROOVY_SCRIPT);
        }
        catch (Exception e) {
            log.info("Smart filter '{}' compilation error", (Object)execData.getFilterCode(), (Object)e);
            return new SmartFilterTestExecutionResultDTO().error("Compilation error : " + e.getMessage());
        }
        try {
            boolean result = predicate.test(new TopicMessageDTO().key(execData.getKey()).content(execData.getValue()).headers(execData.getHeaders()).offset(execData.getOffset()).partition(execData.getPartition()).timestamp((OffsetDateTime)Optional.ofNullable(execData.getTimestampMs()).map(ts -> OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC)).orElse(null)));
            return new SmartFilterTestExecutionResultDTO().result(Boolean.valueOf(result));
        }
        catch (Exception e) {
            log.info("Smart filter {} execution error", (Object)execData, (Object)e);
            return new SmartFilterTestExecutionResultDTO().error("Execution error : " + e.getMessage());
        }
    }

    public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName, List<Integer> partitionsToInclude) {
        return this.withExistingTopic(cluster, topicName).flatMap(td -> this.offsetsForDeletion(cluster, topicName, partitionsToInclude).flatMap(offsets -> this.adminClientService.get(cluster).flatMap(ac -> ac.deleteRecords(offsets))));
    }

    private Mono<Map<TopicPartition, Long>> offsetsForDeletion(KafkaCluster cluster, String topicName, List<Integer> partitionsToInclude) {
        return this.adminClientService.get(cluster).flatMap(ac -> ac.listTopicOffsets(topicName, OffsetSpec.earliest(), true).zipWith(ac.listTopicOffsets(topicName, OffsetSpec.latest(), true), (start, end) -> end.entrySet().stream().filter(e -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(((TopicPartition)e.getKey()).partition())).filter(entry -> !((Long)entry.getValue()).equals(start.get(entry.getKey()))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
    }

    public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic, CreateTopicMessageDTO msg) {
        return this.withExistingTopic(cluster, topic).publishOn(Schedulers.boundedElastic()).flatMap(desc -> this.sendMessageImpl(cluster, desc, msg));
    }

    private Mono<RecordMetadata> sendMessageImpl(KafkaCluster cluster, TopicDescription topicDescription, CreateTopicMessageDTO msg) {
        Mono mono;
        block9: {
            if (msg.getPartition() != null && msg.getPartition() > topicDescription.partitions().size() - 1) {
                return Mono.error((Throwable)new ValidationException("Invalid partition"));
            }
            ProducerRecordCreator producerRecordCreator = this.deserializationService.producerRecordCreator(cluster, topicDescription.name(), (String)msg.getKeySerde().get(), (String)msg.getValueSerde().get());
            KafkaProducer producer = MessagesService.createProducer((KafkaCluster)cluster, Map.of());
            try {
                ProducerRecord producerRecord = producerRecordCreator.create(topicDescription.name(), msg.getPartition(), (String)msg.getKey().orElse(null), (String)msg.getContent().orElse(null), msg.getHeaders());
                CompletableFuture cf = new CompletableFuture();
                producer.send(producerRecord, (metadata, exception) -> {
                    if (exception != null) {
                        cf.completeExceptionally(exception);
                    } else {
                        cf.complete(metadata);
                    }
                });
                mono = Mono.fromFuture(cf);
                if (producer == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (producer != null) {
                        try {
                            producer.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Throwable e) {
                    return Mono.error((Throwable)e);
                }
            }
            producer.close();
        }
        return mono;
    }

    public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster, Map<String, Object> additionalProps) {
        Properties properties = new Properties();
        SslPropertiesUtil.addKafkaSslProperties((ClustersProperties.TruststoreConfig)cluster.getOriginalProperties().getSsl(), (Properties)properties);
        properties.putAll((Map<?, ?>)cluster.getProperties());
        properties.put("bootstrap.servers", cluster.getBootstrapServers());
        properties.put("key.serializer", ByteArraySerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        properties.putAll(additionalProps);
        return new KafkaProducer(properties);
    }

    public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, @Nullable String query, MessageFilterTypeDTO filterQueryType, @Nullable Integer pageSize, SeekDirectionDTO seekDirection, @Nullable String keySerde, @Nullable String valueSerde) {
        return this.withExistingTopic(cluster, topic).flux().publishOn(Schedulers.boundedElastic()).flatMap(td -> this.loadMessagesImpl(cluster, topic, consumerPosition, query, filterQueryType, this.fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
    }

    private int fixPageSize(@Nullable Integer pageSize) {
        return Optional.ofNullable(pageSize).filter(ps -> ps > 0 && ps <= this.maxPageSize).orElse(this.defaultPageSize);
    }

    private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, @Nullable String query, MessageFilterTypeDTO filterQueryType, int limit, SeekDirectionDTO seekDirection, @Nullable String keySerde, @Nullable String valueSerde) {
        ConsumerRecordDeserializer deserializer = this.deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
        Predicate filter = this.getMsgFilter(query, filterQueryType);
        ForwardEmitter emitter = switch (1.$SwitchMap$com$provectus$kafka$ui$model$SeekDirectionDTO[seekDirection.ordinal()]) {
            default -> throw new IncompatibleClassChangeError();
            case 1 -> new ForwardEmitter(() -> this.consumerGroupService.createConsumer(cluster), consumerPosition, limit, deserializer, filter, cluster.getPollingSettings());
            case 2 -> new BackwardEmitter(() -> this.consumerGroupService.createConsumer(cluster), consumerPosition, limit, deserializer, filter, cluster.getPollingSettings());
            case 3 -> new TailingEmitter(() -> this.consumerGroupService.createConsumer(cluster), consumerPosition, deserializer, filter, cluster.getPollingSettings());
        };
        return Flux.create((Consumer)emitter).map((Function)this.throttleUiPublish(seekDirection));
    }

    private Predicate<TopicMessageDTO> getMsgFilter(String query, MessageFilterTypeDTO filterQueryType) {
        if (StringUtils.isEmpty((CharSequence)query)) {
            return evt -> true;
        }
        return MessageFilters.createMsgFilter((String)query, (MessageFilterTypeDTO)filterQueryType);
    }

    private <T> UnaryOperator<T> throttleUiPublish(SeekDirectionDTO seekDirection) {
        if (seekDirection == SeekDirectionDTO.TAILING) {
            RateLimiter rateLimiter = RateLimiter.create((double)20.0);
            return m -> {
                rateLimiter.acquire(1);
                return m;
            };
        }
        return UnaryOperator.identity();
    }
}

