/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.controller;

import com.provectus.kafka.ui.api.MessagesApi;
import com.provectus.kafka.ui.controller.AbstractController;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import com.provectus.kafka.ui.model.SerdeUsageDTO;
import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.service.DeserializationService;
import com.provectus.kafka.ui.service.MessagesService;
import com.provectus.kafka.ui.util.DynamicConfigOperations;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.validation.Valid;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@RestController
public class MessagesController
extends AbstractController
implements MessagesApi {
    private static final Logger log = LoggerFactory.getLogger(MessagesController.class);
    private final MessagesService messagesService;
    private final DeserializationService deserializationService;
    private final DynamicConfigOperations dynamicConfigOperations;

    public Mono<ResponseEntity<Void>> deleteTopicMessages(String clusterName, String topicName, @Valid List<Integer> partitions, ServerWebExchange exchange) {
        AccessContext context = AccessContext.builder().cluster(clusterName).topic(topicName).topicActions(new TopicAction[]{TopicAction.MESSAGES_DELETE}).build();
        return this.validateAccess(context).then(this.messagesService.deleteTopicMessages(this.getCluster(clusterName), topicName, Optional.ofNullable(partitions).orElse(List.of())).thenReturn((Object)ResponseEntity.ok().build())).doOnEach(sig -> this.audit(context, sig));
    }

    public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilterTest(Mono<SmartFilterTestExecutionDTO> smartFilterTestExecutionDto, ServerWebExchange exchange) {
        return smartFilterTestExecutionDto.map(MessagesService::execSmartFilterTest).map(ResponseEntity::ok);
    }

    public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName, String topicName, SeekTypeDTO seekType, List<String> seekTo, Integer limit, String q, MessageFilterTypeDTO filterQueryType, SeekDirectionDTO seekDirection, String keySerde, String valueSerde, ServerWebExchange exchange) {
        AccessContext.AccessContextBuilder contextBuilder = AccessContext.builder().cluster(clusterName).topic(topicName).topicActions(new TopicAction[]{TopicAction.MESSAGES_READ}).operationName("getTopicMessages");
        if (StringUtils.isNoneEmpty((CharSequence[])new CharSequence[]{q}) && MessageFilterTypeDTO.GROOVY_SCRIPT == filterQueryType) {
            this.dynamicConfigOperations.checkIfFilteringGroovyEnabled();
        }
        if (this.auditService.isAuditTopic(this.getCluster(clusterName), topicName)) {
            contextBuilder.auditActions(new AuditAction[]{AuditAction.VIEW});
        }
        seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
        seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
        filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
        ConsumerPosition positions = new ConsumerPosition(seekType, topicName, this.parseSeekTo(topicName, seekType, seekTo));
        Mono job = Mono.just((Object)ResponseEntity.ok((Object)this.messagesService.loadMessages(this.getCluster(clusterName), topicName, positions, q, filterQueryType, limit, seekDirection, keySerde, valueSerde)));
        AccessContext context = contextBuilder.build();
        return this.validateAccess(context).then(job).doOnEach(sig -> this.audit(context, sig));
    }

    public Mono<ResponseEntity<Void>> sendTopicMessages(String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage, ServerWebExchange exchange) {
        AccessContext context = AccessContext.builder().cluster(clusterName).topic(topicName).topicActions(new TopicAction[]{TopicAction.MESSAGES_PRODUCE}).operationName("sendTopicMessages").build();
        return this.validateAccess(context).then(createTopicMessage.flatMap(msg -> this.messagesService.sendMessage(this.getCluster(clusterName), topicName, msg).then()).map(ResponseEntity::ok)).doOnEach(sig -> this.audit(context, sig));
    }

    @Nullable
    private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
        if (seekTo == null || seekTo.isEmpty()) {
            if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
                return null;
            }
            throw new ValidationException("seekTo should be set if seekType is " + seekType);
        }
        return seekTo.stream().map(p -> {
            String[] split = p.split("::");
            if (split.length != 2) {
                throw new IllegalArgumentException("Wrong seekTo argument format. See API docs for details");
            }
            return Pair.of((Object)new TopicPartition(topic, Integer.parseInt(split[0])), (Object)Long.parseLong(split[1]));
        }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
    }

    public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName, String topicName, SerdeUsageDTO use, ServerWebExchange exchange) {
        AccessContext context = AccessContext.builder().cluster(clusterName).topic(topicName).topicActions(new TopicAction[]{TopicAction.VIEW}).operationName("getSerdes").build();
        TopicSerdeSuggestionDTO dto = new TopicSerdeSuggestionDTO().key(use == SerdeUsageDTO.SERIALIZE ? this.deserializationService.getSerdesForSerialize(this.getCluster(clusterName), topicName, Serde.Target.KEY) : this.deserializationService.getSerdesForDeserialize(this.getCluster(clusterName), topicName, Serde.Target.KEY)).value(use == SerdeUsageDTO.SERIALIZE ? this.deserializationService.getSerdesForSerialize(this.getCluster(clusterName), topicName, Serde.Target.VALUE) : this.deserializationService.getSerdesForDeserialize(this.getCluster(clusterName), topicName, Serde.Target.VALUE));
        return this.validateAccess(context).then(Mono.just((Object)dto).subscribeOn(Schedulers.boundedElastic()).map(ResponseEntity::ok));
    }

    public MessagesController(MessagesService messagesService, DeserializationService deserializationService, DynamicConfigOperations dynamicConfigOperations) {
        this.messagesService = messagesService;
        this.deserializationService = deserializationService;
        this.dynamicConfigOperations = dynamicConfigOperations;
    }
}

