/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.kafka.client.runtime.ui;

import io.quarkus.kafka.client.runtime.KafkaAdminClient;
import io.quarkus.kafka.client.runtime.ui.model.Order;
import io.quarkus.kafka.client.runtime.ui.model.converter.KafkaModelConverter;
import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessageCreateRequest;
import io.quarkus.kafka.client.runtime.ui.model.response.KafkaMessage;
import io.quarkus.kafka.client.runtime.ui.model.response.KafkaMessagePage;
import io.quarkus.kafka.client.runtime.ui.util.ConsumerFactory;
import io.smallrye.common.annotation.Identifier;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;

@ApplicationScoped
public class KafkaTopicClient {
    private static final int RETRIES = 3;
    @Inject
    KafkaAdminClient adminClient;
    KafkaModelConverter modelConverter = new KafkaModelConverter();
    @Inject
    @Identifier(value="default-kafka-broker")
    Map<String, Object> config;

    private Producer<Bytes, Bytes> createProducer() {
        HashMap<String, Object> config = new HashMap<String, Object>(this.config);
        config.put("client.id", "kafka-ui-producer-" + UUID.randomUUID());
        config.put("key.serializer", BytesSerializer.class.getName());
        config.put("value.serializer", BytesSerializer.class.getName());
        return new KafkaProducer<Bytes, Bytes>(config);
    }

    public KafkaMessagePage getTopicMessages(String topicName, Order order, Map<Integer, Long> partitionOffsets, int pageSize) throws ExecutionException, InterruptedException {
        this.assertParamsValid(pageSize, partitionOffsets);
        Set<Integer> requestedPartitions = partitionOffsets.keySet();
        this.assertRequestedPartitionsExist(topicName, requestedPartitions);
        if (order == null) {
            order = Order.OLD_FIRST;
        }
        List<ConsumerRecord<Bytes, Bytes>> allPartitionsResult = this.getConsumerRecords(topicName, order, pageSize, requestedPartitions, partitionOffsets, pageSize);
        Comparator<ConsumerRecord> comparator = Comparator.comparing(ConsumerRecord::timestamp);
        if (Order.NEW_FIRST == order) {
            comparator = comparator.reversed();
        }
        allPartitionsResult.sort(comparator);
        if (allPartitionsResult.size() > pageSize) {
            allPartitionsResult = allPartitionsResult.subList(0, pageSize);
        }
        Map<Integer, Long> newOffsets = this.calculateNewPartitionOffset(partitionOffsets, allPartitionsResult, order, topicName);
        List<KafkaMessage> convertedResult = allPartitionsResult.stream().map(this.modelConverter::convert).collect(Collectors.toList());
        return new KafkaMessagePage(newOffsets, convertedResult);
    }

    private void assertParamsValid(int pageSize, Map<Integer, Long> partitionOffsets) {
        if (pageSize <= 0) {
            throw new IllegalArgumentException("Page size must be > 0.");
        }
        if (partitionOffsets == null || partitionOffsets.isEmpty()) {
            throw new IllegalArgumentException("Partition offset map must be specified.");
        }
        for (Map.Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) {
            if (partitionOffset.getValue() >= 0L) continue;
            throw new IllegalArgumentException("Partition offset must be > 0.");
        }
    }

    private ConsumerRecords<Bytes, Bytes> pollWhenReady(Consumer<Bytes, Bytes> consumer) {
        Duration pullDuration = Duration.of(100L, ChronoUnit.MILLIS);
        ConsumerRecords<Bytes, Bytes> result = consumer.poll(pullDuration);
        for (int attempts = 0; result.isEmpty() && attempts < 3; ++attempts) {
            result = consumer.poll(pullDuration);
        }
        return result;
    }

    private Map<Integer, Long> calculateNewPartitionOffset(Map<Integer, Long> oldPartitionOffset, Collection<ConsumerRecord<Bytes, Bytes>> records, Order order, String topicName) {
        Map newOffsets = records.stream().map(ConsumerRecord::partition).collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
        HashMap<Integer, Long> newPartitionOffset = new HashMap<Integer, Long>();
        for (Integer partition : oldPartitionOffset.keySet()) {
            int multiplier = Order.OLD_FIRST == order ? 1 : -1;
            long newOffset = oldPartitionOffset.get(partition) + (long)multiplier * newOffsets.getOrDefault(partition, 0L);
            newPartitionOffset.put(partition, newOffset);
        }
        return newPartitionOffset;
    }

    private long getPosition(String topicName, int partition, Order order) {
        try (Consumer<Bytes, Bytes> consumer = ConsumerFactory.createConsumer(topicName, partition, this.config);){
            TopicPartition topicPartition = new TopicPartition(topicName, partition);
            if (Order.NEW_FIRST == order) {
                consumer.seekToEnd(List.of(topicPartition));
            } else {
                consumer.seekToBeginning(List.of(topicPartition));
            }
            long l = consumer.position(topicPartition);
            return l;
        }
    }

    public Map<Integer, Long> getPagePartitionOffset(String topicName, Collection<Integer> requestedPartitions, Order order) throws ExecutionException, InterruptedException {
        this.assertRequestedPartitionsExist(topicName, requestedPartitions);
        HashMap<Integer, Long> result = new HashMap<Integer, Long>();
        for (Integer requestedPartition : requestedPartitions) {
            long maxPosition = this.getPosition(topicName, requestedPartition, order);
            result.put(requestedPartition, maxPosition);
        }
        return result;
    }

    private List<ConsumerRecord<Bytes, Bytes>> getConsumerRecords(String topicName, Order order, int pageSize, Collection<Integer> requestedPartitions, Map<Integer, Long> start, int totalMessages) {
        ArrayList<ConsumerRecord<Bytes, Bytes>> allPartitionsResult = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
        for (Integer requestedPartition : requestedPartitions) {
            List partitionResult = new ArrayList<ConsumerRecord>();
            Long offset = start.get(requestedPartition);
            try (Consumer<Bytes, Bytes> consumer = ConsumerFactory.createConsumer(topicName, requestedPartition, this.config);){
                TopicPartition partition = new TopicPartition(topicName, requestedPartition);
                long seekedOffset = Order.OLD_FIRST == order ? offset : Long.max(offset - (long)pageSize, 0L);
                consumer.seek(partition, seekedOffset);
                int numberOfMessagesReadSoFar = 0;
                boolean keepOnReading = true;
                block6: while (keepOnReading) {
                    ConsumerRecords<Bytes, Bytes> records = this.pollWhenReady(consumer);
                    if (records.isEmpty()) {
                        keepOnReading = false;
                    }
                    for (ConsumerRecord<Bytes, Bytes> consumerRecord : records) {
                        partitionResult.add(consumerRecord);
                        if (++numberOfMessagesReadSoFar < totalMessages) continue;
                        keepOnReading = false;
                        continue block6;
                    }
                }
                if (Order.NEW_FIRST == order && seekedOffset == 0L && partitionResult.size() > offset.intValue()) {
                    partitionResult.sort(Comparator.comparing(ConsumerRecord::timestamp));
                    partitionResult = partitionResult.subList(0, offset.intValue());
                }
            }
            allPartitionsResult.addAll(partitionResult);
        }
        return allPartitionsResult;
    }

    private void assertRequestedPartitionsExist(String topicName, Collection<Integer> requestedPartitions) throws InterruptedException, ExecutionException {
        List<Integer> topicPartitions = this.partitions(topicName);
        if (!new HashSet<Integer>(topicPartitions).containsAll(requestedPartitions)) {
            throw new IllegalArgumentException(String.format("Requested messages from partition, that do not exist. Requested partitions: %s. Existing partitions: %s", requestedPartitions, topicPartitions));
        }
    }

    public void createMessage(KafkaMessageCreateRequest request) {
        ProducerRecord<Bytes, Bytes> record = new ProducerRecord<Bytes, Bytes>(request.getTopic(), request.getPartition(), Bytes.wrap(request.getKey().getBytes()), Bytes.wrap(request.getValue().getBytes()));
        try (Producer<Bytes, Bytes> producer = this.createProducer();){
            producer.send(record);
        }
    }

    public List<Integer> partitions(String topicName) throws ExecutionException, InterruptedException {
        return this.adminClient.describeTopics(List.of(topicName)).values().stream().reduce((a, b) -> {
            throw new IllegalStateException("Requested info about single topic, but got result of multiple: " + a + ", " + b);
        }).orElseThrow(() -> new IllegalStateException("Requested info about a topic, but nothing found. Topic name: " + topicName)).partitions().stream().map(TopicPartitionInfo::partition).collect(Collectors.toList());
    }
}

