/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service;

import com.provectus.kafka.ui.model.InternalLogDirStats;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Metrics;
import com.provectus.kafka.ui.model.ServerStatusDTO;
import com.provectus.kafka.ui.model.Statistics;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.FeatureService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import com.provectus.kafka.ui.service.StatisticsCache;
import com.provectus.kafka.ui.service.metrics.MetricsCollector;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
public class StatisticsService {
    private static final Logger log = LoggerFactory.getLogger(StatisticsService.class);
    private final MetricsCollector metricsCollector;
    private final AdminClientService adminClientService;
    private final FeatureService featureService;
    private final StatisticsCache cache;

    public Mono<Statistics> updateCache(KafkaCluster c) {
        return this.getStatistics(c).doOnSuccess(m -> this.cache.replace(c, m));
    }

    private Mono<Statistics> getStatistics(KafkaCluster cluster) {
        return this.adminClientService.get(cluster).flatMap(ac -> ac.describeCluster().flatMap(description -> ac.updateInternalStats(description.getController()).then(Mono.zip(List.of(this.metricsCollector.getBrokerMetrics(cluster, description.getNodes()), this.getLogDirInfo(description, ac), this.featureService.getAvailableFeatures(ac, cluster, description), this.loadTopicConfigs(cluster), this.describeTopics(cluster)), results -> Statistics.builder().status(ServerStatusDTO.ONLINE).clusterDescription(description).version(ac.getVersion()).metrics((Metrics)results[0]).logDirInfo((InternalLogDirStats)results[1]).features((List)results[2]).topicConfigs((Map)results[3]).topicDescriptions((Map)results[4]).build())))).doOnError(e -> log.error("Failed to collect cluster {} info", (Object)cluster.getName(), e)).onErrorResume(e -> Mono.just((Object)Statistics.empty().toBuilder().lastKafkaException(e).build()));
    }

    private Mono<InternalLogDirStats> getLogDirInfo(ReactiveAdminClient.ClusterDescription desc, ReactiveAdminClient ac) {
        Set brokerIds = desc.getNodes().stream().map(Node::id).collect(Collectors.toSet());
        return ac.describeLogDirs(brokerIds).map(InternalLogDirStats::new);
    }

    private Mono<Map<String, TopicDescription>> describeTopics(KafkaCluster c) {
        return this.adminClientService.get(c).flatMap(ReactiveAdminClient::describeTopics);
    }

    private Mono<Map<String, List<ConfigEntry>>> loadTopicConfigs(KafkaCluster c) {
        return this.adminClientService.get(c).flatMap(ReactiveAdminClient::getTopicsConfig);
    }

    public StatisticsService(MetricsCollector metricsCollector, AdminClientService adminClientService, FeatureService featureService, StatisticsCache cache) {
        this.metricsCollector = metricsCollector;
        this.adminClientService = adminClientService;
        this.featureService = featureService;
        this.cache = cache;
    }
}

