package org.elasticsearch.compute.operator.exchange;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.OwningChannelActionListener;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.Transports;

/* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService.class */
public final class ExchangeService extends AbstractLifecycleComponent {
    public static final String EXCHANGE_ACTION_NAME = "internal:data/read/esql/exchange";
    private static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange";
    public static final String INACTIVE_SINKS_INTERVAL_SETTING = "esql.exchange.sink_inactive_interval";
    private static final Logger LOGGER;
    private final ThreadPool threadPool;
    private final Executor executor;
    private final BlockFactory blockFactory;
    private final Map<String, ExchangeSinkHandler> sinks = ConcurrentCollections.newConcurrentMap();
    private final Map<String, ExchangeSourceHandler> sources = ConcurrentCollections.newConcurrentMap();
    private final InactiveSinksReaper inactiveSinksReaper;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService$ExchangeTransportAction.class */
    private class ExchangeTransportAction implements TransportRequestHandler<ExchangeRequest> {
        private ExchangeTransportAction() {
        }

        public void messageReceived(ExchangeRequest exchangeRequest, TransportChannel transportChannel, Task task) {
            String exchangeId = exchangeRequest.exchangeId();
            OwningChannelActionListener owningChannelActionListener = new OwningChannelActionListener(transportChannel);
            ExchangeSinkHandler exchangeSinkHandler = ExchangeService.this.sinks.get(exchangeId);
            if (exchangeSinkHandler == null) {
                owningChannelActionListener.onResponse(new ExchangeResponse(null, true));
                return;
            }
            if (!exchangeSinkHandler.hasData()) {
                ((CancellableTask) task).addListener(() -> {
                    exchangeSinkHandler.onFailure(new TaskCancelledException("task cancelled"));
                });
            }
            exchangeSinkHandler.fetchPageAsync(exchangeRequest.sourcesFinished(), owningChannelActionListener);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService$InactiveSinksReaper.class */
    public final class InactiveSinksReaper extends AbstractAsyncTask {
        static final /* synthetic */ boolean $assertionsDisabled;

        InactiveSinksReaper(Logger logger, ThreadPool threadPool, Executor executor, TimeValue timeValue) {
            super(logger, threadPool, executor, timeValue, true);
            rescheduleIfNecessary();
        }

        protected boolean mustReschedule() {
            Lifecycle.State lifecycleState = ExchangeService.this.lifecycleState();
            return (lifecycleState == Lifecycle.State.STOPPED || lifecycleState == Lifecycle.State.CLOSED) ? false : true;
        }

        protected void runInternal() {
            if (!$assertionsDisabled && !Transports.assertNotTransportThread("reaping inactive exchanges can be expensive")) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !ThreadPool.assertNotScheduleThread("reaping inactive exchanges can be expensive")) {
                throw new AssertionError();
            }
            TimeValue interval = getInterval();
            long relativeTimeInMillis = ExchangeService.this.threadPool.relativeTimeInMillis();
            for (Map.Entry<String, ExchangeSinkHandler> entry : ExchangeService.this.sinks.entrySet()) {
                ExchangeSinkHandler value = entry.getValue();
                if (!value.hasData() || !value.hasListeners()) {
                    long lastUpdatedTimeInMillis = relativeTimeInMillis - value.lastUpdatedTimeInMillis();
                    if (lastUpdatedTimeInMillis > interval.millis()) {
                        ExchangeService.this.finishSinkHandler(entry.getKey(), new ElasticsearchTimeoutException("Exchange sink {} has been inactive for {}", new Object[]{entry.getKey(), TimeValue.timeValueMillis(lastUpdatedTimeInMillis)}));
                    }
                }
            }
        }

        static {
            $assertionsDisabled = !ExchangeService.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService$OpenExchangeRequest.class */
    public static class OpenExchangeRequest extends TransportRequest {
        private final String sessionId;
        private final int exchangeBuffer;

        OpenExchangeRequest(String str, int i) {
            this.sessionId = str;
            this.exchangeBuffer = i;
        }

        OpenExchangeRequest(StreamInput streamInput) throws IOException {
            super(streamInput);
            this.sessionId = streamInput.readString();
            this.exchangeBuffer = streamInput.readVInt();
        }

        public void writeTo(StreamOutput streamOutput) throws IOException {
            super.writeTo(streamOutput);
            streamOutput.writeString(this.sessionId);
            streamOutput.writeVInt(this.exchangeBuffer);
        }
    }

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService$OpenExchangeRequestHandler.class */
    private class OpenExchangeRequestHandler implements TransportRequestHandler<OpenExchangeRequest> {
        private OpenExchangeRequestHandler() {
        }

        public void messageReceived(OpenExchangeRequest openExchangeRequest, TransportChannel transportChannel, Task task) throws Exception {
            ExchangeService.this.createSinkHandler(openExchangeRequest.sessionId, openExchangeRequest.exchangeBuffer);
            transportChannel.sendResponse(new TransportResponse.Empty());
        }
    }

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink.class */
    static final class TransportRemoteSink extends Record implements RemoteSink {
        private final TransportService transportService;
        private final BlockFactory blockFactory;
        private final DiscoveryNode node;
        private final Task parentTask;
        private final String exchangeId;
        private final Executor responseExecutor;

        TransportRemoteSink(TransportService transportService, BlockFactory blockFactory, DiscoveryNode discoveryNode, Task task, String str, Executor executor) {
            this.transportService = transportService;
            this.blockFactory = blockFactory;
            this.node = discoveryNode;
            this.parentTask = task;
            this.exchangeId = str;
            this.responseExecutor = executor;
        }

        @Override // org.elasticsearch.compute.operator.exchange.RemoteSink
        public void fetchPageAsync(boolean z, ActionListener<ExchangeResponse> actionListener) {
            this.transportService.sendChildRequest(this.node, ExchangeService.EXCHANGE_ACTION_NAME, new ExchangeRequest(this.exchangeId, z), this.parentTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler(actionListener, streamInput -> {
                return new ExchangeResponse(new BlockStreamInput(streamInput, this.blockFactory));
            }, this.responseExecutor));
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, TransportRemoteSink.class), TransportRemoteSink.class, "transportService;blockFactory;node;parentTask;exchangeId;responseExecutor", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->transportService:Lorg/elasticsearch/transport/TransportService;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->blockFactory:Lorg/elasticsearch/compute/data/BlockFactory;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->node:Lorg/elasticsearch/cluster/node/DiscoveryNode;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->parentTask:Lorg/elasticsearch/tasks/Task;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->exchangeId:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->responseExecutor:Ljava/util/concurrent/Executor;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, TransportRemoteSink.class), TransportRemoteSink.class, "transportService;blockFactory;node;parentTask;exchangeId;responseExecutor", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->transportService:Lorg/elasticsearch/transport/TransportService;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->blockFactory:Lorg/elasticsearch/compute/data/BlockFactory;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->node:Lorg/elasticsearch/cluster/node/DiscoveryNode;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->parentTask:Lorg/elasticsearch/tasks/Task;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->exchangeId:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->responseExecutor:Ljava/util/concurrent/Executor;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, TransportRemoteSink.class, Object.class), TransportRemoteSink.class, "transportService;blockFactory;node;parentTask;exchangeId;responseExecutor", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->transportService:Lorg/elasticsearch/transport/TransportService;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->blockFactory:Lorg/elasticsearch/compute/data/BlockFactory;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->node:Lorg/elasticsearch/cluster/node/DiscoveryNode;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->parentTask:Lorg/elasticsearch/tasks/Task;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->exchangeId:Ljava/lang/String;", "FIELD:Lorg/elasticsearch/compute/operator/exchange/ExchangeService$TransportRemoteSink;->responseExecutor:Ljava/util/concurrent/Executor;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public TransportService transportService() {
            return this.transportService;
        }

        public BlockFactory blockFactory() {
            return this.blockFactory;
        }

        public DiscoveryNode node() {
            return this.node;
        }

        public Task parentTask() {
            return this.parentTask;
        }

        public String exchangeId() {
            return this.exchangeId;
        }

        public Executor responseExecutor() {
            return this.responseExecutor;
        }
    }

    public ExchangeService(Settings settings, ThreadPool threadPool, String str, BlockFactory blockFactory) {
        this.threadPool = threadPool;
        this.executor = threadPool.executor(str);
        this.blockFactory = blockFactory;
        this.inactiveSinksReaper = new InactiveSinksReaper(LOGGER, threadPool, this.executor, settings.getAsTime(INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMinutes(5L)));
    }

    public void registerTransportHandler(TransportService transportService) {
        transportService.registerRequestHandler(EXCHANGE_ACTION_NAME, this.executor, ExchangeRequest::new, new ExchangeTransportAction());
        transportService.registerRequestHandler(OPEN_EXCHANGE_ACTION_NAME, this.executor, OpenExchangeRequest::new, new OpenExchangeRequestHandler());
    }

    ExchangeSinkHandler createSinkHandler(String str, int i) {
        ThreadPool threadPool = this.threadPool;
        Objects.requireNonNull(threadPool);
        ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(i, threadPool::relativeTimeInMillis);
        if (this.sinks.putIfAbsent(str, exchangeSinkHandler) != null) {
            throw new IllegalStateException("sink exchanger for id [" + str + "] already exists");
        }
        return exchangeSinkHandler;
    }

    public ExchangeSinkHandler getSinkHandler(String str) {
        ExchangeSinkHandler exchangeSinkHandler = this.sinks.get(str);
        if (exchangeSinkHandler == null) {
            throw new ResourceNotFoundException("sink exchanger for id [{}] doesn't exist", new Object[]{str});
        }
        return exchangeSinkHandler;
    }

    public void finishSinkHandler(String str, Exception exc) {
        ExchangeSinkHandler remove = this.sinks.remove(str);
        if (remove != null) {
            if (exc != null) {
                remove.onFailure(exc);
            }
            if (!$assertionsDisabled && !remove.isFinished()) {
                throw new AssertionError("Exchange sink " + str + " wasn't finished yet");
            }
        }
    }

    public ExchangeSourceHandler createSourceHandler(String str, int i, String str2) {
        ExchangeSourceHandler exchangeSourceHandler = new ExchangeSourceHandler(i, this.threadPool.executor(str2));
        if (this.sources.putIfAbsent(str, exchangeSourceHandler) != null) {
            throw new IllegalStateException("source exchanger for id [" + str + "] already exists");
        }
        exchangeSourceHandler.addCompletionListener(ActionListener.releasing(() -> {
            this.sources.remove(str);
        }));
        return exchangeSourceHandler;
    }

    public static void openExchange(TransportService transportService, DiscoveryNode discoveryNode, String str, int i, Executor executor, ActionListener<Void> actionListener) {
        transportService.sendRequest(discoveryNode, OPEN_EXCHANGE_ACTION_NAME, new OpenExchangeRequest(str, i), new ActionListenerResponseHandler(actionListener.map(empty -> {
            return null;
        }), streamInput -> {
            return TransportResponse.Empty.INSTANCE;
        }, executor));
    }

    public RemoteSink newRemoteSink(Task task, String str, TransportService transportService, DiscoveryNode discoveryNode) {
        return new TransportRemoteSink(transportService, this.blockFactory, discoveryNode, task, str, this.executor);
    }

    public boolean isEmpty() {
        return this.sources.isEmpty() && this.sinks.isEmpty();
    }

    protected void doStart() {
    }

    protected void doStop() {
        this.inactiveSinksReaper.close();
    }

    protected void doClose() {
        doStop();
    }

    public String toString() {
        return "ExchangeService{sinks=" + this.sinks.keySet() + ", sources=" + this.sources.keySet() + "}";
    }

    static {
        $assertionsDisabled = !ExchangeService.class.desiredAssertionStatus();
        LOGGER = LogManager.getLogger(ExchangeService.class);
    }
}
