package com.caucho.bam.mailbox;

import com.caucho.bam.BamError;
import com.caucho.bam.QueueFullException;
import com.caucho.bam.RemoteConnectionFailedException;
import com.caucho.bam.broker.Broker;
import com.caucho.bam.packet.Message;
import com.caucho.bam.packet.MessageError;
import com.caucho.bam.packet.Packet;
import com.caucho.bam.packet.Query;
import com.caucho.bam.packet.QueryError;
import com.caucho.bam.packet.QueryResult;
import com.caucho.bam.stream.MessageStream;
import com.caucho.env.actor.AbstractActorProcessor;
import com.caucho.env.actor.ActorProcessor;
import com.caucho.lifecycle.Lifecycle;
import com.caucho.util.L10N;
import java.io.Closeable;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:BOOT-INF/lib/resin-4.0.65.jar:com/caucho/bam/mailbox/MultiworkerMailbox.class */
public class MultiworkerMailbox implements Mailbox, Closeable {
    private static final L10N L = new L10N(MultiworkerMailbox.class);
    private static final Logger log = Logger.getLogger(MultiworkerMailbox.class.getName());
    private final String _name;
    private final String _address;
    private final Broker _broker;
    private final MessageStream _actorStream;
    private final int _queueSize = 16384;
    private final MailboxQueue2[] _queues;
    private final Lifecycle _lifecycle;
    private final AtomicInteger _roundRobin;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/resin-4.0.65.jar:com/caucho/bam/mailbox/MultiworkerMailbox$PacketProcessor.class */
    public class PacketProcessor extends AbstractActorProcessor<Packet> {
        PacketProcessor() {
        }

        @Override // com.caucho.env.actor.AbstractActorProcessor, com.caucho.env.actor.ActorProcessor
        public String getThreadName() {
            return MultiworkerMailbox.this._name + "-" + Thread.currentThread().getId();
        }

        @Override // com.caucho.env.actor.AbstractActorProcessor, com.caucho.env.actor.ActorProcessor
        public void process(Packet packet) throws Exception {
            packet.dispatch(MultiworkerMailbox.this.getActorStream(), MultiworkerMailbox.this._broker);
        }

        @Override // com.caucho.env.actor.AbstractActorProcessor, com.caucho.env.actor.ActorProcessor
        public void onProcessComplete() throws Exception {
        }

        public String toString() {
            return getClass().getSimpleName() + "[" + MultiworkerMailbox.this._name + "]";
        }
    }

    public MultiworkerMailbox(MessageStream messageStream, Broker broker, int i) {
        this(null, messageStream, broker, i);
    }

    public MultiworkerMailbox(String str, MessageStream messageStream, Broker broker, int i) {
        this._queueSize = 16384;
        this._lifecycle = new Lifecycle();
        this._roundRobin = new AtomicInteger();
        this._address = str;
        if (broker == null) {
            throw new NullPointerException(L.l("broker must not be null"));
        }
        if (messageStream == null) {
            throw new NullPointerException(L.l("actorStream must not be null"));
        }
        this._broker = broker;
        this._actorStream = messageStream;
        if (this._actorStream.getAddress() == null) {
            this._name = this._actorStream.getClass().getSimpleName();
        } else {
            this._name = this._actorStream.getAddress();
        }
        this._queues = new MailboxQueue2[i];
        for (int i2 = 0; i2 < i; i2++) {
            this._queues[i2] = createWorker();
        }
        this._lifecycle.toActive();
    }

    protected MailboxQueue2 createWorker() {
        return new MailboxQueue2(16384, createProcessor());
    }

    protected ActorProcessor<Packet> createProcessor() {
        return new PacketProcessor();
    }

    public int getThreadMax() {
        return this._queues.length;
    }

    @Override // com.caucho.bam.mailbox.Mailbox
    public int getSize() {
        int i = 0;
        for (MailboxQueue2 mailboxQueue2 : this._queues) {
            i += mailboxQueue2.getSize();
        }
        return i;
    }

    @Override // com.caucho.bam.stream.MessageStream, com.caucho.bam.actor.ActorHolder
    public String getAddress() {
        return this._address != null ? this._address : this._actorStream.getAddress();
    }

    public boolean isPacketAvailable() {
        return false;
    }

    @Override // com.caucho.bam.stream.MessageStream, com.caucho.bam.actor.ActorHolder
    public Broker getBroker() {
        return this._broker;
    }

    @Override // com.caucho.bam.mailbox.Mailbox
    public MessageStream getActorStream() {
        return this._actorStream;
    }

    @Override // com.caucho.bam.stream.MessageStream
    public void message(String str, String str2, Serializable serializable) {
        try {
            enqueue(new Message(str, str2, serializable));
        } catch (QueueFullException e) {
            log.finer(e.toString());
        } catch (RuntimeException e2) {
            log.warning(this + ": message " + serializable + " {to:" + str + ", from:" + str2 + "}\n  " + e2.toString());
            log.log(Level.FINE, e2.toString(), (Throwable) e2);
        }
    }

    @Override // com.caucho.bam.stream.MessageStream
    public void messageError(String str, String str2, Serializable serializable, BamError bamError) {
        try {
            enqueue(new MessageError(str, str2, serializable, bamError));
        } catch (QueueFullException e) {
            log.finer(e.toString());
        } catch (RuntimeException e2) {
            log.warning(this + ": messageError " + serializable + " {to:" + str + ", from:" + str2 + "}\n  " + e2.toString());
            log.log(Level.FINE, e2.toString(), (Throwable) e2);
        }
    }

    @Override // com.caucho.bam.stream.MessageStream
    public void query(long j, String str, String str2, Serializable serializable) {
        if (!this._lifecycle.isActive()) {
            RemoteConnectionFailedException remoteConnectionFailedException = new RemoteConnectionFailedException(L.l("{0} is closed", this));
            remoteConnectionFailedException.fillInStackTrace();
            getBroker().queryError(j, str2, str, serializable, BamError.create(remoteConnectionFailedException));
            return;
        }
        try {
            enqueue(new Query(j, str, str2, serializable));
        } catch (QueueFullException e) {
            log.finer(e.toString());
            getBroker().queryError(j, str2, str, serializable, BamError.create(e));
        } catch (RuntimeException e2) {
            log.warning(this + ": query " + serializable + " {to:" + str + ", from:" + str2 + "}\n  " + e2.toString());
            getBroker().queryError(j, str2, str, serializable, BamError.create(e2));
        }
    }

    @Override // com.caucho.bam.stream.MessageStream
    public void queryResult(long j, String str, String str2, Serializable serializable) {
        try {
            enqueue(new QueryResult(j, str, str2, serializable));
        } catch (QueueFullException e) {
            log.finer(e.toString());
        } catch (RuntimeException e2) {
            log.warning(this + ": queryResult " + serializable + " {to:" + str + ", from:" + str2 + "}\n  " + e2.toString());
            log.log(Level.FINE, e2.toString(), (Throwable) e2);
        }
    }

    @Override // com.caucho.bam.stream.MessageStream
    public void queryError(long j, String str, String str2, Serializable serializable, BamError bamError) {
        try {
            enqueue(new QueryError(j, str, str2, serializable, bamError));
        } catch (QueueFullException e) {
            log.finer(e.toString());
        } catch (RuntimeException e2) {
            log.warning(this + ": queryError " + bamError + " {to:" + str + ", from:" + str2 + "}\n  " + e2.toString());
            log.log(Level.FINE, e2.toString(), (Throwable) e2);
        }
    }

    protected final void enqueue(Packet packet) {
        if (!this._lifecycle.isActive()) {
            throw new IllegalStateException(L.l("{0} cannot accept packets because it's no longer active", this));
        }
        MailboxQueue2 findWorker = findWorker();
        if (log.isLoggable(Level.FINEST)) {
            log.finest(this + " enqueue(" + findWorker.getSize() + ") " + packet);
        }
        if (!findWorker.offer(packet, false)) {
            findWorker.wake();
            if (!findWorker.offer(packet, true)) {
                findWorker.wake();
                this._broker.getQueueFullHandler().onQueueFull(this, findWorker.getSize(), 100L, TimeUnit.MILLISECONDS, packet);
            }
        }
        findWorker.wake();
    }

    private MailboxQueue2 findWorker() {
        int i = Integer.MAX_VALUE;
        MailboxQueue2 mailboxQueue2 = this._queues[0];
        for (MailboxQueue2 mailboxQueue22 : this._queues) {
            int size = mailboxQueue22.getSize();
            if (size < i) {
                mailboxQueue2 = mailboxQueue22;
                i = size;
            }
        }
        return mailboxQueue2;
    }

    @Override // com.caucho.bam.mailbox.Mailbox
    public void close() {
        this._lifecycle.toStop();
        for (MailboxQueue2 mailboxQueue2 : this._queues) {
            mailboxQueue2.wake();
        }
        long currentTimeActual = getCurrentTimeActual() + 2000;
        while (!isQueueEmpty() && getCurrentTimeActual() < currentTimeActual) {
            try {
                Thread.sleep(100L);
            } catch (Exception e) {
            }
        }
        for (MailboxQueue2 mailboxQueue22 : this._queues) {
            mailboxQueue22.close();
        }
        this._lifecycle.toDestroy();
    }

    private boolean isQueueEmpty() {
        for (MailboxQueue2 mailboxQueue2 : this._queues) {
            if (mailboxQueue2.isEmpty()) {
                return true;
            }
        }
        return false;
    }

    protected long getCurrentTimeActual() {
        return System.currentTimeMillis();
    }

    @Override // com.caucho.bam.stream.MessageStream
    public boolean isClosed() {
        return this._lifecycle.isDestroying() || this._broker.isClosed() || this._actorStream.isClosed();
    }

    public String toString() {
        return getClass().getSimpleName() + "[" + this._name + "]";
    }
}
