Java使用WebFlux调用大模型实现智能对话
本文介绍了如何使用Java WebFlux实现大模型流式输出的技术方案。主要包括:1) 引入webflux依赖并排除Tomcat容器;2) 定义请求和接收数据对象;3) 配置响应无缓存;4) 分析大模型返回的数据格式;5) 通过WebClient实现流式调用,包括SSL配置和响应处理。核心是使用Flux处理分段返回的数据流,实现实时响应效果。
·
Java使用WebFlux调用大模型实现流式输出
下文将简要介绍Java使用webFlux调用大模型实现流式输出
1.引入依赖
如果使用了tomcat作为容器需要排除tomcat,webflux使用Netty作为容器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
2.定义请求类和接收类
AiPromptDto 用于接收用户输入信息
@Data
public class AiPromptDto {
/**
* 大模型id
*/
private String serviceId;
/**
* 用户输入
*/
private String userInput;
/**
* sessionId
*/
private String sessionId;
/**
* 请求id
*/
private String requestId;
/**
* 获取token
*/
private String token;
}
答案接收对象
@Data
@AllArgsConstructor
public class AnswerChunk {
/**
* 返回的内容
*/
private String content;
private String sessionId;
}
3.修改application.yml
此处配置response没有缓存,否则可能会阻塞,不会实时返回
reactor:
netty:
response:
buffer-size: 0
4.测试大模型获取数据格式
1.欢迎词
userinput:你好?
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"messageId":"d47cce80-bcf0-49fe-8e23-06bb5ab79af3","messageContent":"消息1:我是一个聊天机器人,这里是我的消息"}
id:[DONE]
data:[DONE]
2.问答
userinput:物料00NY681的库存有多少个?
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"库"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"存"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"中"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"物"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"料"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"0"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"0"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"N"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"Y"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"6"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"8"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"1"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"的"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"数"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"量"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"为"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"1"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"个"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"。"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"inquiryList":"[\"物料00NY681的库存是否充足?\",\"物料00NY681的库存位置在哪里?\",\"如何补充物料00NY681的库存?\"]"}
id:[DONE]
data:[DONE]
4.定义Service接口和实现类
webflux返回Mono或者Flux
public interface AiService {
/**
* 根据请求获取流式返回的答案
* @param request
* @return
*/
Flux<AnswerChunk> processStream(AiPromptDto request);
}
实现类AIServiceImpl
import org.springframework.web.reactive.function.client.WebClient;
@Service
public class AIServiceImpl implements AiService {
private final WebClient webClient;
//初始化webClient,并ssl校验,生产环境不要跳过
public AIServiceImpl(WebClient.Builder webClientBuilder) {
// 使用InsecureTrustManagerFactory来信任所有证书
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE);
HttpClient httpClient = HttpClient.create()
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContextBuilder))
.responseTimeout(Duration.ofMinutes(timeout));
this.webClient = webClientBuilder.clientConnector(
new ReactorClientHttpConnector(httpClient)
).baseUrl(aiForceUrl).build();
}
@Override
public Flux<AnswerChunk> processStream(AiPromptDto request) {
String body = JSONUtil.toJsonStr(request);//参数都转化为json字符串
return webClient.post()
.uri(aiForceUrl + "/aiforceplatformapi/openapi/llm/debugSse")//大模型地址
.bodyValue(body)//body参数
.header("token", request.getToken())//设置请求头
.header("Content-Type", "application/json")
.retrieve()//retrieve 方法会从服务器响应中提取数据
.bodyToFlux(String.class)//响应体解析为一个流式的 String 类型序列
.map(chunk -> {//解析数据以供存储
//System.out.println("chunk = " + chunk);
String content = "";
// 解析大模型返回数据
if (!chunk.contains("[DONE]")) {//结束标志
if (chunk.contains("inquiryList")) {//处理返回的关联查询列表
content = parseChunk(chunk);
finalAnswer[0].setQueryList(content);
}else if (chunk.contains("messageId")&&chunk.contains("messageContent")) {//处理返回提示message
parseMessage(chunk, messageMap);
} else if (chunk.contains("data")) {//处理返回的问题答案
content = parseChunk(chunk);
redisTemplate.opsForValue().append(request.getRequestId() + "_result", content);
} else if (chunk.contains("question")) {//处理返回question
//先删除
questionService.deleteQuestionsByPreviousIdAndRequest(questionId, requestId);
//保存ai返回的question
} else if (chunk.contains("image")) {//处理图片
parseImages(chunk, imagesUrl);
} else if (chunk.contains("referenceInfo")) {//处理参照信息
parseReference(chunk, aiAnswerReferenceList);
}
} else {
// 处理结束
end.set("[DONE]");
finalAnswer[0].setState("DONE");
}
if (StringUtils.isEmpty(chunk)) {
chunk = "";
}
return new AnswerChunk(chunk, request.getRequestId());
})
.doOnComplete(() -> {//答案都完成后存储对应数据到数据库中
String finalContent = redisTemplate.opsForValue().get(request.getRequestId() + "_result");
redisTemplate.delete(request.getRequestId());
//保存答案
String returnAnswer = "";
JSONObject answer1 = new JSONObject().putOnce("data", finalContent);
//具体实现
})
.onErrorResume(e -> {//错误情况处理
finalAnswer[0].setState("FAILED");
answerService.saveOrUpdate(finalAnswer[0]);
return Flux.error(e);
});
}
}
5.定义Controller
@RestController
@RequestMapping("/aiAgent")
public class AiForceController {
/**
* 获取内容
*
* @param request MediaType.TEXT_EVENT_STREAM_VALUE 流式输出,否则会一次返回
* charset=UTF-8 字符集,不设置会乱码
* 注意:使用get会中文乱码
* @return
*/
@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE + ";charset=UTF-8")
public Flux<ServerSentEvent<String>> streamResponse(@RequestBody AiForcePromptDto request) {
return aiService.processStream(request)
.limitRate(100) // 限制每秒最大请求数
.onBackpressureBuffer(100,//背压策略:缓冲区大小为 100
buffer -> {
logger.warn("Backpressure buffer overflow, dropping {} items", buffer);
}).publishOn(Schedulers.boundedElastic(),1) // 单线程调度确保顺序
.flatMap(chunk -> { // 使用 flatMap 将一个异步流中的每个元素映射为另一个流,并将这些流合并为一个单一的流
String content = chunk.getContent();
if (StringUtils.isNotBlank(content)) {
String processedContent = content.replaceAll("`{3}", "\n```"); // 规范代码块格式
return Flux.just(ServerSentEvent.<String>builder()
.id(request.getRequestId())
.data(processedContent)
.build());
}
return Flux.empty();//如果内容为空,就返回空的flux
}, 1) // 设置并发度为 1,确保逐条发送
.doOnNext(event -> logger.info("Streaming chunk: {}", event.data())); // 日志记录每次发送的数据
}
}
// Flux<ServerSentEvent<String>> 实现 SSE(Server-Sent Events),以便客户端可以实时接收服务器推送的消息
6.调用结果
注意:在部署时,如果使用到了nginx需要配置
chunked_transfer_encoding off 关闭分块传输,会发送完整的数据
proxy_buffering off #禁用代理缓冲,适用于流式传输
gzip off ##关闭压缩,数据以未压缩的方式传输
add_header Cache-Control “no-cache” header定义无缓存
add_header X-Accel-Buffering no;##禁用 Nginx 的缓冲功能,确保数据实时传输
有任何问题,可以评论,欢迎交流!
更多推荐
所有评论(0)