package org.eclipse.hono.client.kafka;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.kafka.admin.KafkaAdminClient;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/kafka/KafkaClientFactory.class */
public class KafkaClientFactory {
    public static final int CLIENT_CREATION_RETRY_DELAY_MILLIS = 1000;
    private final Vertx vertx;
    private Clock clock = Clock.systemUTC();
    public static final Duration UNLIMITED_RETRIES_DURATION = Duration.ofSeconds(-1);
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaClientFactory.class);
    private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");

    public KafkaClientFactory(Vertx vertx) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
    }

    void setClock(Clock clock) {
        this.clock = (Clock) Objects.requireNonNull(clock);
    }

    public <T> Future<T> createClientWithRetries(Supplier<T> supplier, String str, Duration duration) {
        return createClientWithRetries(supplier, () -> {
            return true;
        }, str, duration);
    }

    public <T> Future<T> createClientWithRetries(Supplier<T> supplier, Supplier<Boolean> supplier2, String str, Duration duration) {
        Objects.requireNonNull(supplier);
        Objects.requireNonNull(supplier2);
        Promise<T> promise = Promise.promise();
        createClientWithRetries(supplier, supplier2, getRetriesTimeLimit(duration), () -> {
            return Boolean.valueOf(containsValidServerEntries(str));
        }, promise);
        return promise.future();
    }

    public Future<KafkaAdminClient> createKafkaAdminClientWithRetries(Map<String, String> map, Supplier<Boolean> supplier, Duration duration) {
        Objects.requireNonNull(map);
        Objects.requireNonNull(supplier);
        Promise promise = Promise.promise();
        createClientWithRetries(() -> {
            return KafkaAdminClient.create(this.vertx, (Map<String, String>) map);
        }, supplier, getRetriesTimeLimit(duration), () -> {
            return Boolean.valueOf(containsValidServerEntries((String) map.get("bootstrap.servers")));
        }, promise);
        return promise.future();
    }

    private Instant getRetriesTimeLimit(Duration duration) {
        return (duration == null || duration.isNegative()) ? Instant.MAX : duration.isZero() ? Instant.MIN : Instant.now(this.clock).plus((TemporalAmount) duration);
    }

    private <T> void createClientWithRetries(Supplier<T> supplier, Supplier<Boolean> supplier2, Instant instant, Supplier<Boolean> supplier3, Promise<T> promise) {
        if (!supplier2.get().booleanValue()) {
            promise.fail("client code has canceled further attempts to create Kafka client");
            return;
        }
        try {
            T t = supplier.get();
            LOG.debug("successfully created client [type: {}]", t.getClass().getName());
            promise.complete(t);
        } catch (Exception e) {
            if (instant.equals(Instant.MIN) || !(e instanceof KafkaException) || !isBootstrapServersConfigException(e.getCause()) || !supplier3.get().booleanValue()) {
                LOG.warn("failed to create client due to terminal error (won't retry)", (Throwable) e);
                promise.fail(e);
            } else if (!supplier2.get().booleanValue()) {
                LOG.debug("client code has canceled further attempts to create Kafka client");
                promise.fail(e);
            } else if (Instant.now(this.clock).isBefore(instant)) {
                LOG.debug("error creating Kafka client, will retry in {}ms: {}", (Object) 1000, (Object) e.getCause().getMessage());
                this.vertx.setTimer(1000L, l -> {
                    createClientWithRetries(supplier, supplier2, instant, () -> {
                        return true;
                    }, promise);
                });
            } else {
                LOG.warn("error creating Kafka client (no further attempts will be done, timeout for retries reached): {}\n", e.getCause().getMessage());
                promise.fail(e);
            }
        }
    }

    public static boolean isRetriableClientCreationError(Throwable th, String str) {
        return (th instanceof KafkaException) && isBootstrapServersConfigException(th.getCause()) && containsValidServerEntries(str);
    }

    private static boolean isBootstrapServersConfigException(Throwable th) {
        return (th instanceof ConfigException) && th.getMessage() != null && th.getMessage().contains("bootstrap.servers");
    }

    private static boolean containsValidServerEntries(String str) {
        List list = (List) Optional.ofNullable(str).map(str2 -> {
            String trim = str2.trim();
            return trim.isEmpty() ? List.of() : Arrays.asList(COMMA_WITH_WHITESPACE.split(trim, -1));
        }).orElseGet(List::of);
        return !list.isEmpty() && list.stream().allMatch(KafkaClientFactory::containsHostAndPort);
    }

    private static boolean containsHostAndPort(String str) {
        try {
            if (Utils.getHost(str) != null) {
                if (Utils.getPort(str) != null) {
                    return true;
                }
            }
            return false;
        } catch (IllegalArgumentException e) {
            return false;
        }
    }
}
