package jp.ossc.nimbus.service.publish.websocket;

import java.util.Iterator;
import java.util.Set;
import jp.ossc.nimbus.core.ServiceBase;
import jp.ossc.nimbus.core.ServiceManagerFactory;
import jp.ossc.nimbus.core.ServiceName;
import jp.ossc.nimbus.service.queue.DistributedQueueHandlerContainerService;
import jp.ossc.nimbus.service.queue.QueueHandler;
import jp.ossc.nimbus.service.queue.QueueHandlerContainer;
import jp.ossc.nimbus.service.websocket.ExceptionHandlerMappingService;

/* loaded from: input_file:jp/ossc/nimbus/service/publish/websocket/AbstractPublishMessageDispatcherService.class */
public abstract class AbstractPublishMessageDispatcherService extends ServiceBase implements MessageDispatcher, AbstractPublishMessageDispatcherServiceMBean {
    protected ServiceName messageListenerQueueHandlerContainerServiceName;
    protected ServiceName messageListenerQueueSelectorServiceName;
    protected ServiceName messageSendQueueHandlerContainerServiceName;
    protected ServiceName messageSendQueueSelectorServiceName;
    protected ServiceName messageSendExceptionHandlerMappingServiceName;
    protected String sendErrorMessageId = AbstractPublishMessageDispatcherServiceMBean.DEFAULT_SEND_ERROR_MESSAGE_ID;
    protected QueueHandlerContainer messageListenerQueueHandler;
    protected DistributedQueueHandlerContainerService messageSendQueueHandler;
    protected ExceptionHandlerMappingService messageSendExceptionHandler;
    protected long messageReceiveCount;
    protected long messageSendCount;

    /* loaded from: input_file:jp/ossc/nimbus/service/publish/websocket/AbstractPublishMessageDispatcherService$MessageListenerQueueHandler.class */
    protected class MessageListenerQueueHandler implements QueueHandler {
        protected MessageListenerQueueHandler() {
        }

        @Override // jp.ossc.nimbus.service.queue.QueueHandler
        public void handleDequeuedObject(Object obj) throws Throwable {
            Set messageSendTarget;
            if (obj == null || (messageSendTarget = AbstractPublishMessageDispatcherService.this.getMessageSendTarget(obj)) == null || messageSendTarget.size() == 0) {
                return;
            }
            AbstractPublishMessageDispatcherService.this.sendMessageSenders(messageSendTarget, obj);
        }

        @Override // jp.ossc.nimbus.service.queue.QueueHandler
        public boolean handleError(Object obj, Throwable th) throws Throwable {
            return false;
        }

        @Override // jp.ossc.nimbus.service.queue.QueueHandler
        public void handleRetryOver(Object obj, Throwable th) throws Throwable {
        }
    }

    /* loaded from: input_file:jp/ossc/nimbus/service/publish/websocket/AbstractPublishMessageDispatcherService$MessageSendQueueHandler.class */
    protected class MessageSendQueueHandler implements QueueHandler {
        protected MessageSendQueueHandler() {
        }

        @Override // jp.ossc.nimbus.service.queue.QueueHandler
        public void handleDequeuedObject(Object obj) throws Throwable {
            if (obj == null) {
                return;
            }
            Object[] objArr = (Object[]) obj;
            AbstractPublishMessageDispatcherService.this.sendMessage((MessageSender) objArr[0], objArr[1]);
        }

        @Override // jp.ossc.nimbus.service.queue.QueueHandler
        public boolean handleError(Object obj, Throwable th) throws Throwable {
            return ((MessageSender) ((Object[]) obj)[0]).getSession().isOpen();
        }

        @Override // jp.ossc.nimbus.service.queue.QueueHandler
        public void handleRetryOver(Object obj, Throwable th) throws Throwable {
        }
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public ServiceName getMessageListenerQueueHandlerContainerServiceName() {
        return this.messageListenerQueueHandlerContainerServiceName;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public void setMessageListenerQueueHandlerContainerServiceName(ServiceName serviceName) {
        this.messageListenerQueueHandlerContainerServiceName = serviceName;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public ServiceName getMessageSendQueueHandlerContainerServiceName() {
        return this.messageSendQueueHandlerContainerServiceName;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public void setMessageSendQueueHandlerContainerServiceName(ServiceName serviceName) {
        this.messageSendQueueHandlerContainerServiceName = serviceName;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public ServiceName getMessageListenerQueueSelectorServiceName() {
        return this.messageListenerQueueSelectorServiceName;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public void setMessageListenerQueueSelectorServiceName(ServiceName serviceName) {
        this.messageListenerQueueSelectorServiceName = serviceName;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public ServiceName getMessageSendExceptionHandlerMappingServiceName() {
        return this.messageSendExceptionHandlerMappingServiceName;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public void setMessageSendExceptionHandlerMappingServiceName(ServiceName serviceName) {
        this.messageSendExceptionHandlerMappingServiceName = serviceName;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public ServiceName getMessageSendQueueSelectorServiceName() {
        return this.messageSendQueueSelectorServiceName;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public void setMessageSendQueueSelectorServiceName(ServiceName serviceName) {
        this.messageSendQueueSelectorServiceName = serviceName;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public String getSendErrorMessageId() {
        return this.sendErrorMessageId;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public void setSendErrorMessageId(String str) {
        this.sendErrorMessageId = str;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public long getMessageReceiveCount() {
        return this.messageReceiveCount;
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.AbstractPublishMessageDispatcherServiceMBean
    public long getMessageSendCount() {
        return this.messageSendCount;
    }

    @Override // jp.ossc.nimbus.core.ServiceBase
    public void startService() throws Exception {
        super.startService();
        if (this.messageListenerQueueHandlerContainerServiceName != null) {
            this.messageListenerQueueHandler = (QueueHandlerContainer) ServiceManagerFactory.getServiceObject(this.messageListenerQueueHandlerContainerServiceName);
        } else if (this.messageListenerQueueSelectorServiceName != null) {
            this.messageListenerQueueHandler = new DistributedQueueHandlerContainerService();
            ((DistributedQueueHandlerContainerService) this.messageListenerQueueHandler).setDistributedQueueSelectorServiceName(this.messageListenerQueueSelectorServiceName);
            ((DistributedQueueHandlerContainerService) this.messageListenerQueueHandler).startService();
        }
        if (this.messageListenerQueueHandler != null) {
            this.messageListenerQueueHandler.setQueueHandler(new MessageListenerQueueHandler());
            this.messageListenerQueueHandler.accept();
        }
        if (this.messageSendQueueHandlerContainerServiceName != null) {
            this.messageSendQueueHandler = (DistributedQueueHandlerContainerService) ServiceManagerFactory.getServiceObject(this.messageSendQueueHandlerContainerServiceName);
        } else if (this.messageSendQueueSelectorServiceName != null) {
            this.messageSendQueueHandler = new DistributedQueueHandlerContainerService();
            this.messageSendQueueHandler.setDistributedQueueSelectorServiceName(this.messageSendQueueSelectorServiceName);
            this.messageSendQueueHandler.startService();
        }
        if (this.messageSendQueueHandler != null) {
            this.messageSendQueueHandler.setQueueHandler(new MessageSendQueueHandler());
            this.messageSendQueueHandler.accept();
        }
        if (this.messageSendExceptionHandlerMappingServiceName != null) {
            this.messageSendExceptionHandler = (ExceptionHandlerMappingService) ServiceManagerFactory.getServiceObject(this.messageSendExceptionHandlerMappingServiceName);
        }
    }

    @Override // jp.ossc.nimbus.core.ServiceBase
    public void stopService() throws Exception {
        super.startService();
        if (this.messageListenerQueueHandler != null) {
            this.messageListenerQueueHandler.stop();
        }
        if (this.messageSendQueueHandler != null) {
            this.messageSendQueueHandler.stop();
        }
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.MessageDispatcher
    public void addMessageSender(MessageSender messageSender) {
        addMessageSenderProcess(messageSender);
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.MessageDispatcher
    public void removeMessageSender(MessageSender messageSender) {
        removeMessageSenderProcess(messageSender);
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.MessageDispatcher
    public void addKey(Object obj, MessageSender messageSender) {
        addKeyProcess(obj, messageSender);
    }

    @Override // jp.ossc.nimbus.service.publish.websocket.MessageDispatcher
    public void removeKey(Object obj, MessageSender messageSender) {
        removeKeyProcess(obj, messageSender);
    }

    public abstract void addMessageSenderProcess(MessageSender messageSender);

    public abstract void removeMessageSenderProcess(MessageSender messageSender);

    public abstract void addKeyProcess(Object obj, MessageSender messageSender);

    public abstract void removeKeyProcess(Object obj, MessageSender messageSender);

    /* JADX INFO: Access modifiers changed from: protected */
    public void onMessageProcess(Object obj) {
        this.messageReceiveCount++;
        if (obj != null) {
            if (this.messageListenerQueueHandler != null) {
                this.messageListenerQueueHandler.push(obj);
                return;
            }
            Set messageSendTarget = getMessageSendTarget(obj);
            if (messageSendTarget == null || messageSendTarget.size() == 0) {
                return;
            }
            sendMessageSenders(messageSendTarget, obj);
        }
    }

    protected abstract Set getMessageSendTarget(Object obj);

    protected void sendMessageSenders(Set set, Object obj) {
        Iterator it = set.iterator();
        while (it.hasNext()) {
            MessageSender messageSender = (MessageSender) it.next();
            if (this.messageSendQueueHandler == null) {
                sendMessage(messageSender, obj);
            } else {
                this.messageSendQueueHandler.push(new Object[]{messageSender, obj});
            }
        }
    }

    protected void sendMessage(MessageSender messageSender, Object obj) {
        try {
            messageSender.sendMessage(obj);
            this.messageSendCount++;
        } catch (Exception e) {
            if (this.messageSendExceptionHandler != null) {
                try {
                    this.messageSendExceptionHandler.handleException(messageSender.getSession(), e);
                } catch (Throwable th) {
                }
            }
        }
    }
}
