package org.codehaus.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.AndFilter;
import org.codehaus.activemq.filter.DestinationMap;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.filter.NoLocalFilter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.DeadLetterPolicy;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.QueueList;
import org.codehaus.activemq.service.QueueListEntry;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.service.QueueMessageContainerManager;
import org.codehaus.activemq.service.RedeliveryPolicy;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.SubscriptionContainer;
import org.codehaus.activemq.store.PersistenceAdapter;

/* loaded from: input_file:lib/activemq-1.2.jar:org/codehaus/activemq/service/impl/DurableQueueMessageContainerManager.class */
public class DurableQueueMessageContainerManager extends MessageContainerManagerSupport implements QueueMessageContainerManager {
    private static final Log log;
    private static final int MAX_MESSAGES_DISPATCHED_FROM_POLL = 50;
    private PersistenceAdapter persistenceAdapter;
    protected SubscriptionContainer subscriptionContainer;
    protected FilterFactory filterFactory;
    protected Map activeSubscriptions;
    protected Map browsers;
    protected DestinationMap destinationMap;
    private Object subscriptionMutex;
    static Class class$org$codehaus$activemq$service$impl$DurableQueueMessageContainerManager;

    public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
        this(persistenceAdapter, new SubscriptionContainerImpl(redeliveryPolicy, deadLetterPolicy), new FilterFactoryImpl(), new DispatcherImpl());
    }

    public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
        super(dispatcher);
        this.activeSubscriptions = new ConcurrentHashMap();
        this.browsers = new ConcurrentHashMap();
        this.destinationMap = new DestinationMap();
        this.subscriptionMutex = new Object();
        this.persistenceAdapter = persistenceAdapter;
        this.subscriptionContainer = subscriptionContainer;
        this.filterFactory = filterFactory;
    }

    private boolean isManagerFor(ActiveMQDestination activeMQDestination) {
        return (activeMQDestination == null || !activeMQDestination.isQueue() || activeMQDestination.isTemporary()) ? false : true;
    }

    private boolean isManagerFor(ActiveMQDestination activeMQDestination, boolean z) {
        return isManagerFor(activeMQDestination) && z;
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void addMessageConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) throws JMSException {
        if (isManagerFor(consumerInfo.getDestination())) {
            if (log.isDebugEnabled()) {
                log.debug(new StringBuffer().append("Adding consumer: ").append(consumerInfo).toString());
            }
            getContainer(consumerInfo.getDestination().getPhysicalName());
            Subscription makeSubscription = this.subscriptionContainer.makeSubscription(this.dispatcher, brokerClient, consumerInfo, createFilter(consumerInfo));
            this.dispatcher.addActiveSubscription(brokerClient, makeSubscription);
            updateActiveSubscriptions(makeSubscription);
            makeSubscription.setActive(true);
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void removeMessageConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug(new StringBuffer().append("Removing consumer: ").append(consumerInfo).toString());
        }
        if (consumerInfo.getDestination() == null || !consumerInfo.getDestination().isQueue()) {
            return;
        }
        synchronized (this.subscriptionMutex) {
            Subscription removeSubscription = this.subscriptionContainer.removeSubscription(consumerInfo.getConsumerId());
            if (removeSubscription != null) {
                removeSubscription.setActive(false);
                removeSubscription.clear();
                this.dispatcher.removeActiveSubscription(brokerClient, removeSubscription);
                for (QueueMessageContainer queueMessageContainer : this.messageContainers.values()) {
                    if (queueMessageContainer.getDestinationName().equals(removeSubscription.getDestination().getPhysicalName())) {
                        QueueList subscriptionList = getSubscriptionList(queueMessageContainer);
                        subscriptionList.remove(removeSubscription);
                        if (subscriptionList.isEmpty()) {
                            this.activeSubscriptions.remove(removeSubscription.getDestination().getPhysicalName());
                        }
                        QueueList browserList = getBrowserList(queueMessageContainer);
                        browserList.remove(removeSubscription);
                        if (browserList.isEmpty()) {
                            this.browsers.remove(removeSubscription.getDestination().getPhysicalName());
                        }
                    }
                }
            }
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void deleteSubscription(String str, String str2) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void sendMessage(BrokerClient brokerClient, ActiveMQMessage activeMQMessage) throws JMSException {
        if (isManagerFor((ActiveMQDestination) activeMQMessage.getJMSDestination(), activeMQMessage.getJMSDeliveryMode() == 2)) {
            if (log.isDebugEnabled()) {
                log.debug(new StringBuffer().append("Dispaching message: ").append(activeMQMessage).toString());
            }
            getContainer(((ActiveMQDestination) activeMQMessage.getJMSDestination()).getPhysicalName());
            Iterator it = this.destinationMap.get(activeMQMessage.getJMSActiveMQDestination()).iterator();
            while (it.hasNext()) {
                ((QueueMessageContainer) it.next()).addMessage(activeMQMessage);
                this.dispatcher.wakeup();
                updateSendStats(brokerClient, activeMQMessage);
            }
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void acknowledgeMessage(BrokerClient brokerClient, MessageAck messageAck) throws JMSException {
        Subscription subscription;
        if (isManagerFor(messageAck.getDestination(), messageAck.isPersistent()) && (subscription = this.subscriptionContainer.getSubscription(messageAck.getConsumerId())) != null) {
            subscription.messageConsumed(messageAck);
            if (messageAck.isMessageRead()) {
                updateAcknowledgeStats(brokerClient, subscription);
            }
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void acknowledgeTransactedMessage(BrokerClient brokerClient, String str, MessageAck messageAck) throws JMSException {
        Subscription subscription;
        if (isManagerFor(messageAck.getDestination(), messageAck.isPersistent()) && (subscription = this.subscriptionContainer.getSubscription(messageAck.getConsumerId())) != null) {
            subscription.onAcknowledgeTransactedMessageBeforeCommit(messageAck);
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void redeliverMessage(BrokerClient brokerClient, MessageAck messageAck) throws JMSException {
        Subscription subscription;
        if (isManagerFor(messageAck.getDestination(), messageAck.isPersistent()) && (subscription = this.subscriptionContainer.getSubscription(messageAck.getConsumerId())) != null) {
            subscription.redeliverMessage(null, messageAck);
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void poll() throws JMSException {
        synchronized (this.subscriptionMutex) {
            for (QueueMessageContainer queueMessageContainer : this.activeSubscriptions.keySet()) {
                doPeek(queueMessageContainer, (QueueList) this.browsers.get(queueMessageContainer));
                doPoll(queueMessageContainer, (QueueList) this.activeSubscriptions.get(queueMessageContainer));
            }
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void commitTransaction(BrokerClient brokerClient, String str) {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void rollbackTransaction(BrokerClient brokerClient, String str) {
    }

    @Override // org.codehaus.activemq.service.impl.MessageContainerManagerSupport, org.codehaus.activemq.service.MessageContainerManager
    public MessageContainer getContainer(String str) throws JMSException {
        MessageContainer container;
        synchronized (this.subscriptionMutex) {
            container = super.getContainer(str);
        }
        return container;
    }

    @Override // org.codehaus.activemq.service.impl.MessageContainerManagerSupport
    protected MessageContainer createContainer(String str) throws JMSException {
        QueueMessageContainer createQueueMessageContainer = this.persistenceAdapter.createQueueMessageContainer(str);
        Iterator subscriptionIterator = this.subscriptionContainer.subscriptionIterator();
        while (subscriptionIterator.hasNext()) {
            Subscription subscription = (Subscription) subscriptionIterator.next();
            if (subscription.isBrowser()) {
                updateBrowsers(createQueueMessageContainer, subscription);
            } else {
                updateActiveSubscriptions(createQueueMessageContainer, subscription);
            }
        }
        this.destinationMap.put(new ActiveMQQueue(str), createQueueMessageContainer);
        return createQueueMessageContainer;
    }

    @Override // org.codehaus.activemq.service.impl.MessageContainerManagerSupport
    protected Destination createDestination(String str) {
        return new ActiveMQQueue(str);
    }

    private void doPeek(QueueMessageContainer queueMessageContainer, QueueList queueList) throws JMSException {
        int i;
        if (queueList == null || queueList.size() <= 0) {
            return;
        }
        for (int i2 = 0; i2 < queueList.size(); i2++) {
            SubscriptionImpl subscriptionImpl = (SubscriptionImpl) queueList.get(i2);
            int i3 = 0;
            do {
                ActiveMQMessage peekNext = queueMessageContainer.peekNext(subscriptionImpl.getLastMessageIdentity());
                if (peekNext != null) {
                    if (subscriptionImpl.isTarget(peekNext)) {
                        subscriptionImpl.addMessage(queueMessageContainer, peekNext);
                        this.dispatcher.wakeup(subscriptionImpl);
                    } else {
                        subscriptionImpl.setLastMessageIdentifier(peekNext.getJMSMessageIdentity());
                    }
                }
                if (peekNext != null && !subscriptionImpl.isAtPrefetchLimit()) {
                    i = i3;
                    i3++;
                }
            } while (i < MAX_MESSAGES_DISPATCHED_FROM_POLL);
        }
    }

    private void doPoll(QueueMessageContainer queueMessageContainer, QueueList queueList) throws JMSException {
        int i;
        int i2 = 0;
        if (queueList == null || queueList.size() <= 0) {
            return;
        }
        do {
            boolean z = false;
            ActiveMQMessage poll = queueMessageContainer.poll();
            if (poll != null) {
                QueueListEntry firstEntry = queueList.getFirstEntry();
                boolean z2 = false;
                while (true) {
                    if (firstEntry == null) {
                        break;
                    }
                    SubscriptionImpl subscriptionImpl = (SubscriptionImpl) firstEntry.getElement();
                    if (subscriptionImpl.isTarget(poll)) {
                        z2 = true;
                        if (!subscriptionImpl.isAtPrefetchLimit()) {
                            subscriptionImpl.addMessage(queueMessageContainer, poll);
                            z = true;
                            this.dispatcher.wakeup(subscriptionImpl);
                            queueList.rotate();
                            break;
                        }
                    }
                    firstEntry = queueList.getNextEntry(firstEntry);
                }
                if (!z) {
                    if (z2) {
                        queueMessageContainer.returnMessage(poll.getJMSMessageIdentity());
                        return;
                    }
                    return;
                }
            }
            if (poll == null) {
                return;
            }
            i = i2;
            i2++;
        } while (i < MAX_MESSAGES_DISPATCHED_FROM_POLL);
    }

    private void updateActiveSubscriptions(Subscription subscription) throws JMSException {
        synchronized (this.subscriptionMutex) {
            boolean z = false;
            String physicalName = subscription.getDestination().getPhysicalName();
            for (Map.Entry entry : this.messageContainers.entrySet()) {
                String str = (String) entry.getKey();
                QueueMessageContainer queueMessageContainer = (QueueMessageContainer) entry.getValue();
                if (str.equals(physicalName)) {
                    z = true;
                }
                processSubscription(subscription, queueMessageContainer);
            }
            if (!z) {
                processSubscription(subscription, (QueueMessageContainer) getContainer(physicalName));
            }
        }
    }

    protected void processSubscription(Subscription subscription, QueueMessageContainer queueMessageContainer) throws JMSException {
        if (subscription.isBrowser()) {
            updateBrowsers(queueMessageContainer, subscription);
        } else {
            updateActiveSubscriptions(queueMessageContainer, subscription);
        }
    }

    private void updateActiveSubscriptions(QueueMessageContainer queueMessageContainer, Subscription subscription) throws JMSException {
        if (queueMessageContainer.getDestinationName().equals(subscription.getDestination().getPhysicalName())) {
            queueMessageContainer.reset();
            QueueList subscriptionList = getSubscriptionList(queueMessageContainer);
            if (subscriptionList.contains(subscription)) {
                return;
            }
            subscriptionList.add(subscription);
        }
    }

    private QueueList getSubscriptionList(QueueMessageContainer queueMessageContainer) {
        QueueList queueList = (QueueList) this.activeSubscriptions.get(queueMessageContainer);
        if (queueList == null) {
            queueList = new DefaultQueueList();
            this.activeSubscriptions.put(queueMessageContainer, queueList);
        }
        return queueList;
    }

    private void updateBrowsers(QueueMessageContainer queueMessageContainer, Subscription subscription) throws JMSException {
        if (queueMessageContainer.getDestinationName().equals(subscription.getDestination().getPhysicalName())) {
            queueMessageContainer.reset();
            QueueList browserList = getBrowserList(queueMessageContainer);
            if (browserList.contains(subscription)) {
                return;
            }
            browserList.add(subscription);
        }
    }

    private QueueList getBrowserList(QueueMessageContainer queueMessageContainer) {
        QueueList queueList = (QueueList) this.browsers.get(queueMessageContainer);
        if (queueList == null) {
            queueList = new DefaultQueueList();
            this.browsers.put(queueMessageContainer, queueList);
        }
        return queueList;
    }

    protected Filter createFilter(ConsumerInfo consumerInfo) throws JMSException {
        Filter createFilter = this.filterFactory.createFilter(consumerInfo.getDestination(), consumerInfo.getSelector());
        if (consumerInfo.isNoLocal()) {
            createFilter = new AndFilter(createFilter, new NoLocalFilter(consumerInfo.getClientId()));
        }
        return createFilter;
    }

    @Override // org.codehaus.activemq.service.impl.MessageContainerManagerSupport, org.codehaus.activemq.service.MessageContainerManager
    public void createMessageContainer(ActiveMQDestination activeMQDestination) throws JMSException {
        if (activeMQDestination.isQueue()) {
            super.createMessageContainer(activeMQDestination);
        }
    }

    @Override // org.codehaus.activemq.service.impl.MessageContainerManagerSupport, org.codehaus.activemq.service.MessageContainerManager
    public synchronized void destroyMessageContainer(ActiveMQDestination activeMQDestination) throws JMSException {
        if (activeMQDestination.isQueue()) {
            super.destroyMessageContainer(activeMQDestination);
        }
    }

    @Override // org.codehaus.activemq.service.QueueMessageContainerManager
    public void sendToDeadLetterQueue(String str, ActiveMQMessage activeMQMessage) throws JMSException {
        QueueMessageContainer queueMessageContainer = (QueueMessageContainer) getContainer(str);
        queueMessageContainer.setDeadLetterQueue(true);
        queueMessageContainer.addMessage(activeMQMessage);
        this.dispatcher.wakeup();
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$codehaus$activemq$service$impl$DurableQueueMessageContainerManager == null) {
            cls = class$("org.codehaus.activemq.service.impl.DurableQueueMessageContainerManager");
            class$org$codehaus$activemq$service$impl$DurableQueueMessageContainerManager = cls;
        } else {
            cls = class$org$codehaus$activemq$service$impl$DurableQueueMessageContainerManager;
        }
        log = LogFactory.getLog(cls);
    }
}
