国产提示词编排平台毕昇源码阅读
最近看到的一个开源的提示词编排平台bisheng,音同「毕昇」,项目介绍说 「“毕昇”是活字印刷术的发明人,活字印刷术为人类知识的传递起到了巨大的推动作用。我们希望“毕昇”同样能够为智能应用的广泛落地提供有力的支撑」。看了下团队团队前身为国内人工智能独角兽企业第四范式的智能文档产品事业部,后根据发展需要进行业务独立拆分与运营,专注于非结构化数据的价值挖掘、信息处理自动化与数据即服务,第四范式在 AI 行业深耕多年,我比较期待能在这个项目里看到一些企业落地实践,所以阅读了毕昇平台的源码,写篇文章分享下。
我的新书《LangChain编程从入门到实践》 已经开售!推荐正在学习AI应用开发的朋友购买阅读!
最近看到的一个开源的提示词编排平台bisheng,音同「毕昇」,项目介绍说 「“毕昇”是活字印刷术的发明人,活字印刷术为人类知识的传递起到了巨大的推动作用。我们希望“毕昇”同样能够为智能应用的广泛落地提供有力的支撑」。看了下团队团队前身为国内人工智能独角兽企业第四范式的智能文档产品事业部,后根据发展需要进行业务独立拆分与运营,专注于非结构化数据的价值挖掘、信息处理自动化与数据即服务,第四范式在 AI 行业深耕多年,我比较期待能在这个项目里看到一些企业落地实践,所以阅读了毕昇平台的源码,写篇文章分享下。
项目演示里可以看到一些很不错的演示案例,比较贴合实际需求:
- 📃 合同审核报告生成
- 🏦 信贷调查报告生成
- 📈 招股书分析报告生成
- 💼 智能投顾报告生成
- 等等
技能模块源码
技能创建
这部分比较简单,就是序列化后入库
1 | @router.post('/', response_model=FlowRead, status_code=201) |
技能上线
技能上线时会触发编译动作,下面是一个具体步骤,以联网搜索技能为例:
- 权限和状态校验,进入
build_flow_no_yield
开始编译(注:编译实际上就是将参数传入相应节点后进行验证节点是否正常,比如向量数据库连通性,搜索工具连通性,大模型端点是否可达)
1 |
|
- 遍历技能模板的节点(根结点默认在节点列表第一个,序号为 0),进行编译。
vertex.artifacts
用作提示词变量,这些变量将传递给build_input_keys_response
函数以设置输入键值;向量数据库节点未配置集合名称时需要自动生成。
1 | def build_flow_no_yield(graph_data: dict, |
所有的节点都继承自
Vertex
对象,但是只有部分节点实现了自己的build
方法,编译过程实际上就是逐次执行节点的build
方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133class ToolVertex(Vertex)
class ToolkitVertex(Vertex)
class FileToolVertex(ToolVertex)
class OutputParserVertex(Vertex)
class DocumentLoaderVertex(Vertex)
class EmbeddingVertex(Vertex)
class VectorStoreVertex(Vertex)
class MemoryVertex(Vertex)
class RetrieverVertex(Vertex)
class TextSplitterVertex(Vertex)
# Agent 节点
class AgentVertex(Vertex):
...
def build(self, force: bool = False) -> Any:
if not self._built or force:
self._set_tools_and_chains()
for tool_node in self.tools:
tool_node.build()
for chain_node in self.chains:
chain_node.build(tools=self.tools)
self._build()
return self._built_object
# 大模型节点
class LLMVertex(Vertex):
...
def build(self, force: bool = False) -> Any:
# 因为有些模型可能会占用太多内存,选择懒加载(只在需要的时候加载它们)
if self.vertex_type == self.built_node_type:
return self.class_built_object
if not self._built or force:
self._build()
self.built_node_type = self.vertex_type
self.class_built_object = self._built_object
# 避免直接复制从文件中加载的 LLM
return self._built_object
# 通用工具节点
class WrapperVertex(Vertex):
...
def build(self, force: bool = False) -> Any:
# 主要处理 header 参数,比如这里联网搜索的例子中 bing_subscription_key 字段。
if not self._built or force:
if 'headers' in self.params:
self.params['headers'] = ast.literal_eval(self.params['headers'])
self._build()
return self._built_object
# 链节点
class ChainVertex(Vertex):
...
def build(
self,
force: bool = False,
tools: Optional[List[Union[ToolkitVertex, ToolVertex]]] = None,
) -> Any:
if not self._built or force:
for key, value in self.params.items():
if isinstance(value, PromptVertex):
# 构建PromptVertex,如果有工具则传递
self.params[key] = value.build(tools=tools, force=force)
self._build()
return self._built_object
# 提示词编辑节点
class PromptVertex(Vertex):
...
def build(
self,
force: bool = False,
tools: Optional[List[Union[ToolkitVertex, ToolVertex]]] = None,
) -> Any:
if not self._built or force:
if (
'input_variables' not in self.params
or self.params['input_variables'] is None
):
self.params['input_variables'] = []
# 检查是否为ZeroShotPrompt并需要工具
if 'ShotPrompt' in self.vertex_type:
tools = (
[tool_node.build() for tool_node in tools]
if tools is not None
else []
)
# 展开工具嵌套列表
if tools and isinstance(tools, list) and isinstance(tools[0], list):
tools = flatten_list(tools)
self.params['tools'] = tools
prompt_params = [
key
for key, value in self.params.items()
if isinstance(value, str) and key != 'format_instructions'
]
else:
prompt_params = ['template']
if 'prompt' not in self.params and 'messages' not in self.params:
for param in prompt_params:
prompt_text = self.params[param]
variables = extract_input_variables_from_prompt(prompt_text)
self.params['input_variables'].extend(variables)
self.params['input_variables'] = list(
set(self.params['input_variables'])
)
else:
self.params.pop('input_variables', None)
self._build()
return self._built_object
def _built_object_repr(self):
if (
not self.artifacts
or self._built_object is None
or not hasattr(self._built_object, 'format')
):
return super()._built_object_repr()
# 构建提示,以向用户展示带有填充变量的提示内容
artifacts = self.artifacts.copy()
artifacts.pop('handle_keys', None)
try:
template = self._built_object.format(**artifacts)
return (
template
if isinstance(template, str)
else f'{self.vertex_type}({template})'
)
except KeyError:
return str(self._built_object)以上即为技能模板首次创建时,各个节点的编译过程。
应用(新建会话)模块源码
应用创建
对话聊天接口采用websocket
协议,应用创建应用时会关联一个技能模板,会判断技能是否存在,上线状态以及是否编译成功。
1 |
|
技能编译
可以看到,执行build_flow_no_yield
子节点的编译后,最后会通过langchain_object = graph.build()
对技能模板整体进行编译,最后返回一个 Chain 对象,其实就和 langchain 里的 Chain 对象概念一样(比较讨巧的做法,Chain 的执行可以直接使用 langchain 的逻辑,不用再二次开发),感兴趣的可以读这篇文章,这里不再赘述。
1 | class Graph: |
最后将编译后的结果写入缓存,后续对话读入技能模板内容都是从缓存读取,不需要每次全量编译。
消息处理
消息处理环节,文字消息的处理时使用输入节点 InputNode,填充内容后,重新编译生成 Chain;文件消息的处理时输入节点 InputFileNode,将文件上传,嵌入后,重新编译生成 Chain,这个技能执行过程其实就是 langchian 的 Chains 执行过程,可以看相关代码解读,也可看我历史文章。
1 | async def handle_websocket(self, client_id: str, chat_id: str, websocket: WebSocket, |
后台引擎
前后端整体交互部分的逻辑没啥太多亮点,自研的文本处理引擎是我比较感兴趣的,但是实际看到开源出来的内容也不多,主要包括下面几部分。
1 | . |
- chains 工作链模块实现了文件合并的链
StuffDocumentsChain
和打印加载器输出的链LoaderOutputChain
- chat_models 模块主要国产模型和自托管模型的对接接口
- document_loaders 文档加载模块,文档解析的 LayoutParser,CRClient,ELLMClient,加载 PDF 的 PDFWithSemanticLoader,以及 UniversalKVLoader
- embeddings 嵌入模型接入了’WenxinEmbeddings’, ‘ME5Embedding’, ‘BGEZhEmbedding’, ‘GTEEmbedding’
- retrievers 检索模块实现了
MixEsVectorRetriever
,Elasticsearch 和向量数据库结合的查询方式,但当前应该只是 demo 状态,技能创建页面看不到。 - vectorstores 向量数据库模块实现了 Elasticsearch 关键字搜索接口 ElasticKeywordsSearch
后续会持续关注 document_loaders 模块的内容更新,毕竟在当前嵌入模型效果相差不大的情况下,非结构化的数据预处理对一个 RAG 引擎的作用很大。
国产提示词编排平台毕昇源码阅读