想要实现对接 DeepSeek并且以流式响应的形式返回到前端,不用等待太长时间就可以有更好的体验感,但是基于 java8和低版本 Spring 对 WebFlux支持不太友好的情况选择了用 webSocket的形式实现。

webSocketHandler

package org.lbp.modules.aiModel.webSocketHandler;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

@Component
public class DeepSeekWebSocketHandler extends TextWebSocketHandler {

    @Value("${deepseek.api.key}")
    private String apiKey;

    @Value("${deepseek.api.url}")
    private String apiUrl;

    @Value("${deepseek.api.model}")
    private String model;

    @Value("${deepseek.api.role}")
    private String role;

    private final OkHttpClient client;

    // 使用 OkHttpClient 作为单例,避免每次请求都创建新实例
    public DeepSeekWebSocketHandler() {
        this.client = new OkHttpClient.Builder().build();
    }

    @Override
    public void handleTextMessage(WebSocketSession session, TextMessage message) {
        String userMessage = message.getPayload();
        sendStreamingRequest(userMessage, session);
    }

    private void sendStreamingRequest(String userMessage, WebSocketSession session) {
        try {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("model", model);
            jsonObject.put("stream", true);

            JSONArray messagesArray = new JSONArray();
            JSONObject messageObject = new JSONObject();
            messageObject.put("role", role);
            messageObject.put("content", userMessage);
            messagesArray.add(messageObject);
            jsonObject.put("messages", messagesArray);

            Request request = new Request.Builder()
                    .url(apiUrl)
                    .addHeader("Authorization", "Bearer " + apiKey)
                    .addHeader("Content-Type", "application/json")
                    .post(RequestBody.create(MediaType.parse("application/json"), jsonObject.toJSONString()))
                    .build();

            // 发送请求并处理流式响应
            client.newCall(request).enqueue(new Callback() {
                @Override
                public void onFailure(Call call, IOException e) {
                    try {
                        session.sendMessage(new TextMessage("API 请求失败:" + e.getMessage()));
                        session.close();
                    } catch (IOException ignored) {
                    }
                }
                @Override
                public void onResponse(Call call, Response response) throws IOException {
                    if (!response.isSuccessful()) {
                        session.sendMessage(new TextMessage("请求失败,状态码: " + response.code()));
                        session.close();
                        return;
                    }
                    try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.body().byteStream()))) {
                        String line;
                        ObjectMapper objectMapper = new ObjectMapper();
                        while ((line = reader.readLine()) != null) {
                            if (line.startsWith("data:")) {
                                String jsonData = line.substring(5).trim();
                                if ("[DONE]".equals(jsonData)) {
                                    break;
                                }
                                try {
                                    JsonNode jsonNode = objectMapper.readTree(jsonData);
                                    JsonNode choices = jsonNode.get("choices");
                                    if (choices != null && choices.isArray() && choices.size() > 0) {
                                        JsonNode delta = choices.get(0).get("delta");
                                        if (delta != null) {
                                            session.sendMessage(new TextMessage(delta.toString()));
                                        }
                                    }
                                } catch (Exception e) {
                                    session.sendMessage(new TextMessage("JSON 解析错误: " + e.getMessage()));
                                }
                            }
                        }
                    }
                }
            });
        } catch (Exception e) {
            try {
                session.sendMessage(new TextMessage("错误: " + e.getMessage()));
                session.close();
            } catch (Exception ignored) {
            }
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        System.out.println("WebSocket 连接关闭:" + session.getId() + ",状态:" + status);
    }
}

websocketConfig

package org.lbp.modules.aiModel.config;

import org.lbp.modules.aiModel.webSocketHandler.DeepSeekWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.*;

@Configuration
@EnableWebSocket
public class MyWebSocketConfig implements WebSocketConfigurer {

    private final DeepSeekWebSocketHandler webSocketHandler;

    public MyWebSocketConfig(DeepSeekWebSocketHandler webSocketHandler) {
        this.webSocketHandler = webSocketHandler;
    }

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(webSocketHandler, "/ws/deepSeek/chat")
                .setAllowedOrigins("*");
    }

}

在postman 测试,新建一个 websocket接口

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐