/*
 * Decompiled with CFR 0.152.
 */
package com.provectus.kafka.ui.util;

import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.util.ReactiveFailover;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/*
 * Exception performing whole class analysis ignored.
 */
public class ReactiveFailover<T> {
    public static final Duration DEFAULT_RETRY_GRACE_PERIOD_MS = Duration.ofSeconds(5L);
    public static final Predicate<Throwable> CONNECTION_REFUSED_EXCEPTION_FILTER = error -> error.getCause() instanceof IOException && error.getCause().getMessage().contains("Connection refused");
    private final List<PublisherHolder<T>> publishers;
    private int currentIndex = 0;
    private final Predicate<Throwable> failoverExceptionsPredicate;
    private final String noAvailablePublishersMsg;

    public static <T> ReactiveFailover<T> createNoop(T publisher) {
        return ReactiveFailover.create(List.of(publisher), (T th) -> true, (String)"publisher is not available", (Duration)DEFAULT_RETRY_GRACE_PERIOD_MS);
    }

    public static <T> ReactiveFailover<T> create(List<T> publishers, Predicate<Throwable> failoverExeptionsPredicate, String noAvailablePublishersMsg, Duration retryGracePeriodMs) {
        return new ReactiveFailover(publishers.stream().map(p -> new PublisherHolder(() -> p, retryGracePeriodMs.toMillis())).toList(), failoverExeptionsPredicate, noAvailablePublishersMsg);
    }

    public static <T, A> ReactiveFailover<T> create(List<A> args, Function<A, T> factory, Predicate<Throwable> failoverExeptionsPredicate, String noAvailablePublishersMsg, Duration retryGracePeriodMs) {
        return new ReactiveFailover(args.stream().map(arg -> new PublisherHolder(() -> factory.apply(arg), retryGracePeriodMs.toMillis())).toList(), failoverExeptionsPredicate, noAvailablePublishersMsg);
    }

    private ReactiveFailover(List<PublisherHolder<T>> publishers, Predicate<Throwable> failoverExceptionsPredicate, String noAvailablePublishersMsg) {
        Preconditions.checkArgument((!publishers.isEmpty() ? 1 : 0) != 0);
        this.publishers = publishers;
        this.failoverExceptionsPredicate = failoverExceptionsPredicate;
        this.noAvailablePublishersMsg = noAvailablePublishersMsg;
    }

    public <V> Mono<V> mono(Function<T, Mono<V>> f) {
        List candidates = this.getActivePublishers();
        if (candidates.isEmpty()) {
            return Mono.error(() -> new IllegalStateException(this.noAvailablePublishersMsg));
        }
        return this.mono(f, candidates);
    }

    private <V> Mono<V> mono(Function<T, Mono<V>> f, List<PublisherHolder<T>> candidates) {
        PublisherHolder<T> publisher = candidates.get(0);
        return publisher.get().flatMap(f).onErrorResume(this.failoverExceptionsPredicate, th -> {
            publisher.markFailed();
            if (candidates.size() == 1) {
                return Mono.error((Throwable)th);
            }
            List<PublisherHolder> newCandidates = candidates.stream().skip(1L).filter(PublisherHolder::isActive).toList();
            if (newCandidates.isEmpty()) {
                return Mono.error((Throwable)th);
            }
            return this.mono(f, newCandidates);
        });
    }

    public <V> Flux<V> flux(Function<T, Flux<V>> f) {
        List candidates = this.getActivePublishers();
        if (candidates.isEmpty()) {
            return Flux.error(() -> new IllegalStateException(this.noAvailablePublishersMsg));
        }
        return this.flux(f, candidates);
    }

    private <V> Flux<V> flux(Function<T, Flux<V>> f, List<PublisherHolder<T>> candidates) {
        PublisherHolder<T> publisher = candidates.get(0);
        return publisher.get().flatMapMany(f).onErrorResume(this.failoverExceptionsPredicate, th -> {
            publisher.markFailed();
            if (candidates.size() == 1) {
                return Flux.error((Throwable)th);
            }
            List<PublisherHolder> newCandidates = candidates.stream().skip(1L).filter(PublisherHolder::isActive).toList();
            if (newCandidates.isEmpty()) {
                return Flux.error((Throwable)th);
            }
            return this.flux(f, newCandidates);
        });
    }

    private synchronized List<PublisherHolder<T>> getActivePublishers() {
        ArrayList<PublisherHolder<T>> result = new ArrayList<PublisherHolder<T>>();
        int j = this.currentIndex;
        for (int i = 0; i < this.publishers.size(); ++i) {
            PublisherHolder publisher = (PublisherHolder)this.publishers.get(j);
            if (publisher.isActive()) {
                result.add(publisher);
            } else if (this.currentIndex == j) {
                this.currentIndex = ++this.currentIndex == this.publishers.size() ? 0 : this.currentIndex;
            }
            j = ++j == this.publishers.size() ? 0 : j;
        }
        return result;
    }
}

