/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service.ksql;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.TextNode;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
import com.provectus.kafka.ui.service.ksql.KsqlGrammar;
import com.provectus.kafka.ui.service.ksql.response.ResponseParser;
import com.provectus.kafka.ui.util.WebClientConfigurator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import ksql.KsqlGrammarParser;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Encoder;
import org.springframework.http.MediaType;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/*
 * Exception performing whole class analysis ignored.
 */
public class KsqlApiClient {
    private static final Logger log = LoggerFactory.getLogger(KsqlApiClient.class);
    private static final MimeType KQL_API_MIME_TYPE = MimeTypeUtils.parseMimeType((String)"application/vnd.ksql.v1+json");
    private static final Set<Class<?>> UNSUPPORTED_STMT_TYPES = Set.of(KsqlGrammarParser.PrintTopicContext.class, KsqlGrammarParser.DefineVariableContext.class, KsqlGrammarParser.UndefineVariableContext.class);
    private final String baseUrl;
    private final WebClient webClient;

    public KsqlApiClient(String baseUrl, @Nullable ClustersProperties.KsqldbServerAuth ksqldbServerAuth, @Nullable ClustersProperties.TruststoreConfig ksqldbServerSsl, @Nullable ClustersProperties.KeystoreConfig keystoreConfig, @Nullable DataSize maxBuffSize) {
        this.baseUrl = baseUrl;
        this.webClient = KsqlApiClient.webClient((ClustersProperties.KsqldbServerAuth)ksqldbServerAuth, (ClustersProperties.TruststoreConfig)ksqldbServerSsl, (ClustersProperties.KeystoreConfig)keystoreConfig, (DataSize)maxBuffSize);
    }

    private static WebClient webClient(@Nullable ClustersProperties.KsqldbServerAuth ksqldbServerAuth, @Nullable ClustersProperties.TruststoreConfig truststoreConfig, @Nullable ClustersProperties.KeystoreConfig keystoreConfig, @Nullable DataSize maxBuffSize) {
        ksqldbServerAuth = Optional.ofNullable(ksqldbServerAuth).orElse(new ClustersProperties.KsqldbServerAuth());
        maxBuffSize = Optional.ofNullable(maxBuffSize).orElse(DataSize.ofMegabytes((long)20L));
        return new WebClientConfigurator().configureSsl(truststoreConfig, keystoreConfig).configureBasicAuth(ksqldbServerAuth.getUsername(), ksqldbServerAuth.getPassword()).configureBufferSize(maxBuffSize).configureCodecs(codecs -> {
            JsonMapper mapper = new JsonMapper();
            codecs.defaultCodecs().jackson2JsonEncoder((Encoder)new Jackson2JsonEncoder((ObjectMapper)mapper, new MimeType[]{KQL_API_MIME_TYPE, MediaType.APPLICATION_JSON}));
            codecs.defaultCodecs().jackson2JsonDecoder((Decoder)new Jackson2JsonDecoder((ObjectMapper)mapper, new MimeType[]{MimeTypeUtils.ALL}));
        }).build();
    }

    private KsqlRequest ksqlRequest(String ksql, Map<String, String> streamProperties) {
        return new KsqlRequest(ksql, streamProperties);
    }

    private Flux<KsqlResponseTable> executeSelect(String ksql, Map<String, String> streamProperties) {
        return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)this.webClient.post().uri(this.baseUrl + "/query", new Object[0])).accept(new MediaType[]{new MediaType(KQL_API_MIME_TYPE)})).contentType(new MediaType(KQL_API_MIME_TYPE)).bodyValue((Object)this.ksqlRequest(ksql, streamProperties)).retrieve().bodyToFlux(JsonNode.class).onErrorResume(arg_0 -> this.isUnexpectedJsonArrayEndCharException(arg_0), th -> Mono.empty()).map(ResponseParser::parseSelectResponse).filter(Optional::isPresent).map(Optional::get).onErrorResume(WebClientResponseException.class, e -> Flux.just((Object)ResponseParser.parseErrorResponse((WebClientResponseException)e)));
    }

    private boolean isUnexpectedJsonArrayEndCharException(Throwable th) {
        return th instanceof DecodingException && th.getMessage().contains("Unexpected character (']'");
    }

    private Flux<KsqlResponseTable> executeStatement(String ksql, Map<String, String> streamProperties) {
        return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)this.webClient.post().uri(this.baseUrl + "/ksql", new Object[0])).accept(new MediaType[]{new MediaType(KQL_API_MIME_TYPE)})).contentType(MediaType.APPLICATION_JSON).bodyValue((Object)this.ksqlRequest(ksql, streamProperties)).exchangeToFlux(resp -> {
            if (resp.statusCode().isError()) {
                return resp.createException().flux().map(ResponseParser::parseErrorResponse);
            }
            return resp.bodyToFlux(JsonNode.class).flatMap(body -> (body.isArray() ? Flux.fromIterable((Iterable)body) : Flux.just((Object)body)).flatMapIterable(ResponseParser::parseStatementResponse)).switchIfEmpty((Publisher)Flux.just((Object)KsqlResponseTable.builder().header("Query Result").columnNames(List.of("Result")).values(List.of(List.of(new TextNode("Success")))).build()));
        });
    }

    public Flux<KsqlResponseTable> execute(String ksql, Map<String, String> streamProperties) {
        Optional parsedStatements = KsqlGrammar.parse((String)ksql);
        if (parsedStatements.isEmpty()) {
            return this.errorTableFlux("Sql statement is invalid or unsupported");
        }
        List statements = ((KsqlGrammar.KsqlStatements)parsedStatements.get()).getStatements();
        if (statements.size() > 1) {
            return this.errorTableFlux("Only single statement supported now");
        }
        if (statements.size() == 0) {
            return this.errorTableFlux("No valid ksql statement found");
        }
        if (this.isUnsupportedStatementType((KsqlGrammarParser.SingleStatementContext)statements.get(0))) {
            return this.errorTableFlux("Unsupported statement type");
        }
        Flux outputFlux = KsqlGrammar.isSelect((KsqlGrammarParser.SingleStatementContext)((KsqlGrammarParser.SingleStatementContext)statements.get(0))) ? this.executeSelect(ksql, streamProperties) : this.executeStatement(ksql, streamProperties);
        return outputFlux.onErrorResume(Exception.class, e -> {
            log.error("Unexpected error while execution ksql: {}", (Object)ksql, e);
            return this.errorTableFlux("Unexpected error: " + e.getMessage());
        });
    }

    private Flux<KsqlResponseTable> errorTableFlux(String errorText) {
        return Flux.just((Object)ResponseParser.errorTableWithTextMsg((String)errorText));
    }

    private boolean isUnsupportedStatementType(KsqlGrammarParser.SingleStatementContext context) {
        Class<?> ctxClass = context.statement().getClass();
        return UNSUPPORTED_STMT_TYPES.contains(ctxClass);
    }
}

