随着ai大模型的普及,大部分公司会在项目里接入大模型,如果做问答或者别的ai回答需求,通常会用到流式接口,小程序里怎么处理流式接口呢?

话不多说直接干货代码,直接封装成组件,然后在页面里引入并且调用fetchDeepSeekStream就可以了,并且有打断方法以及终止和结束和输出

import { useStore } from 'vuex';
import store from '/src/store/index';
import { ref } from 'vue';

const chatdocDatawang = ref('');

const app = getApp();

// 最大重试次数
const MAX_RETRIES = 3;
// 初始重试延迟(毫秒)
const INITIAL_RETRY_DELAY = 1000;

// 改进的数据处理类,使用状态机更高效地解析SSE格式
class SseParser {
    constructor() {
        this.buffer = '';
        this.events = [];
    }
    
    appendChunk(chunk) {
        this.buffer += chunk;
        this.parseEvents();
    }
    

//parse这个方法是我封装的去找流式接口里特定的字段里的内容

    parseEvents() {
        // 寻找完整的事件(以空行结尾)
        const eventRegex = /^((?:event: [^\n]*\n)?(?:data: [^\n]*\n)*)\n/gm;
        let match;
        
        while ((match = eventRegex.exec(this.buffer)) !== null) {
            const eventText = match[1];
            this.parseEvent(eventText);
            // 从缓冲区中移除已处理的事件
            this.buffer = this.buffer.substring(match.index + match[0].length);
            // 重置正则表达式索引
            eventRegex.lastIndex = 0;
        }
    }
    
    parseEvent(eventText) {
        const lines = eventText.split('\n');
        let eventName = '';
        let data = '';
        
        for (const line of lines) {
            if (line.startsWith('event: ')) {
                eventName = line.substring('event: '.length).trim();
            } else if (line.startsWith('data: ')) {
                data += line.substring('data: '.length) + '\n';
            }
        }
        
        // 移除最后一个多余的换行符
        if (data) {
            data = data.slice(0, -1);
        }
        
        if (eventName || data) {
            this.events.push({ eventName, data });
        }
    }
    
    getEvents() {
        return [...this.events];
    }
    
    clearEvents() {
        this.events = [];
    }
}

// 提取特定事件的数据
function extractEventData(events, eventName) {
    return events
        .filter(e => e.eventName === eventName)
        .map(e => e.data)
        .join('');
}

// 提取单个事件的数据
function extractSingleEventData(events, eventName) {
    const event = events.find(e => e.eventName === eventName);
    return event ? event.data : '';
}

// 提取所有 refer_doc 事件的数据
function extractAllReferDocs(events) {
    return events
        .filter(e => e.eventName === 'refer_doc')
        .map(e => {
            try {
                return JSON.parse(e.data.trim());
            } catch (e) {
                return e.data;
            }
        });
}

// 检查网络连接状态
async function checkNetwork() {
    try {
        const res = await uni.getNetworkType();
        return res.networkType !== 'none';
    } catch (e) {
        console.error('获取网络状态失败:', e);
        return true; // 默认为有网络连接
    }
}

export function fetchDeepSeekStream(messages, onProgress, onComplete, onError, onAbort) {
    const API_URL = '你的流式接口';

    let isStopped = false;
    let isCompleted = false;
    let retryCount = 0;
    let requestTask = null;
    const sseParser = new SseParser();
    
    // 创建请求函数,支持重试
    const createRequest = async () => {
        if (isStopped) return;
        
        // 实现指数退避策略
        if (retryCount > 0) {
            const delay = INITIAL_RETRY_DELAY * Math.pow(2, retryCount);
            await new Promise(resolve => setTimeout(resolve, delay));
        }
        
        // #ifdef MP-WEIXIN
        requestTask = uni.request({
            url: API_URL,
            enableChunked: true,
            responseType: 'text',
            method: "POST",
            header: {
                "Content-Type": "application/json",
                "Authorization": `Bearer ${uni.getStorageSync('access_token')}`
            },
            data: messages,
            timeout: 6000, 
            success: (res) => {
                console.log('引用来源返回', app.globalData.chatdocData);
                
                if (res.statusCode !== 200) {
                    handleError(new Error(`HTTP错误,状态码: ${res.statusCode}`));
                    return;
                }
                
                store.commit('appchiid', getRandom());
                
                if (res.data && typeof res.data === 'string') {
                    sseParser.appendChunk(res.data);
                    const events = sseParser.getEvents();
                    sseParser.clearEvents();
                    
                    const answerid = extractSingleEventData(events, 'answer_id');
                    if (answerid) {
                        app.globalData.answerid = answerid;
                        isCompleted = true;
                        if (onComplete) onComplete();
                    }
                    
                    const chatdocDataArray = extractAllReferDocs(events);
                    if (chatdocDataArray.length > 0) {
                        app.globalData.chatdocData = [...app.globalData.chatdocData, ...chatdocDataArray];
                    }
                }
            },
            fail: (err) => {
                console.error('请求失败:', err);
                handleError(err);
            },
            complete: () => {
                console.log('请求完成');
            }
        });

        requestTask.onChunkReceived(async (res) => {
            if (isStopped || isCompleted) return;
            
            try {
                const uint8Array = new Uint8Array(res.data);
//小程序里不返回汉字,可能为二进制编码数字,所以需要解码
                const chunkText = utf8ArrayToString([...uint8Array]);
//小程序里不返回汉字,可能为二进制编码数字,所以需要解码

//解码后输出打印的chunkText就是纯流式接口里的东西了
				console.log('chunkText',chunkText);
                
                // 使用SSE解析器处理数据块
                sseParser.appendChunk(chunkText);
                const events = sseParser.getEvents();
                sseParser.clearEvents();
                
                const questionid = extractSingleEventData(events, 'question_id');
                if (questionid) {
                    app.globalData.questionid = questionid;
                }
                
                const chatThinkData = extractEventData(events, 'think');
                
                const chatdocDataArray = extractAllReferDocs(events);
                if (chatdocDataArray.length > 0) {
                    app.globalData.chatdocData = [...app.globalData.chatdocData, ...chatdocDataArray];
                }

                const chatReplyData = extractEventData(events, 'chat_reply');
                const answerid = extractSingleEventData(events, 'answer_id');
                if (answerid) {
                    app.globalData.answerid = answerid;
                    isCompleted = true;
                    if (onComplete) onComplete();
                }

                let thinkNewline = chatThinkData;
                let replyNewline = chatReplyData;

                onProgress && onProgress(thinkNewline, replyNewline);
            } catch (error) {
                console.error('处理数据块时出错:', error);
                handleError(error);
            }
        });
        // #endif
    };

    // 改进的错误处理函数,支持更多错误类型和指数退避
    const handleError = async (err) => {
        if (isStopped) return;
        
        console.error('请求错误:', err);
        
        // 检查网络连接
        const isNetworkConnected = await checkNetwork();
        if (!isNetworkConnected) {
            console.error('网络连接已断开');
            isStopped = true;
            if (onError) onError(new Error('网络连接已断开'));
            return;
        }
        
        // 检查是否是可重试的错误
        const isRetryableError = 
            err.errMsg && (
                err.errMsg.includes('time out') || 
                err.errMsg.includes('network error') ||
                err.errMsg.includes('aborted')
            );
            
        // 重试请求
        if (isRetryableError && retryCount < MAX_RETRIES) {
            retryCount++;
            await createRequest();
            return;
        }
        
        isStopped = true;
        if (onError) onError(err);
    };

    // 创建初始请求
    createRequest().catch(err => {
        console.error('创建请求时出错:', err);
        if (!isStopped && onError) onError(err);
    });

    return () => {
        console.log('中断请求...');
        isStopped = true;

        // #ifdef MP-WEIXIN
        if (requestTask) {
            requestTask.abort();
        }
        // #endif 

        if (typeof onAbort === 'function') {
            onAbort();
        }
    };
}

function generateUniqueRandom() {
    let lastRandom = null;

    return function () {
        let newRandom;
        do {
            newRandom = Math.floor(Math.random() * 99) + 1;
        } while (newRandom === lastRandom);

        lastRandom = newRandom;
        return newRandom;
    };
}

const getRandom = generateUniqueRandom();

// 使用TextDecoder替代自定义的UTF-8转换函数
function utf8ArrayToString(array) {
    try {
        return new TextDecoder('utf-8').decode(new Uint8Array(array));
    } catch (e) {
        // 回退到原来的实现
        let out, i, len, c;
        let char2, char3;

        out = "";
        len = array.length;
        i = 0;
        while (i < len) {
            c = array[i++];
            switch (c >> 4) {
                case 0:
                case 1:
                case 2:
                case 3:
                case 4:
                case 5:
                case 6:
                case 7:
                    out += String.fromCharCode(c);
                    break;
                case 12:
                case 13:
                    char2 = array[i++];
                    out += String.fromCharCode(((c & 0x1F) << 6) | (char2 & 0x3F));
                    break;
                case 14:
                    char2 = array[i++];
                    char3 = array[i++];
                    out += String.fromCharCode(((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
                    break;
            }
        }

        return out;
    }
}

index里调用

const jiekoudiaooyng =()=>{
stopStreamFunction.value = fetchDeepSeekStream(
			messages,  //这儿是参数对象
			(thinkChunk, replyChunk) => {
//分别返回thinkChunk回调参数思考部分和replyChunk回调参数ai回答部分


				let decodedReplyChunk = '';
				decodedReplyChunk = md.render(replyChunk);

//这儿的md.render是我npm安装的markdown-it,用来解析markdown格式,然后在页面里v-html就可以渲染出来了

				// if (thinkChunk) {
				messageListthContent.value.push(renderMarkdownWithoutParagraphs(thinkChunk));
				messageList.value[aiMessageIndex].thinkContent = messageListthContent.value.join('');
				// 	topxThink()
				// }
				// if (replyChunk) {
				messageListreContent.value.push(renderMarkdownWithoutParagraphs(replyChunk));
				messageList.value[aiMessageIndex].content = messageListreContent.value.join('');
				topx()
				// }

			},

			(error) => {
				console.error('错误:', error);
				if (messageList.value.length > 0) {
					const aiMessageIndex = messageList.value.length - 1;
					// 检查 messageList.value[aiMessageIndex] 是否存在
					if (messageList.value[aiMessageIndex]) {
						Instreamingoutput.value = false;
						messageList.value[aiMessageIndex].loading = false;
					} else {
						console.error('messageList.value[aiMessageIndex] 为 undefined,无法更新 loading 状态');
					}
				} else {}


			},
			() => {
				console.log('中断请求...');
				Instreamingoutput.value = false;

				if (messageList.value.length > 0) {
					const aiMessageIndex = messageList.value.length - 1;
					// 检查 messageList.value[aiMessageIndex] 是否为 undefined
					if (messageList.value[aiMessageIndex]) {
						// messageList.value[aiMessageIndex].content = '好的,已停止回答,有需要再问我';
						messageList.value[aiMessageIndex].loading = false;
					} else {
						console.error('messageList.value[aiMessageIndex] 为 undefined,无法更新内容');
					}
				} else {}
			},

		);

		optshareid.value = ''
		// newaddupload.value = []
	};

}


以上代码开箱即用,小程序里直接直接复制粘贴到一个组件页面里,在要调用流式的页面里直接引入并调用如代码里所示就可以渲染流式里的东西了,还会自动形成打字效果!!

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐