本文是《LangChain实战课》系列的第二十三篇,将深入探讨LangChain应用在生产环境中需要考虑的各种关键问题,包括性能优化、错误处理、成本控制、监控日志等,帮助你构建稳定高效的AI应用系统。
一、前言
在前面的文章中,我们学习了如何使用LangChain构建各种AI应用并将其部署为API服务。然而,将应用从开发环境迁移到生产环境需要考虑更多因素:性能、稳定性、成本、可观测性等。本文将分享在生产环境中运行LangChain应用的最佳实践和优化策略。
二、缓存策略优化
1. 请求级别缓存
from langchain.cache import SQLiteCache, RedisCache
import langchain
import sqlite3
# 使用SQLite缓存
langchain.llm_cache = SQLiteCache(database_path=".langchain.db")
# 或者使用Redis缓存(推荐用于生产环境)
from redis import Redis
langchain.llm_cache = RedisCache(redis_=Redis(host='localhost', port=6379))
# 自定义缓存策略
from langchain.cache import BaseCache
from typing import Optional, Any
import hashlib
import json
class CustomCache(BaseCache):
def __init__(self):
self.cache = {
}
def lookup(self, prompt: str, llm_string: str) -> Optional[Any]:
key = self._generate_key(prompt, llm_string)
return self.cache.get(key)
def update(self, prompt: str, llm_string: str, return_val: Any) -> None:
key = self._generate_key(prompt, llm_string)
self.cache[key] = return_val
def _generate_key(self, prompt: str, llm_string: str) -> str:
return hashlib.md5(f"{
prompt}{
llm_string}".encode()).hexdigest()
# 使用自定义缓存
langchain.llm_cache = CustomCache()
2. 语义缓存
from langchain.globals import set_llm_cache
from langchain.cache import SemanticCache
from langchain.embeddings import OpenAIEmbeddings
# 基于语义相似度的缓存
set_llm_cache(
SemanticCache(
embedding=OpenAIEmbeddings(),
similarity_threshold=0.8 # 相似度阈值
)
)
3. 向量存储缓存
from langchain.storage import LocalFileStore
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.cache import SQLiteCache
# 创建基于向量存储的缓存系统
store = LocalFileStore("./cache/")
embeddings = OpenAIEmbeddings()
# 在链中显式使用缓存
from langchain.chains import LLMChain
from langchain.callbacks import ManagerCallbackHandler
class CachedChain:
def __init__(self, chain):
self.chain = chain
self.cache = {
}
async def arun(self, input_dict, **kwargs):
cache_key = self._generate_cache_key(input_dict)
if cache_key in self.cache:
return self.cache[cache_key]
result = await self.chain.arun(input_dict, **kwargs)
self.cache[cache_key] = result
return result
def _generate_cache_key(self, input_dict):
import json
return hashlib.md5(json.dumps(input_dict, sort_keys=True).encode()).hexdigest()
三、异步处理优化
1. 异步链设计
import asyncio
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
class AsyncChainProcessor:
def __init__(self, max_concurrent=10):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def process_batch(self, inputs):
"""批量处理输入,控制并发数"""
tasks = []
for input_data in inputs:
task = self._process_single(input_data)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def _process_single(self, input_data):
"""处理单个输入,带有信号量控制"""
async with self.semaphore:
try:
# 模拟异步处理
await asyncio.sleep(0.1