GPTCache:革新大模型缓存,降低成本,提升效率
GPTCache项目通过语义缓存LLM响应,有效降低高流量下API调用成本和提高响应速度。项目提供从初始化到集成的详细步骤,支持自定义嵌入函数、数据管理器和相似度评估,优化了长序列处理和多轮对话的效率。此外,介绍了KV Cache在GPT2中的应用,展示了其在Transformer模型中的关键作用,通过缓存Key和Value状态减少计算量,加速模型推理。
·
GPTCache介绍
随着应用程序越来越受欢迎并遇到更高的流量水平,与 LLM API 调用相关的费用可能会变得相当可观。此外,LLM 服务的响应时间可能会很慢,尤其是在处理大量请求时。GPTCache是一个致力于构建用于存储 LLM 响应的语义缓存的项目。
项目架构

代码分段实现
1. 运行视觉问答示例
import openai
from gptcache import cache
# 问答函数
def ask_question(question, image_path=None):
if image_path:
prompt = f"Image: {image_path}\nQuestion: {question}"
else:
prompt = question
response = openai.Completion.create(
engine="davinci",
prompt=prompt,
max_tokens=50
)
return response['choices'][0]['text']
# 主函数
def main():
# 示例图片和问题
image_path = "path_to_image.jpg"
question_text = "What is shown in this image?"
# 处理包含图片的问答
if cache.contains((image_path, question_text)):
print("Cache hit:", cache.get((image_path, question_text)))
else:
answer = ask_question(question_text, image_path=image_path)
cache.set((image_path, question_text), answer)
print("Generated and cached:", answer)
if __name__ == "__main__":
main()
2. 设置嵌入函数
嵌入函数的作用是将用户的查询和响应数据转换为嵌入向量,从而实现高效的缓存管理和相似性搜索。
def to_embeddings(data, **kwargs):
return data
from gptcache.embedding import Onnx
onnx = Onnx()
embedding_func = onnx.to_embeddings
3. 设置数据管理器类
from gptcache.manager import get_data_manager, CacheBase, VectorBase
from gptcache import cache
import numpy as np
cache_base = CacheBase('sqlite')
vector_base = VectorBase('faiss', dimension=onnx.dimension)
data_manager = get_data_manager(cache_base, vector_base)
cache.init(
embedding_func=embedding_func,
data_manager=data_manager,
)
4. 初始化其他缓存参数
cache.init(
embedding_func=embedding_func,
data_manager=data_manager,
similarity_evaluation=similarity_evaluation,
)
cache.set_openai_key('your_openai_api_key')
5. 使用 GPTCache 服务器
$ gptcache_server -s 0.0.0.0 -p 8000 -d /path/to/cache_directory -f /path/to/cache_config.yaml
服务器配置示例(cache_config.yaml
):
embedding: onnx
embedding_config:
# Set embedding model params here
storage_config:
data_dir: gptcache_data
manager: sqlite,faiss
vector_params:
# Set vector storage related params here
evaluation: distance
evaluation_config:
# Set evaluation metric kws here
pre_function: get_prompt
post_function: first
config:
similarity_threshold: 0.8
# Set other config here
6. 基准测试
基准测试主要用于评估缓存系统的性能,包括缓存命中率、查询响应时间和资源利用率。通过基准测试,可以了解缓存策略的有效性,并根据测试结果进行优化
import json
import numpy as np
from gptcache import cache
from gptcache.manager import get_data_manager, CacheBase, VectorBase
from gptcache.similarity_evaluation.distance import SearchDistanceEvaluation
def mock_embeddings(data, **kwargs):
return np.random.random((8,)).astype('float32')
cache_base = CacheBase('sqlite')
vector_base = VectorBase('faiss', dimension=8)
data_manager = get_data_manager(cache_base, vector_base)
cache.init(
embedding_func=mock_embeddings,
data_manager=data_manager,
similarity_evaluation=SearchDistanceEvaluation(),
)
cache.set_openai_key('your_openai_api_key')
# 加载测试数据
with open('mock_data.json', 'r') as f:
data = json.load(f)
# 进行基准测试
for item in data:
prompt = item['prompt']
answer = item['answer']
if cache.contains(prompt):
print("Cache hit:", cache.get(prompt))
else:
cache.set(prompt, answer)
print("Generated and cached:", answer)
代码综合实现
import openai
from gptcache import cache
from gptcache.embedding import Onnx
from gptcache.manager import get_data_manager, CacheBase, VectorBase
from gptcache.similarity_evaluation.distance import SearchDistanceEvaluation
from gptcache.session import Session
import numpy as np
import json
openai.api_key = 'your_openai_api_key'
# 嵌入函数
def to_embeddings(data, **kwargs):
return data
# 问答函数
def ask_question(question, image_path=None):
if image_path:
prompt = f"Image: {image_path}\nQuestion: {question}"
else:
prompt = question
response = openai.Completion.create(
engine="davinci",
prompt=prompt,
max_tokens=50
)
return response['choices'][0]['text']
# 主函数
def main():
onnx = Onnx()
embedding_func = onnx.to_embeddings
# 数据管理器配置
cache_base = CacheBase('sqlite')
vector_base = VectorBase('faiss', dimension=onnx.dimension)
data_manager = get_data_manager(cache_base, vector_base)
# 初始化缓存
cache.init(
embedding_func=embedding_func,
data_manager=data_manager,
similarity_evaluation=SearchDistanceEvaluation(),
)
cache.set_openai_key('your_openai_api_key')
# 示例图片和问题
image_path = "path_to_image.jpg"
question_text = "What is shown in this image?"
# 处理包含图片的问答
if cache.contains((image_path, question_text)):
print("Cache hit:", cache.get((image_path, question_text)))
else:
answer = ask_question(question_text, image_path=image_path)
cache.set((image_path, question_text), answer)
print("Generated and cached:", answer)
# 基准测试
def benchmark():
def mock_embeddings(data, **kwargs):
return np.random.random((8,)).astype('float32')
cache_base = CacheBase('sqlite')
vector_base = VectorBase('faiss', dimension=8)
data_manager = get_data_manager(cache_base, vector_base)
cache.init(
embedding_func=mock_embeddings,
data_manager=data_manager,
similarity_evaluation=SearchDistanceEvaluation(),
)
cache.set_openai_key('your_openai_api_key')
# 加载测试数据
with open('mock_data.json', 'r') as f:
data = json.load(f)
# 进行基准测试
for item in data:
prompt = item['prompt']
answer = item['answer']
if cache.contains(prompt):
print("Cache hit:", cache.get(prompt))
else:
cache.set(prompt, answer)
print("Generated and cached:", answer)
if __name__ == "__main__":
main()
benchmark()
知识拓展
KV Cache 是一种大模型服务端的缓存机制,用于存储模型在前向传播过程中生成的键(Key)和值(Value)。在 Transformer 结构中,注意力机制会计算查询(Query)与键和值之间的关系。在处理长序列或多轮对话时,直接重新计算所有的键和值会非常耗时且计算资源密集。KV Cache 通过缓存这些中间结果,避免了重复计算,从而大大提高了推理效率。
GPT2 中 KV Cache 代码实现:
class GPT2Attention(nn.Module):
def forward(
self,
hidden_states: Optional[Tuple[torch.FloatTensor]],
layer_past: Optional[Tuple[torch.Tensor]] = None,
attention_mask: Optional[torch.FloatTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
encoder_hidden_states: Optional[torch.Tensor] = None,
encoder_attention_mask: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = False,
output_attentions: Optional[bool] = False,
) -> Tuple[Union[torch.Tensor, Tuple[torch.Tensor]], ...]:
...
# 拆分 Q、K、V
query, key, value = self.c_attn(hidden_states).split(self.split_size, dim=2)
...
# [batch, sequence_len, embeded_dim] -> [batch, heads, sequence_len, head_dim]
query = self._split_heads(query, self.num_heads, self.head_dim) # 当前token对应的query
key = self._split_heads(key, self.num_heads, self.head_dim) # 当前token对应的key
value = self._split_heads(value, self.num_heads, self.head_dim) # 当前token对应的value
##################################
# KV Cache 核心代码逻辑
if layer_past is not None:
past_key, past_value = layer_past # 从 KV Cache 去数据
key = torch.cat((past_key, key), dim=-2) # 将当前token的key与历史的K拼接
value = torch.cat((past_value, value), dim=-2) # 将当前token的value与历史的V拼接
if use_cache is True:
present = (key, value) # 将数据存到 KV Cache
else:
present = None
##################################
...
# 使用当前token的query与K和V计算注意力表示
attn_output, attn_weights = self._attn(query, key, value, attention_mask, head_mask) # 返回att输出(激活)和权重
# 合并多头注意力
# attn_output: [batch, heads, sequence_len, head_dim] -> [batch, heads, embed_dim]
attn_output = self._merge_heads(attn_output, self.num_heads, self.head_dim)
attn_output = self.c_proj(attn_output)
attn_output = self.resid_dropout(attn_output)
outputs = (attn_output, present)
if output_attentions:
outputs += (attn_weights,)
return outputs # a, present, (attentions)
参考文章
更多推荐
所有评论(0)