/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service;

import com.google.common.collect.Streams;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
import com.provectus.kafka.ui.model.InternalConsumerGroup;
import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ConsumerGroupService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.ConsumerGroupState;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
public class ConsumerGroupService {
    private final AdminClientService adminClientService;
    private final AccessControlService accessControlService;

    private Mono<List<InternalConsumerGroup>> getConsumerGroups(ReactiveAdminClient ac, List<ConsumerGroupDescription> descriptions) {
        List<String> groupNames = descriptions.stream().map(ConsumerGroupDescription::groupId).toList();
        return ac.listConsumerGroupOffsets(groupNames, null).flatMap(committedOffsets -> ac.listOffsets((Collection)committedOffsets.columnKeySet(), OffsetSpec.latest(), false).map(endOffsets -> descriptions.stream().map(desc -> {
            Map groupOffsets = committedOffsets.row((Object)desc.groupId());
            HashMap endOffsetsForGroup = new HashMap(endOffsets);
            endOffsetsForGroup.keySet().retainAll(groupOffsets.keySet());
            return InternalConsumerGroup.create((ConsumerGroupDescription)desc, (Map)groupOffsets, endOffsetsForGroup);
        }).collect(Collectors.toList())));
    }

    public Mono<List<InternalTopicConsumerGroup>> getConsumerGroupsForTopic(KafkaCluster cluster, String topic) {
        return this.adminClientService.get(cluster).flatMap(ac -> ac.listTopicOffsets(topic, OffsetSpec.latest(), false).flatMap(endOffsets -> {
            ArrayList tps = new ArrayList(endOffsets.keySet());
            return this.describeConsumerGroups(ac).flatMap(groups -> {
                List<String> groupNames = groups.stream().map(ConsumerGroupDescription::groupId).toList();
                return ac.listConsumerGroupOffsets(groupNames, (List)tps).map(offsets -> groups.stream().filter(g -> this.isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow((Object)g.groupId()))).map(g -> InternalTopicConsumerGroup.create((String)topic, (ConsumerGroupDescription)g, (Map)offsets.row((Object)g.groupId()), (Map)endOffsets)).toList());
            });
        }));
    }

    private boolean isConsumerGroupRelatesToTopic(String topic, ConsumerGroupDescription description, boolean hasCommittedOffsets) {
        boolean hasActiveMembersForTopic = description.members().stream().anyMatch(m -> m.assignment().topicPartitions().stream().anyMatch(tp -> tp.topic().equals(topic)));
        return hasActiveMembersForTopic || hasCommittedOffsets;
    }

    public Mono<ConsumerGroupsPage> getConsumerGroupsPage(KafkaCluster cluster, int pageNum, int perPage, @Nullable String search, ConsumerGroupOrderingDTO orderBy, SortOrderDTO sortOrderDto) {
        return this.adminClientService.get(cluster).flatMap(ac -> ac.listConsumerGroups().map(listing -> search == null ? listing : listing.stream().filter(g -> StringUtils.containsIgnoreCase((CharSequence)g.groupId(), (CharSequence)search)).toList()).flatMapIterable(lst -> lst).filterWhen(cg -> this.accessControlService.isConsumerGroupAccessible(cg.groupId(), cluster.getName())).collectList().flatMap(allGroups -> this.loadSortedDescriptions(ac, allGroups, pageNum, perPage, orderBy, sortOrderDto).flatMap(descriptions -> this.getConsumerGroups(ac, descriptions).map(page -> new ConsumerGroupsPage(page, allGroups.size() / perPage + (allGroups.size() % perPage == 0 ? 0 : 1))))));
    }

    private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdminClient ac, List<ConsumerGroupListing> groups, int pageNum, int perPage, ConsumerGroupOrderingDTO orderBy, SortOrderDTO sortOrderDto) {
        return switch (1.$SwitchMap$com$provectus$kafka$ui$model$ConsumerGroupOrderingDTO[orderBy.ordinal()]) {
            default -> throw new IncompatibleClassChangeError();
            case 1 -> {
                Comparator<ConsumerGroupListing> comparator = Comparator.comparing(ConsumerGroupListing::groupId);
                yield this.loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto);
            }
            case 2 -> {
                ToIntFunction<ConsumerGroupListing> statesPriorities = cg -> switch (1.$SwitchMap$org$apache$kafka$common$ConsumerGroupState[cg.state().orElse(ConsumerGroupState.UNKNOWN).ordinal()]) {
                    default -> throw new IncompatibleClassChangeError();
                    case 1 -> 0;
                    case 2 -> 1;
                    case 3 -> 2;
                    case 4 -> 3;
                    case 5 -> 4;
                    case 6 -> 5;
                };
                Comparator<ConsumerGroupListing> comparator = Comparator.comparingInt(statesPriorities);
                yield this.loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto);
            }
            case 3 -> {
                Comparator<ConsumerGroupDescription> comparator = Comparator.comparingInt(cg -> cg.members().size());
                List<String> groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
                yield ac.describeConsumerGroups(groupNames).map(descriptions -> this.sortAndPaginate(descriptions.values(), comparator, pageNum, perPage, sortOrderDto).toList());
            }
            case 4 -> {
                Comparator<GroupWithDescr> comparator = Comparator.comparingLong(gwd -> gwd.icg.getConsumerLag() == null ? 0L : gwd.icg.getConsumerLag());
                yield this.loadDescriptionsByInternalConsumerGroups(ac, groups, comparator, pageNum, perPage, sortOrderDto);
            }
            case 5 -> {
                Comparator<GroupWithDescr> comparator = Comparator.comparingInt(gwd -> gwd.icg.getTopicNum());
                yield this.loadDescriptionsByInternalConsumerGroups(ac, groups, comparator, pageNum, perPage, sortOrderDto);
            }
        };
    }

    private Mono<List<ConsumerGroupDescription>> loadDescriptionsByListings(ReactiveAdminClient ac, List<ConsumerGroupListing> listings, Comparator<ConsumerGroupListing> comparator, int pageNum, int perPage, SortOrderDTO sortOrderDto) {
        List<String> sortedGroups = this.sortAndPaginate(listings, comparator, pageNum, perPage, sortOrderDto).map(ConsumerGroupListing::groupId).toList();
        return ac.describeConsumerGroups(sortedGroups).map(descrMap -> sortedGroups.stream().map(descrMap::get).toList());
    }

    private <T> Stream<T> sortAndPaginate(Collection<T> collection, Comparator<T> comparator, int pageNum, int perPage, SortOrderDTO sortOrderDto) {
        return collection.stream().sorted(sortOrderDto == SortOrderDTO.ASC ? comparator : comparator.reversed()).skip((long)(pageNum - 1) * (long)perPage).limit(perPage);
    }

    private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdminClient ac) {
        return ac.listConsumerGroupNames().flatMap(arg_0 -> ((ReactiveAdminClient)ac).describeConsumerGroups(arg_0)).map(cgs -> new ArrayList(cgs.values()));
    }

    private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(ReactiveAdminClient ac, List<ConsumerGroupListing> groups, Comparator<GroupWithDescr> comparator, int pageNum, int perPage, SortOrderDTO sortOrderDto) {
        List<String> groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
        return ac.describeConsumerGroups(groupNames).flatMap(descriptionsMap -> {
            List descriptions = descriptionsMap.values().stream().toList();
            return this.getConsumerGroups(ac, descriptions).map(icg -> Streams.zip(icg.stream(), descriptions.stream(), GroupWithDescr::new).toList()).map(gwd -> this.sortAndPaginate((Collection)gwd, comparator, pageNum, perPage, sortOrderDto).map(GroupWithDescr::cgd).toList());
        });
    }

    public Mono<InternalConsumerGroup> getConsumerGroupDetail(KafkaCluster cluster, String consumerGroupId) {
        return this.adminClientService.get(cluster).flatMap(ac -> ac.describeConsumerGroups(List.of(consumerGroupId)).filter(m -> m.containsKey(consumerGroupId)).map(r -> (ConsumerGroupDescription)r.get(consumerGroupId)).flatMap(descr -> this.getConsumerGroups(ac, List.of(descr)).filter(groups -> !groups.isEmpty()).map(groups -> (InternalConsumerGroup)groups.get(0))));
    }

    public Mono<Void> deleteConsumerGroupById(KafkaCluster cluster, String groupId) {
        return this.adminClientService.get(cluster).flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
    }

    public EnhancedConsumer createConsumer(KafkaCluster cluster) {
        return this.createConsumer(cluster, Map.of());
    }

    public EnhancedConsumer createConsumer(KafkaCluster cluster, Map<String, Object> properties) {
        Properties props = new Properties();
        SslPropertiesUtil.addKafkaSslProperties((ClustersProperties.TruststoreConfig)cluster.getOriginalProperties().getSsl(), (Properties)props);
        props.putAll((Map<?, ?>)cluster.getProperties());
        props.put("client.id", "kafka-ui-consumer-" + System.currentTimeMillis());
        props.put("bootstrap.servers", cluster.getBootstrapServers());
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "false");
        props.put("allow.auto.create.topics", "false");
        props.putAll(properties);
        return new EnhancedConsumer(props, cluster.getPollingSettings().getPollingThrottler(), ApplicationMetrics.forCluster((KafkaCluster)cluster));
    }

    public ConsumerGroupService(AdminClientService adminClientService, AccessControlService accessControlService) {
        this.adminClientService = adminClientService;
        this.accessControlService = accessControlService;
    }
}

