package org.eclipse.hono.client.impl;

import com.fasterxml.jackson.core.JsonLocation;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.util.Objects;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.Command;
import org.eclipse.hono.client.DelegatedCommandSender;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.MessageHelper;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.4.4.jar:org/eclipse/hono/client/impl/DelegatedCommandSenderImpl.class */
public class DelegatedCommandSenderImpl extends AbstractSender implements DelegatedCommandSender {
    DelegatedCommandSenderImpl(HonoConnection honoConnection, ProtonSender protonSender, SendMessageSampler sendMessageSampler) {
        super(honoConnection, protonSender, "", "", sendMessageSampler);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.eclipse.hono.client.impl.AbstractSender
    public String getTo(String str) {
        return null;
    }

    @Override // org.eclipse.hono.client.MessageSender
    public String getEndpoint() {
        return CommandConstants.INTERNAL_COMMAND_ENDPOINT;
    }

    @Override // org.eclipse.hono.client.impl.AbstractSender
    protected Future<ProtonDelivery> sendMessage(Message message, Span span) {
        return runSendAndWaitForOutcomeOnContext(message, span);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public Future<ProtonDelivery> sendAndWaitForOutcome(Message message) {
        return sendAndWaitForOutcome(message, null);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public Future<ProtonDelivery> sendAndWaitForOutcome(Message message, SpanContext spanContext) {
        Objects.requireNonNull(message);
        Span startSpan = startSpan(spanContext, message);
        TracingHelper.injectSpanContext(this.connection.getTracer(), startSpan.context(), message);
        return runSendAndWaitForOutcomeOnContext(message, startSpan);
    }

    private Future<ProtonDelivery> runSendAndWaitForOutcomeOnContext(Message message, Span span) {
        return this.connection.executeOnContext(promise -> {
            if (!this.sender.sendQueueFull()) {
                sendMessageAndWaitForOutcome(message, span).onComplete2(promise);
                return;
            }
            ServerErrorException serverErrorException = new ServerErrorException(503, "no credit available");
            logMessageSendingError("error sending message [ID: {}, address: {}], no credit available", message.getMessageId(), getMessageAddress(message));
            logError(span, serverErrorException);
            span.finish();
            promise.fail(serverErrorException);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.eclipse.hono.client.impl.AbstractSender
    public Future<ProtonDelivery> sendMessageAndWaitForOutcome(Message message, Span span) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(span);
        Promise promise = Promise.promise();
        String obj = message.getMessageId() != null ? message.getMessageId().toString() : "";
        logMessageIdAndSenderInfo(span, obj);
        SendMessageSampler.Sample start = this.sampler.start(this.tenantId);
        Long valueOf = this.connection.getConfig().getSendMessageTimeout() > 0 ? Long.valueOf(this.connection.getVertx().setTimer(this.connection.getConfig().getSendMessageTimeout(), l -> {
            if (promise.future().isComplete()) {
                return;
            }
            ServerErrorException serverErrorException = new ServerErrorException(503, "waiting for delivery update timed out after " + this.connection.getConfig().getSendMessageTimeout() + "ms");
            logMessageSendingError("waiting for delivery update timed out for message [ID: {}, address: {}] after {}ms", obj, getMessageAddress(message), Long.valueOf(this.connection.getConfig().getSendMessageTimeout()));
            promise.fail(serverErrorException);
            start.timeout();
        })) : null;
        this.sender.send(message, protonDelivery -> {
            if (valueOf != null) {
                this.connection.getVertx().cancelTimer(valueOf.longValue());
            }
            DeliveryState remoteState = protonDelivery.getRemoteState();
            start.completed(remoteState);
            if (promise.future().isComplete()) {
                this.log.debug("ignoring received delivery update for message [ID: {}, address: {}]: waiting for the update has already timed out", obj, getMessageAddress(message));
            } else if (protonDelivery.remotelySettled()) {
                logUpdatedDeliveryState(span, message, protonDelivery);
                promise.complete(protonDelivery);
            } else {
                logMessageSendingError("peer did not settle message [ID: {}, address: {}, remote state: {}], failing delivery", obj, getMessageAddress(message), remoteState.getClass().getSimpleName());
                promise.fail(new ServerErrorException(JsonLocation.MAX_CONTENT_SNIPPET, "peer did not settle message, failing delivery"));
            }
        });
        this.log.trace("sent message [ID: {}, address: {}], remaining credit: {}, queued messages: {}", obj, getMessageAddress(message), Integer.valueOf(this.sender.getCredit()), Integer.valueOf(this.sender.getQueued()));
        return promise.future().map(protonDelivery2 -> {
            this.log.trace("message [ID: {}, address: {}] accepted by peer", obj, getMessageAddress(message));
            Tags.HTTP_STATUS.set(span, (Integer) 202);
            span.finish();
            return protonDelivery2;
        }).recover(th -> {
            TracingHelper.logError(span, th);
            Tags.HTTP_STATUS.set(span, Integer.valueOf(ServiceInvocationException.extractStatusCode(th)));
            span.finish();
            return Future.failedFuture(th);
        });
    }

    @Override // org.eclipse.hono.client.DelegatedCommandSender
    public Future<ProtonDelivery> sendCommandMessage(Command command, SpanContext spanContext) {
        Objects.requireNonNull(command);
        Message createDelegatedCommandMessage = createDelegatedCommandMessage(command.getCommandMessage(), command.isOneWay() ? null : String.format("%s/%s/%s", command.getReplyToEndpoint(), command.getTenant(), command.getReplyToId()));
        Span startSpan = startSpan(spanContext, createDelegatedCommandMessage);
        startSpan.setTag(MessageHelper.APP_PROPERTY_TENANT_ID, command.getTenant());
        if (command.isTargetedAtGateway()) {
            MessageHelper.addProperty(createDelegatedCommandMessage, "via", command.getDeviceId());
            TracingHelper.TAG_DEVICE_ID.set(startSpan, command.getOriginalDeviceId());
            TracingHelper.TAG_GATEWAY_ID.set(startSpan, command.getDeviceId());
        } else {
            TracingHelper.TAG_DEVICE_ID.set(startSpan, command.getDeviceId());
        }
        TracingHelper.injectSpanContext(this.connection.getTracer(), startSpan.context(), createDelegatedCommandMessage);
        return runSendAndWaitForOutcomeOnContext(createDelegatedCommandMessage, startSpan);
    }

    static String getTargetAddress(String str) {
        return "command_internal/" + ((String) Objects.requireNonNull(str));
    }

    private static Message createDelegatedCommandMessage(Message message, String str) {
        Objects.requireNonNull(message);
        Message shallowCopy = MessageHelper.getShallowCopy(message);
        shallowCopy.setReplyTo(str);
        return shallowCopy;
    }

    public static Future<DelegatedCommandSender> create(HonoConnection honoConnection, String str, SendMessageSampler sendMessageSampler, Handler<String> handler) {
        Objects.requireNonNull(honoConnection);
        Objects.requireNonNull(str);
        return honoConnection.createSender(getTargetAddress(str), ProtonQoS.AT_LEAST_ONCE, handler).map(protonSender -> {
            return new DelegatedCommandSenderImpl(honoConnection, protonSender, sendMessageSampler);
        });
    }

    @Override // org.eclipse.hono.client.impl.AbstractSender
    protected Span startSpan(SpanContext spanContext, Message message) {
        if (this.connection.getTracer() == null) {
            throw new IllegalStateException("no tracer configured");
        }
        Span newChildSpan = newChildSpan(spanContext, "delegate Command request");
        Tags.SPAN_KIND.set(newChildSpan, Tags.SPAN_KIND_CLIENT);
        return newChildSpan;
    }
}
