/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.service;

import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

@Component
public class OffsetsResetService {
    private static final Logger log = LoggerFactory.getLogger(OffsetsResetService.class);
    private final AdminClientService adminClientService;

    public Mono<Void> resetToEarliest(KafkaCluster cluster, String group, String topic, Collection<Integer> partitions) {
        return this.checkGroupCondition(cluster, group).flatMap(ac -> this.offsets(ac, topic, partitions, OffsetSpec.earliest()).flatMap(offsets -> this.resetOffsets(ac, group, offsets)));
    }

    private Mono<Map<TopicPartition, Long>> offsets(ReactiveAdminClient client, String topic, @Nullable Collection<Integer> partitions, OffsetSpec spec) {
        if (partitions == null) {
            return client.listTopicOffsets(topic, spec, true);
        }
        return client.listOffsets((Collection)partitions.stream().map(idx -> new TopicPartition(topic, idx.intValue())).collect(Collectors.toSet()), spec, true);
    }

    public Mono<Void> resetToLatest(KafkaCluster cluster, String group, String topic, Collection<Integer> partitions) {
        return this.checkGroupCondition(cluster, group).flatMap(ac -> this.offsets(ac, topic, partitions, OffsetSpec.latest()).flatMap(offsets -> this.resetOffsets(ac, group, offsets)));
    }

    public Mono<Void> resetToTimestamp(KafkaCluster cluster, String group, String topic, Collection<Integer> partitions, long targetTimestamp) {
        return this.checkGroupCondition(cluster, group).flatMap(ac -> this.offsets(ac, topic, partitions, OffsetSpec.forTimestamp((long)targetTimestamp)).flatMap(foundOffsets -> this.offsets(ac, topic, partitions, OffsetSpec.latest()).map(endOffsets -> this.editTsOffsets(foundOffsets, endOffsets))).flatMap(offsets -> this.resetOffsets(ac, group, offsets)));
    }

    public Mono<Void> resetToOffsets(KafkaCluster cluster, String group, String topic, Map<Integer, Long> targetOffsets) {
        Preconditions.checkNotNull(targetOffsets);
        Map<TopicPartition, Long> partitionOffsets = targetOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(topic, ((Integer)e.getKey()).intValue()), Map.Entry::getValue));
        return this.checkGroupCondition(cluster, group).flatMap(ac -> ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.earliest(), true).flatMap(earliest -> ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.latest(), true).map(latest -> this.editOffsetsBounds(partitionOffsets, earliest, latest)).flatMap(offsetsToCommit -> this.resetOffsets(ac, group, offsetsToCommit))));
    }

    private Mono<ReactiveAdminClient> checkGroupCondition(KafkaCluster cluster, String groupId) {
        return this.adminClientService.get(cluster).flatMap(ac -> ac.listConsumerGroupNames().filter(cgs -> cgs.stream().anyMatch(g -> g.equals(groupId))).flatMap(cgs -> ac.describeConsumerGroups(List.of(groupId))).filter(cgs -> cgs.containsKey(groupId)).map(cgs -> (ConsumerGroupDescription)cgs.get(groupId)).flatMap(cg -> {
            if (!Set.of(ConsumerGroupState.DEAD, ConsumerGroupState.EMPTY).contains(cg.state())) {
                return Mono.error((Throwable)new ValidationException(String.format("Group's offsets can be reset only if group is inactive, but group is in %s state", cg.state())));
            }
            return Mono.just((Object)ac);
        }).switchIfEmpty(Mono.error((Throwable)new NotFoundException("Consumer group not found"))));
    }

    private Map<TopicPartition, Long> editTsOffsets(Map<TopicPartition, Long> foundTsOffsets, Map<TopicPartition, Long> endOffsets) {
        HashMap<TopicPartition, Long> result = new HashMap<TopicPartition, Long>(endOffsets);
        result.putAll(foundTsOffsets);
        return result;
    }

    private Map<TopicPartition, Long> editOffsetsBounds(Map<TopicPartition, Long> offsetsToCheck, Map<TopicPartition, Long> earliestOffsets, Map<TopicPartition, Long> latestOffsets) {
        HashMap<TopicPartition, Long> result = new HashMap<TopicPartition, Long>();
        offsetsToCheck.forEach((tp, offset) -> {
            if ((Long)earliestOffsets.get(tp) > offset) {
                log.warn("Offset for partition {} is lower than earliest offset, resetting to earliest", tp);
                result.put((TopicPartition)tp, (Long)earliestOffsets.get(tp));
            } else if ((Long)latestOffsets.get(tp) < offset) {
                log.warn("Offset for partition {} is greater than latest offset, resetting to latest", tp);
                result.put((TopicPartition)tp, (Long)latestOffsets.get(tp));
            } else {
                result.put((TopicPartition)tp, (Long)offset);
            }
        });
        return result;
    }

    private Mono<Void> resetOffsets(ReactiveAdminClient adminClient, String groupId, Map<TopicPartition, Long> offsets) {
        return adminClient.alterConsumerGroupOffsets(groupId, offsets);
    }

    public OffsetsResetService(AdminClientService adminClientService) {
        this.adminClientService = adminClientService;
    }
}

