零框架打造 LLM+MCP+RAG Agent极简客户端
思维大导图
graph TD subgraph "1. 初始化阶段 (src/index.ts)" A[应用启动] --> B(加载 .env 配置); B --> C[创建 OpenAI 客户端]; B --> D[创建 MCP SDK 实例]; B --> E[创建 ChatOpenAI 实例]; E -- 依赖 --> C; B --> F[创建 MCPClient 实例]; F -- 依赖 --> D; B --> G[创建 MemoryVectorStore 实例]; G -- 实现 --> H(VectorStore接口); B --> I[创建 EmbeddingRetrivers 实例]; I -- 依赖 --> C; I -- 依赖 --> G; B --> J[创建 Agent 实例]; J -- 依赖 --> E; J -- 依赖 --> F; J -- 依赖 --> I; A --> K(示例: 添加文档); K -- 调用 addDocuments --> I; I -- 内部调用 embedDocuments --> C; I -- 内部调用 addVectors --> G; end subgraph "2. Agent 执行阶段 (Agent.execute)" L[用户 Query 输入] --> J; J --> M{执行 Agent.execute}; M --> N[调用 embeddingRetriever.similaritySearch]; N -- 获取 Query Embedding --> C; N -- 调用 vectorStore.similaritySearchVectorWithScore --> G; G -- 返回 chunks & scores --> N; N -- 返回 Context --> M; M --> P{调用 mcpClient.getContext 可选}; P -- (如果需要) --> F; F -- 返回 MCP 上下文 --> P; M --> Q[构建最终 Prompt]; M --> R[调用 chatOpenAI.call]; R -- 调用 OpenAI API --> C; C -- 返回 LLM Response --> R; R -- 返回 LLM Response --> M; M --> S[处理并返回结果]; S --> T[输出结果]; end subgraph "主要类与接口 (src)" direction LR subgraph "ChatOpenAI.ts" E --- OpenAI_constructor(constructor); E --- OpenAI_call(call); end subgraph "MCPClient.ts" F --- MCP_constructor(constructor); F --- MCP_getContext(getContext); F --- MCP_updateContext(updateContext); end subgraph "VectorStore.ts" H --- VS_addDocuments(addDocuments); H --- VS_similaritySearch(similaritySearchVectorWithScore); G --- H; end subgraph "EmbeddingRetrivers.ts" I --- Embed_constructor(constructor); I --- Embed_addDocuments(addDocuments); I --- Embed_similaritySearch(similaritySearch); I --- Embed_embedDocuments(embedDocuments); end subgraph "Agent.ts" J --- Agent_constructor(constructor); J --- Agent_execute(execute); end subgraph "util.ts" Utils[辅助函数 sleep] end end style E fill:#f9d,stroke:#333,stroke-width:2px style F fill:#ccf,stroke:#333,stroke-width:2px style I fill:#dfd,stroke:#333,stroke-width:2px style J fill:#ffc,stroke:#333,stroke-width:4px style G fill:#eee,stroke:#333,stroke-width:2px style H fill:#ddd,stroke:#333,stroke-width:1px,stroke-dasharray: 5 5
前言
最近我一直在关注大语言模型相关的技术,特别是如何让它变得更加实用。毕竟现在科技一直在发展嘛,还是要紧跟时代的脚步的。你可能也遇到过这样的问题:LLM虽然强大,但它不能访问最新信息,也不能执行外部操作,更别说针对我的专业文档精准回答了。 所以我决定自己动手,做一个结合了LLM+MCP+RAG的系统。这篇文章会完整记录我的实现过程,包括所有的思考和代码。 既然如此我就先简单解释一下 LLM MCP RAG 的相关概念吧。
- LLM: 大语言模型,像DeepSeek、Claude这种能够理解和生成人类语言的AI模型
- MCP: 模拟上下文协议,一种让AI模型能够调用外部工具和服务的标准化接口
- RAG: 检索增强生成,通过先检索相关信息再生成回答的方式提高AI回答的准确性 当我们把这三者结合起来,就能创造一个既有语言理解能力,又能获取专业知识,还可以执行外部操作的强大系统啦~
所需依赖
环境变量、openAi sdk,MCP sdk,chalk美化输出
pnpm add dotenv openai @modelcontextprotocol/sdk chalk
系统架构
- 用户提出问题
- RAG系统检索相关文档
- LLM结合检索到的上下文和外部工具(MCP)生成回答
- 将回答返给用户
graph TD User[用户] -->|提问| Agent subgraph "Agent系统" Agent -->|问题| RAG[RAG检索系统] RAG -->|相关文档| Agent Agent -->|问题+上下文| LLM[大语言模型] LLM -->|工具调用请求| MCP[MCP工具管理] MCP -->|工具调用结果| LLM subgraph "RAG组件" RAG --> VectorStore[向量存储] RAG --> EmbeddingRetrivers[嵌入检索器] end subgraph "MCP组件" MCP --> Network[网络请求工具] MCP --> FileSystem[文件系统工具] MCP --> OtherTools[其他工具...] end end Agent -->|最终回答| User style Agent fill:#f9f,stroke:#333,stroke-width:2px style LLM fill:#bbf,stroke:#333,stroke-width:2px style RAG fill:#bfb,stroke:#333,stroke-width:2px style MCP fill:#fbb,stroke:#333,stroke-width:2px
sequenceDiagram participant User as 用户 participant Agent as Agent participant RAG as RAG系统 participant LLM as 大语言模型 participant MCP as MCP工具管理 participant Tools as 外部工具 User->>Agent: 1. 发送问题 Agent->>RAG: 2. 检索相关文档 RAG-->>Agent: 3. 返回相关上下文 Agent->>LLM: 4. 发送问题+上下文 loop 工具调用循环 LLM->>Agent: 5. 请求调用工具 Agent->>MCP: 6. 转发工具调用请求 MCP->>Tools: 7. 执行工具调用 Tools-->>MCP: 8. 返回工具执行结果 MCP-->>Agent: 9. 返回工具结果 Agent->>LLM: 10. 提供工具结果 end LLM-->>Agent: 11. 生成最终回答 Agent-->>User: 12. 返回最终回答
具体实现
classDiagram class Agent { -mcpClients: MCPClient[] -llm: ChatOpenAI -model: string -systemPrompt: string -context: string +init() +close() +invoke(prompt) } class ChatOpenAI { -llm: OpenAI -model: string -messages: ChatCompletionMessageParam[] -tools: Tool[] +chat(prompt) +appendToolResult(toolCallId, toolOutput) -getToolsDefinition() } class MCPClient { -mcp: Client -transport: StdioClientTransport -tools: Tool[] -command: string -args: string[] +close() +init() +getTools() +callTool(name, params) -connectToServer() } class EmbeddingRetrivers { -embeddingModel: string -vectorStore: VectorStore +embedQuery(query) +embedDocument(document) -embed(document) +retrieve(query, topK) } class VectorStore { -vectorStore: VectorStoreItem[] +addItem(item) +search(queryEmbedding, topK) -cosineSimilarity(a, b) } Agent --> ChatOpenAI : 使用 Agent --> MCPClient : 管理 EmbeddingRetrivers --> VectorStore : 使用 Agent ..> EmbeddingRetrivers : 使用
1.向量存储--VectorStore
向量存储作为RAG的核心组件之一,他负责存储文档的向量表示,并支持向量相似度的搜索。 以下是具体的代码实现:
/**
* VectorStoreItem 接口
*
* 定义了向量存储中的每个项目的结构:
* - embedding: 文档的向量表示,是一个数字数组
* - document: 原始文档的文本内容
*
* 这种结构允许我们同时存储文档的语义表示(向量)和原始内容
*/
export interface VectorStoreItem {
embedding: number[], // 文档的向量表示,通常是由嵌入模型生成的高维向量
document: string // 原始文档文本,用于在检索后返回给用户
}
/**
* VectorStore 类
*
* 这个类实现了一个简单的向量存储,用于保存文档的向量表示并支持语义搜索。
* 向量存储是实现检索增强生成(RAG)系统的核心组件,它允许我们基于语义相似度
* 而不仅仅是关键词匹配来查找相关文档。
*/
export default class VectorStore {
/**
* 存储所有文档及其向量表示的数组
* 在实际生产环境中,这可能会替换为专门的向量数据库如Pinecone、Milvus等
*/
private vectorStore: VectorStoreItem[];
/**
* 构造函数
* 初始化一个空的向量存储
*/
constructor() {
// 初始化一个空数组来存储文档和它们的向量表示
this.vectorStore = []
}
/**
* 添加一个新项目到向量存储中
*
* @param item 包含文档和其向量表示的对象
*/
public addItem(item: VectorStoreItem) {
// 将新项目添加到存储数组中
this.vectorStore.push(item)
}
/**
* 在向量存储中搜索与查询向量最相似的文档
*
* 这个方法使用余弦相似度来衡量向量之间的相似程度。
* 余弦相似度值范围在-1到1之间,值越高表示越相似。
*
* @param queryEmbedding 查询文本的向量表示
* @param topK 要返回的最相似文档数量,默认为3
* @returns 返回按相似度降序排列的文档列表,每个文档包含原始文本和相似度分数
*/
async search(queryEmbedding: number[], topK: number = 3) {
// 计算查询向量与所有存储文档向量的相似度
const scored = this.vectorStore.map(item => ({
document: item.document, // 原始文档文本
score: this.cosineSimilarity(item.embedding, queryEmbedding) // 计算相似度分数
}))
// 按相似度分数降序排序,并只返回前topK个结果
// 降序排序确保最相似的文档排在最前面
return scored.sort((a, b) => b.score - a.score).slice(0, topK)
}
/**
* 计算两个向量之间的余弦相似度
*
* 余弦相似度是衡量两个向量方向相似性的指标,不考虑它们的大小。
* 计算公式: cos(θ) = (A·B)/(|A|·|B|)
* 其中A·B是向量的点积,|A|和|B|是向量的欧几里得范数(长度)
*
* @param a 第一个向量
* @param b 第二个向量
* @returns 返回两个向量的余弦相似度,范围在-1到1之间
*/
private cosineSimilarity(a: number[], b: number[]) {
// 计算两个向量的点积(dot product)
// 点积 = a[0]*b[0] + a[1]*b[1] + ... + a[n]*b[n]
const dotProduct = a.reduce((acc, val, index) => acc + val * b[index], 0)
// 计算向量a的欧几里得范数(长度)
// |a| = √(a[0]² + a[1]² + ... + a[n]²)
const magnitudeA = Math.sqrt(a.reduce((acc, val) => acc + val * val, 0))
// 计算向量b的欧几里得范数(长度)
const magnitudeB = Math.sqrt(b.reduce((acc, val) => acc + val * val, 0))
// 计算余弦相似度: cos(θ) = (A·B)/(|A|·|B|)
return dotProduct / (magnitudeA * magnitudeB)
}
}
这里我实现了简单的向量计存储,它能够存储文档及其向量表示,并使用余弦相似度来计算向量之间的相似性。不过在实际项目中,大家可能会用更加成熟的专业向量数据库来进行替代,比如Pinecore...
flowchart TD A[开始] --> B[创建VectorStore实例] B --> C[添加文档向量] C --> D[接收用户查询] D --> E[将查询文本转换为向量] E --> F[计算向量相似度] subgraph "余弦相似度计算流程" F1[计算向量点积
a·b = Σ(a[i]×b[i])] --> F2[计算向量A范数
|a| = √Σ(a[i]²)] F2 --> F3[计算向量B范数
|b| = √Σ(b[i]²)] F3 --> F4[计算最终相似度
cos(θ) = (a·b)/(|a|×|b|)] end F --> F1 F4 --> G[按相似度降序排序文档] G --> H[返回TopK最相关文档] classDef startEnd fill:#d4f1f9,stroke:#05a0c8,stroke-width:2px classDef process fill:#fff9e6,stroke:#f9c74f,stroke-width:1px classDef calculation fill:#ffebee,stroke:#ef476f,stroke-width:2px classDef result fill:#e8f5e9,stroke:#06d6a0,stroke-width:2px class A,H startEnd class B,C,D,E,G process class F,F1,F2,F3,F4 calculation class H result
2.嵌入检索器--EmbeddingRetrivers
有了向量存储,我们还需要一个嵌入检索器来处理文本转向量以及向量检索的操作:
import VectorStore from "./VectorStore"
import 'dotenv/config'
import { logTitle } from "./util"
/**
* EmbeddingRetrivers 类
*
* 这个类负责处理文本嵌入(embeddings)和向量检索操作。
* 向量数据库是一种特殊的数据库,它存储文本的数值表示(向量),
* 并允许我们基于语义相似度进行搜索,而不仅仅是关键词匹配。
*/
export default class EmbeddingRetrivers {
// 使用的嵌入模型名称
private embeddingModel: string
// 向量存储实例,用于保存和检索向量
private vectorStore: VectorStore
/**
* 构造函数
* @param embeddingModel 要使用的嵌入模型名称,如'BAAI/bg-m3'
*/
constructor(embeddingModel:string) {
this.embeddingModel = embeddingModel
// 初始化一个新的向量存储实例
this.vectorStore = new VectorStore()
}
/**
* 将查询文本转换为向量表示
*
* 这个方法只是将文本转换为向量,但不会存储到向量库中
* 通常用于用户输入的查询文本
*
* @param query 需要转换为向量的查询文本
* @returns 返回表示查询文本的数值向量
*/
async embedQuery(query: string) : Promise<number[]> {
const embedding = await this.embed(query)
return embedding
}
/**
* 将文档转换为向量并存储到向量库中
*
* 这个方法不仅将文本转换为向量,还会将文本和对应的向量
* 一起存储到向量库中,以便后续检索
*
* @param document 需要转换并存储的文档文本
* @returns 返回表示文档的数值向量
*/
async embedDocument(document: string) : Promise<number[]> {
// 首先将文档转换为向量
const embedding = await this.embed(document)
// 然后将文档和向量一起添加到向量存储中
this.vectorStore.addItem({
embedding: embedding, // 文档的向量表示
document: document // 原始文档文本
})
return embedding
}
/**
* 私有方法:将文本转换为向量
*
* 这个方法调用外部API将文本转换为数值向量
*
* @param document 需要转换为向量的文本
* @returns 返回表示文本的数值向量
*/
private async embed(document: string) : Promise<number[]> {
// 调用嵌入API将文本转换为向量
const response = await fetch(`${process.env.EMBEDDING_BASE_URL}/embeddings`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.EMBEDDING_KEY}`
},
body: JSON.stringify({
model: this.embeddingModel, // 使用指定的嵌入模型
input: document // 需要转换的文本
})
})
// 解析API返回的结果
const data = await response.json()
console.log(data.data[0].embedding);
// 返回嵌入向量
return data.data[0].embedding
}
/**
* 根据查询文本检索相关文档
*
* 这个方法首先将查询文本转换为向量,然后在向量库中
* 搜索与查询向量最相似的文档
*
* @param query 查询文本
* @param topK 要返回的最相似文档数量,默认为3
* @returns 返回按相似度排序的文档列表
*/
async retrieve(query: string, topK:number = 3) {
// 首先将查询文本转换为向量
const queryEmbedding = await this.embedQuery(query)
// 然后在向量库中搜索最相似的文档
// 相似度基于余弦相似度计算,值越高表示越相似
return this.vectorStore.search(queryEmbedding, topK)
}
}
这个嵌入检索器封装了文本到向量的转换过程,即使用外部嵌入API(eg:BAAI/bg-m3)将文本转换为高维向量。 它主要提供了两个方法:将文档转换为向量并存储,以及根据查询文本检索相关文档。
graph TD A[用户问题] --> B[嵌入问题向量] subgraph "知识库处理" C[文档内容] --> D[文档分块] D --> E[生成文档向量] E --> F[存储到向量数据库] end B --> G{向量相似度搜索} F --> G G --> H[获取最相关文档] H --> I[构建增强上下文] I --> J[发送给LLM] style A fill:#bbf,stroke:#333,stroke-width:2px style J fill:#bbf,stroke:#333,stroke-width:2px style G fill:#fbb,stroke:#333,stroke-width:2px
3.OpenAI聊天模型封装
接下来是对OpenAI聊天模型进行封装,便于我们与LLM交互:
// 导入OpenAI库,用于与OpenAI API交互
import OpenAI from "openai";
// 导入Tool类型定义,用于定义工具功能
import { Tool } from "@modelcontextProtocol/sdk/types.js";
// 导入dotenv配置,用于加载环境变量
import 'dotenv/config'
// 导入日志工具函数
import { logTitle } from "./util";
/**
* 定义工具调用的接口结构
* 当AI模型调用工具时,会返回这种格式的数据
*/
export interface toolCall {
// 工具调用的唯一标识符
id: string,
function: {
// 被调用的函数名称
name: string,
// 函数的参数,以字符串形式传递
// 流式传输返回的都是字符串,
arguments: string,
}
}
/**
* ChatOpenAI类 - 封装与OpenAI聊天模型的交互
* 提供简单的接口来进行对话并处理工具调用
*/
export default class ChatOpenAI {
// OpenAI客户端实例
private llm: OpenAI;
// 使用的模型名称
private model: string;
// ChatCompletionMessageParam 是 OpenAI SDK 中定义的类型,
// 用于表示聊天完成请求中的消息参数。
// 它包含了消息的角色(如system、user、assistant)和内容等信息,
// 用于构建与AI模型的对话历史。
private messages: OpenAI.Chat.ChatCompletionMessageParam[] = []; // 对话历史记录
// MCP 的 tools 参数,定义模型可以使用的工具
private tools: Tool[];
/**
* 构造函数 - 初始化ChatOpenAI实例
* @param model 模型名称,如'moonshotai/kimi-vl-a3b-thinking:free'
* @param systemPrompt 系统提示,用于设置AI的行为和角色
* @param tools 可用的工具列表
* @param context 初始上下文信息
*/
constructor(model: string, systemPrompt: string = '', tools: Tool[] = [], context: string = '') {
// 创建OpenAI客户端,使用环境变量中的API密钥和基础URL
this.llm = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
baseURL: process.env.OPEN_BASE_URL,
})
this.model = model
this.tools = tools
// 如果提供了系统提示,添加到消息历史中
// 系统提示(system prompt)用于设置AI助手的行为、角色和限制,是对话的基础设置
if (systemPrompt) this.messages.push({ role: 'system', content: systemPrompt });
// 如果提供了上下文,添加到消息历史中
// 上下文作为用户消息添加,为AI提供背景信息,使其能够理解后续对话的环境
// 这对于需要连续对话或特定场景下的交互非常重要
if (context) this.messages.push({ role: 'user', content: context });
}
/**
* 发送消息并获取AI回复
* @param prompt 用户输入的提示
* @returns 包含AI回复内容和工具调用的对象
*/
async chat(prompt?: string) {
// 打印聊天开始的标题
logTitle('Chatting...')
// 如果提供了提示,添加到消息历史中
if (prompt) this.messages.push({ role: 'user', content: prompt });
// 创建流式聊天完成请求
const stream = await this.llm.chat.completions.create({
model: this.model,
messages: this.messages,
stream: true, // 启用流式传输,可以逐步获取回复
tools: this.getToolsDefinition(), // 提供工具定义
max_tokens: 1024, // 设置合理的最大令牌数
}, {
headers: {
"HTTP-Referer": "localhost",
"X-Title": "MCP-Test"
}
})
// 用于累积完整回复内容
let content = ''
// 用于收集工具调用
let toolCalls: toolCall[] = []
// 打印响应标题
logTitle('RESPONSE')
// 处理流式响应的每个数据块
for await (const chunk of stream) {
// delta 是流式响应中当前数据块的差异部分
// 包含了当前块的文本内容或工具调用信息
// 在流式响应中,OpenAI API 会将完整回复分成多个小块发送
// 每个 chunk 的 delta 包含了相对于前一个块的新增内容
const delta = chunk.choices[0].delta
// 处理文本内容
if (delta.content) {
const contentChunk = delta.content || ''
content += contentChunk
// 实时输出到控制台
process.stdout.write(contentChunk)
}
// 处理工具调用
if (delta.tool_calls) {
for (const toolCallChunk of delta.tool_calls) {
// 第一次收到 toolCall,新的 index = 0
if (toolCalls.length <= toolCallChunk.index) {
// 初始化新的工具调用对象
toolCalls.push({ id: '', function: { name: '', arguments: '' } })
}
// 获取当前工具调用对象
let currentCall = toolCalls[toolCallChunk.index]
// 累积工具调用ID
if (toolCallChunk.id) currentCall.id += toolCallChunk.id
// 累积函数名称
if (toolCallChunk.function?.name) currentCall.function.name += toolCallChunk.function.name
// 累积函数参数
if (toolCallChunk.function?.arguments) currentCall.function.arguments += toolCall.function.arguments
}
}
}
// 将AI回复添加到消息历史中,包括工具调用
this.messages.push({ role: 'assistant', content, tool_calls: toolCalls.map(call => ({ type: 'function', id: call.id, function: call.function })) })
// 返回内容和工具调用
return { content, toolCalls }
}
/**
* 添加工具执行结果到对话历史
* @param toolCallId 工具调用的ID
* @param toolOutput 工具执行的输出结果
*/
public appendToolResult(toolCallId: string, toolOutput: string) {
// 将工具结果添加到消息历史中
this.messages.push({ role: 'tool', content: toolOutput, tool_call_id: toolCallId })
}
/**
* 获取工具定义,转换为OpenAI API所需的格式
* @returns 格式化的工具定义数组
*/
private getToolsDefinition() {
// 将工具数组转换为OpenAI API所需的格式
return this.tools.map(tool => ({
type: 'function' as const,
// OPEN AI 对function的定义是 name description parameters
// MCP 对 function 的 parameters 为 inputSchema
function: {
name: tool.name,
description: tool.description,
parameters: tool.inputSchema
},
}))
}
}
这个类封装了与OpenAI API的交互,支持流式传输和工具调用。这里我使用了流式传输来获取AI的实时响应,同时也处理了AI可能返回的工具调用请求。
4.MCP客户端
MCP客户端为连接LLM和外部工具的桥梁,让LLM能够调用外部服务和工具
import { Client } from '@modelcontextProtocol/sdk/client/index.js'
import { Tool } from '@modelcontextProtocol/sdk/types';
import { StdioClientTransport } from '@modelcontextProtocol/sdk/client/stdio.js';
export default class MCPClient {
private mcp: Client;
private transport: StdioClientTransport | null = null;
private tools: Tool[] = [];
private command: string;
private args: string[];
constructor(name: string, command: string, args: string[], version?: string) {
this.mcp = new Client({ name, version: version || '1.0.0' })
this.command = command
this.args = args
}
// 封装关闭连接,防止暴露公共属性
public async close() {
await this.mcp.close();
}
public async init() {
await this.connectToServer();
}
public getTools() {
return this.tools;
}
public async callTool(name:string, params:Record<string, any>) {
return await this.mcp.callTool( { name, arguments: params })
}
// npx 对应 command 、 @modelcontextprotocol/sdk/client/stdio arguments
// 初始化时才调用这个方法
private async connectToServer() {
try {
// command + arguments 更通用,无需将代码拉入本地
this.transport = new StdioClientTransport({
command: this.command,
args: this.args,
})
// 连接到服务器
await this.mcp.connect(this.transport)
const toolsResult = await this.mcp.listTools();
this.tools = toolsResult.tools.map((tool) => {
return {
name: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
}
})
console.log("Connected to server with tools", this.tools.map(({ name }) => name))
} catch (error) {
console.error('Failed to connect to MCP server', error)
throw error;
}
}
}
这里的MCP客户端类封装了与MCP服务器的连接和工具调用。我们可以通过它让LLM使用网络请求、文件操作等外部工具。
sequenceDiagram participant LLM as 大语言模型 participant Agent as Agent participant MCP as MCPClient participant Tool as 外部工具服务 LLM->>Agent: 生成包含工具调用的响应 Note over LLM,Agent: { content, toolCalls: [{id, function: {name, arguments}}] } loop 对每个工具调用 Agent->>Agent: 查找处理工具的MCP客户端 Agent->>MCP: 调用工具 (name, arguments) MCP->>Tool: 发送工具请求 Tool-->>MCP: 返回工具执行结果 MCP-->>Agent: 返回工具结果 Agent->>LLM: 将工具结果添加到对话历史 Note over Agent,LLM: appendToolResult(toolCallId, result) end Agent->>LLM: 继续对话 LLM-->>Agent: 生成最终回答或新的工具调用
5.智能代理--Agent
这里Agent则是整个系统的核心,他将LLM MCP RAG整合了起来
import ChatOpenAI from "./ChatOpenAI";
import MCPClient from "./MCPClient";
import { logTitle } from "./util";
export default class Agent {
private mcpClients: MCPClient[];
private llm: ChatOpenAI | null = null;
private model: string
private systemPrompt: string
private context: string
constructor(model: string, mcpClients: MCPClient[], systemPrompt: string = "", context: string = "") {
this.mcpClients = mcpClients
this.model = model
this.systemPrompt = systemPrompt
this.context = context
}
public async init() {
logTitle('初始化LLM 和 TOOLS')
this.llm = new ChatOpenAI(this.model, this.systemPrompt)
for (const mcpClient of this.mcpClients) {
await mcpClient.init()
}
const tools = this.mcpClients.flatMap(mcp => mcp.getTools())
this.llm = new ChatOpenAI(this.model, this.systemPrompt, tools, this.context)
}
public async close() {
logTitle('关闭MCP Client')
for await (const client of this.mcpClients) {
await client.close()
}
}
async invoke(prompt: string) {
if (!this.llm) throw new Error('LLM 未初始化')
let response = await this.llm.chat(prompt)
// 无限循环,agent 可能会和LLM展开很多轮的对话
while (true) {
if (response.toolCalls.length > 0) {
for (const toolCall of response.toolCalls) {
const mcp = this.mcpClients.find(mcpClient => mcpClient.getTools().find(tool => tool.name === toolCall.function.name))
if (mcp) {
logTitle(`TOOL USE` + toolCall.function.name)
console.log(`Calling tool:${toolCall.function.name}`)
console.log(toolCall.function.arguments);
const result = await mcp.callTool(toolCall.function.name, JSON.parse(toolCall.function.arguments)) // 入参不能直接给字符串,MCP需要解析JSON对象
console.log(`Result: ${result}`);
this.llm.appendToolResult(toolCall.id, JSON.stringify(result))
} else {
// 如果工具未找到,则将工具调用结果添加到LLM的上下文
this.llm.appendToolResult(toolCall.id, `工具 ${toolCall.function.name} 未找到`)
}
}
// 下一次对话,
response = await this.llm.chat()
continue; // 有工具调用则进入下一轮循环
}
// 没有工具调用,则结束循环
await this.close()
return response.content
}
}
}
stateDiagram-v2 [*] --> 初始化 初始化 --> 接收用户问题 接收用户问题 --> RAG检索 RAG检索 --> 创建LLM会话 创建LLM会话 --> 发送问题到LLM 发送问题到LLM --> 分析LLM响应 分析LLM响应 --> 是否有工具调用? 是否有工具调用? --> 执行工具调用: 是 执行工具调用 --> 将工具结果返回LLM 将工具结果返回LLM --> 发送问题到LLM 是否有工具调用? --> 生成最终答案: 否 生成最终答案 --> 关闭资源 关闭资源 --> [*]
flowchart TD A[LLM生成响应] --> B{包含工具调用?} B -->|是| C[查找处理工具的MCP客户端] C --> D[调用工具] D --> E[获取工具结果] E --> F[将结果添加到LLM上下文] F --> G[继续与LLM对话] G --> A B -->|否| H[生成最终答案] H --> I[结束对话] style B fill:#fbb,stroke:#333,stroke-width:2px style G fill:#bfb,stroke:#333,stroke-width:2px
6.主程序--index.ts
import path from "path";
import Agent from "./Agent";
import ChatOpenAI from "./ChatOpenAI";
import EmbeddingRetrivers from "./EmbeddingRetrivers";
import MCPClient from "./MCPClient";
import fs from 'fs'
/**
* 获取当前工作目录
* 这将作为文件操作的基础路径
*/
const currentDir = process.cwd()
/**
* MCP (Model Context Protocol) 客户端初始化
*
* MCP是一种协议,允许大语言模型(LLM)通过标准化接口与外部工具和服务交互
* 它使LLM能够执行文件操作、网络请求等超出其基本能力的任务
*/
// 创建网络请求MCP客户端,使LLM能够发送HTTP请求获取网络信息
const fetchMcp = new MCPClient('fetch', 'uvx', ['mcp-server-fetch'])
// 创建文件操作MCP客户端,使LLM能够读写文件系统中的文件
const fileMCP = new MCPClient('file', 'npx', ["-y", "@modelcontextprotocol/server-filesystem", currentDir])
/**
* 主函数 - 应用程序入口点
*
* 这个函数展示了一个完整的LLM+RAG+MCP工作流程:
* 1. 接收用户问题
* 2. 检索相关上下文(RAG)
* 3. 初始化具有工具使用能力的AI代理
* 4. 让AI处理问题并生成回答
*/
async function main() {
// 用户输入的问题或指令
const prompt = '请根据以下内容回答问题:xxxxx'
// 使用RAG(检索增强生成)获取相关上下文
// RAG通过在回答前检索相关信息,大大提高了LLM回答的准确性和相关性
const context = await retrieveContext(prompt)
// 创建AI代理实例
// 参数说明:
// 1. 使用的LLM模型 - 这里使用OpenAI的gpt-4o-mini
// 2. 工具列表 - 提供给LLM使用的外部工具(MCP客户端)
// 3. 系统提示词 - 这里为空
// 4. 上下文信息 - 通过RAG检索到的相关文档
const agent = new Agent('openai/gpt-4o-mini', [fetchMcp, fileMCP], '', context)
// 初始化代理,准备会话环境
await agent.init()
// 让代理处理用户问题并生成回答
// 代理会结合上下文信息和工具能力来生成最佳回答
const response = await agent.invoke(prompt)
// 输出AI的回答
console.log(response)
// 关闭代理,释放资源
await agent.close()
}
/**
* 检索增强生成(RAG)实现函数
*
* RAG是一种混合架构,结合了:
* 1. 检索系统 - 从知识库中找出与问题相关的文档
* 2. 生成系统 - 使用LLM基于检索到的信息生成回答
*
* RAG的优势:
* - 减少LLM的幻觉(生成不准确信息)
* - 使LLM能够访问最新信息
* - 提供可验证的信息来源
*
* @param prompt 用户的问题或指令
* @returns 与问题相关的上下文文本
*/
async function retrieveContext(prompt: string) {
// 创建嵌入检索器实例,使用BAAI/bg-m3模型将文本转换为向量
// 嵌入(Embedding)是将文本转换为数值向量的过程,使计算机能够理解文本的语义
const embeddingRetriver = new EmbeddingRetrivers('BAAI/bg-m3')
// 获取知识库目录路径
const knowledgeDir = path.join(process.cwd(), 'knowledge')
// 读取知识库目录中的所有文件
const files = fs.readdirSync(knowledgeDir)
// 遍历每个文件,将其内容转换为向量并存储到向量数据库
for(const file of files) {
// 读取文件内容
const content = fs.readFileSync(path.join(knowledgeDir, file), 'utf-8')
// 将文档内容转换为向量并存储
// 这个过程会:
// 1. 调用嵌入API将文本转换为高维向量
// 2. 将向量和原始文本一起存储在向量数据库中
await embeddingRetriver.embedDocument(content)
}
// 根据用户问题检索最相关的文档
// 这个过程会:
// 1. 将用户问题转换为向量
// 2. 计算问题向量与所有文档向量的相似度
// 3. 返回相似度最高的几个文档
const context = await embeddingRetriver.retrieve(prompt)
// 输出检索到的上下文信息,用于调试
console.log('CONTEXT')
console.log(context)
// 将检索到的文档合并为一个字符串,作为LLM的上下文
// 每个文档由换行符分隔
return context.map(item => item.document).join('\n')
}
/**
* 执行主函数
*
* 整个流程总结:
* 1. 用户提出问题
* 2. RAG系统检索相关文档
* 3. LLM结合检索到的上下文和外部工具(MCP)生成回答
* 4. 将回答返回给用户
*
* 这种架构结合了:
* - LLM的自然语言理解和生成能力
* - RAG的知识检索能力
* - MCP的外部工具交互能力
*
* 形成了一个强大的AI应用框架
*/
main()
主程序主要是对上面咱写的工具进行一个运用,首先使用RAG检索相关上下文,然后创建AI代理,初始化LLM和MCP工具,最后让代理处理用户问题并生成回答
7.工具函数--util.ts
最后就是美化输出的小工具 ———— chalk,在打印的时候就可以使得整个输出更加整洁直观,因为我们把每个小块都用小工具分隔开啦 以下是小工具的代码:
import chalk from "chalk";
export const logTitle = (message: string) => {
const totalLength = 80;
const messageLength = message.length;
const padding = Math.max(0, totalLength - messageLength - 4);
const paddedMessage = `${'='.repeat(Math.floor(padding / 2))} ${message} ${'='.repeat(Math.ceil(padding / 2))}`
console.log(chalk.bold.cyanBright(paddedMessage));
}
代码执行过程解析
系统启动流程
初始化阶段:
- 程序从
main()
函数开始执行 - 创建两个MCP客户端(网络请求和文件操作)
- 使用
retrieveContext()
函数执行RAG检索 - 初始化
Agent
实例,传入模型名称、MCP客户端和上下文
- 程序从
RAG检索流程:
- 创建
EmbeddingRetrivers
实例,使用BAAI/bg-m3模型将文本转换为向量 - 读取知识库目录中的所有文件
- 将每个文件内容转换为向量并存储到向量数据库
- 将用户问题转换为向量
- 计算问题向量与所有文档向量的相似度
- 返回相似度最高的文档作为上下文
- 创建
Agent初始化:
- 使用
agent.init()
初始化代理 - 创建
ChatOpenAI
实例 - 为每个MCP客户端调用
init()
方法 - 收集所有工具定义
- 重新创建
ChatOpenAI
实例,包含工具定义和上下文
- 使用
聊天执行阶段:
- 调用
agent.invoke(prompt)
处理用户问题 ChatOpenAI
创建流式聊天完成请求- 处理LLM响应,包括文本内容和工具调用
- 调用
如果每次向AI询问的问题都是一样的,如何减少性能开销
1.实现缓存机制
class QueryCache {
// 使用Map数据结构存储查询和响应的键值对
// 键是查询字符串,值是对应的响应内容
private cache: Map<string, string> = new Map();
/**
* 根据查询获取缓存的响应
* @param query - 用户的查询字符串
* @returns 如果缓存中存在该查询,返回对应响应;否则返回null
*/
public getResponse(query: string): string | null {
return this.cache.get(query) || null;
}
/**
* 将查询和对应的响应存储到缓存中
* @param query - 用户的查询字符串,作为缓存的键
* @param response - AI生成的响应内容,作为缓存的值
*/
public storeResponse(query: string, response: string): void {
this.cache.set(query, response);
}
}
2.避免重复向量计算
class CachedEmbeddingRetriever extends EmbeddingRetrivers {
// 使用Map数据结构缓存查询文本及其对应的向量表示
// 键是查询文本,值是该文本的向量嵌入表示
private queryEmbeddingCache: Map<string, number[]> = new Map();
/**
* 将查询文本转换为向量表示,并实现缓存机制
* @param query - 用户的查询文本
* @returns 查询文本的向量表示
*/
async embedQuery(query: string): Promise<number[]> {
// 检查缓存中是否已存在该查询的向量表示
if (this.queryEmbeddingCache.has(query)) {
// 如果存在,直接从缓存返回,避免重复计算
return this.queryEmbeddingCache.get(query)!;
}
// 缓存中不存在,调用父类方法计算向量嵌入
// 这通常涉及调用大型模型API,计算成本较高
const embedding = await super.embedQuery(query);
// 将新计算的向量表示存入缓存,以便后续使用
this.queryEmbeddingCache.set(query, embedding);
return embedding;
}
}
3.预计算模式
// 预计算常见问题回答
// 定义一个包含常见问题的数组,这些问题通常是用户经常会问的
const commonQueries = ['问题1', '问题2', '问题3'];
// 创建一个Map结构来存储预计算的响应,键为问题,值为对应的回答
const precomputedResponses = new Map<string, string>();
/**
* 预计算常见问题的回答并存储到Map中
* 这个函数在应用启动时执行,可以提前准备好高频问题的回答
* 避免在用户实际提问时才进行计算,从而减少响应时间
*/
async function precomputeResponses() {
// 遍历所有预定义的常见问题
for (const query of commonQueries) {
// 使用AI代理生成回答
// 这一步在应用启动时完成,而不是在用户查询时
const response = await agent.invoke(query);
// 将问题和对应的回答存储到预计算Map中
// 这样当用户提出相同问题时可以直接返回结果
precomputedResponses.set(query, response);
}
}
4.分层处理架构
/**
* 处理用户查询的分层函数
* 实现了多级处理策略,从简单到复杂,逐步尝试解决用户查询
*
* @param query - 用户输入的查询字符串
* @returns 返回对用户查询的回答
*/
async function handleQuery(query: string) {
// 1. 检查缓存 - 首先尝试从缓存中获取完全匹配的响应
// 这是最快的响应路径,无需任何计算或模型调用
const cachedResponse = queryCache.getResponse(query);
if (cachedResponse) return cachedResponse; // 如果找到缓存的响应,直接返回
// 2. 检查是否为已知问题的变体 - 使用相似度匹配
// 通过语义相似性查找是否有预先计算好的类似问题
const similarQuery = findSimilarQuery(query);
if (similarQuery) return precomputedResponses.get(similarQuery); // 如果找到相似问题,返回其预计算的回答
// 3. 使用完整AI处理流程 - 这是计算成本最高的路径
// 当缓存和预计算都无法满足需求时,调用AI模型生成回答
const response = await agent.invoke(query);
queryCache.storeResponse(query, response); // 将新生成的回答存入缓存,优化未来相同查询
return response; // 返回AI生成的回答
}
小项目就到这里就结束啦,整个做下来还是有很大收获的,对AI有了更加全面的认识了,也希望大家看了我的这篇文章也能有所收获哦! 拜拜👋