package org.eclipse.hono.client.impl;

import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.ext.web.handler.TimeoutHandler;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonSession;
import io.vertx.proton.sasl.MechanismMismatchException;
import io.vertx.proton.sasl.SaslSystemException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLException;
import javax.security.sasl.AuthenticationException;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.DisconnectListener;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.ReconnectListener;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.connection.ConnectionFactory;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.HonoProtonHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.7.2.jar:org/eclipse/hono/client/impl/HonoConnectionImpl.class */
public class HonoConnectionImpl implements HonoConnection {
    protected final Logger log;
    protected final ClientConfigProperties clientConfigProperties;
    protected final Vertx vertx;
    protected ProtonConnection connection;
    protected volatile Context context;
    private final List<DisconnectListener<HonoConnection>> disconnectListeners;
    private final List<DisconnectListener<HonoConnection>> oneTimeDisconnectListeners;
    private final List<ReconnectListener<HonoConnection>> reconnectListeners;
    private final AtomicBoolean shuttingDown;
    private final AtomicBoolean disconnecting;
    private final ConnectionFactory connectionFactory;
    private final Object connectionLock;
    private final AtomicReference<ConnectionAttempt> currentConnectionAttempt;
    private final String containerId;
    private final DeferredConnectionCheckHandler deferredConnectionCheckHandler;
    private ProtonClientOptions lastUsedClientOptions;
    private List<Symbol> offeredCapabilities;
    private Tracer tracer;
    private ProtonSession session;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/hono-client-1.7.2.jar:org/eclipse/hono/client/impl/HonoConnectionImpl$ConnectionAttempt.class */
    public class ConnectionAttempt {
        private final ProtonClientOptions clientOptions;
        private final Handler<AsyncResult<HonoConnection>> connectionHandler;
        private final AtomicInteger connectAttempts = new AtomicInteger(0);
        private final AtomicBoolean cancelled = new AtomicBoolean();
        private Long reconnectTimerId;

        ConnectionAttempt(ProtonClientOptions protonClientOptions, Handler<AsyncResult<HonoConnection>> handler) {
            this.clientOptions = protonClientOptions;
            this.connectionHandler = asyncResult -> {
                if (asyncResult.failed()) {
                    HonoConnectionImpl.this.clearState();
                }
                HonoConnectionImpl.this.currentConnectionAttempt.compareAndSet(this, null);
                handler.handle(asyncResult);
                HonoConnectionImpl.this.deferredConnectionCheckHandler.setConnectionAttemptFinished(asyncResult);
            };
        }

        public boolean start(boolean z) {
            if (!HonoConnectionImpl.this.currentConnectionAttempt.compareAndSet(null, this)) {
                return false;
            }
            HonoConnectionImpl.this.deferredConnectionCheckHandler.setConnectionAttemptInProgress();
            if (z) {
                reconnect(null);
                return true;
            }
            connect();
            return true;
        }

        public void cancel(String str) {
            if (HonoConnectionImpl.this.currentConnectionAttempt.get() == this && this.cancelled.compareAndSet(false, true)) {
                Optional ofNullable = Optional.ofNullable(this.reconnectTimerId);
                Vertx vertx = HonoConnectionImpl.this.vertx;
                Objects.requireNonNull(vertx);
                boolean booleanValue = ((Boolean) ofNullable.map((v1) -> {
                    return r1.cancelTimer(v1);
                }).orElse(false)).booleanValue();
                Logger logger = HonoConnectionImpl.this.log;
                Object[] objArr = new Object[5];
                objArr[0] = booleanValue ? "upcoming" : "ongoing";
                objArr[1] = Integer.valueOf(this.connectAttempts.get() + 1);
                objArr[2] = HonoConnectionImpl.this.connectionFactory.getHost();
                objArr[3] = Integer.valueOf(HonoConnectionImpl.this.connectionFactory.getPort());
                objArr[4] = HonoConnectionImpl.this.connectionFactory.getServerRole();
                logger.debug("cancelled {} connection attempt [#{}] to server [{}:{}, role: {}]", objArr);
                this.connectionHandler.handle(Future.failedFuture(new ClientErrorException(409, str)));
            }
        }

        private void connect() {
            if (this.cancelled.get()) {
                return;
            }
            HonoConnectionImpl.this.log.debug("starting attempt [#{}] to connect to server [{}:{}, role: {}]", Integer.valueOf(this.connectAttempts.get() + 1), HonoConnectionImpl.this.connectionFactory.getHost(), Integer.valueOf(HonoConnectionImpl.this.connectionFactory.getPort()), HonoConnectionImpl.this.connectionFactory.getServerRole());
            ConnectionFactory connectionFactory = HonoConnectionImpl.this.connectionFactory;
            ProtonClientOptions protonClientOptions = this.clientOptions;
            String str = HonoConnectionImpl.this.containerId;
            HonoConnectionImpl honoConnectionImpl = HonoConnectionImpl.this;
            Handler<AsyncResult<ProtonConnection>> handler = asyncResult -> {
                honoConnectionImpl.onRemoteClose(asyncResult);
            };
            HonoConnectionImpl honoConnectionImpl2 = HonoConnectionImpl.this;
            connectionFactory.connect(protonClientOptions, null, null, str, handler, protonConnection -> {
                honoConnectionImpl2.onRemoteDisconnect(protonConnection);
            }, asyncResult2 -> {
                if (asyncResult2.failed()) {
                    reconnect(asyncResult2.cause());
                    return;
                }
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult2.result();
                if (!this.cancelled.get()) {
                    HonoConnectionImpl.this.log.debug("attempt [#{}]: connected to server [{}:{}, role: {}]; remote container: {}", Integer.valueOf(this.connectAttempts.get() + 1), HonoConnectionImpl.this.connectionFactory.getHost(), Integer.valueOf(HonoConnectionImpl.this.connectionFactory.getPort()), HonoConnectionImpl.this.connectionFactory.getServerRole(), protonConnection2.getRemoteContainer());
                    HonoConnectionImpl.this.setConnection(protonConnection2, HonoConnectionImpl.this.createDefaultSession(protonConnection2));
                    this.connectionHandler.handle(Future.succeededFuture(HonoConnectionImpl.this));
                } else {
                    HonoConnectionImpl.this.log.debug("attempt [#{}]: connected but will directly be closed because attempt got cancelled; server [{}:{}, role: {}]", Integer.valueOf(this.connectAttempts.get() + 1), HonoConnectionImpl.this.connectionFactory.getHost(), Integer.valueOf(HonoConnectionImpl.this.connectionFactory.getPort()), HonoConnectionImpl.this.connectionFactory.getServerRole());
                    protonConnection2.closeHandler(null);
                    protonConnection2.disconnectHandler(null);
                    protonConnection2.close();
                }
            });
        }

        private void reconnect(Throwable th) {
            if (this.cancelled.get()) {
                return;
            }
            if (th != null) {
                logConnectionError(th);
            }
            if (HonoConnectionImpl.this.clientConfigProperties.getReconnectAttempts() - this.connectAttempts.get() == 0) {
                HonoConnectionImpl.this.log.info("max number of attempts [{}] to re-connect to server [{}:{}, role: {}] have been made, giving up", Integer.valueOf(HonoConnectionImpl.this.clientConfigProperties.getReconnectAttempts()), HonoConnectionImpl.this.connectionFactory.getHost(), Integer.valueOf(HonoConnectionImpl.this.connectionFactory.getPort()), HonoConnectionImpl.this.connectionFactory.getServerRole());
                this.connectionHandler.handle(Future.failedFuture(mapConnectionAttemptFailure(th)));
                return;
            }
            long reconnectMaxDelay = HonoConnectionImpl.this.getReconnectMaxDelay(this.connectAttempts.getAndIncrement());
            long nextLong = reconnectMaxDelay > HonoConnectionImpl.this.clientConfigProperties.getReconnectMinDelay() ? ThreadLocalRandom.current().nextLong(HonoConnectionImpl.this.clientConfigProperties.getReconnectMinDelay(), reconnectMaxDelay) : HonoConnectionImpl.this.clientConfigProperties.getReconnectMinDelay();
            if (nextLong <= 0) {
                connect();
            } else {
                HonoConnectionImpl.this.log.trace("scheduling new connection attempt in {}ms ...", Long.valueOf(nextLong));
                this.reconnectTimerId = Long.valueOf(HonoConnectionImpl.this.vertx.setTimer(nextLong, l -> {
                    this.reconnectTimerId = null;
                    connect();
                }));
            }
        }

        private ServiceInvocationException mapConnectionAttemptFailure(Throwable th) {
            return th == null ? new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "failed to connect") : th instanceof AuthenticationException ? new ClientErrorException(401, "failed to authenticate with server") : th instanceof MechanismMismatchException ? new ClientErrorException(401, "no suitable SASL mechanism found for authentication with server") : th instanceof SSLException ? new ClientErrorException(400, "TLS handshake with server failed: " + th.getMessage(), th) : new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "failed to connect", th);
        }

        private void logConnectionError(Throwable th) {
            if (isNoteworthyConnectionError(th)) {
                HonoConnectionImpl.this.log.warn("attempt [#{}] to connect to server [{}:{}, role: {}] failed", Integer.valueOf(this.connectAttempts.get() + 1), HonoConnectionImpl.this.clientConfigProperties.getHost(), Integer.valueOf(HonoConnectionImpl.this.clientConfigProperties.getPort()), HonoConnectionImpl.this.connectionFactory.getServerRole(), th);
            } else {
                HonoConnectionImpl.this.log.debug("attempt [#{}] to connect to server [{}:{}, role: {}] failed", Integer.valueOf(this.connectAttempts.get() + 1), HonoConnectionImpl.this.clientConfigProperties.getHost(), Integer.valueOf(HonoConnectionImpl.this.clientConfigProperties.getPort()), HonoConnectionImpl.this.connectionFactory.getServerRole(), th);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private boolean isNoteworthyConnectionError(Throwable th) {
            return (th instanceof SSLException) || (th instanceof AuthenticationException) || (th instanceof MechanismMismatchException) || ((th instanceof SaslSystemException) && ((SaslSystemException) th).isPermanent());
        }
    }

    public HonoConnectionImpl(Vertx vertx, ClientConfigProperties clientConfigProperties) {
        this(vertx, null, clientConfigProperties);
    }

    public HonoConnectionImpl(Vertx vertx, ConnectionFactory connectionFactory, ClientConfigProperties clientConfigProperties) {
        this.log = LoggerFactory.getLogger((Class<?>) HonoConnectionImpl.class);
        this.disconnectListeners = new ArrayList();
        this.oneTimeDisconnectListeners = Collections.synchronizedList(new ArrayList());
        this.reconnectListeners = new ArrayList();
        this.shuttingDown = new AtomicBoolean(false);
        this.disconnecting = new AtomicBoolean(false);
        this.connectionLock = new Object();
        this.currentConnectionAttempt = new AtomicReference<>();
        this.offeredCapabilities = Collections.emptyList();
        this.tracer = NoopTracerFactory.create();
        Objects.requireNonNull(vertx);
        Objects.requireNonNull(clientConfigProperties);
        this.vertx = vertx;
        this.deferredConnectionCheckHandler = new DeferredConnectionCheckHandler(vertx);
        if (connectionFactory != null) {
            this.connectionFactory = connectionFactory;
        } else {
            this.connectionFactory = ConnectionFactory.newConnectionFactory(this.vertx, clientConfigProperties);
        }
        this.containerId = ConnectionFactory.createContainerId(clientConfigProperties.getName(), clientConfigProperties.getServerRole(), UUID.randomUUID());
        this.clientConfigProperties = clientConfigProperties;
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final Vertx getVertx() {
        return this.vertx;
    }

    @Autowired(required = false)
    public final void setTracer(Tracer tracer) {
        this.log.info("using OpenTracing implementation [{}]", tracer.getClass().getName());
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final Tracer getTracer() {
        return this.tracer;
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final ClientConfigProperties getConfig() {
        return this.clientConfigProperties;
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final void addDisconnectListener(DisconnectListener<HonoConnection> disconnectListener) {
        this.disconnectListeners.add(disconnectListener);
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final void addReconnectListener(ReconnectListener<HonoConnection> reconnectListener) {
        this.reconnectListeners.add(reconnectListener);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final <T> Future<T> executeOnContext(Handler<Promise<T>> handler) {
        return this.context == null ? Future.failedFuture(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "not connected")) : HonoProtonHelper.executeOnContext(this.context, handler);
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final Future<Void> isConnected() {
        return executeOnContext((v1) -> {
            checkConnected(v1);
        });
    }

    protected final Future<Void> checkConnected() {
        Promise promise = Promise.promise();
        checkConnected(promise);
        return promise.future();
    }

    private void checkConnected(Handler<AsyncResult<Void>> handler) {
        if (isConnectedInternal()) {
            handler.handle(Future.succeededFuture());
        } else {
            handler.handle(Future.failedFuture(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "not connected")));
        }
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final Future<Void> isConnected(long j) {
        return executeOnContext(promise -> {
            checkConnected(promise, j);
        });
    }

    private void checkConnected(Handler<AsyncResult<Void>> handler, long j) {
        if (isConnectedInternal()) {
            handler.handle(Future.succeededFuture());
            return;
        }
        if (j <= 0 || !this.deferredConnectionCheckHandler.isConnectionAttemptInProgress()) {
            handler.handle(Future.failedFuture(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "not connected")));
            return;
        }
        this.log.debug("connection attempt to server [{}:{}] in progress, connection check will be completed with its result", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
        if (this.deferredConnectionCheckHandler.addConnectionCheck(handler, j)) {
            return;
        }
        checkConnected(handler);
    }

    protected boolean isConnectedInternal() {
        return (this.connection == null || this.connection.isDisconnected() || this.session == null) ? false : true;
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final boolean isShutdown() {
        return this.shuttingDown.get();
    }

    void setConnection(ProtonConnection protonConnection, ProtonSession protonSession) {
        synchronized (this.connectionLock) {
            this.connection = protonConnection;
            this.session = protonSession;
            if (protonConnection == null) {
                this.offeredCapabilities = Collections.emptyList();
                this.context = null;
            } else {
                this.offeredCapabilities = (List) Optional.ofNullable(protonConnection.getRemoteOfferedCapabilities()).map(symbolArr -> {
                    return Collections.unmodifiableList(Arrays.asList(symbolArr));
                }).orElse(Collections.emptyList());
            }
        }
    }

    protected final ProtonConnection getConnection() {
        ProtonConnection protonConnection;
        synchronized (this.connectionLock) {
            protonConnection = this.connection;
        }
        return protonConnection;
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final boolean supportsCapability(Symbol symbol) {
        boolean contains;
        if (symbol == null) {
            return false;
        }
        synchronized (this.connectionLock) {
            contains = this.offeredCapabilities.contains(symbol);
        }
        return contains;
    }

    @Override // org.eclipse.hono.client.HonoConnection, org.eclipse.hono.client.ConnectionLifecycle
    public final Future<HonoConnection> connect() {
        return connect(null);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final Future<HonoConnection> connect(ProtonClientOptions protonClientOptions) {
        Promise promise = Promise.promise();
        connect(protonClientOptions, promise, false);
        return promise.future();
    }

    private void connect(ProtonClientOptions protonClientOptions, Handler<AsyncResult<HonoConnection>> handler, boolean z) {
        if (this.shuttingDown.get()) {
            handler.handle(Future.failedFuture(new ClientErrorException(409, "client is already shut down")));
            return;
        }
        this.context = this.vertx.getOrCreateContext();
        this.log.trace("running on vert.x context [event-loop context: {}]", Boolean.valueOf(this.context.isEventLoopContext()));
        executeOnContext(promise -> {
            if (isConnectedInternal()) {
                this.log.debug("already connected to server [{}:{}, role: {}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), this.connectionFactory.getServerRole());
                handler.handle(Future.succeededFuture(this));
            } else if (new ConnectionAttempt(protonClientOptions, handler).start(z)) {
                this.lastUsedClientOptions = protonClientOptions;
            } else {
                this.log.debug("already trying to connect to server ...");
                handler.handle(Future.failedFuture(new ClientErrorException(409, "already connecting to server")));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onRemoteClose(AsyncResult<ProtonConnection> asyncResult) {
        if (asyncResult.failed()) {
            this.log.info("remote server [{}:{}, role: {}] closed connection: {}", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), this.connectionFactory.getServerRole(), asyncResult.cause().getMessage());
        } else {
            this.log.info("remote server [{}:{}, role: {}] closed connection", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), this.connectionFactory.getServerRole());
        }
        this.connection.disconnectHandler(null);
        this.connection.close();
        handleConnectionLoss();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onRemoteDisconnect(ProtonConnection protonConnection) {
        if (protonConnection != this.connection) {
            this.log.warn("cannot handle failure of unknown connection");
        } else {
            this.log.debug("lost connection to server [{}:{}, role: {}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), this.connectionFactory.getServerRole());
            handleConnectionLoss();
        }
    }

    private void handleConnectionLoss() {
        if (isConnectedInternal()) {
            this.connection.disconnect();
        }
        notifyDisconnectHandlers();
        clearState();
        if (this.shuttingDown.get() || this.clientConfigProperties.getReconnectAttempts() == 0) {
            return;
        }
        connect(this.lastUsedClientOptions, this::notifyReconnectHandlers, true);
    }

    private void notifyReconnectHandlers(AsyncResult<HonoConnection> asyncResult) {
        if (asyncResult.succeeded()) {
            Iterator<ReconnectListener<HonoConnection>> it = this.reconnectListeners.iterator();
            while (it.hasNext()) {
                it.next().onReconnect(this);
            }
        }
    }

    protected void clearState() {
        setConnection(null, null);
    }

    private void notifyDisconnectHandlers() {
        Iterator<DisconnectListener<HonoConnection>> it = this.disconnectListeners.iterator();
        while (it.hasNext()) {
            notifyDisconnectHandler(it.next());
        }
        this.oneTimeDisconnectListeners.removeIf(disconnectListener -> {
            notifyDisconnectHandler(disconnectListener);
            return true;
        });
    }

    private void notifyDisconnectHandler(DisconnectListener<HonoConnection> disconnectListener) {
        try {
            disconnectListener.onDisconnect(this);
        } catch (Exception e) {
            this.log.warn("error running disconnectHandler", (Throwable) e);
        }
    }

    final long getReconnectMaxDelay(int i) {
        if (i <= 0) {
            return 0L;
        }
        if (i > 31) {
            return this.clientConfigProperties.getReconnectMaxDelay();
        }
        long reconnectDelayIncrement = (1 << (i - 1)) * this.clientConfigProperties.getReconnectDelayIncrement();
        return reconnectDelayIncrement >= 0 ? Math.min(this.clientConfigProperties.getReconnectMaxDelay(), reconnectDelayIncrement) : this.clientConfigProperties.getReconnectMaxDelay();
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public void closeAndFree(ProtonLink<?> protonLink, Handler<Void> handler) {
        if (this.context == null) {
            handler.handle(null);
        } else {
            HonoProtonHelper.closeAndFree(this.context, protonLink, handler);
        }
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public void closeAndFree(ProtonLink<?> protonLink, long j, Handler<Void> handler) {
        if (this.context == null) {
            handler.handle(null);
        } else {
            HonoProtonHelper.closeAndFree(this.context, protonLink, j, handler);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ProtonSession createDefaultSession(ProtonConnection protonConnection) {
        if (protonConnection == null) {
            throw new IllegalStateException("no connection to create session for");
        }
        this.log.debug("establishing AMQP session with server [{}:{}, role: {}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), this.connectionFactory.getServerRole());
        ProtonSession createSession = protonConnection.createSession();
        createSession.closeHandler(asyncResult -> {
            StringBuilder sb = new StringBuilder("the connection's session closed unexpectedly");
            Optional.ofNullable(createSession.getRemoteCondition()).ifPresent(errorCondition -> {
                sb.append(String.format(" [condition: %s, description: %s]", errorCondition.getCondition(), errorCondition.getDescription()));
            });
            createSession.close();
            onRemoteClose(Future.failedFuture(sb.toString()));
        });
        createSession.setIncomingCapacity(this.clientConfigProperties.getMaxSessionWindowSize());
        createSession.open();
        return createSession;
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final Future<ProtonSender> createSender(String str, ProtonQoS protonQoS, Handler<String> handler) {
        Objects.requireNonNull(protonQoS);
        return executeOnContext(promise -> {
            checkConnected().compose(r13 -> {
                if (str == null && !supportsCapability(Constants.CAP_ANONYMOUS_RELAY)) {
                    return Future.failedFuture(new ServerErrorException(501, "server does not support anonymous terminus"));
                }
                Promise promise = Promise.promise();
                ProtonSender createSender = this.session.createSender(str);
                createSender.setQoS(protonQoS);
                createSender.setAutoSettle(true);
                DisconnectListener<HonoConnection> disconnectListener = honoConnection -> {
                    this.log.debug("opening sender [{}] failed: got disconnected", str);
                    promise.tryFail(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "not connected"));
                };
                this.oneTimeDisconnectListeners.add(disconnectListener);
                createSender.openHandler(asyncResult -> {
                    this.oneTimeDisconnectListeners.remove(disconnectListener);
                    if (promise.future().isComplete()) {
                        this.log.debug("ignoring server response for opening sender [{}]: sender creation already timed out", str);
                        return;
                    }
                    if (asyncResult.failed()) {
                        ErrorCondition remoteCondition = createSender.getRemoteCondition();
                        if (remoteCondition == null) {
                            this.log.debug("opening sender [{}] failed", str, asyncResult.cause());
                            promise.tryFail(new ClientErrorException(404, "cannot open sender", asyncResult.cause()));
                            return;
                        } else {
                            this.log.debug("opening sender [{}] failed: {} - {}", str, remoteCondition.getCondition(), remoteCondition.getDescription());
                            promise.tryFail(StatusCodeMapper.fromAttachError(remoteCondition));
                            return;
                        }
                    }
                    if (!HonoProtonHelper.isLinkEstablished(createSender)) {
                        this.log.debug("peer did not create terminus for target [{}] and will detach the link", str);
                        promise.tryFail(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE));
                        return;
                    }
                    this.log.debug("sender open [target: {}, sendQueueFull: {}, remote max-message-size: {}]", str, Boolean.valueOf(createSender.sendQueueFull()), createSender.getRemoteMaxMessageSize());
                    long longValue = ((Long) Optional.ofNullable(createSender.getRemoteMaxMessageSize()).map((v0) -> {
                        return v0.longValue();
                    }).orElse(0L)).longValue();
                    if (longValue > 0 && longValue < this.clientConfigProperties.getMinMaxMessageSize()) {
                        createSender.close();
                        String format = String.format("peer does not support minimum max-message-size [required: %d, supported: %d", Long.valueOf(this.clientConfigProperties.getMinMaxMessageSize()), Long.valueOf(longValue));
                        this.log.debug(format);
                        promise.tryFail(new ClientErrorException(412, format));
                        return;
                    }
                    if (createSender.getCredit() > 0) {
                        promise.tryComplete(createSender);
                    } else {
                        long timer = this.vertx.setTimer(this.clientConfigProperties.getFlowLatency(), l -> {
                            this.log.debug("sender [target: {}] has {} credits after grace period of {}ms", str, Integer.valueOf(createSender.getCredit()), Long.valueOf(this.clientConfigProperties.getFlowLatency()));
                            createSender.sendQueueDrainHandler(null);
                            promise.tryComplete(createSender);
                        });
                        createSender.sendQueueDrainHandler(protonSender -> {
                            this.log.debug("sender [target: {}] has received {} initial credits", str, Integer.valueOf(protonSender.getCredit()));
                            if (this.vertx.cancelTimer(timer)) {
                                promise.tryComplete(protonSender);
                                protonSender.sendQueueDrainHandler(null);
                            }
                        });
                    }
                });
                HonoProtonHelper.setDetachHandler(createSender, asyncResult2 -> {
                    onRemoteDetach(createSender, this.connection.getRemoteContainer(), false, handler);
                });
                HonoProtonHelper.setCloseHandler(createSender, asyncResult3 -> {
                    onRemoteDetach(createSender, this.connection.getRemoteContainer(), true, handler);
                });
                createSender.open();
                this.vertx.setTimer(this.clientConfigProperties.getLinkEstablishmentTimeout(), l -> {
                    if (this.oneTimeDisconnectListeners.remove(disconnectListener)) {
                        onLinkEstablishmentTimeout(createSender, this.clientConfigProperties, promise);
                    }
                });
                return promise.future();
            }).onComplete2(promise);
        });
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public Future<ProtonReceiver> createReceiver(String str, ProtonQoS protonQoS, ProtonMessageHandler protonMessageHandler, Handler<String> handler) {
        return createReceiver(str, protonQoS, protonMessageHandler, this.clientConfigProperties.getInitialCredits(), handler);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public Future<ProtonReceiver> createReceiver(String str, ProtonQoS protonQoS, ProtonMessageHandler protonMessageHandler, int i, Handler<String> handler) {
        return createReceiver(str, protonQoS, protonMessageHandler, i, true, handler);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public Future<ProtonReceiver> createReceiver(String str, ProtonQoS protonQoS, ProtonMessageHandler protonMessageHandler, int i, boolean z, Handler<String> handler) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(protonQoS);
        Objects.requireNonNull(protonMessageHandler);
        if (i < 0) {
            throw new IllegalArgumentException("pre-fetch size must be >= 0");
        }
        return executeOnContext(promise -> {
            checkConnected().compose(r15 -> {
                Promise promise = Promise.promise();
                ProtonReceiver createReceiver = this.session.createReceiver(str);
                if (this.clientConfigProperties.getMaxMessageSize() > -1) {
                    createReceiver.setMaxMessageSize(new UnsignedLong(this.clientConfigProperties.getMaxMessageSize()));
                }
                createReceiver.setAutoAccept(z);
                createReceiver.setQoS(protonQoS);
                createReceiver.setPrefetch(i);
                createReceiver.handler((protonDelivery, message) -> {
                    HonoProtonHelper.onReceivedMessageDeliveryUpdatedFromRemote(protonDelivery, protonDelivery -> {
                        this.log.debug("got unexpected disposition update for received message [remote state: {}]", protonDelivery.getRemoteState());
                    });
                    try {
                        protonMessageHandler.handle(protonDelivery, message);
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("handling message [remotely settled: {}, queued messages: {}, remaining credit: {}]", Boolean.valueOf(protonDelivery.remotelySettled()), Integer.valueOf(createReceiver.getQueued()), Integer.valueOf(createReceiver.getCredit() - createReceiver.getQueued()));
                        }
                    } catch (Exception e) {
                        this.log.warn("error handling message", (Throwable) e);
                        ProtonHelper.released(protonDelivery, true);
                    }
                });
                DisconnectListener<HonoConnection> disconnectListener = honoConnection -> {
                    this.log.debug("opening receiver [{}] failed: got disconnected", str);
                    promise.tryFail(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "not connected"));
                };
                this.oneTimeDisconnectListeners.add(disconnectListener);
                createReceiver.openHandler(asyncResult -> {
                    this.oneTimeDisconnectListeners.remove(disconnectListener);
                    if (promise.future().isComplete()) {
                        this.log.debug("ignoring server response for opening receiver [{}]: receiver creation already timed out", str);
                        return;
                    }
                    if (!asyncResult.failed()) {
                        if (HonoProtonHelper.isLinkEstablished(createReceiver)) {
                            this.log.debug("receiver open [source: {}]", str);
                            promise.tryComplete((ProtonReceiver) asyncResult.result());
                            return;
                        } else {
                            this.log.debug("peer did not create terminus for source [{}] and will detach the link", str);
                            promise.tryFail(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE));
                            return;
                        }
                    }
                    ErrorCondition remoteCondition = createReceiver.getRemoteCondition();
                    if (remoteCondition == null) {
                        this.log.debug("opening receiver [{}] failed", str, asyncResult.cause());
                        promise.tryFail(new ClientErrorException(404, "cannot open receiver", asyncResult.cause()));
                    } else {
                        this.log.debug("opening receiver [{}] failed: {} - {}", str, remoteCondition.getCondition(), remoteCondition.getDescription());
                        promise.tryFail(StatusCodeMapper.fromAttachError(remoteCondition));
                    }
                });
                HonoProtonHelper.setDetachHandler(createReceiver, asyncResult2 -> {
                    onRemoteDetach(createReceiver, this.connection.getRemoteContainer(), false, handler);
                });
                HonoProtonHelper.setCloseHandler(createReceiver, asyncResult3 -> {
                    onRemoteDetach(createReceiver, this.connection.getRemoteContainer(), true, handler);
                });
                createReceiver.open();
                this.vertx.setTimer(this.clientConfigProperties.getLinkEstablishmentTimeout(), l -> {
                    if (this.oneTimeDisconnectListeners.remove(disconnectListener)) {
                        onLinkEstablishmentTimeout(createReceiver, this.clientConfigProperties, promise);
                    }
                });
                return promise.future();
            }).onComplete2(promise);
        });
    }

    private void onLinkEstablishmentTimeout(ProtonLink<?> protonLink, ClientConfigProperties clientConfigProperties, Promise<?> promise) {
        if (!protonLink.isOpen() || HonoProtonHelper.isLinkEstablished(protonLink)) {
            return;
        }
        this.log.info("link establishment [peer: {}] timed out after {}ms", clientConfigProperties.getHost(), Long.valueOf(clientConfigProperties.getLinkEstablishmentTimeout()));
        protonLink.close();
        promise.tryFail(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE));
    }

    private void onRemoteDetach(ProtonLink<?> protonLink, String str, boolean z, Handler<String> handler) {
        ErrorCondition remoteCondition = protonLink.getRemoteCondition();
        String str2 = protonLink instanceof ProtonSender ? "sender" : "receiver";
        String address = protonLink instanceof ProtonSender ? protonLink.getTarget().getAddress() : protonLink.getSource().getAddress();
        if (remoteCondition == null) {
            this.log.debug("{} [{}] detached (with closed={}) by peer [{}]", str2, address, Boolean.valueOf(z), str);
        } else {
            this.log.debug("{} [{}] detached (with closed={}) by peer [{}]: {} - {}", str2, address, Boolean.valueOf(z), str, remoteCondition.getCondition(), remoteCondition.getDescription());
        }
        protonLink.close();
        if (!HonoProtonHelper.isLinkEstablished(protonLink) || handler == null) {
            return;
        }
        handler.handle(address);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final void shutdown() {
        CountDownLatch countDownLatch = Context.isOnEventLoopThread() ? null : new CountDownLatch(1);
        shutdown(asyncResult -> {
            if (!asyncResult.succeeded()) {
                this.log.warn("could not close connection to server", asyncResult.cause());
            }
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
        });
        if (countDownLatch != null) {
            try {
                int closeConnectionTimeout = getCloseConnectionTimeout() + 20;
                if (!countDownLatch.await(closeConnectionTimeout, TimeUnit.MILLISECONDS)) {
                    this.log.warn("shutdown of client timed out after {}ms", Integer.valueOf(closeConnectionTimeout));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final void shutdown(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        cancelCurrentConnectionAttempt("client is getting shut down");
        if (this.shuttingDown.compareAndSet(Boolean.FALSE.booleanValue(), Boolean.TRUE.booleanValue())) {
            closeConnection(handler);
        } else {
            handler.handle(Future.failedFuture(new ClientErrorException(409, "already shutting down")));
        }
    }

    @Override // org.eclipse.hono.client.HonoConnection, org.eclipse.hono.client.ConnectionLifecycle
    public final void disconnect() {
        CountDownLatch countDownLatch = Context.isOnEventLoopThread() ? null : new CountDownLatch(1);
        disconnect(asyncResult -> {
            if (asyncResult.succeeded()) {
                this.log.info("successfully disconnected from the server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            } else {
                this.log.warn("could not disconnect from the server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            }
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
        });
        if (countDownLatch != null) {
            try {
                int closeConnectionTimeout = getCloseConnectionTimeout() + 20;
                if (!countDownLatch.await(closeConnectionTimeout, TimeUnit.MILLISECONDS)) {
                    this.log.warn("Disconnecting from server [{}:{}, role: {}] timed out after {}ms", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), this.connectionFactory.getServerRole(), Integer.valueOf(closeConnectionTimeout));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final void disconnect(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        cancelCurrentConnectionAttempt("client got disconnected");
        if (this.disconnecting.compareAndSet(Boolean.FALSE.booleanValue(), Boolean.TRUE.booleanValue())) {
            closeConnection(handler);
        } else {
            handler.handle(Future.failedFuture(new ClientErrorException(409, "already disconnecting")));
        }
    }

    private void cancelCurrentConnectionAttempt(String str) {
        Optional.ofNullable(this.currentConnectionAttempt.get()).ifPresent(connectionAttempt -> {
            connectionAttempt.cancel(str);
        });
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public String getRemoteContainerId() {
        if (isConnectedInternal()) {
            return this.connection.getRemoteContainer();
        }
        return null;
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public String getContainerId() {
        return this.containerId;
    }

    private void closeConnection(Handler<AsyncResult<Void>> handler) {
        Handler handler2 = asyncResult -> {
            this.disconnecting.compareAndSet(Boolean.TRUE.booleanValue(), Boolean.FALSE.booleanValue());
            if (asyncResult.succeeded()) {
                handler.handle(Future.succeededFuture());
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        };
        synchronized (this.connectionLock) {
            if (isConnectedInternal()) {
                executeOnContext(promise -> {
                    ProtonConnection protonConnection = this.connection;
                    protonConnection.disconnectHandler(null);
                    Handler handler3 = asyncResult2 -> {
                        if (asyncResult2.succeeded()) {
                            this.log.info("closed connection to container [{}] at [{}:{}, role: {}]", protonConnection.getRemoteContainer(), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), this.connectionFactory.getServerRole());
                        } else {
                            this.log.info("closed connection to container [{}] at [{}:{}, role: {}]", protonConnection.getRemoteContainer(), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), this.connectionFactory.getServerRole(), asyncResult2.cause());
                        }
                        notifyDisconnectHandlers();
                        clearState();
                        promise.complete();
                    };
                    int closeConnectionTimeout = getCloseConnectionTimeout();
                    long timer = this.vertx.setTimer(closeConnectionTimeout, l -> {
                        this.log.info("did not receive remote peer's close frame after {}ms", Integer.valueOf(closeConnectionTimeout));
                        handler3.handle(Future.succeededFuture());
                    });
                    protonConnection.closeHandler(asyncResult3 -> {
                        if (this.vertx.cancelTimer(timer)) {
                            handler3.handle(asyncResult3);
                        }
                    });
                    this.log.info("closing connection to container [{}] at [{}:{}, role: {}] ...", protonConnection.getRemoteContainer(), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), this.connectionFactory.getServerRole());
                    protonConnection.close();
                }).onComplete2(handler2);
            } else {
                this.log.info("connection to server [{}:{}, role: {}] already closed", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), this.connectionFactory.getServerRole());
                handler2.handle(Future.succeededFuture());
            }
        }
    }

    private int getCloseConnectionTimeout() {
        return (this.clientConfigProperties.getConnectTimeout() > 0 ? this.clientConfigProperties.getConnectTimeout() : 5000) / 2;
    }
}
