/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.controller;

import com.provectus.kafka.ui.api.KsqlApi;
import com.provectus.kafka.ui.controller.AbstractController;
import com.provectus.kafka.ui.model.KsqlCommandV2DTO;
import com.provectus.kafka.ui.model.KsqlCommandV2ResponseDTO;
import com.provectus.kafka.ui.model.KsqlResponseDTO;
import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class KsqlController
extends AbstractController
implements KsqlApi {
    private static final Logger log = LoggerFactory.getLogger(KsqlController.class);
    private final KsqlServiceV2 ksqlServiceV2;

    public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName, Mono<KsqlCommandV2DTO> ksqlCmdDo, ServerWebExchange exchange) {
        return ksqlCmdDo.flatMap(command -> {
            AccessContext context = AccessContext.builder().cluster(clusterName).ksqlActions(new KsqlAction[]{KsqlAction.EXECUTE}).operationName("executeKsql").operationParams(command).build();
            return this.validateAccess(context).thenReturn((Object)new KsqlCommandV2ResponseDTO().pipeId(this.ksqlServiceV2.registerCommand(this.getCluster(clusterName), command.getKsql(), Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of())))).doOnEach(sig -> this.audit(context, sig));
        }).map(ResponseEntity::ok);
    }

    public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> openKsqlResponsePipe(String clusterName, String pipeId, ServerWebExchange exchange) {
        AccessContext context = AccessContext.builder().cluster(clusterName).ksqlActions(new KsqlAction[]{KsqlAction.EXECUTE}).operationName("openKsqlResponsePipe").build();
        return this.validateAccess(context).thenReturn((Object)ResponseEntity.ok((Object)this.ksqlServiceV2.execute(pipeId).map(table -> new KsqlResponseDTO().table(new KsqlTableResponseDTO().header(table.getHeader()).columnNames(table.getColumnNames()).values(table.getValues())))));
    }

    public Mono<ResponseEntity<Flux<KsqlStreamDescriptionDTO>>> listStreams(String clusterName, ServerWebExchange exchange) {
        AccessContext context = AccessContext.builder().cluster(clusterName).ksqlActions(new KsqlAction[]{KsqlAction.EXECUTE}).operationName("listStreams").build();
        return this.validateAccess(context).thenReturn((Object)ResponseEntity.ok((Object)this.ksqlServiceV2.listStreams(this.getCluster(clusterName)))).doOnEach(sig -> this.audit(context, sig));
    }

    public Mono<ResponseEntity<Flux<KsqlTableDescriptionDTO>>> listTables(String clusterName, ServerWebExchange exchange) {
        AccessContext context = AccessContext.builder().cluster(clusterName).ksqlActions(new KsqlAction[]{KsqlAction.EXECUTE}).operationName("listTables").build();
        return this.validateAccess(context).thenReturn((Object)ResponseEntity.ok((Object)this.ksqlServiceV2.listTables(this.getCluster(clusterName)))).doOnEach(sig -> this.audit(context, sig));
    }

    public KsqlController(KsqlServiceV2 ksqlServiceV2) {
        this.ksqlServiceV2 = ksqlServiceV2;
    }
}

