/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service.analyze;

import com.provectus.kafka.ui.exception.TopicAnalysisException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
import com.provectus.kafka.ui.service.ConsumerGroupService;
import com.provectus.kafka.ui.service.TopicsService;
import com.provectus.kafka.ui.service.analyze.AnalysisTasksStore;
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import com.provectus.kafka.ui.service.analyze.TopicIdentity;
import java.io.Closeable;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Component
public class TopicAnalysisService {
    private static final Logger log = LoggerFactory.getLogger(TopicAnalysisService.class);
    private static final Scheduler SCHEDULER = Schedulers.newBoundedElastic((int)Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, (int)Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, (String)"topic-analysis-tasks", (int)10, (boolean)true);
    private final AnalysisTasksStore analysisTasksStore = new AnalysisTasksStore();
    private final TopicsService topicsService;
    private final ConsumerGroupService consumerGroupService;

    public Mono<Void> analyze(KafkaCluster cluster, String topicName) {
        return this.topicsService.getTopicDetails(cluster, topicName).doOnNext(topic -> this.startAnalysis(cluster, topicName)).then();
    }

    private synchronized void startAnalysis(KafkaCluster cluster, String topic) {
        TopicIdentity topicId = new TopicIdentity(cluster, topic);
        if (this.analysisTasksStore.isAnalysisInProgress(topicId)) {
            throw new TopicAnalysisException("Topic is already analyzing");
        }
        AnalysisTask task = new AnalysisTask(this, cluster, topicId);
        this.analysisTasksStore.registerNewTask(topicId, (Closeable)task);
        SCHEDULER.schedule((Runnable)task);
    }

    public void cancelAnalysis(KafkaCluster cluster, String topicName) {
        this.analysisTasksStore.cancelAnalysis(new TopicIdentity(cluster, topicName));
    }

    public Optional<TopicAnalysisDTO> getTopicAnalysis(KafkaCluster cluster, String topicName) {
        return this.analysisTasksStore.getTopicAnalysis(new TopicIdentity(cluster, topicName));
    }

    public TopicAnalysisService(TopicsService topicsService, ConsumerGroupService consumerGroupService) {
        this.topicsService = topicsService;
        this.consumerGroupService = consumerGroupService;
    }
}

