package io.modelcontextprotocol.server.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.vladsch.flexmark.util.sequence.SequenceUtils;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.ServerMcpTransport;
import io.modelcontextprotocol.util.Assert;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:mcp/mcp-0.8.0-SNAPSHOT.jar:io/modelcontextprotocol/server/transport/StdioServerTransport.class */
public class StdioServerTransport implements ServerMcpTransport {
    private static final Logger logger = LoggerFactory.getLogger(StdioServerTransport.class);
    private final Sinks.Many<McpSchema.JSONRPCMessage> inboundSink;
    private final Sinks.Many<McpSchema.JSONRPCMessage> outboundSink;
    private ObjectMapper objectMapper;
    private Scheduler inboundScheduler;
    private Scheduler outboundScheduler;
    private volatile boolean isClosing;
    private final InputStream inputStream;
    private final OutputStream outputStream;
    private final Sinks.One<Void> inboundReady;
    private final Sinks.One<Void> outboundReady;

    public StdioServerTransport() {
        this(new ObjectMapper());
    }

    public StdioServerTransport(ObjectMapper objectMapper) {
        this.isClosing = false;
        this.inboundReady = Sinks.one();
        this.outboundReady = Sinks.one();
        Assert.notNull(objectMapper, "The ObjectMapper can not be null");
        this.inboundSink = Sinks.many().unicast().onBackpressureBuffer();
        this.outboundSink = Sinks.many().unicast().onBackpressureBuffer();
        this.objectMapper = objectMapper;
        this.inputStream = System.in;
        this.outputStream = System.out;
        this.inboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "inbound");
        this.outboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "outbound");
    }

    @Override // io.modelcontextprotocol.spec.McpTransport
    public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> function) {
        return Mono.fromRunnable(() -> {
            handleIncomingMessages(function);
            startInboundProcessing();
            startOutboundProcessing();
        }).subscribeOn(Schedulers.boundedElastic());
    }

    private void handleIncomingMessages(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> function) {
        this.inboundSink.asFlux().flatMap(jSONRPCMessage -> {
            return Mono.just(jSONRPCMessage).transform(function).contextWrite(context -> {
                return context.put("observation", "myObservation");
            });
        }).doOnTerminate(() -> {
            this.outboundSink.tryEmitComplete();
            this.inboundScheduler.dispose();
        }).subscribe();
    }

    @Override // io.modelcontextprotocol.spec.McpTransport
    public Mono<Void> sendMessage(McpSchema.JSONRPCMessage jSONRPCMessage) {
        return Mono.zip(this.inboundReady.asMono(), this.outboundReady.asMono()).then(Mono.defer(() -> {
            return this.outboundSink.tryEmitNext(jSONRPCMessage).isSuccess() ? Mono.empty() : Mono.error(new RuntimeException("Failed to enqueue message"));
        }));
    }

    private void startInboundProcessing() {
        this.inboundScheduler.schedule(() -> {
            this.inboundReady.tryEmitValue(null);
            try {
                try {
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.inputStream));
                    while (true) {
                        if (this.isClosing) {
                            break;
                        }
                        try {
                            String readLine = bufferedReader.readLine();
                            if (readLine == null || this.isClosing) {
                                break;
                            }
                            logger.debug("Received JSON message: {}", readLine);
                            try {
                                if (!this.inboundSink.tryEmitNext(McpSchema.deserializeJsonRpcMessage(this.objectMapper, readLine)).isSuccess()) {
                                    break;
                                }
                            } catch (Exception e) {
                                logIfNotClosing("Error processing inbound message", e);
                            }
                        } catch (IOException e2) {
                            logIfNotClosing("Error reading from stdin", e2);
                        }
                    }
                    this.isClosing = true;
                    this.inboundSink.tryEmitComplete();
                } catch (Exception e3) {
                    logIfNotClosing("Error in inbound processing", e3);
                    this.isClosing = true;
                    this.inboundSink.tryEmitComplete();
                }
            } catch (Throwable th) {
                this.isClosing = true;
                this.inboundSink.tryEmitComplete();
                throw th;
            }
        });
    }

    private void startOutboundProcessing() {
        Function function = flux -> {
            return flux.doOnSubscribe(subscription -> {
                this.outboundReady.tryEmitValue(null);
            }).publishOn(this.outboundScheduler).handle((jSONRPCMessage, synchronousSink) -> {
                if (jSONRPCMessage == null || this.isClosing) {
                    if (this.isClosing) {
                        synchronousSink.complete();
                        return;
                    }
                    return;
                }
                try {
                    String replace = this.objectMapper.writeValueAsString(jSONRPCMessage).replace("\r\n", "\\n").replace(SequenceUtils.EOL, "\\n").replace("\r", "\\n");
                    synchronized (this.outputStream) {
                        this.outputStream.write(replace.getBytes(StandardCharsets.UTF_8));
                        this.outputStream.write(SequenceUtils.EOL.getBytes(StandardCharsets.UTF_8));
                        this.outputStream.flush();
                    }
                    synchronousSink.next(jSONRPCMessage);
                } catch (IOException e) {
                    if (this.isClosing) {
                        logger.debug("Stream closed during shutdown", e);
                    } else {
                        logger.error("Error writing message", e);
                        synchronousSink.error(new RuntimeException(e));
                    }
                }
            }).doOnComplete(() -> {
                this.isClosing = true;
                this.outboundScheduler.dispose();
            }).doOnError(th -> {
                if (this.isClosing) {
                    return;
                }
                logger.error("Error in outbound processing", th);
                this.isClosing = true;
                this.outboundScheduler.dispose();
            }).map(obj -> {
                return (McpSchema.JSONRPCMessage) obj;
            });
        };
        ((Flux) function.apply(this.outboundSink.asFlux())).subscribe();
    }

    @Override // io.modelcontextprotocol.spec.McpTransport
    public Mono<Void> closeGracefully() {
        return Mono.defer(() -> {
            this.isClosing = true;
            logger.debug("Initiating graceful shutdown");
            this.inboundSink.tryEmitComplete();
            logger.debug("Graceful shutdown complete");
            return Mono.empty();
        }).subscribeOn(Schedulers.boundedElastic());
    }

    @Override // io.modelcontextprotocol.spec.McpTransport
    public <T> T unmarshalFrom(Object obj, TypeReference<T> typeReference) {
        return (T) this.objectMapper.convertValue(obj, typeReference);
    }

    private void logIfNotClosing(String str, Exception exc) {
        if (this.isClosing) {
            return;
        }
        logger.error(str, exc);
    }

    private void logIfNotClosing(String str) {
        if (this.isClosing) {
            return;
        }
        logger.error(str);
    }
}
