/*
 * Decompiled with CFR 0.152.
 */
package dev.langchain4j.community.model.qianfan.client;

import dev.langchain4j.community.model.qianfan.client.ErrorHandling;
import dev.langchain4j.community.model.qianfan.client.Json;
import dev.langchain4j.community.model.qianfan.client.ResponseLoggingInterceptor;
import dev.langchain4j.community.model.qianfan.client.StreamingCompletionHandling;
import dev.langchain4j.community.model.qianfan.client.StreamingResponseHandling;
import dev.langchain4j.community.model.qianfan.client.Utils;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingRequestExecutor<Request, Response, ResponseContent> {
    private static final Logger log = LoggerFactory.getLogger(StreamingRequestExecutor.class);
    private final OkHttpClient okHttpClient;
    private final String endpointUrl;
    private final Supplier<Request> requestWithStreamSupplier;
    private final Class<Response> responseClass;
    private final Function<Response, ResponseContent> streamEventContentExtractor;
    private final boolean logStreamingResponses;

    StreamingRequestExecutor(OkHttpClient okHttpClient, String endpointUrl, Supplier<Request> requestWithStreamSupplier, Class<Response> responseClass, Function<Response, ResponseContent> streamEventContentExtractor, boolean logStreamingResponses) {
        this.okHttpClient = okHttpClient;
        this.endpointUrl = endpointUrl;
        this.requestWithStreamSupplier = requestWithStreamSupplier;
        this.responseClass = responseClass;
        this.streamEventContentExtractor = streamEventContentExtractor;
        this.logStreamingResponses = logStreamingResponses;
    }

    StreamingResponseHandling onPartialResponse(final Consumer<ResponseContent> partialResponseHandler) {
        return new StreamingResponseHandling(){

            @Override
            public StreamingCompletionHandling onComplete(final Runnable runnable) {
                return new StreamingCompletionHandling(){

                    @Override
                    public ErrorHandling onError(Consumer<Throwable> errorHandler) {
                        return () -> StreamingRequestExecutor.this.stream(partialResponseHandler, runnable, errorHandler);
                    }

                    @Override
                    public ErrorHandling ignoreErrors() {
                        return () -> StreamingRequestExecutor.this.stream(partialResponseHandler, runnable, e -> {});
                    }
                };
            }

            @Override
            public ErrorHandling onError(Consumer<Throwable> errorHandler) {
                return () -> StreamingRequestExecutor.this.stream(partialResponseHandler, () -> {}, errorHandler);
            }

            @Override
            public ErrorHandling ignoreErrors() {
                return () -> StreamingRequestExecutor.this.stream(partialResponseHandler, () -> {}, e -> {});
            }
        };
    }

    private void stream(final Consumer<ResponseContent> partialResponseHandler, final Runnable streamingCompletionCallback, final Consumer<Throwable> errorHandler) {
        Request request = this.requestWithStreamSupplier.get();
        String requestJson = Json.toJson(request);
        Request okHttpRequest = new Request.Builder().url(this.endpointUrl).post(RequestBody.create((String)requestJson, (MediaType)MediaType.get((String)"application/json; charset=utf-8"))).build();
        EventSourceListener eventSourceListener = new EventSourceListener(){

            public void onOpen(EventSource eventSource, Response response) {
                if (StreamingRequestExecutor.this.logStreamingResponses) {
                    ResponseLoggingInterceptor.log(response);
                }
            }

            public void onEvent(EventSource eventSource, String id, String type, String data) {
                if (StreamingRequestExecutor.this.logStreamingResponses) {
                    log.debug("onEvent() {}", (Object)data);
                }
                if (!"[DONE]".equals(data)) {
                    try {
                        Object response = Json.fromJson(data, StreamingRequestExecutor.this.responseClass);
                        Object responseContent = StreamingRequestExecutor.this.streamEventContentExtractor.apply(response);
                        if (responseContent != null) {
                            partialResponseHandler.accept(responseContent);
                        }
                    }
                    catch (Exception var7) {
                        errorHandler.accept(var7);
                    }
                }
            }

            public void onClosed(EventSource eventSource) {
                if (StreamingRequestExecutor.this.logStreamingResponses) {
                    log.debug("onClosed()");
                }
                streamingCompletionCallback.run();
            }

            public void onFailure(EventSource eventSource, Throwable t, Response response) {
                if (StreamingRequestExecutor.this.logStreamingResponses) {
                    log.debug("reqeust url:", (Object)response.request().url());
                    log.debug("onFailure()", t);
                    ResponseLoggingInterceptor.log(response);
                }
                if (t != null) {
                    errorHandler.accept(t);
                } else {
                    try {
                        errorHandler.accept(Utils.toException(response));
                    }
                    catch (IOException var5) {
                        errorHandler.accept(var5);
                    }
                }
            }
        };
        EventSources.createFactory((OkHttpClient)this.okHttpClient).newEventSource(okHttpRequest, eventSourceListener);
    }
}

