/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.connect.model.ConnectorStatus;
import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector;
import com.provectus.kafka.ui.connect.model.ConnectorTopics;
import com.provectus.kafka.ui.connect.model.TaskStatus;
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.mapper.KafkaConnectMapper;
import com.provectus.kafka.ui.model.ConnectDTO;
import com.provectus.kafka.ui.model.ConnectorActionDTO;
import com.provectus.kafka.ui.model.ConnectorDTO;
import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponseDTO;
import com.provectus.kafka.ui.model.ConnectorPluginDTO;
import com.provectus.kafka.ui.model.ConnectorStateDTO;
import com.provectus.kafka.ui.model.ConnectorTaskStatusDTO;
import com.provectus.kafka.ui.model.FullConnectorInfoDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.NewConnectorDTO;
import com.provectus.kafka.ui.model.TaskDTO;
import com.provectus.kafka.ui.model.connect.InternalConnectInfo;
import com.provectus.kafka.ui.service.KafkaConfigSanitizer;
import com.provectus.kafka.ui.service.KafkaConnectService;
import com.provectus.kafka.ui.util.ReactiveFailover;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class KafkaConnectService {
    private static final Logger log = LoggerFactory.getLogger(KafkaConnectService.class);
    private final ClusterMapper clusterMapper;
    private final KafkaConnectMapper kafkaConnectMapper;
    private final ObjectMapper objectMapper;
    private final KafkaConfigSanitizer kafkaConfigSanitizer;

    public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
        return Flux.fromIterable((Iterable)Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()).map(lst -> lst.stream().map(arg_0 -> ((ClusterMapper)this.clusterMapper).toKafkaConnect(arg_0)).toList()).orElse(List.of()));
    }

    public Flux<FullConnectorInfoDTO> getAllConnectors(KafkaCluster cluster, @Nullable String search) {
        return this.getConnects(cluster).flatMap(connect -> this.getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName -> Mono.zip((Mono)this.getConnector(cluster, connect.getName(), connectorName), (Mono)this.getConnectorConfig(cluster, connect.getName(), connectorName), (Mono)this.getConnectorTasks(cluster, connect.getName(), connectorName).collectList(), (Mono)this.getConnectorTopics(cluster, connect.getName(), connectorName)).map(tuple -> InternalConnectInfo.builder().connector((ConnectorDTO)tuple.getT1()).config((Map)tuple.getT2()).tasks((List)tuple.getT3()).topics(((ConnectorTopics)tuple.getT4()).getTopics()).build()))).map(arg_0 -> ((KafkaConnectMapper)this.kafkaConnectMapper).fullConnectorInfo(arg_0)).filter(this.matchesSearchTerm(search));
    }

    private Predicate<FullConnectorInfoDTO> matchesSearchTerm(@Nullable String search) {
        if (search == null) {
            return c -> true;
        }
        return connector -> this.getStringsForSearch(connector).anyMatch(string -> StringUtils.containsIgnoreCase((CharSequence)string, (CharSequence)search));
    }

    private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {
        return Stream.of(fullConnectorInfo.getName(), fullConnectorInfo.getConnect(), fullConnectorInfo.getStatus().getState().getValue(), fullConnectorInfo.getType().getValue());
    }

    public Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String connectClusterName, String connectorName) {
        return this.api(cluster, connectClusterName).mono(c -> c.getConnectorTopics(connectorName)).map(result -> (ConnectorTopics)result.get(connectorName)).onErrorResume(Exception.class, e -> Mono.just((Object)new ConnectorTopics().topics(List.of())));
    }

    public Flux<String> getConnectorNames(KafkaCluster cluster, String connectName) {
        return this.api(cluster, connectName).flux(client -> client.getConnectors(null)).collectList().map(e -> (String)e.get(0)).map(arg_0 -> this.parseConnectorsNamesStringToList(arg_0)).flatMapMany(Flux::fromIterable);
    }

    public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, String connectName) {
        return this.getConnectorNames(cluster, connectName).onErrorComplete();
    }

    private List<String> parseConnectorsNamesStringToList(String json) {
        return (List)this.objectMapper.readValue(json, (TypeReference)new /* Unavailable Anonymous Inner Class!! */);
    }

    public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectName, Mono<NewConnectorDTO> connector) {
        return this.api(cluster, connectName).mono(client -> connector.flatMap(c -> this.connectorExists(cluster, connectName, c.getName()).map(exists -> {
            if (Boolean.TRUE.equals(exists)) {
                throw new ValidationException(String.format("Connector with name %s already exists", c.getName()));
            }
            return c;
        })).map(arg_0 -> ((KafkaConnectMapper)this.kafkaConnectMapper).toClient(arg_0)).flatMap(arg_0 -> ((KafkaConnectClientApi)client).createConnector(arg_0)).flatMap(c -> this.getConnector(cluster, connectName, c.getName())));
    }

    private Mono<Boolean> connectorExists(KafkaCluster cluster, String connectName, String connectorName) {
        return this.getConnectorNames(cluster, connectName).any(name -> name.equals(connectorName));
    }

    public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName, String connectorName) {
        return this.api(cluster, connectName).mono(client -> client.getConnector(connectorName).map(arg_0 -> ((KafkaConnectMapper)this.kafkaConnectMapper).fromClient(arg_0)).flatMap(connector -> client.getConnectorStatus(connector.getName()).onErrorResume(WebClientResponseException.NotFound.class, e -> this.emptyStatus(connectorName)).map(connectorStatus -> {
            ConnectorStatusConnector status = connectorStatus.getConnector();
            Map sanitizedConfig = this.kafkaConfigSanitizer.sanitizeConnectorConfig(connector.getConfig());
            ConnectorDTO result = new ConnectorDTO().connect(connectName).status(this.kafkaConnectMapper.fromClient(status)).type(connector.getType()).tasks(connector.getTasks()).name(connector.getName()).config(sanitizedConfig);
            if (connectorStatus.getTasks() != null) {
                boolean isAnyTaskFailed = connectorStatus.getTasks().stream().map(TaskStatus::getState).anyMatch(arg_0 -> TaskStatus.StateEnum.FAILED.equals(arg_0));
                if (isAnyTaskFailed) {
                    result.getStatus().state(ConnectorStateDTO.TASK_FAILED);
                }
            }
            return result;
        })));
    }

    private Mono<ConnectorStatus> emptyStatus(String connectorName) {
        return Mono.just((Object)new ConnectorStatus().name(connectorName).tasks(List.of()).connector(new ConnectorStatusConnector().state(ConnectorStatusConnector.StateEnum.UNASSIGNED)));
    }

    public Mono<Map<String, Object>> getConnectorConfig(KafkaCluster cluster, String connectName, String connectorName) {
        return this.api(cluster, connectName).mono(c -> c.getConnectorConfig(connectorName)).map(arg_0 -> ((KafkaConfigSanitizer)this.kafkaConfigSanitizer).sanitizeConnectorConfig(arg_0));
    }

    public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connectName, String connectorName, Mono<Map<String, Object>> requestBody) {
        return this.api(cluster, connectName).mono(c -> requestBody.flatMap(body -> c.setConnectorConfig(connectorName, body)).map(arg_0 -> ((KafkaConnectMapper)this.kafkaConnectMapper).fromClient(arg_0)));
    }

    public Mono<Void> deleteConnector(KafkaCluster cluster, String connectName, String connectorName) {
        return this.api(cluster, connectName).mono(c -> c.deleteConnector(connectorName));
    }

    public Mono<Void> updateConnectorState(KafkaCluster cluster, String connectName, String connectorName, ConnectorActionDTO action) {
        return this.api(cluster, connectName).mono(client -> {
            switch (2.$SwitchMap$com$provectus$kafka$ui$model$ConnectorActionDTO[action.ordinal()]) {
                case 1: {
                    return client.restartConnector(connectorName, Boolean.valueOf(false), Boolean.valueOf(false));
                }
                case 2: {
                    return this.restartTasks(cluster, connectName, connectorName, task -> true);
                }
                case 3: {
                    return this.restartTasks(cluster, connectName, connectorName, t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
                }
                case 4: {
                    return client.pauseConnector(connectorName);
                }
                case 5: {
                    return client.resumeConnector(connectorName);
                }
            }
            throw new IllegalStateException("Unexpected value: " + action);
        });
    }

    private Mono<Void> restartTasks(KafkaCluster cluster, String connectName, String connectorName, Predicate<TaskDTO> taskFilter) {
        return this.getConnectorTasks(cluster, connectName, connectorName).filter(taskFilter).flatMap(t -> this.restartConnectorTask(cluster, connectName, connectorName, t.getId().getTask())).then();
    }

    public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName, String connectorName) {
        return this.api(cluster, connectName).flux(client -> client.getConnectorTasks(connectorName).onErrorResume(WebClientResponseException.NotFound.class, e -> Flux.empty()).map(arg_0 -> ((KafkaConnectMapper)this.kafkaConnectMapper).fromClient(arg_0)).flatMap(task -> client.getConnectorTaskStatus(connectorName, task.getId().getTask()).onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty()).map(arg_0 -> ((KafkaConnectMapper)this.kafkaConnectMapper).fromClient(arg_0)).map(arg_0 -> ((TaskDTO)task).status(arg_0))));
    }

    public Mono<Void> restartConnectorTask(KafkaCluster cluster, String connectName, String connectorName, Integer taskId) {
        return this.api(cluster, connectName).mono(client -> client.restartConnectorTask(connectorName, taskId));
    }

    public Flux<ConnectorPluginDTO> getConnectorPlugins(KafkaCluster cluster, String connectName) {
        return this.api(cluster, connectName).flux(client -> client.getConnectorPlugins().map(arg_0 -> ((KafkaConnectMapper)this.kafkaConnectMapper).fromClient(arg_0)));
    }

    public Mono<ConnectorPluginConfigValidationResponseDTO> validateConnectorPluginConfig(KafkaCluster cluster, String connectName, String pluginName, Mono<Map<String, Object>> requestBody) {
        return this.api(cluster, connectName).mono(client -> requestBody.flatMap(body -> client.validateConnectorPluginConfig(pluginName, body)).map(arg_0 -> ((KafkaConnectMapper)this.kafkaConnectMapper).fromClient(arg_0)));
    }

    private ReactiveFailover<KafkaConnectClientApi> api(KafkaCluster cluster, String connectName) {
        ReactiveFailover client = (ReactiveFailover)cluster.getConnectsClients().get(connectName);
        if (client == null) {
            throw new NotFoundException("Connect %s not found for cluster %s".formatted(connectName, cluster.getName()));
        }
        return client;
    }

    public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper, ObjectMapper objectMapper, KafkaConfigSanitizer kafkaConfigSanitizer) {
        this.clusterMapper = clusterMapper;
        this.kafkaConnectMapper = kafkaConnectMapper;
        this.objectMapper = objectMapper;
        this.kafkaConfigSanitizer = kafkaConfigSanitizer;
    }
}

