前言
这是个超级大坑,要一点时间才能更新完。总之,从这里开始吧。
function call
是大模型工程化的一个重要部分。通过function call
,我们可以将模型与数据进行交互,从而实现模型的功能化。我们以一个文件的解析为例,探讨文件解析应该怎么触发。
环境准备
这个已经是一个工程项目了,因为他包括大模型部分、文件解析部分以及前端页面部分,所以需要做一点准备。
比较关键的是,他既然是这样一个工程项目,那么就一定会需要实现各种工厂、策略、装饰器,还有各种各样的继承。其中最要命的就是:
写一个模块测试一个模块之后,如果不好好处理依赖,就会出现环形继承。然后,就崩溃了。和程序一起崩溃的,还有完全不知道从哪里开始下手的你我。
所以一定要避免环形继承!
当然,他是一个演示项目,所以很多部分也是能简单就简单。
比如说,前端页面并不打算采用HTML
或者什么前端框架,后面再用axios
进行请求的话,大模型部分就不得不引申出一个django
了,这个演示项目也就跟Spring
项目一样,莫名其妙的越来越大。所以呢,整体筛选下来,所选用的技术基本上就是这些:
技术名称 | 用途 |
---|---|
Streamlit |
前端 |
langchain |
大模型部分 |
milvus |
文件解析部分 |
dashscope |
词嵌入服务和通义智能体 |
看着似乎很全面。于是,我们也就给出requirements.txt
:
dashscope==1.22.1
langchain==0.3.16
langchain-community==0.3.16
langchain-openai==0.3.17
milvus-lite==2.4.11
openai==1.65.4
playwright==1.49.1
pydantic==2.10.6
pymilvus==2.5.4
PyYAML==6.0.2
requests==2.32.3
streamlit==1.41.1
应该是够的。
你可以使用poetry
,还可以使用conda
,但是也希望你能够只在生产环境中直接使用pip
。
具体安装命令就不在这边多说了。
基本结构
当然,作为一个开发者,当然希望自己的东西能够成为日后各位的模板。
所以呢,这个结构也会尽可能考虑到各位之后的种种要求。比如说:
- 没有什么用户逻辑的时候,就尽可能用
markdown
简化页面构建; - 大模型部分尽可能抽离出来,供各位随意插拔;
- 大模型参数尽可能抽离出来,供各位随意配置;
- 大模型工具尽可能抽离出来,供各位随意归类;
当然,想法是好的,至于实现起来离各位的预期还有多远,那就看各位反映的问题了。
于是呢,在上述思路的约束下,整个项目的结构也就稍微有了那么一些可以清晰归类的逻辑。就像这样:
project
├── factory
│ ├── base.py
│ ├── tongyi.py
│ ├── client.py
│ └── 你其他自定义的LLM.py
├── utils
│ ├── common.py
│ ├── load_md.py
│ └── managers
│ ├── files.py
│ └── embeddings.py
├── page
│ └── main.md
├── ui
│ ├── qa.py
│ └── modal.py
├── app.py
├── local.db
├── README.md
└── requirements.txt
结构简述
在上述结构中,可以看到,分了几个比较主要的文件夹:factory
、utils
、page
、ui
,剩下的就是放在根目录的各种东西。
首先,factory
部分说的就是LLM
的实现。这个部分其实也是提供ChatOpenAI
对象。为了使得创建对象的过程相对来说可以管理,这里也是每个文件都给出了一个单例工厂,通过统一的方式生成ChatOpenAI
对象。
然后就是utils
部分。这个部分主要是工具类,东西也相对来说较杂。
common.py
主要是utils
内部的常用工具,最好不要对外引用,因为很容易循环引用;managers
子模块里面包含了一些管理工具,实际上也是大模型进行工地调用时候的一些脚本。load_md
是读取本地markdown
文件,简化了编辑说明性质页面的工作。
最后就是ui
模块。这个模块的主要功能就是给用户提供一个可以问答的界面。暂时只给一个纯文本和一个多模态的问答界面,作为纯文本和多模态的用例。
多出来的client.py
为啥这里会多出来一个这个呢?我们得从很开始的时候讲。
正如上一篇文章所说,ChatOpenAI
是一个httpx
客户端。而对于这个客户端而言,本身其实是有一定的内存优化的。
就像httpx
的官方文档所说:
a Client instance uses HTTP connection pooling. This means that when you make several requests to the same host, the Client will reuse the underlying TCP connection, instead of recreating one for every single request.
This can bring significant performance improvements compared to using the top-level API, including:
- Reduced latency across requests (no handshaking).
- Reduced CPU usage and round-trips.
- Reduced network congestion.
而这部分呢,主要是他自己实现了一个HTTP
连接池。在httpx._transports.default
中,实现了一个HTTPTransport
类,继承自BaseTransport
抽象类。
其中核心部分采用httpcore._sync.connection_pool.ConnectionPool
类实现,主要就是在类中维护了一个HTTPConnection
列表,然后用一个进程锁保证多个HTTPConnection
实例之间的线程安全。具体核心部分如下:
首先是保证所有的请求都在一个队列中进行
with self._optional_thread_lock:
# Add incoming connection to our request queue.
pool_request = PoolRequest(request)
self._requests.append(pool_request)
其次是请求结束前始终进行
try:
while True:
with self._optional_thread_lock:
# Assign incoming requests to available connections,
# closing or creating new connections as required.
closing = self._assign_requests_to_connections()
self._close_connections(closing)
# Wait until this request has an assigned connection.
connection = pool_request.wait_for_connection(timeout=timeout)
try:
# Send the request on the assigned connection.
response = connection.handle_request(
pool_request.request
)
except ConnectionNotAvailable:
# In some cases a connection may initially be available to
# handle a request, but then become unavailable.
#
# In this case we clear the connection and try again.
pool_request.clear_connection()
else:
break # pragma: nocover
except BaseException as exc:
with self._optional_thread_lock:
# For any exception or cancellation we remove the request from
# the queue, and then re-assign requests to connections.
self._requests.remove(pool_request)
closing = self._assign_requests_to_connections()
self._close_connections(closing)
raise exc from None
# Return the response. Note that in this case we still have to manage
# the point at which the response is closed.
assert isinstance(response.stream, typing.Iterable)
return Response(
status=response.status,
headers=response.headers,
content=PoolByteStream(
stream=response.stream, pool_request=pool_request, pool=self
),
extensions=response.extensions,
)
总结
所以呢,为了在项目中尽可能保证这个优化,我们必须得复用httpx
客户端,也就是说,无论多少个ChatOpenAI
实例,实际上只有一个httpx
实例在不停的复用,这样带来的优化收益就会非常明显。
那么,我们要做的,实际上就是实现client
的单例。这样的话,这个项目无论使用多少次工厂,最终就只会有一个client
,所有的ChatOpenAI
实例都会复用这个client
,从而用上了它自带的网络优化。
local.db和不可不忽视的OS限制
在这篇文章产出时,pymilvus
是支持Windows
的,在官方文档中可以看到安装命令。
但比较尴尬的是,其中的一个依赖项milvus-lite
不支持Windows
(参考这个issue)。
人家也说的相当明确了:
所以呢,从结论上来说,推荐使用Ubuntu
、MacOS
。当然,由于我头铁尝试了AlmaLinxu
,似乎没有什么大问题,所以目前来说AlmaLinux
也是可行的。
然后就是根目录下的local.db
了。这个东西其实就是milvus
的本地数据库,方便了本地存储与访问,不需要满世界找是否有这么一个云服务器提供服务了,也不需要自己想办法搭建。
动手
差不多就介绍到这里吧。我们接下来就一步步做了。
如果你需要看看有没有什么更新,请在我的主页中查询【用FunctionCall实现文件解析】,看看有没有什么你感兴趣的。