package org.elasticsearch.compute.operator.exchange;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.tasks.TaskCancelledException;

/* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.class */
public final class ExchangeSourceHandler extends AbstractRefCounted {
    private final ExchangeBuffer buffer;
    private final Executor fetchExecutor;
    private final PendingInstances outstandingSinks = new PendingInstances();
    private final PendingInstances outstandingSources = new PendingInstances();
    private final AtomicReference<Exception> failure = new AtomicReference<>();
    private final SubscribableListener<Void> completionFuture = new SubscribableListener<>();

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$LocalExchangeSource.class */
    private class LocalExchangeSource implements ExchangeSource {
        private boolean finished;

        LocalExchangeSource() {
            ExchangeSourceHandler.this.outstandingSources.trackNewInstance();
        }

        private void checkFailure() {
            Exception exc = ExchangeSourceHandler.this.failure.get();
            if (exc != null) {
                throw ExceptionsHelper.convertToElastic(exc);
            }
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public Page pollPage() {
            checkFailure();
            return ExchangeSourceHandler.this.buffer.pollPage();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public boolean isFinished() {
            checkFailure();
            return this.finished || ExchangeSourceHandler.this.buffer.isFinished();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public SubscribableListener<Void> waitForReading() {
            return ExchangeSourceHandler.this.buffer.waitForReading();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public void finish() {
            if (this.finished) {
                return;
            }
            this.finished = true;
            if (ExchangeSourceHandler.this.outstandingSources.finishInstance()) {
                ExchangeSourceHandler.this.buffer.finish(true);
            }
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public int bufferSize() {
            return ExchangeSourceHandler.this.buffer.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$LoopControl.class */
    public static class LoopControl {
        private Status status = Status.RUNNING;
        private final Thread startedThread = Thread.currentThread();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$LoopControl$Status.class */
        public enum Status {
            RUNNING,
            EXITING,
            EXITED
        }

        LoopControl() {
        }

        boolean isRunning() {
            return this.status == Status.RUNNING;
        }

        boolean tryResume() {
            if (this.startedThread != Thread.currentThread() || this.status == Status.EXITED) {
                return false;
            }
            this.status = Status.RUNNING;
            return true;
        }

        void exiting() {
            this.status = Status.EXITING;
        }

        void exited() {
            this.status = Status.EXITED;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$PendingInstances.class */
    public final class PendingInstances {
        private final AtomicInteger instances = new AtomicInteger();

        private PendingInstances() {
        }

        void trackNewInstance() {
            ExchangeSourceHandler.this.incRef();
            this.instances.incrementAndGet();
        }

        boolean finishInstance() {
            ExchangeSourceHandler.this.decRef();
            return this.instances.decrementAndGet() == 0;
        }
    }

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$RemoteSinkFetcher.class */
    private final class RemoteSinkFetcher {
        private volatile boolean finished = false;
        private final RemoteSink remoteSink;

        RemoteSinkFetcher(RemoteSink remoteSink) {
            ExchangeSourceHandler.this.outstandingSinks.trackNewInstance();
            this.remoteSink = remoteSink;
        }

        void fetchPage() {
            LoopControl loopControl = new LoopControl();
            while (loopControl.isRunning()) {
                loopControl.exiting();
                this.remoteSink.fetchPageAsync(ExchangeSourceHandler.this.buffer.noMoreInputs() || ExchangeSourceHandler.this.failure.get() != null, ActionListener.wrap(exchangeResponse -> {
                    Page takePage = exchangeResponse.takePage();
                    if (takePage != null) {
                        ExchangeSourceHandler.this.buffer.addPage(takePage);
                    }
                    if (exchangeResponse.finished()) {
                        onSinkComplete();
                        return;
                    }
                    SubscribableListener<Void> waitForWriting = ExchangeSourceHandler.this.buffer.waitForWriting();
                    if (!waitForWriting.isDone()) {
                        waitForWriting.addListener(ActionListener.wrap(r4 -> {
                            if (loopControl.tryResume()) {
                                return;
                            }
                            fetchPage();
                        }, this::onSinkFailed));
                    } else {
                        if (loopControl.tryResume()) {
                            return;
                        }
                        fetchPage();
                    }
                }, this::onSinkFailed));
            }
            loopControl.exited();
        }

        void onSinkFailed(Exception exc) {
            ExchangeSourceHandler.this.failure.getAndUpdate(exc2 -> {
                if (exc2 == null) {
                    return exc;
                }
                if (ExceptionsHelper.unwrap(exc, new Class[]{TaskCancelledException.class}) != null) {
                    return exc2;
                }
                if (ExceptionsHelper.unwrap(exc2, new Class[]{TaskCancelledException.class}) != null) {
                    return exc;
                }
                if (ExceptionsHelper.unwrapCause(exc2) != ExceptionsHelper.unwrapCause(exc)) {
                    exc2.addSuppressed(exc);
                }
                return exc2;
            });
            onSinkComplete();
        }

        void onSinkComplete() {
            if (this.finished) {
                return;
            }
            this.finished = true;
            if (ExchangeSourceHandler.this.outstandingSinks.finishInstance()) {
                ExchangeSourceHandler.this.buffer.finish(false);
            }
        }
    }

    public ExchangeSourceHandler(int i, Executor executor) {
        this.buffer = new ExchangeBuffer(i);
        this.fetchExecutor = executor;
    }

    public ExchangeSource createExchangeSource() {
        return new LocalExchangeSource();
    }

    public void addRemoteSink(RemoteSink remoteSink, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            final RemoteSinkFetcher remoteSinkFetcher = new RemoteSinkFetcher(remoteSink);
            this.fetchExecutor.execute(new AbstractRunnable() { // from class: org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler.1
                public void onFailure(Exception exc) {
                    remoteSinkFetcher.onSinkFailed(exc);
                }

                protected void doRun() {
                    remoteSinkFetcher.fetchPage();
                }
            });
        }
    }

    protected void closeInternal() {
        Exception exc = this.failure.get();
        if (exc != null) {
            this.completionFuture.onFailure(exc);
        } else {
            this.completionFuture.onResponse((Object) null);
        }
    }

    public void addCompletionListener(ActionListener<Void> actionListener) {
        this.completionFuture.addListener(actionListener);
    }
}
