package reactor.core.processor;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.processor.MutableSignal;
import reactor.core.processor.util.RingBufferSubscriberUtils;
import reactor.core.support.SpecificationExceptions;
import reactor.jarjar.com.lmax.disruptor.AlertException;
import reactor.jarjar.com.lmax.disruptor.EventFactory;
import reactor.jarjar.com.lmax.disruptor.EventProcessor;
import reactor.jarjar.com.lmax.disruptor.LiteBlockingWaitStrategy;
import reactor.jarjar.com.lmax.disruptor.RingBuffer;
import reactor.jarjar.com.lmax.disruptor.Sequence;
import reactor.jarjar.com.lmax.disruptor.SequenceBarrier;
import reactor.jarjar.com.lmax.disruptor.TimeoutException;
import reactor.jarjar.com.lmax.disruptor.WaitStrategy;
import reactor.jarjar.com.lmax.disruptor.dsl.ProducerType;

/* loaded from: input_file:lib/reactor-core-2.0.6.RELEASE.jar:reactor/core/processor/RingBufferWorkProcessor.class */
public final class RingBufferWorkProcessor<E> extends ExecutorPoweredProcessor<E, E> {
    private final Sequence workSequence;
    private final Queue<Sequence> cancelledSequences;
    private final RingBuffer<MutableSignal<E>> ringBuffer;

    /* loaded from: input_file:lib/reactor-core-2.0.6.RELEASE.jar:reactor/core/processor/RingBufferWorkProcessor$RingBufferSubscription.class */
    private final class RingBufferSubscription implements Subscription {
        private final Subscriber<? super E> subscriber;
        private final WorkSignalProcessor eventProcessor;

        public RingBufferSubscription(Subscriber<? super E> subscriber, WorkSignalProcessor workSignalProcessor) {
            this.subscriber = subscriber;
            this.eventProcessor = workSignalProcessor;
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (j <= 0) {
                this.subscriber.onError(SpecificationExceptions.spec_3_09_exception(j));
                return;
            }
            if (this.eventProcessor.isRunning()) {
                if (this.eventProcessor.pendingRequest.addAndGet(j) < 0) {
                    this.eventProcessor.pendingRequest.set(Long.MAX_VALUE);
                }
                Subscription subscription = RingBufferWorkProcessor.this.upstreamSubscription;
                if (subscription != null) {
                    subscription.request(j);
                }
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            Subscription subscription = RingBufferWorkProcessor.this.upstreamSubscription;
            if (subscription != null && RingBufferWorkProcessor.this.autoCancel && ReactorProcessor.SUBSCRIBER_COUNT.get(RingBufferWorkProcessor.this) - 1 == 0) {
                RingBufferWorkProcessor.this.upstreamSubscription = null;
                subscription.cancel();
            }
            this.eventProcessor.halt();
        }
    }

    /* loaded from: input_file:lib/reactor-core-2.0.6.RELEASE.jar:reactor/core/processor/RingBufferWorkProcessor$WorkSignalProcessor.class */
    private static final class WorkSignalProcessor<T> implements EventProcessor {
        private final AtomicBoolean running = new AtomicBoolean(false);
        private final Sequence sequence = new Sequence(-1);
        private final Sequence pendingRequest = new Sequence(0);
        private final SequenceBarrier barrier;
        private final RingBufferWorkProcessor<T> processor;
        private final Subscriber<? super T> subscriber;
        private Subscription subscription;

        public WorkSignalProcessor(Subscriber<? super T> subscriber, RingBufferWorkProcessor<T> ringBufferWorkProcessor) {
            this.processor = ringBufferWorkProcessor;
            this.subscriber = subscriber;
            this.barrier = ((RingBufferWorkProcessor) ringBufferWorkProcessor).ringBuffer.newBarrier(new Sequence[0]);
        }

        public Subscription getSubscription() {
            return this.subscription;
        }

        public void setSubscription(Subscription subscription) {
            this.subscription = subscription;
        }

        @Override // reactor.jarjar.com.lmax.disruptor.EventProcessor
        public Sequence getSequence() {
            return this.sequence;
        }

        @Override // reactor.jarjar.com.lmax.disruptor.EventProcessor
        public void halt() {
            this.running.set(false);
            this.barrier.alert();
        }

        @Override // reactor.jarjar.com.lmax.disruptor.EventProcessor
        public boolean isRunning() {
            return this.running.get();
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!this.running.compareAndSet(false, true)) {
                this.subscriber.onError(new IllegalStateException("Thread is already running"));
                this.processor.decrementSubscribers();
                return;
            }
            this.processor.incrementSubscribers();
            try {
                this.subscriber.onSubscribe(this.subscription);
            } catch (Throwable th) {
                this.subscriber.onError(th);
            }
            boolean z = true;
            long j = Long.MIN_VALUE;
            long j2 = this.sequence.get();
            MutableSignal<T> mutableSignal = null;
            try {
                if (!RingBufferSubscriberUtils.waitRequestOrTerminalEvent(this.pendingRequest, ((RingBufferWorkProcessor) this.processor).ringBuffer, this.barrier, this.subscriber, this.running)) {
                    ((RingBufferWorkProcessor) this.processor).ringBuffer.removeGatingSequence(this.sequence);
                    this.processor.decrementSubscribers();
                    this.running.set(false);
                    return;
                }
                boolean z2 = this.pendingRequest.get() == Long.MAX_VALUE;
                if (replay(z2)) {
                    this.running.set(false);
                    this.processor.decrementSubscribers();
                    this.running.set(false);
                    return;
                }
                this.barrier.clearAlert();
                while (true) {
                    if (z) {
                        z = false;
                        do {
                            try {
                                try {
                                    try {
                                        j2 = ((RingBufferWorkProcessor) this.processor).workSequence.get() + 1;
                                        this.sequence.set(j2 - 1);
                                    } catch (Throwable th2) {
                                        this.subscriber.onError(th2);
                                        this.sequence.set(j2);
                                        z = true;
                                    }
                                } catch (AlertException e) {
                                    if (this.running.get()) {
                                        this.barrier.clearAlert();
                                    } else {
                                        this.sequence.set(j2 - 1);
                                        ((RingBufferWorkProcessor) this.processor).cancelledSequences.add(this.sequence);
                                    }
                                }
                            } catch (CancelException e2) {
                                if (mutableSignal == null || mutableSignal.type != MutableSignal.Type.NEXT || mutableSignal.value == null) {
                                    this.sequence.set(j2);
                                } else {
                                    this.sequence.set(j2 - 1);
                                }
                                ((RingBufferWorkProcessor) this.processor).cancelledSequences.add(this.sequence);
                            }
                        } while (!((RingBufferWorkProcessor) this.processor).workSequence.compareAndSet(j2 - 1, j2));
                    }
                    if (j >= j2) {
                        mutableSignal = (MutableSignal) ((RingBufferWorkProcessor) this.processor).ringBuffer.get(j2);
                        readNextEvent(mutableSignal, z2);
                        RingBufferSubscriberUtils.routeOnce(mutableSignal, this.subscriber);
                        z = true;
                    } else {
                        j = this.barrier.waitFor(j2);
                    }
                }
                this.sequence.set(j2 - 1);
                ((RingBufferWorkProcessor) this.processor).cancelledSequences.add(this.sequence);
            } finally {
                this.processor.decrementSubscribers();
                this.running.set(false);
            }
        }

        private boolean replay(boolean z) {
            while (true) {
                Sequence sequence = (Sequence) ((RingBufferWorkProcessor) this.processor).cancelledSequences.poll();
                if (sequence == null) {
                    return false;
                }
                MutableSignal<T> mutableSignal = (MutableSignal) ((RingBufferWorkProcessor) this.processor).ringBuffer.get(sequence.get() + 1);
                try {
                    if (mutableSignal.value == null) {
                        this.barrier.waitFor(sequence.get() + 1);
                    }
                    readNextEvent(mutableSignal, z);
                    RingBufferSubscriberUtils.routeOnce(mutableSignal, this.subscriber);
                    ((RingBufferWorkProcessor) this.processor).ringBuffer.removeGatingSequence(sequence);
                } catch (InterruptedException | CancelException | AlertException | TimeoutException e) {
                    ((RingBufferWorkProcessor) this.processor).ringBuffer.removeGatingSequence(this.sequence);
                    ((RingBufferWorkProcessor) this.processor).cancelledSequences.add(sequence);
                    return true;
                }
            }
        }

        private void readNextEvent(MutableSignal<T> mutableSignal, boolean z) throws AlertException {
            if (mutableSignal.type != MutableSignal.Type.NEXT) {
                if (mutableSignal.type != null) {
                    this.running.set(false);
                    RingBufferSubscriberUtils.route(mutableSignal, this.subscriber);
                    Subscription subscription = this.processor.upstreamSubscription;
                    if (subscription != null) {
                        subscription.cancel();
                    }
                    throw CancelException.INSTANCE;
                }
                return;
            }
            if (mutableSignal.value == null || z || this.pendingRequest.addAndGet(-1L) >= 0) {
                return;
            }
            this.pendingRequest.incrementAndGet();
            while (this.pendingRequest.addAndGet(-1L) < 0) {
                this.pendingRequest.incrementAndGet();
                if (!this.running.get()) {
                    throw CancelException.INSTANCE;
                }
                LockSupport.parkNanos(1L);
            }
        }
    }

    public static <E> RingBufferWorkProcessor<E> create() {
        return create(RingBufferWorkProcessor.class.getSimpleName(), 32, (WaitStrategy) new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> create(boolean z) {
        return create(RingBufferWorkProcessor.class.getSimpleName(), 32, new LiteBlockingWaitStrategy(), z);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService executorService) {
        return create(executorService, 32, (WaitStrategy) new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService executorService, boolean z) {
        return create(executorService, 32, new LiteBlockingWaitStrategy(), z);
    }

    public static <E> RingBufferWorkProcessor<E> create(String str, int i) {
        return create(str, i, (WaitStrategy) new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> create(String str, int i, boolean z) {
        return create(str, i, new LiteBlockingWaitStrategy(), z);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService executorService, int i) {
        return create(executorService, i, (WaitStrategy) new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService executorService, int i, boolean z) {
        return create(executorService, i, new LiteBlockingWaitStrategy(), z);
    }

    public static <E> RingBufferWorkProcessor<E> create(String str, int i, WaitStrategy waitStrategy) {
        return create(str, i, waitStrategy, true);
    }

    public static <E> RingBufferWorkProcessor<E> create(String str, int i, WaitStrategy waitStrategy, boolean z) {
        return new RingBufferWorkProcessor<>(str, null, i, waitStrategy, false, z);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService executorService, int i, WaitStrategy waitStrategy) {
        return create(executorService, i, waitStrategy, true);
    }

    public static <E> RingBufferWorkProcessor<E> create(ExecutorService executorService, int i, WaitStrategy waitStrategy, boolean z) {
        return new RingBufferWorkProcessor<>(null, executorService, i, waitStrategy, false, z);
    }

    public static <E> RingBufferWorkProcessor<E> share() {
        return share(RingBufferWorkProcessor.class.getSimpleName(), 32, (WaitStrategy) new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> share(boolean z) {
        return share(RingBufferWorkProcessor.class.getSimpleName(), 32, new LiteBlockingWaitStrategy(), z);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService executorService) {
        return share(executorService, 32, (WaitStrategy) new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService executorService, boolean z) {
        return share(executorService, 32, new LiteBlockingWaitStrategy(), z);
    }

    public static <E> RingBufferWorkProcessor<E> share(String str, int i) {
        return share(str, i, (WaitStrategy) new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> share(String str, int i, boolean z) {
        return share(str, i, new LiteBlockingWaitStrategy(), z);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService executorService, int i) {
        return share(executorService, i, (WaitStrategy) new LiteBlockingWaitStrategy(), true);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService executorService, int i, boolean z) {
        return share(executorService, i, new LiteBlockingWaitStrategy(), z);
    }

    public static <E> RingBufferWorkProcessor<E> share(String str, int i, WaitStrategy waitStrategy) {
        return share(str, i, waitStrategy, true);
    }

    public static <E> RingBufferWorkProcessor<E> share(String str, int i, WaitStrategy waitStrategy, boolean z) {
        return new RingBufferWorkProcessor<>(str, null, i, waitStrategy, true, z);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService executorService, int i, WaitStrategy waitStrategy) {
        return share(executorService, i, waitStrategy, true);
    }

    public static <E> RingBufferWorkProcessor<E> share(ExecutorService executorService, int i, WaitStrategy waitStrategy, boolean z) {
        return new RingBufferWorkProcessor<>(null, executorService, i, waitStrategy, true, z);
    }

    private RingBufferWorkProcessor(String str, ExecutorService executorService, int i, WaitStrategy waitStrategy, boolean z, boolean z2) {
        super(str, executorService, z2);
        this.workSequence = new Sequence(-1L);
        this.cancelledSequences = new ConcurrentLinkedQueue();
        this.ringBuffer = RingBuffer.create(z ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>() { // from class: reactor.core.processor.RingBufferWorkProcessor.1
            @Override // reactor.jarjar.com.lmax.disruptor.EventFactory
            public MutableSignal<E> newInstance() {
                return new MutableSignal<>();
            }
        }, i, waitStrategy);
        this.ringBuffer.addGatingSequences(this.workSequence);
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super E> subscriber) {
        if (null == subscriber) {
            throw new NullPointerException("Cannot subscribe NULL subscriber");
        }
        try {
            WorkSignalProcessor workSignalProcessor = new WorkSignalProcessor(subscriber, this);
            workSignalProcessor.sequence.set(this.workSequence.get());
            this.ringBuffer.addGatingSequences(workSignalProcessor.sequence);
            workSignalProcessor.setSubscription(new RingBufferSubscription(subscriber, workSignalProcessor));
            this.executor.execute(workSignalProcessor);
        } catch (Throwable th) {
            subscriber.onError(th);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(E e) {
        RingBufferSubscriberUtils.onNext(e, this.ringBuffer);
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        RingBufferSubscriberUtils.onError(th, this.ringBuffer);
        for (int i = 1; i < SUBSCRIBER_COUNT.get(this); i++) {
            RingBufferSubscriberUtils.onError(th, this.ringBuffer);
        }
    }

    @Override // reactor.core.processor.ExecutorPoweredProcessor, org.reactivestreams.Subscriber
    public void onComplete() {
        RingBufferSubscriberUtils.onComplete(this.ringBuffer);
        for (int i = 0; i < SUBSCRIBER_COUNT.get(this); i++) {
            RingBufferSubscriberUtils.onComplete(this.ringBuffer);
        }
        super.onComplete();
    }

    public Publisher<Void> writeWith(Publisher<? extends E> publisher) {
        return RingBufferSubscriberUtils.writeWith(publisher, this.ringBuffer);
    }

    public String toString() {
        return "RingBufferWorkProcessor{, ringBuffer=" + this.ringBuffer + ", executor=" + this.executor + ", workSequence=" + this.workSequence + ", cancelledSequence=" + this.cancelledSequences + '}';
    }

    @Override // reactor.core.processor.ReactorProcessor, reactor.core.support.NonBlocking
    public long getCapacity() {
        return this.ringBuffer.getBufferSize();
    }

    @Override // reactor.core.processor.ReactorProcessor
    public long getAvailableCapacity() {
        return this.ringBuffer.remainingCapacity();
    }
}
