package com.aliyun.httpcomponent.httpclient.implementation.reactive;

import com.aliyun.apache.hc.core5.http.Header;
import com.aliyun.apache.hc.core5.http.HttpStreamResetException;
import com.aliyun.apache.hc.core5.http.nio.AsyncDataConsumer;
import com.aliyun.apache.hc.core5.http.nio.CapacityChannel;
import com.aliyun.apache.hc.core5.util.Args;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:com/aliyun/httpcomponent/httpclient/implementation/reactive/ReactiveDataConsumer.class */
final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuffer> {
    private final AtomicLong requests = new AtomicLong(0);
    private final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue();
    private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
    private final Object flushLock = new Object();
    private final AtomicInteger windowScalingIncrement = new AtomicInteger(0);
    private volatile boolean cancelled;
    private volatile boolean completed;
    private volatile Exception exception;
    private volatile CapacityChannel capacityChannel;
    private volatile Subscriber<? super ByteBuffer> subscriber;

    public void failed(Exception exc) {
        if (this.completed) {
            return;
        }
        this.exception = exc;
        flushToSubscriber();
    }

    @Override // com.aliyun.apache.hc.core5.http.nio.AsyncDataConsumer
    public void updateCapacity(CapacityChannel capacityChannel) throws IOException {
        throwIfCancelled();
        this.capacityChannel = capacityChannel;
        signalCapacity(capacityChannel);
    }

    private void signalCapacity(CapacityChannel capacityChannel) throws IOException {
        int andSet = this.windowScalingIncrement.getAndSet(0);
        if (andSet > 0) {
            capacityChannel.update(andSet);
        }
    }

    private void throwIfCancelled() throws IOException {
        if (this.cancelled) {
            throw new HttpStreamResetException("Downstream subscriber to ReactiveDataConsumer cancelled");
        }
    }

    @Override // com.aliyun.apache.hc.core5.http.nio.AsyncDataConsumer
    public void consume(ByteBuffer byteBuffer) throws IOException {
        if (this.completed) {
            throw new IllegalStateException("Received data past end of stream");
        }
        throwIfCancelled();
        byte[] bArr = new byte[byteBuffer.remaining()];
        byteBuffer.get(bArr);
        this.buffers.add(ByteBuffer.wrap(bArr));
        flushToSubscriber();
    }

    @Override // com.aliyun.apache.hc.core5.http.nio.AsyncDataConsumer
    public void streamEnd(List<? extends Header> list) {
        this.completed = true;
        flushToSubscriber();
    }

    @Override // com.aliyun.apache.hc.core5.http.nio.ResourceHolder
    public void releaseResources() {
        this.capacityChannel = null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flushToSubscriber() {
        ByteBuffer poll;
        synchronized (this.flushLock) {
            Subscriber<? super ByteBuffer> subscriber = this.subscriber;
            if (this.flushInProgress.getAndSet(true)) {
                return;
            }
            if (subscriber == null) {
                return;
            }
            try {
                if (this.exception != null) {
                    subscriber.onError(this.exception);
                    this.subscriber = null;
                    this.flushInProgress.set(false);
                    return;
                }
                while (this.requests.get() > 0 && (poll = this.buffers.poll()) != null) {
                    int remaining = poll.remaining();
                    subscriber.onNext(poll);
                    this.requests.decrementAndGet();
                    this.windowScalingIncrement.addAndGet(remaining);
                }
                CapacityChannel capacityChannel = this.capacityChannel;
                if (capacityChannel != null) {
                    try {
                        signalCapacity(capacityChannel);
                    } catch (IOException e) {
                        this.exception = e;
                        subscriber.onError(e);
                        this.flushInProgress.set(false);
                        return;
                    }
                }
                if (this.completed && this.buffers.isEmpty()) {
                    subscriber.onComplete();
                    this.subscriber = null;
                }
                this.flushInProgress.set(false);
            } finally {
                this.flushInProgress.set(false);
            }
        }
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        this.subscriber = (Subscriber) Args.notNull(subscriber, "subscriber");
        subscriber.onSubscribe(new Subscription() { // from class: com.aliyun.httpcomponent.httpclient.implementation.reactive.ReactiveDataConsumer.1
            @Override // org.reactivestreams.Subscription
            public void request(long j) {
                if (j <= 0) {
                    ReactiveDataConsumer.this.failed(new IllegalArgumentException("The number of elements requested must be strictly positive"));
                } else {
                    ReactiveDataConsumer.this.requests.addAndGet(j);
                    ReactiveDataConsumer.this.flushToSubscriber();
                }
            }

            @Override // org.reactivestreams.Subscription
            public void cancel() {
                ReactiveDataConsumer.this.cancelled = true;
                ReactiveDataConsumer.this.subscriber = null;
            }
        });
    }
}
