/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.log.remote.metadata.storage;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback;
import org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager;
import org.apache.kafka.server.log.remote.metadata.storage.ProducerManager;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataTopicPartitioner;
import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicBasedRemoteLogMetadataManager
implements BrokerReadyCallback,
RemoteLogMetadataManager {
    private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
    private final Time time = Time.SYSTEM;
    private final AtomicBoolean configured = new AtomicBoolean(false);
    private final AtomicBoolean closing = new AtomicBoolean(false);
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private Thread initializationThread;
    private volatile ProducerManager producerManager;
    private volatile ConsumerManager consumerManager;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final RemotePartitionMetadataStore remotePartitionMetadataStore;
    private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet());
    private final Function<Integer, RemoteLogMetadataTopicPartitioner> partitionerFunction;

    public TopicBasedRemoteLogMetadataManager() {
        this(RemoteLogMetadataTopicPartitioner::new, RemotePartitionMetadataStore::new);
    }

    TopicBasedRemoteLogMetadataManager(Function<Integer, RemoteLogMetadataTopicPartitioner> partitionerFunction, Supplier<RemotePartitionMetadataStore> metadataStoreSupplier) {
        this.partitionerFunction = partitionerFunction;
        this.remotePartitionMetadataStore = metadataStoreSupplier.get();
    }

    public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
        return this.withReadLockAndEnsureInitialized(() -> {
            if (remoteLogSegmentMetadata.state() != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
                throw new IllegalArgumentException("Given remoteLogSegmentMetadata should have state as " + String.valueOf(RemoteLogSegmentState.COPY_SEGMENT_STARTED) + " but it contains state as: " + String.valueOf(remoteLogSegmentMetadata.state()));
            }
            return this.storeRemoteLogMetadata((RemoteLogMetadata)remoteLogSegmentMetadata);
        });
    }

    public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate) throws RemoteStorageException {
        Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be null");
        return this.withReadLockAndEnsureInitialized(() -> {
            if (metadataUpdate.state() == RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
                throw new IllegalArgumentException("Given remoteLogSegmentMetadataUpdate should not have the state as: " + String.valueOf(RemoteLogSegmentState.COPY_SEGMENT_STARTED));
            }
            return this.storeRemoteLogMetadata((RemoteLogMetadata)metadataUpdate);
        });
    }

    public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata deleteMetadata) throws RemoteStorageException {
        Objects.requireNonNull(deleteMetadata, "deleteMetadata can not be null");
        return this.withReadLockAndEnsureInitialized(() -> this.storeRemoteLogMetadata((RemoteLogMetadata)deleteMetadata));
    }

    private CompletableFuture<Void> storeRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) throws RemoteStorageException {
        log.debug("Storing the partition: {} metadata: {}", (Object)remoteLogMetadata.topicIdPartition(), (Object)remoteLogMetadata);
        try {
            CompletableFuture<RecordMetadata> produceFuture = this.producerManager.publishMessage(remoteLogMetadata);
            return produceFuture.thenAcceptAsync(recordMetadata -> {
                try {
                    this.consumerManager.waitTillConsumptionCatchesUp((RecordMetadata)recordMetadata);
                }
                catch (TimeoutException e) {
                    throw new KafkaException((Throwable)e);
                }
            });
        }
        catch (KafkaException e) {
            if (e instanceof RetriableException) {
                throw e;
            }
            throw new RemoteStorageException((Throwable)e);
        }
    }

    public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, int epochForOffset, long offset) throws RemoteStorageException {
        return this.withReadLockAndEnsureInitialized(() -> this.remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, offset, epochForOffset));
    }

    public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
        return this.withReadLockAndEnsureInitialized(() -> this.remotePartitionMetadataStore.highestLogOffset(topicIdPartition, leaderEpoch));
    }

    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException {
        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
        return this.withReadLockAndEnsureInitialized(() -> this.remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition));
    }

    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
        return this.withReadLockAndEnsureInitialized(() -> this.remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions, Set<TopicIdPartition> followerPartitions) {
        Objects.requireNonNull(leaderPartitions, "leaderPartitions can not be null");
        Objects.requireNonNull(followerPartitions, "followerPartitions can not be null");
        log.info("Received leadership notifications with leader partitions {} and follower partitions {}", leaderPartitions, followerPartitions);
        this.lock.readLock().lock();
        try {
            if (this.closing.get()) {
                throw new IllegalStateException("This instance is in closing state");
            }
            HashSet<TopicIdPartition> allPartitions = new HashSet<TopicIdPartition>(leaderPartitions);
            allPartitions.addAll(followerPartitions);
            if (!this.initialized.get()) {
                this.pendingAssignPartitions.addAll(allPartitions);
            } else {
                this.assignPartitions(allPartitions);
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    private void assignPartitions(Set<TopicIdPartition> allPartitions) {
        for (TopicIdPartition partition : allPartitions) {
            this.remotePartitionMetadataStore.maybeLoadPartition(partition);
        }
        this.consumerManager.addAssignmentsForPartitions(allPartitions);
    }

    public void onStopPartitions(Set<TopicIdPartition> partitions) {
        this.lock.readLock().lock();
        try {
            if (this.closing.get()) {
                throw new IllegalStateException("This instance is in closing state");
            }
            if (!this.initialized.get()) {
                if (!this.pendingAssignPartitions.isEmpty()) {
                    this.pendingAssignPartitions.removeAll(partitions);
                }
            } else {
                this.consumerManager.removeAssignmentsForPartitions(partitions);
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
        long remoteLogSize = 0L;
        Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator = this.remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch);
        while (remoteLogSegmentMetadataIterator.hasNext()) {
            RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next();
            remoteLogSize += (long)remoteLogSegmentMetadata.segmentSizeInBytes();
        }
        return remoteLogSize;
    }

    public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long offset) throws RemoteStorageException {
        return this.withReadLockAndEnsureInitialized(() -> this.remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset));
    }

    public void configure(Map<String, ?> configs) {
        Objects.requireNonNull(configs, "configs can not be null.");
        this.lock.writeLock().lock();
        try {
            if (this.configured.compareAndSet(false, true)) {
                TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
                this.initializationThread = KafkaThread.nonDaemon((String)"RLMMInitializationThread", () -> this.initializeResources(rlmmConfig));
                log.info("Successfully configured topic-based RLMM with config: {}", (Object)rlmmConfig);
            } else {
                log.info("Skipping configure as it is already configured.");
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public boolean isReady(TopicIdPartition topicIdPartition) {
        return this.remotePartitionMetadataStore.isInitialized(topicIdPartition);
    }

    private void handleRetry(long retryIntervalMs) {
        log.info("Sleep for {} ms before retrying.", (Object)retryIntervalMs);
        Utils.sleep((long)retryIntervalMs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeResources(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig) {
        log.info("Initializing topic-based RLMM resources");
        int metadataTopicPartitionCount = rlmmConfig.metadataTopicPartitionsCount();
        long retryIntervalMs = rlmmConfig.initializationRetryIntervalMs();
        long retryMaxTimeoutMs = rlmmConfig.initializationRetryMaxTimeoutMs();
        RemoteLogMetadataTopicPartitioner partitioner = this.partitionerFunction.apply(metadataTopicPartitionCount);
        NewTopic newTopic = this.newRemoteLogMetadataTopic(rlmmConfig);
        boolean isTopicCreated = false;
        long startTimeMs = this.time.milliseconds();
        boolean initializationFailed = false;
        try (Admin admin = Admin.create(rlmmConfig.commonProperties());){
            while (!(this.initialized.get() || this.closing.get() || initializationFailed)) {
                block24: {
                    if (this.time.milliseconds() - startTimeMs > retryMaxTimeoutMs) {
                        log.error("Timed out to initialize the resources within {} ms.", (Object)retryMaxTimeoutMs);
                        initializationFailed = true;
                        break;
                    }
                    boolean bl = isTopicCreated = isTopicCreated || this.createTopic(admin, newTopic);
                    if (!isTopicCreated) {
                        this.handleRetry(retryIntervalMs);
                        continue;
                    }
                    try {
                        if (this.isPartitionsCountSameAsConfigured(admin, newTopic.name(), metadataTopicPartitionCount)) break block24;
                        initializationFailed = true;
                        break;
                    }
                    catch (Exception e) {
                        this.handleRetry(retryIntervalMs);
                        continue;
                    }
                }
                this.lock.writeLock().lock();
                try {
                    this.producerManager = new ProducerManager(rlmmConfig, partitioner);
                    this.consumerManager = new ConsumerManager(rlmmConfig, this.remotePartitionMetadataStore, partitioner, this.time);
                    this.consumerManager.startConsumerThread();
                    if (!this.pendingAssignPartitions.isEmpty()) {
                        this.assignPartitions(this.pendingAssignPartitions);
                        this.pendingAssignPartitions.clear();
                    }
                    this.initialized.set(true);
                    log.info("Initialized topic-based RLMM resources successfully");
                }
                catch (Exception e) {
                    log.error("Encountered error while initializing producer/consumer", (Throwable)e);
                    initializationFailed = true;
                }
                finally {
                    this.lock.writeLock().unlock();
                }
            }
        }
        catch (KafkaException e) {
            log.error("Encountered error while initializing topic-based RLMM resources", (Throwable)e);
            initializationFailed = true;
        }
        finally {
            if (initializationFailed) {
                log.error("Stopping the server as it failed to initialize topic-based RLMM resources");
                Exit.exit((int)1);
            }
        }
    }

    @Override
    public void onBrokerReady() {
        this.initializationThread.start();
    }

    boolean doesTopicExist(Admin admin, String topic) throws ExecutionException, InterruptedException {
        try {
            TopicDescription description = (TopicDescription)((KafkaFuture)admin.describeTopics(Set.of(topic)).topicNameValues().get(topic)).get();
            log.info("Topic {} exists. TopicId: {}, numPartitions: {}", new Object[]{topic, description.topicId(), description.partitions().size()});
            return true;
        }
        catch (InterruptedException | ExecutionException ex) {
            if (ex.getCause() instanceof UnknownTopicOrPartitionException) {
                log.info("Topic {} does not exist", (Object)topic);
                return false;
            }
            throw ex;
        }
    }

    private boolean isPartitionsCountSameAsConfigured(Admin admin, String topicName, int metadataTopicPartitionCount) throws InterruptedException, ExecutionException {
        log.debug("Getting topic details to check for partition count and replication factor.");
        TopicDescription topicDescription = (TopicDescription)((KafkaFuture)admin.describeTopics(Set.of(topicName)).topicNameValues().get(topicName)).get();
        int topicPartitionsSize = topicDescription.partitions().size();
        if (topicPartitionsSize != metadataTopicPartitionCount) {
            log.error("Existing topic partition count {} is not same as the expected partition count {}", (Object)topicPartitionsSize, (Object)metadataTopicPartitionCount);
            return false;
        }
        return true;
    }

    private NewTopic newRemoteLogMetadataTopic(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig) {
        HashMap<String, String> topicConfigs = new HashMap<String, String>();
        topicConfigs.put("retention.ms", Long.toString(rlmmConfig.metadataTopicRetentionMs()));
        topicConfigs.put("cleanup.policy", "delete");
        topicConfigs.put("remote.storage.enable", "false");
        return new NewTopic(rlmmConfig.remoteLogMetadataTopicName(), rlmmConfig.metadataTopicPartitionsCount(), rlmmConfig.metadataTopicReplicationFactor()).configs(topicConfigs);
    }

    private boolean createTopic(Admin admin, NewTopic newTopic) {
        boolean doesTopicExist = false;
        String topic = newTopic.name();
        try {
            doesTopicExist = this.doesTopicExist(admin, topic);
            if (!doesTopicExist) {
                CreateTopicsResult result = admin.createTopics(Set.of(newTopic));
                result.all().get();
                List<String> overriddenConfigs = ((Config)result.config(topic).get()).entries().stream().filter(entry -> !entry.isDefault()).map(entry -> entry.name() + "=" + entry.value()).toList();
                log.info("Topic {} created. TopicId: {}, numPartitions: {}, replicationFactor: {}, config: {}", new Object[]{topic, result.topicId(topic).get(), result.numPartitions(topic).get(), result.replicationFactor(topic).get(), overriddenConfigs});
                doesTopicExist = true;
            }
        }
        catch (Exception e) {
            if (e.getCause() instanceof TopicExistsException) {
                log.info("Topic: {} already exists", (Object)topic);
                doesTopicExist = true;
            }
            log.error("Encountered error while querying or creating {} topic.", (Object)topic, (Object)e);
        }
        return doesTopicExist;
    }

    boolean isInitialized() {
        return this.initialized.get();
    }

    private void ensureInitializedAndNotClosed() {
        if (this.closing.get() || !this.initialized.get()) {
            throw new IllegalStateException("This instance is in invalid state, initialized: " + String.valueOf(this.initialized) + " close: " + String.valueOf(this.closing));
        }
    }

    public void close() throws IOException {
        log.info("Closing topic-based RLMM resources");
        if (this.closing.compareAndSet(false, true)) {
            if (this.initializationThread != null) {
                try {
                    this.initializationThread.join();
                }
                catch (InterruptedException e) {
                    log.error("Initialization thread was interrupted while waiting to join on close.", (Throwable)e);
                }
            }
            Utils.closeQuietly((AutoCloseable)this.producerManager, (String)"ProducerTask");
            Utils.closeQuietly((AutoCloseable)this.consumerManager, (String)"RLMMConsumerManager");
            Utils.closeQuietly((AutoCloseable)this.remotePartitionMetadataStore, (String)"RemotePartitionMetadataStore");
            log.info("Closed topic-based RLMM resources");
        }
    }

    private <T> T withReadLockAndEnsureInitialized(ThrowingSupplier<T, RemoteStorageException> action) throws RemoteStorageException {
        this.lock.readLock().lock();
        try {
            this.ensureInitializedAndNotClosed();
            T t = action.get();
            return t;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @FunctionalInterface
    public static interface ThrowingSupplier<T, E extends Exception> {
        public T get() throws E;
    }
}

