Các trợ lý nghiên cứu AI ngày càng trở nên thông minh hơn, nhưng việc chuyển đổi khả năng suy luận đó thành một sản phẩm có thể sử dụng được vẫn còn là một thách thức đáng kể. Nghiên cứu thực tế đòi hỏi nhiều hơn là chỉ xử lý văn bản đơn thuần: nó bao gồm tìm kiếm thông tin trên web, khám phá dữ liệu có cấu trúc, so sánh nhiều nguồn, và dần dần tổng hợp thành một báo cáo hoàn chỉnh. Để thực hiện tốt những tác vụ này, chúng ta cần các hệ thống có khả năng quản lý trạng thái, tích hợp công cụ mạnh mẽ và giao diện người dùng (UI) trực quan, chứ không chỉ là một hộp chat đơn thuần.
Hôm nay, chúng ta sẽ cùng xây dựng một trợ lý nghiên cứu độc đáo với giao diện chat kết hợp canvas. Trợ lý này sẽ tìm kiếm trên web, truy vấn dữ liệu có cấu trúc thông qua Giao thức Ngữ cảnh Mô hình (MCP – Model Context Protocol), và nhúng trực tiếp các biểu đồ tương tác vào báo cáo, đồng thời truyền tải tiến độ theo thời gian thực đến frontend.
Chúng ta sẽ sử dụng bộ công cụ mạnh mẽ bao gồm: CopilotKit (dựa trên Next.js) cho giao diện người dùng, LangGraph để điều phối luồng công việc của tác nhân, và MCP để kết nối các công cụ bên ngoài như Tako, nơi trả về các thành phần UI biểu đồ có thể nhúng được.
Khám phá GitHub của CopilotKit ⭐️
Trong bài viết này, bạn sẽ tìm hiểu về kiến trúc tổng thể, các khái niệm cốt lõi, cách trạng thái luân chuyển giữa UI và tác nhân, vai trò của MCP Apps trong hệ thống, và hướng dẫn từng bước để xây dựng hệ thống này từ đầu.
Hãy bắt đầu xây dựng!
Mục lục
Nội Dung Chính Trong Bài Viết Này
Để tóm tắt, chúng ta sẽ đi sâu vào các chủ đề sau:
- Chúng ta đang xây dựng gì?
- Ngăn Xếp Công Nghệ và Cấu Trúc Dự Án
- Xây Dựng Frontend (CopilotKit)
- Điều Phối Tác Nhân (LangGraph)
- Backend: Tích Hợp MCP & Công Cụ
- Chạy Ứng Dụng
- Luồng Dữ Liệu Hoàn Chỉnh Giữa UI ↔ Tác Nhân
Bạn có thể tham khảo Kho lưu trữ GitHub nếu bạn muốn tự mình khám phá.
1. Chúng Ta Đang Xây Dựng Gì?
Chúng ta đang xây dựng một trợ lý nghiên cứu AI với giao diện người dùng chat + canvas trực tiếp, có khả năng tìm kiếm, thu thập nguồn, và tổng hợp báo cáo đồng thời truyền tải tiến độ theo thời gian thực đến frontend.
Trợ lý này sẽ thực hiện các chức năng sau:
- Chuyển đổi câu hỏi của người dùng thành một kế hoạch nghiên cứu (bao gồm câu hỏi nghiên cứu chính và các câu hỏi phụ về dữ liệu).
- Thực hiện truy xuất song song: Sử dụng Tavily cho tìm kiếm web và Tako qua MCP Apps cho biểu đồ.
- Loại bỏ trùng lặp kết quả để tạo thành một danh sách tài nguyên gọn gàng.
- Tạo báo cáo với các placeholder
[CHART:tiêu đề]và sau đó nhúng các biểu đồ thực tế vào vị trí tương ứng. - Truyền tải trạng thái trung gian (nhật ký, tài nguyên, báo cáo, biểu đồ) để giao diện người dùng cập nhật theo tiến độ làm việc của tác nhân.
Chúng ta sẽ thấy tất cả các khái niệm này hoạt động như thế nào khi xây dựng tác nhân.
Các Thành Phần Cốt Lõi
Vài ngày trước, nhóm Anthropic đã phát hành MCP Apps như một tiện ích mở rộng chính thức của MCP, điều này có nghĩa là các công cụ có thể trả về UI tương tác mà máy chủ có thể hiển thị, không chỉ là JSON hay văn bản. Đây chính là mô hình chúng ta sẽ sử dụng ở đây: tác nhân của chúng ta sẽ gọi Tako (qua MCP) để lấy biểu đồ dưới dạng UI có thể nhúng được, nhờ đó các biểu đồ trực quan tương tác có thể được đưa thẳng vào báo cáo/canvas mà chúng ta không cần phải tự xây dựng các thành phần biểu đồ.
Ở mức độ cao, dự án này được xây dựng bằng cách kết hợp:
- CopilotKit cho giao diện chat và canvas của tác nhân.
- AG-UI để truyền tải các sự kiện tác nhân ↔ UI (giao thức).
- LangGraph để điều phối tác nhân có trạng thái.
- MCP Apps cho các công cụ bên ngoài.
- Tako cho biểu đồ (công cụ tìm kiếm dữ liệu có cấu trúc).
- Tavily cho tìm kiếm web.
Để hiểu rõ hơn: AG-UI (Agent–User Interaction Protocol) là một giao thức dựa trên sự kiện giúp chuẩn hóa giao tiếp “tác nhân ↔ frontend” theo thời gian thực này. Vì nó không phụ thuộc vào phương tiện truyền tải (SSE, WebSockets, v.v.), cùng một frontend có thể hoạt động trên các backend tác nhân khác nhau mà không cần cấu hình dây nối tùy chỉnh. CopilotKit sử dụng AG-UI làm lớp đồng bộ hóa.
Đây là luồng yêu cầu → phản hồi đơn giản hóa của những gì sẽ xảy ra:
User question
↓
CopilotKit UI (chat + canvas)
↓
LangGraph agent workflow
├─ chat: interpret intent + plan
├─ search: parallel retrieval (Tavily + Tako via MCP Apps)
└─ report: narrative + [CHART:*] markers
↓
Streaming state updates (AG‑UI events)
↓
Canvas renders report + embedded charts
2. Ngăn Xếp Công Nghệ và Kiến Trúc
Ở cốt lõi, chúng ta sẽ sử dụng ngăn xếp công nghệ sau để xây dựng tác nhân:
- Next.js 15: framework frontend với TypeScript, cung cấp khả năng dựng trang phía máy chủ (SSR) và tạo trang tĩnh (SSG) mạnh mẽ.
- CopilotKit 1.50: các thành phần UI tác nhân và runtime streaming (
@copilotkit/react-core,@copilotkit/react-ui,@copilotkit/runtime), giúp đơn giản hóa việc xây dựng giao diện AI. - LangGraph: điều phối tác nhân có trạng thái (các node
StateGraph, định tuyến có điều kiện), là công cụ lý tưởng để quản lý các luồng công việc phức tạp của tác nhân. - FastAPI & Uvicorn: máy chủ HTTP cho tác nhân Python, nổi tiếng với tốc độ cao và dễ sử dụng.
- Tavily: API tìm kiếm web (
tavily-python), cung cấp kết quả tìm kiếm chất lượng. - Model Context Protocol (MCP): kết nối tác nhân với Tako (
mcp,httpx), cho phép tương tác với các công cụ bên ngoài một cách chuẩn hóa. - CopilotKit LangGraph SDK (Python): truyền tải trạng thái LangGraph tới CopilotKit (
copilotkit), đảm bảo giao tiếp liền mạch. - OpenAI: nhà cung cấp LLM (
openai), cung cấp khả năng xử lý ngôn ngữ tự nhiên mạnh mẽ.
Ngoài ra, dự án còn sử dụng các thư viện khác như `react-markdown` & `remark-gfm` để hiển thị báo cáo, `langgraph-checkpoint-sqlite` để duy trì trạng thái, và Shadcn UI cho các thành phần giao diện.
Xem package.json và agents/python/pyproject.toml trong kho lưu trữ để có danh sách đầy đủ các dependency.
Cấu Trúc Dự Án
Đây là cách thư mục dự án của chúng ta sẽ trông như thế nào:
Thư mục agents/python/ chứa tác nhân Python LangGraph, được phơi bày qua FastAPI. Tác nhân này điều phối luồng công việc (chat, tìm kiếm, báo cáo), truyền tải các cập nhật trạng thái, và gọi các công cụ bên ngoài (Tavily, Tako qua MCP).
Thư mục src/ chứa frontend Next.js, bao gồm các thành phần UI, các kiểu dữ liệu dùng chung, và route API của CopilotKit (/api/copilotkit/route.ts) giúp kết nối frontend với backend tác nhân.
.
├── src/ ← Next.js frontend (TypeScript)
│ ├── app/
│ │ ├── page.tsx ← CopilotKit provider & model selector
│ │ ├── Main.tsx ← Chat + ResearchCanvas split layout
│ │ └── api/
│ │ └── copilotkit/
│ │ └── route.ts ← CopilotRuntime bridge to agent
│ ├── components/
│ │ ├── ResearchCanvas.tsx ← Main canvas (orchestrates report + resources)
│ │ ├── Resources.tsx ← Displays Tavily + Tako resources list
│ │ ├── MarkdownRenderer.tsx ← Renders report + embeds charts
│ │ └── ui/ ← Reusable Shadcn UI components
│ └── lib/
│ ├── types.ts ← AgentState type
│ ├── utils.ts ← Utility functions
│ └── model-selector-provider.tsx ← Model selection context
│
├── agents/
│ ├── python/ ← Python LangGraph agent (primary)
│ │ ├── src/
│ │ │ ├── agent.py ← StateGraph definition & compile
│ │ │ └── lib/
│ │ │ ├── state.py ← AgentState (Pydantic)
│ │ │ ├── model.py ← LLM factory (OpenAI / Anthropic / Gemini)
│ │ │ ├── chat.py ← Chat node & tool definitions
│ │ │ ├── search.py ← Parallel Tavily + Tako search
│ │ │ ├── mcp_integration.py ← MCP client & iframe helpers
│ │ │ ├── download.py ← Download node
│ │ │ └── delete.py ← Delete node
│ │ ├── main.py ← FastAPI/Uvicorn entrypoint
│ │ ├── requirements.txt
│ │ └── pyproject.toml
│ │
│ └── src/ ← TypeScript agent (optional, local dev)
│ └── server.ts ← Express + CopilotRuntime
│
├── package.json ← Frontend deps & scripts
├── .env.example
├── .env.local
└── README.md
Bạn có thể tìm thấy kho lưu trữ GitHub tại tako-copilotkit và phiên bản triển khai trực tiếp tại tako-copilotkit.vercel.app/ nếu bạn muốn tự mình khám phá. Tôi sẽ trình bày chi tiết về việc triển khai và tất cả các khái niệm chính trong các phần tiếp theo.
Cách dễ nhất để làm theo là sao chép kho lưu trữ:
git clone https://github.com/TakoData/tako-copilotkit.git
cd tako-copilotkit
Thêm Các Khóa API Cần Thiết
Bạn có thể sao chép tệp mẫu môi trường (.env.example) bằng cách sử dụng lệnh này để tạo một tệp .env.local trong thư mục gốc.
cp .env.example .env.local
Thêm Khóa API OpenAI, Khóa API Tavily và Khóa API Tako của bạn vào tệp. Tôi đã đính kèm các liên kết tài liệu để dễ dàng làm theo.
OPENAI_API_KEY=sk-proj-...
TAVILY_API_KEY=tvly-dev-...
TAKO_API_TOKEN=your_api_token_here
TAKO_MCP_URL=https://mcp.tako.com # URL của máy chủ MCP của Tako
TAKO_URL=https://tako.com # URL của API chính của Tako
Tích hợp Tako/MCP là một nguồn dữ liệu tùy chọn cho tác nhân nghiên cứu. Nếu bạn muốn truy vấn biểu đồ hoặc tập dữ liệu có cấu trúc qua máy chủ Tako MCP, hãy cung cấp `TAKO_API_TOKEN` và các URL liên quan. Nếu không, bạn có thể để trống các biến này và tác nhân vẫn sẽ hoạt động bình thường. Trong hướng dẫn này, chúng ta sẽ sử dụng nó.
3. Frontend: Kết Nối Tác Nhân Với Giao Diện Người Dùng
Trước tiên, hãy xây dựng phần frontend. Thư mục src/ chứa frontend Next.js, bao gồm các thành phần UI, các kiểu dữ liệu dùng chung, và route API của CopilotKit (/api/copilotkit/route.ts) đóng vai trò cầu nối giữa frontend và backend tác nhân Python.
Ở mức độ cao, frontend chịu trách nhiệm:
- Gửi các truy vấn của người dùng đến backend tác nhân qua
CopilotChat. - Nhận và hiển thị các cập nhật trạng thái theo thời gian thực từ tác nhân.
- Nhúng các iframe biểu đồ Tako vào báo cáo được hiển thị.
- Quản lý cầu nối runtime của CopilotKit giữa UI và tác nhân Python.
Nếu bạn đang xây dựng từ đầu, cách tiếp cận dễ nhất là sao chép tệp package.json hiện có từ kho lưu trữ. Nó đã bao gồm tất cả các dependency cần thiết cho CopilotKit, tích hợp LangGraph, các thành phần UI và công cụ phát triển cục bộ, vì vậy bạn không phải tự lắp ráp chúng. Tôi sẽ chỉ trình bày các dependency frontend cốt lõi mà bạn thực sự cần hiểu.
Bước 1: CopilotKit Provider & Layout
Cài đặt các gói cần thiết:
npm install @copilotkit/react-core @copilotkit/react-ui @copilotkit/runtime
@copilotkit/react-core: Các hook và ngữ cảnh React cốt lõi kết nối UI của bạn với backend tác nhân thông qua giao thức AG-UI.@copilotkit/react-ui: Các thành phần UI sẵn sàng sử dụng như<CopilotChat />để xây dựng giao diện chat AI.@copilotkit/runtime: Runtime phía máy chủ cung cấp một điểm cuối API và kết nối frontend với tác nhân LangGraph bằng HTTP và SSE.
Mọi thứ khác trong kho lưu trữ (radix, tailwind, react-split, v.v.) đều để hỗ trợ bố cục, tạo kiểu và trải nghiệm nhà phát triển – không phải là việc kết nối tác nhân cốt lõi.
Thành phần <CopilotKit> phải bao bọc các phần của ứng dụng có nhận biết tác nhân và trỏ đến điểm cuối runtime để nó có thể giao tiếp với backend tác nhân. Đây là điểm vào chính (page.tsx):
// page.tsx
"use client";
import { CopilotKit } from "@copilotkit/react-core";
import Main from "./Main";
import {
ModelSelectorProvider,
useModelSelectorContext,
} from "@/lib/model-selector-provider";
export default function ModelSelectorWrapper() {
return (
<ModelSelectorProvider>
<Home />
</ModelSelectorProvider>
);
}
function Home() {
const { agent, lgcDeploymentUrl } = useModelSelectorContext();
// This logic is implemented to demonstrate multi-agent frameworks in this demo project.
// There are cleaner ways to handle this in a production environment.
const runtimeUrl = lgcDeploymentUrl
? `/api/copilotkit?lgcDeploymentUrl=${lgcDeploymentUrl}`
: `/api/copilotkit${
agent.includes("crewai") ? "?coAgentsModel=crewai" : ""
}`;
return (
<div style={{ height: "100vh", overflow: "hidden" }}>
<CopilotKit runtimeUrl={runtimeUrl} showDevConsole={false} agent={agent}>
<Main />
</CopilotKit>
</div>
);
}
Ở đây, ModelSelectorProvider là một tiện ích để chuyển đổi tác nhân/mô hình trong quá trình phát triển.
Bước 2: Bố Cục Chat + Canvas
Thành phần Main.tsx thiết lập giao diện người dùng cốt lõi. Nó thiết lập:
- Giao diện chat (
CopilotChat) - Canvas nghiên cứu (nơi kết quả và biểu đồ xuất hiện)
- Liên kết trạng thái với tác nhân
// app/Main.tsx
import { useCoAgent } from "@copilotkit/react-core";
import { CopilotChat } from "@copilotkit/react-ui";
import Split from "react-split";
import { ResearchCanvas } from "@/components/ResearchCanvas";
import { ChatInputWithModelSelector } from "@/components/ChatInputWithModelSelector";
import { AgentState } from "@/lib/types";
import { useModelSelectorContext } from "@/lib/model-selector-provider";
export default function Main() {
const { model, agent } = useModelSelectorContext();
const { state, setState } = useCoAgent<AgentState>({
name: agent,
initialState: {
model,
research_question: "",
resources: [],
report: "",
logs: [],
},
});
return (
<div style={{ height: "100%", display: "flex", flexDirection: "column" }}>
<h1>Research Helper</h1>
<Split sizes={[30, 70]} style={{ flex: 1, display: "flex" }}>
{/* Chat Panel */}
<CopilotChat
Input={ChatInputWithModelSelector}
onSubmitMessage={async () => {
setState({ ...state, logs: [] });
await new Promise((r) => setTimeout(r, 30));
}}
/>
{/* Canvas Panel */}
<ResearchCanvas />
</Split>
</div>
);
}
Đây là những gì đang xảy ra:
useCoAgent<AgentState>: đồng bộ hóa trạng thái hai chiều giữa UI và tác nhân. Khi tác nhân phát ra các cập nhật trạng thái quacopilotkit_emit_state(), hook này tự động nhận chúng.CopilotChat: UI chat sẵn có để gửi tin nhắn đến tác nhân và hiển thị các lệnh gọi công cụ trực tiếp.ResearchCanvas: thành phần tùy chỉnh hiển thị báo cáo đang streaming, danh sách tài nguyên và các biểu đồ được nhúng.Split: cung cấp bố cục bảng chia có thể điều chỉnh kích thước.
Bước 3: Xây Dựng Các Thành Phần Chính
Tôi sẽ chỉ trình bày logic cốt lõi đằng sau các thành phần chính vì mã tổng thể khá lớn. Bạn có thể tìm thấy tất cả các thành phần trong kho lưu trữ tại src\components.
Các thành phần này sử dụng CopilotKit hooks (như useCoAgentStateRender) để kết nối mọi thứ lại với nhau.
✅ Thành Phần Research Canvas
ResearchCanvas.tsx là nơi trạng thái hiện tại của tác nhân được hiển thị:
- Văn bản báo cáo đang tích lũy.
- Các tài nguyên được liên kết.
- Các biểu đồ được nhúng (iframe từ MCP Apps như Tako).
Thành phần này lắng nghe trạng thái của tác nhân và hiển thị các phần tử khi chúng đến. Nó dịch các đánh dấu [CHART:tiêu đề] thành các biểu đồ nhúng thực tế. Mô hình này là một phần của hỗ trợ của CopilotKit cho UI sinh tạo.
// core logic
import { useCoAgent, useCoAgentStateRender, useCopilotAction } from "@copilotkit/react-core";
import { MarkdownRenderer } from "./MarkdownRenderer";
import { AgentState } from "@/lib/types";
import { useRef, useEffect } from "react";
export function ResearchCanvas() {
const { state, setState } = useCoAgent<AgentState>({
name: "research_agent",
});
// Use refs to prevent flicker during streaming updates
const lastReportRef = useRef<string>("");
const lastResourcesRef = useRef<Resource[]>([]);
if (state.report) lastReportRef.current = state.report;
if (state.resources?.length) lastResourcesRef.current = state.resources;
const report = state.report || lastReportRef.current;
const resources = state.resources || lastResourcesRef.current;
// Render progress logs during execution
useCoAgentStateRender({
name: "research_agent",
render: ({ state, status }) => {
if (state.logs?.length) return <Progress logs={state.logs} />;
return null;
},
});
// Generative UI: Agent requests deletion confirmation from user
useCopilotAction({
name: "DeleteResources",
description: "Prompt user for resource delete confirmation",
available: "remote",
parameters: [{ name: "urls", type: "string[]" }],
renderAndWait: ({ args, handler }) => (
<div>
<h3>Delete these resources?</h3>
<Resources resources={resources.filter(r => args.urls.includes(r.url))} />
<button onClick={() => handler("YES")}>Delete</button>
<button onClick={() => handler("NO")}>Cancel</button>
</div>
),
});
return (
<div>
<h2>Research Question</h2>
<div>{state.research_question || "Agent will identify your question..."}</div>
{/* Resources Panel */}
<Resources resources={resources} />
{/* Report Panel with Embedded Charts */}
<MarkdownRenderer content={report} />
</div>
);
}
Thành phần hoàn chỉnh cũng bao gồm các phần có thể thu gọn cho biểu đồ Tako so với tài nguyên web, và các hộp thoại quản lý tài nguyên. Kiểm tra src/components/ResearchCanvas.tsx.
✅ Mô Hình Nhúng Biểu Đồ
Việc nhúng theo kiểu MCP Apps diễn ra trong MarkdownRenderer.tsx. Nó nhúng các biểu đồ Tako mà không bị nhấp nháy trong quá trình cập nhật streaming. Thay vì thay thế regex đơn giản, nó sử dụng một mô hình iframe ổn định.
// core logic
import React, { useEffect, useRef, useMemo } from "react";
import ReactMarkdown from "react-markdown";
// Global registry: iframe src never changes for a given ID
const iframeRegistry = new Map<string, string>();
export function MarkdownRenderer({ content }: { content: string }) {
// Split content into text and iframe segments
const segments = useMemo(() => {
const embedPattern = /<!doctype html>[\s\S]*?<\/html>/gi;
const parts = [];
let lastIndex = 0;
content.replace(embedPattern, (match, offset) => {
// Add text before iframe
if (offset > lastIndex) {
parts.push({ type: 'text', content: content.slice(lastIndex, offset) });
}
// Extract iframe src and create stable ID
const src = match.match(/src=["']([^"']+)["']/)?.["1"]; // Corrected index for match group
if (src) {
const id = `embed-${src.replace(/[^a-zA-Z0-9]/g, '-').slice(0, 50)}`;
iframeRegistry.set(id, src);
parts.push({ type: 'iframe', id, src });
}
lastIndex = offset + match.length;
return match;
});
// Add remaining text
if (lastIndex < content.length) {
parts.push({ type: 'text', content: content.slice(lastIndex) });
}
return parts;
}, [content]);
return (
<div>
{segments.map((segment, i) =>
segment.type === 'text'
? <ReactMarkdown key={i}>{segment.content}</ReactMarkdown>
: <StableIframe key={segment.id} src={segment.src} />
)}
</div>
);
}
// Memoized iframe - never re-renders once mounted
const StableIframe = React.memo(({ src }: { src: string }) => {
const iframeRef = useRef<HTMLIFrameElement>(null);
// Handle Tako resize messages
useEffect(() => {
const handleResize = (event: MessageEvent) => {
if (event.data?.type === "tako::resize" &&
iframeRef.current?.contentWindow === event.source) {
iframeRef.current.style.height = `${event.data.height}px`;
}
};
window.addEventListener("message", handleResize);
return () => window.removeEventListener("message", handleResize);
}, []);
return <iframe ref={iframeRef} src={src} style={{ height: "400px", width: "100%", border: "none" }} />; // Added width and border for better display
});
Tako trả về giao diện biểu đồ được định dạng đầy đủ qua MCP Apps, vì vậy frontend chỉ cần nhúng iframe. Đây là những gì đang xảy ra:
- Các iframe được hiển thị dưới dạng các phần tử ngang hàng ổn định (bên ngoài
ReactMarkdown), ngăn chặn việc remount trong quá trình cập nhật streaming. - Mỗi phần tử nhúng sử dụng một ID xác định được dẫn xuất từ
srccủa nó, đảm bảo danh tính DOM nhất quán. - Các sự kiện thay đổi kích thước từ Tako (
tako::resize) được xử lý quapostMessageđể giữ cho biểu đồ phản hồi tốt. rehypeRaw: Cho phép ReactMarkdown hiển thị HTML thô (các phần tử iframe).
Kiểm tra mã hoàn chỉnh tại src/components/MarkdownRenderer.tsx.
✅ Model Selector Provider
ModelSelectorProvider (src\lib\model-selector-provider.tsx) là một wrapper tiện lợi để chuyển đổi giữa các cấu hình tác nhân khác nhau trong quá trình phát triển. Nó đọc các tham số truy vấn URL để xác định tác nhân/mô hình nào sẽ sử dụng.
"use client";
import React from "react";
import { createContext, useContext, useState, ReactNode } from "react";
type ModelSelectorContextType = {
model: string;
setModel: (model: string) => void;
agent: string;
lgcDeploymentUrl?: string | null;
hidden: boolean;
setHidden: (hidden: boolean) => void;
};
const ModelSelectorContext = createContext<ModelSelectorContextType | undefined>(undefined);
export const ModelSelectorProvider = ({ children }: { children: ReactNode }) => {
// Read model from URL query params (e.g., ?coAgentsModel=google_genai)
const model =
globalThis.window === undefined
? "openai"
: new URL(window.location.href).searchParams.get("coAgentsModel") ?? "openai";
const [hidden, setHidden] = useState<boolean>(false);
const setModel = (model: string) => {
const url = new URL(window.location.href);
url.searchParams.set("coAgentsModel", model);
window.location.href = url.toString();
};
// Optional: LangGraph Cloud deployment URL for production
const lgcDeploymentUrl =
globalThis.window === undefined
? null
: new URL(window.location.href).searchParams.get("lgcDeploymentUrl");
// Map model to agent name
let agent = "research_agent";
if (model === "google_genai") {
agent = "research_agent_google_genai";
} else if (model === "crewai") {
agent = "research_agent_crewai";
}
return (
<ModelSelectorContext.Provider
value={{ model, agent, lgcDeploymentUrl, hidden, setModel, setHidden }}
>
{children}
</ModelSelectorContext.Provider>
);
};
export const useModelSelectorContext = () => {
const context = useContext(ModelSelectorContext);
if (context === undefined) {
throw new Error("useModelSelectorContext must be used within a ModelSelectorProvider");
}
return context;
};
Kho lưu trữ (github.com/TakoData/tako-copilotkit/blob/main/src/components) cũng bao gồm các thành phần UI như Resources.tsx (thẻ tài nguyên), Progress.tsx (hiển thị nhật ký), EditResourceDialog.tsx và AddResourceDialog.tsx để quản lý tài nguyên, cộng với ChatInputWithModelSelector để chuyển đổi mô hình trong quá trình phát triển.
Bước 4: Cầu Nối Runtime (API Route)
Route API Next.js này hoạt động như một proxy mỏng giữa trình duyệt và tác nhân LangGraph Python. Nó:
- Chấp nhận các yêu cầu CopilotKit từ UI.
- Chuyển tiếp chúng đến tác nhân LangGraph qua HTTP.
- Truyền tải trạng thái và sự kiện của tác nhân trở lại frontend qua SSE.
Thay vì để frontend giao tiếp trực tiếp với tác nhân FastAPI, tất cả các yêu cầu đều đi qua một điểm cuối duy nhất /api/copilotkit.
import {
CopilotRuntime,
copilotRuntimeNextJSAppRouterEndpoint,
EmptyAdapter,
} from "@copilotkit/runtime";
import {
LangGraphHttpAgent,
LangGraphAgent,
} from "@copilotkit/runtime/langgraph";
import { NextRequest } from "next/server";
// const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
// const llmAdapter = new OpenAIAdapter({ openai } as any);
const llmAdapter = new EmptyAdapter();
const langsmithApiKey = process.env.LANGSMITH_API_KEY as string;
export const POST = async (req: NextRequest) => {
const searchParams = req.nextUrl.searchParams;
const deploymentUrl =
searchParams.get("lgcDeploymentUrl") || process.env.LGC_DEPLOYMENT_URL;
const baseUrl =
process.env.REMOTE_ACTION_URL || "http://localhost:2024/copilotkit";
let runtime = new CopilotRuntime({
agents: {
research_agent: new LangGraphHttpAgent({
url: `${baseUrl}/agents/research_agent`,
}),
research_agent_google_genai: new LangGraphHttpAgent({
url: `${baseUrl}/agents/research_agent_google_genai`,
}),
},
});
if (deploymentUrl) {
runtime = new CopilotRuntime({
agents: {
research_agent: new LangGraphAgent({
deploymentUrl,
langsmithApiKey,
graphId: "research_agent",
}),
research_agent_google_genai: new LangGraphAgent({
deploymentUrl,
langsmithApiKey,
graphId: "research_agent_google_genai",
}),
},
});
}
const { handleRequest } = copilotRuntimeNextJSAppRouterEndpoint({
runtime,
serviceAdapter: llmAdapter,
endpoint: "/api/copilotkit",
});
return handleRequest(req);
};
Frontend hoàn toàn phản ứng: khi tác nhân hoạt động, UI cập nhật theo thời gian thực.
4. Điều Phối Tác Nhân (LangGraph)
Bây giờ frontend đã có thể giao tiếp với tác nhân, chúng ta cần định nghĩa cách tác nhân “suy nghĩ” và hoạt động. Tác nhân backend nằm trong agents/python/src/.
Nó sử dụng LangGraph để điều phối một luồng công việc nghiên cứu đa bước (chat → search → download → report...) thông qua các bước được định nghĩa rõ ràng (nodes), mỗi bước chịu trách nhiệm cho một tác vụ cụ thể.
Mỗi bước:
- Nhận
AgentStatehiện tại. - Thực hiện một trách nhiệm.
- Trả về các cập nhật trạng thái một phần.
- Quyết định node nào sẽ chạy tiếp theo.
Bước 1: Định Nghĩa StateGraph (agent.py)
Luồng công việc được định nghĩa bằng StateGraph của LangGraph trong agent.py. Nó mô hình hóa trợ lý nghiên cứu như một luồng công việc có trạng thái, nơi mỗi bước (node) hoạt động trên trạng thái dùng chung và quyết định bước nào sẽ chạy tiếp theo.
Ở mức độ cao:
AgentStatelà bộ nhớ dùng chung trên toàn bộ tác nhân (tin nhắn, báo cáo, tài nguyên, nhật ký, …).- Mỗi node là một hàm async:
- Đọc từ trạng thái hiện tại.
- Cập nhật một phần của trạng thái đó.
- Trả về các hướng dẫn kiểm soát luồng.
- Các cạnh điều khiển luồng thực thi giữa các node (vòng lặp download → chat → search → download).
Một chi tiết quan trọng là hook interrupt_after, cho phép tác nhân tạm dừng thực thi để người dùng xác nhận trước khi xóa một tài nguyên (delete_node).
# agents/python/src/agent.py
import os
from langgraph.graph import StateGraph
from src.lib.chat import chat_node
from src.lib.delete import delete_node, perform_delete_node
from src.lib.download import download_node
from src.lib.search import search_node
from src.lib.state import AgentState
workflow = StateGraph(AgentState)
workflow.add_node("download", download_node)
workflow.add_node("chat_node", chat_node)
workflow.add_node("search_node", search_node)
workflow.add_node("delete_node", delete_node)
workflow.add_node("perform_delete_node", perform_delete_node)
workflow.set_entry_point("download")
workflow.add_edge("download", "chat_node")
workflow.add_edge("delete_node", "perform_delete_node")
workflow.add_edge("perform_delete_node", "chat_node")
workflow.add_edge("search_node", "download")
# Compile with interrupt support for deletion confirmation
compile_kwargs = {"interrupt_after": ["delete_node"]}
# Check if we're running in LangGraph API mode
if os.environ.get("LANGGRAPH_FASTAPI", "false").lower() == "false":
# When running in LangGraph API, don't use a custom checkpointer
graph = workflow.compile(**compile_kwargs)
else:
# For CopilotKit and other contexts, use MemorySaver
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
compile_kwargs["checkpointer"] = memory
graph = workflow.compile(**compile_kwargs)
Tại sao cấu trúc này hoạt động tốt:
- Vòng lặp
download → chat → search → downloadhỗ trợ nghiên cứu lặp đi lặp lại một cách tự nhiên. - Tùy chọn checkpointing (
MemorySaver) cho phép cùng một biểu đồ chạy cục bộ hoặc trên LangGraph Cloud (duy trì trạng thái).
Bước 2: Node Chat & Công Cụ (chat.py)
Node chat (chat_node) là bộ não của tác nhân. Nó điều phối LLM, định nghĩa các công cụ, quản lý streaming trạng thái và xử lý việc chèn biểu đồ. Nó:
- Định nghĩa các công cụ có thể gọi bằng LLM (
Search,WriteReport,WriteResearchQuestion,GenerateDataQuestions). - Cấu hình việc phát trạng thái trung gian (streaming câu hỏi nghiên cứu và câu hỏi dữ liệu khi chúng được tạo).
- Xây dựng các lời nhắc với các tài nguyên có sẵn và ngữ cảnh biểu đồ Tako.
- Chèn các iframe biểu đồ Tako vào báo cáo sau khi tạo.
- Định tuyến đến node tiếp theo dựa trên các lệnh gọi công cụ.
Đây là các công cụ cốt lõi (agents/python/src/lib/chat.py).
@tool
def Search(queries: List[str]):
"""A list of one or more search queries to find good resources."""
@tool
def WriteReport(report: str):
"""Write the research report."""
@tool
def WriteResearchQuestion(research_question: str):
"""Write the research question."""
@tool
def GenerateDataQuestions(questions: List[DataQuestion]):
"""
Generate 3-6 ATOMIC data-focused questions for Tako search.
CRITICAL: Each query should ask for ONE metric/dimension only.
BAD: "Compare US GDP and inflation" → compound query
GOOD: "US GDP 2020-2024", "US inflation rate" → separate atomic queries
"""
Đây là logic của node chat.
async def chat_node(state: AgentState, config: RunnableConfig):
# Configure intermediate state emission
config = copilotkit_customize_config(
config,
emit_intermediate_state=[
{"state_key": "research_question", "tool": "WriteResearchQuestion", "tool_argument": "research_question"},
{"state_key": "data_questions", "tool": "GenerateDataQuestions", "tool_argument": "questions"},
],
)
# Build Tako charts context for LLM
tako_charts_map = {}
available_tako_charts = []
for resource in state["resources"]:
if resource.get("resource_type") == "tako_chart":
title = resource.get("title", "")
card_id = resource.get("card_id")
description = resource.get("description", "")
tako_charts_map[title] = {"card_id": card_id, "embed_url": resource.get("embed_url")}
available_tako_charts.append(f" - **{title}**\n Description: {description}")
# Build prompt with workflow instructions
available_tako_charts_str = "\n".join(available_tako_charts) # Ensure this variable is defined
response = await model.bind_tools([Search, WriteReport, WriteResearchQuestion, GenerateDataQuestions]).ainvoke([
SystemMessage(content=f"""
You are a research assistant.
WORKFLOW:
1. Use WriteResearchQuestion to extract the core research question
2. Use GenerateDataQuestions to create 3-6 ATOMIC data questions (one metric per query)
3. Use Search for web resources
4. Write a comprehensive report using WriteReport
AVAILABLE CHARTS ({len(tako_charts_map)}):
{available_tako_charts_str}
WRITING GUIDELINES:
- Write a detailed analysis with substantial narrative text
- For EACH chart, write 1-2 paragraphs discussing insights and trends
- DO NOT include chart markers or image syntax - charts will be inserted automatically
- Reference specific data points from chart descriptions
"""),
*state["messages"],
], config)
# Handle tool calls
if response.tool_calls:
if response.tool_calls[0]["name"] == "WriteReport":
report = response.tool_calls[0]["args"]["report"]
# Clean up any markdown images LLM incorrectly added
report = re.sub(r'!\[[^\]]*\]\([^)]+\)', '', report)
# Inject charts using second LLM pass
if tako_charts_map:
chart_list = "\n".join([f"- {title}" for title in tako_charts_map.keys()])
inject_response = await model.ainvoke([
SystemMessage(content=f"""Insert [CHART:exact_title] markers at appropriate positions.
AVAILABLE CHARTS:
{chart_list}
RULES:
- Place markers AFTER relevant paragraphs
- NEVER place more than two charts consecutively
- Each chart is used exactly once
"""),
HumanMessage(content=f"Insert chart markers:\n\n{report}")
])
report_with_markers = inject_response.content
# Replace markers with actual iframe HTML
async def replace_marker(match):
chart_title = match.group(1).strip()
chart_info = tako_charts_map.get(chart_title)
if chart_info:
iframe_html = await get_visualization_iframe(
item_id=chart_info.get("card_id"),
embed_url=chart_info.get("embed_url")
)
return "\n" + iframe_html + "\n" if iframe_html else ""
return ""
# Apply replacements
import re # Ensure re module is imported
markers = list(re.finditer(r'\[CHART:([^\]]+)\]', report_with_markers))
processed_report = report_with_markers
# Process in reverse to avoid index issues after replacement
for match in reversed(markers):
replacement = await replace_marker(match)
processed_report = processed_report[:match.start()] + replacement + processed_report[match.end():]
return {"goto": "chat_node", "update": {"report": processed_report, "messages": state["messages"] + [response]}} # Return dictionary for StateGraph
else:
return {"goto": "chat_node", "update": {"report": report, "messages": state["messages"] + [response]}}
elif response.tool_calls[0]["name"] == "GenerateDataQuestions":
data_questions = response.tool_calls[0]["args"]["questions"]
return {"goto": "search_node", "update": {"data_questions": data_questions, "messages": state["messages"] + [response]}}
# Route based on tool call
goto = "__end__"
if response.tool_calls:
tool_name = response.tool_calls[0]["name"]
if tool_name == "Search":
goto = "search_node"
elif tool_name == "DeleteResources":
goto = "delete_node"
# Always append response to messages to maintain chat history
new_messages = state["messages"] + [response]
return {"goto": goto, "update": {"messages": new_messages}} # Return dictionary for StateGraph
Cách node này hoạt động trong thực tế:
- LLM không bao giờ viết văn bản tự do trực tiếp. Mọi hành động có ý nghĩa đều thông qua một công cụ rõ ràng (
WriteResearchQuestion,GenerateDataQuestions,Search,WriteReport). Điều này giữ cho các thay đổi trạng thái mang tính xác định và dễ hiểu. - Trạng thái trung gian (câu hỏi nghiên cứu và câu hỏi dữ liệu) được truyền tải ngay lập tức bằng cách sử dụng
copilotkit_customize_config. Bản thân báo cáo không được truyền tải để tránh hiện tượng nhấp nháy UI trong khi các biểu đồ đang được chèn sau này. - Việc xử lý biểu đồ được thực hiện có chủ đích qua hai giai đoạn: mô hình đầu tiên viết một báo cáo tường thuật sạch, sau đó một lượt thứ hai chèn các đánh dấu
[CHART:tiêu đề], cuối cùng được thay thế bằng các iframe Tako thực tế. Điều này giúp giữ cho việc tạo ổn định và an toàn về bố cục. - Luồng điều khiển là rõ ràng. Mỗi lệnh gọi công cụ dẫn đến một
Command(được chuyển thành dictionary{"goto": ..., "update": {...}}trong LangGraph Python) định tuyến thực thi đến node tiếp theo (search_node,delete_node, hoặc trở lạichat_node), giúp hành vi của tác nhân dễ đoán và dễ gỡ lỗi hơn.
Bước 3: Node Tìm Kiếm (search.py)
Node tìm kiếm (search_node) chạy tìm kiếm web (Tavily) và tìm kiếm dữ liệu biểu đồ có cấu trúc (Tako qua MCP) song song, truyền tải tiến độ đến UI, loại bỏ trùng lặp kết quả và trích xuất các tài nguyên phù hợp nhất.
- Các tìm kiếm Tavily và Tako chạy đồng thời bằng cách sử dụng
asyncio.gather, vì vậy các trang web và tập dữ liệu có cấu trúc được tìm nạp cùng lúc. TavilyClientlà đồng bộ (sử dụng thư việnrequests), vì vậy chúng ta bọc nó bằngloop.run_in_executor()để tránh chặn vòng lặp sự kiện async.- Khi mỗi tìm kiếm hoàn tất,
copilotkit_emit_stateđược gọi để cập nhậtstate.logs, cho phép frontend hiển thị tiến độ theo thời gian thực. - Kết quả biểu đồ Tako được loại bỏ trùng lặp bằng cách sử dụng tiêu đề để tránh nhúng cùng một biểu đồ nhiều lần.
- Kết quả web và kết quả biểu đồ được truyền qua một công cụ LLM (
ExtractResources) để chỉ chọn ra các tài nguyên phù hợp nhất. - Mỗi tài nguyên được gắn thẻ là
webhoặctako_chart, mà frontend sử dụng để quyết định có nên hiển thị một liên kết hay một biểu đồ được nhúng.
Đây là mã cho Tavily async wrapper (kết nối client đồng bộ với luồng công việc async):
# agents/python/src/lib/search.py
from tavily import TavilyClient
import asyncio
import os # Import os for environment variables
_tavily_client = None
def get_tavily_client():
"""Lazy initialization to avoid repeated client creation"""
global _tavily_client
if _tavily_client is None:
_tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
return _tavily_client
async def async_tavily_search(query: str) -> Dict[str, Any]:
"""
Async wrapper for synchronous Tavily client.
TavilyClient.search() is blocking, so we run it in a thread pool.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: get_tavily_client().search(
query=query,
search_depth="advanced",
include_answer=True,
max_results=5,
),
)
Triển khai node tìm kiếm (thực thi song song và trích xuất tài nguyên):
# agents/python/src/lib/search.py (core search pattern)
from typing import List, Dict, Any # Added for type hinting
from src.lib.state import AgentState, Resource, DataQuestion # Added imports
from langchain_core.runnables import RunnableConfig # Added imports for RunnableConfig
from langchain_core.messages import SystemMessage, HumanMessage # Added imports for messages
from langchain_core.tools import tool # Added imports for tool decorator
from src.lib.model import get_model # Added imports
from src.lib.mcp_integration import search_knowledge_base, get_visualization_iframe # Added imports
from copilotkit.langgraph import copilotkit_emit_state, copilotkit_customize_config # Added imports
@tool
def ExtractResources(resources: List[Resource]):
"""
Extract the most relevant resources from the search results.
Each resource should have a 'title', 'url', and 'summary' (or 'description').
"""
pass # This tool is used for type hinting and LLM interaction, not direct execution
async def search_node(state: AgentState, config: RunnableConfig):
"""
Perform Tavily web search and Tako MCP chart search in parallel.
Stream progress, dedupe results, extract top resources.
"""
# Extract queries from tool_calls or fall back to research_question
# (Actual code handles dual routing: Search tool OR GenerateDataQuestions)
queries = [tc["args"]["queries"] for m in state["messages"] for tc in m.tool_calls if tc["name"] == "Search"]
if not queries and state.get("research_question"): # Fallback if no explicit Search tool call
# Simple heuristic for generating initial web search query from research question
queries = [state["research_question"]]
queries = [q for sublist in queries for q in sublist] # Flatten list of lists
data_questions = state.get("data_questions", [])
# PHASE 1: Build parallel tasks
tavily_tasks = [async_tavily_search(query) for query in queries]
tako_tasks = [
search_knowledge_base(q["question"], search_effort="fast")
for q in data_questions
]
all_tasks = tavily_tasks + tako_tasks
# Initialize logs for progress tracking
new_logs = []
for query in queries:
new_logs.append({"message": f"Web search: {query}", "done": False})
for q in data_questions:
new_logs.append({"message": f"Tako search: {q['question']}", "done": False})
state["logs"] = state.get("logs", []) + new_logs # Append to existing logs
if all_tasks:
await copilotkit_emit_state(config, state)
# PHASE 2: Run all searches concurrently
all_results = await asyncio.gather(*all_tasks, return_exceptions=True)
# Split results back into Tavily and Tako
num_tavily = len(tavily_tasks)
tavily_results = all_results[:num_tavily]
tako_results_raw = all_results[num_tavily:]
# PHASE 3: Stream completion and process results
processed_tavily_results = []
tako_results = []
# Process Tavily results
for i, result in enumerate(tavily_results):
if isinstance(result, Exception):
processed_tavily_results.append({"error": str(result)})
state["logs"][i]["message"] = f"Web search failed: {queries[i]} - {str(result)}"
else:
processed_tavily_results.append(result)
state["logs"][i]["message"] = f"Web search completed: {queries[i]}"
state["logs"][i]["done"] = True
await copilotkit_emit_state(config, state)
# Process Tako results
tako_log_offset = num_tavily
for i, result in enumerate(tako_results_raw):
if isinstance(result, Exception):
tako_results.append({"error": str(result)})
state["logs"][tako_log_offset + i]["message"] = f"Tako search failed: {data_questions[i]['question']} - {str(result)}"
elif result:
tako_results.extend(result)
state["logs"][tako_log_offset + i]["message"] = f"Tako search completed: {data_questions[i]['question']}"
state["logs"][tako_log_offset + i]["done"] = True
await copilotkit_emit_state(config, state)
# PHASE 4: Deduplicate Tako charts by title
# (Same chart may appear in multiple searches)
seen_titles = set()
unique_tako = []
for chart in tako_results:
title = chart.get("title", "")
if title and title not in seen_titles:
seen_titles.add(title)
unique_tako.append(chart)
elif not title: # Include charts without titles if any
unique_tako.append(chart)
# PHASE 5: Use LLM to extract the most relevant resources
model = get_model(state)
# Prepare search message with both web and Tako results
search_message_parts = []
if processed_tavily_results:
search_message_parts.append(f"Web search results: {processed_tavily_results}")
if unique_tako:
search_message_parts.append(f"Tako chart results: {unique_tako}")
full_search_message = "\n\n".join(search_message_parts)
# Build extraction prompt
extract_messages = [
SystemMessage(content="""
You are a research assistant. Your task is to extract the 3-5 most relevant resources from the provided search results.
Prioritize Tako charts when they directly match the research topic or data questions.
For each resource, provide a 'title', 'url', and a concise 'summary' or 'description'.
Ensure the output is a JSON array of resources.
"""),
*state["messages"],
SystemMessage(content=f"Search results:\n{full_search_message}")
]
# Call LLM with ExtractResources tool
response = await model.bind_tools(
[ExtractResources],
tool_choice={"type": "function", "function": {"name": "ExtractResources"}} # Explicitly choose the tool
).ainvoke(extract_messages, config)
# PHASE 6: Tag resources with type and attach content
extracted_resources = response.tool_calls[0]["args"]["resources"] if response.tool_calls else []
final_resources = []
for resource_item in extracted_resources:
is_tako = False
# Check if this is a Tako chart by matching URL or title
for tako_result in unique_tako:
if (resource_item.get("url") == tako_result.get("url") or
resource_item.get("title") == tako_result.get("title")):
is_tako = True
resource_item["resource_type"] = "tako_chart"
resource_item["source"] = "Tako"
resource_item["card_id"] = tako_result.get("id")
resource_item["embed_url"] = tako_result.get("embed_url")
resource_item["content"] = tako_result.get("description", resource_item.get("summary", "")) # Use Tako description or LLM summary
break
if not is_tako:
resource_item["resource_type"] = "web"
resource_item["source"] = "Tavily Web Search"
# Find matching Tavily result for content summary if not already present
if not resource_item.get("content") and not resource_item.get("summary"):
for search_result in processed_tavily_results:
if isinstance(search_result, dict) and "results" in search_result:
for tavily_item in search_result["results"]:
if tavily_item.get("url") == resource_item.get("url"):
resource_item["content"] = tavily_item.get("content", "")
break
# Ensure 'summary' is consistent with 'content' for frontend rendering logic
if resource_item.get("content") and not resource_item.get("summary"):
resource_item["summary"] = resource_item["content"]
elif resource_item.get("summary") and not resource_item.get("content"):
resource_item["content"] = resource_item["summary"]
final_resources.append(resource_item)
# Deduplicate and add to state resources (considering existing resources)
current_resource_urls = {r["url"] for r in state.get("resources", []) if r.get("url")}
new_unique_resources = [r for r in final_resources if r.get("url") and r["url"] not in current_resource_urls]
state["resources"] = state.get("resources", []) + new_unique_resources
# The agent will then transition to the download_node or chat_node
return {"goto": "download", "update": {"messages": state["messages"] + [response], "resources": state["resources"], "logs": state["logs"]}}
Phiên bản đơn giản hóa này cho thấy mô hình tìm kiếm cốt lõi. Mã sản xuất bổ sung các tìm kiếm dự phòng khi Tako không trả về kết quả, tạo HTML iframe để nhúng biểu đồ qua get_visualization_iframe(), áp dụng giới hạn tài nguyên và xử lý định tuyến kép cho các đường dẫn công cụ khác nhau.
Kiểm tra việc triển khai hoàn chỉnh tại agents/python/src/lib/search.py.
5. Backend: Tích Hợp MCP & Công Cụ
Tại thời điểm này, giao diện người dùng đã được kết nối, biểu đồ tác nhân đã được định nghĩa và luồng thực thi đã rõ ràng. Phần còn lại là công việc backend thực sự tìm nạp dữ liệu, giao tiếp với các hệ thống bên ngoài và biến kết quả thành thứ mà tác nhân có thể suy luận. Phần thú vị nhất ở đây là tích hợp MCP cho biểu đồ Tako. Các node backend còn lại có chủ đích đơn giản.
Mô Hình MCP Apps Cho Biểu Đồ Tako
Tako là một nền tảng trực quan hóa dữ liệu với các biểu đồ có thể tìm kiếm về kinh tế, thị trường, nhân khẩu học và nhiều hơn nữa.
Những gì Tako cung cấp:
- Danh mục biểu đồ có thể tìm kiếm qua công cụ
knowledge_search. - Tìm kiếm biểu đồ với nỗ lực có thể cấu hình:
"fast"(tra cứu nhanh),"medium", hoặc"deep"(tìm kiếm kỹ lưỡng). - HTML iframe sẵn sàng nhúng qua công cụ
open_chart_ui. - Biểu đồ tự động thay đổi kích thước qua giao thức
postMessage.
Đây là ý tưởng cốt lõi đằng sau MCP Apps: các công cụ trả về các thành phần UI, chứ không chỉ dữ liệu.
Module này (mcp_integration.py) là nơi tác nhân kết nối với Tako qua Model Context Protocol (MCP). Tôi đã chia nhỏ các phần cốt lõi của tích hợp này để dễ theo dõi hơn.
MCP Client: Quản Lý Phiên
Lớp SimpleMCPClient xử lý vòng đời của Model Context Protocol:
- Kết nối SSE thiết lập và duy trì ID phiên.
- Tin nhắn HTTP gửi các lệnh gọi công cụ JSON-RPC đến máy chủ.
- Tự động kết nối lại khi phiên hết hạn (lỗi 404/410).
# core client logic -- agents/python/src/lib/mcp_integration.py
import asyncio
import httpx
import json
import os
from typing import Optional, Dict, Any, List
# Ensure these environment variables are loaded, e.g., using python-dotenv
TAKO_MCP_URL = os.getenv("TAKO_MCP_URL")
TAKO_API_TOKEN = os.getenv("TAKO_API_TOKEN")
TAKO_URL = os.getenv("TAKO_URL")
DATA_SOURCE_URL = TAKO_URL # Use TAKO_URL as DATA_SOURCE_URL
class SessionExpiredException(Exception):
pass
class SimpleMCPClient:
"""
Minimal MCP client following the Model Context Protocol spec.
Handles connection lifecycle, session management, and message passing.
"""
def __init__(self, base_url: str):
self.base_url = base_url
self.session_id: Optional[str] = None
self._client = httpx.AsyncClient(timeout=30.0) # Increased timeout for robustness
self._sse_task: Optional[asyncio.Task] = None
self._responses: Dict[int, asyncio.Future] = {}
self.message_id_counter = 0
@property
def message_id(self) -> int:
self.message_id_counter += 1
return self.message_id_counter
async def initialize(self):
if not self.session_id:
await self.connect()
async def connect(self):
"""Connect to MCP server via SSE to get session ID."""
print(f"Connecting to MCP server: {self.base_url}/sessions/")
try:
async with self._client.stream("POST", f"{self.base_url}/sessions/", headers={"Accept": "text/event-stream"}) as response:
response.raise_for_status()
async for chunk in response.aiter_bytes():
event_data = chunk.decode("utf-8").strip()
if event_data.startswith("data:"):
data = json.loads(event_data[5:])
if "session_id" in data:
self.session_id = data["session_id"]
print(f"MCP Session ID established: {self.session_id}")
# Start SSE reader in background after session ID is established
self._sse_task = asyncio.create_task(self._sse_reader(response))
return True
print("Failed to get session_id from SSE stream.")
return False
except httpx.HTTPStatusError as e:
print(f"HTTP error connecting to MCP: {e}")
return False
except Exception as e:
print(f"Error connecting to MCP SSE: {e}")
return False
async def _sse_reader(self, response_stream: httpx.AsyncByteStream):
"""Reads SSE events and updates internal state."""
buffer = ""
try:
async for chunk in response_stream.aiter_bytes():
buffer += chunk.decode("utf-8")
while "\n\n" in buffer:
event_str, buffer = buffer.split("\n\n", 1)
if event_str.startswith("data:"):
data = json.loads(event_str[5:])
if "id" in data and data["id"] in self._responses:
self._responses[data["id"]].set_result(data)
del self._responses[data["id"]]
except asyncio.CancelledError:
print("SSE reader task cancelled.")
except Exception as e:
print(f"Error in SSE reader: {e}")
self.session_id = None # Invalidate session on reader error
if self._sse_task and not self._sse_task.done():
self._sse_task.cancel() # Cancel self to allow for reconnection logic
async def reconnect(self):
"""Reconnect with new session on expiry."""
print("Attempting to reconnect MCP session...")
if self._sse_task and not self._sse_task.done():
self._sse_task.cancel()
try:
await self._sse_task # Wait for task to finish cancelling
except asyncio.CancelledError:
pass
# Clear state and reconnect
self.session_id = None
self._responses.clear()
if await self.connect():
print("MCP reconnected successfully.")
return True
else:
print("Failed to reconnect MCP.")
return False
async def _send(self, method: str, params: Optional[dict] = None, _retry: bool = True):
"""
Send JSON-RPC message to server via HTTP.
Automatically reconnects and retries once on session expiration (410/404).
"""
if not self.session_id:
await self.initialize()
if not self.session_id:
raise SessionExpiredException("Could not establish MCP session.")
current_message_id = self.message_id
msg = {"jsonrpc": "2.0", "id": current_message_id, "method": method}
if params:
msg["params"] = params
future = asyncio.get_event_loop().create_future()
self._responses[current_message_id] = future
try:
resp = await self._client.post(
f"{self.base_url}/messages/?session_id={self.session_id}",
json=msg,
)
# Handle session expiration
if resp.status_code in (404, 410):
print(f"MCP session expired (status {resp.status_code}). Retrying...")
if _retry:
if await self.reconnect():
return await self._send(method, params, _retry=False)
else:
raise SessionExpiredException("Failed to reconnect after session expiry.")
raise SessionExpiredException(f"Session expired ({resp.status_code}) and no retry possible.")
resp.raise_for_status() # Raise for other HTTP errors
except httpx.HTTPStatusError as e:
print(f"HTTPStatusError sending MCP message: {e}")
if e.response.status_code in (404, 410) and _retry:
print(f"MCP session expired (status {e.response.status_code}). Retrying...")
if await self.reconnect():
return await self._send(method, params, _retry=False)
else:
raise SessionExpiredException("Failed to reconnect after session expiry.")
raise
except Exception as e:
print(f"General error sending MCP message: {e}")
raise
# Wait for result from SSE reader
return await asyncio.wait_for(future, timeout=120.0)
# Global client instance (reused across all calls)
_mcp_client: Optional[SimpleMCPClient] = None
async def get_mcp_client() -> SimpleMCPClient:
global _mcp_client
if _mcp_client is None:
if not TAKO_MCP_URL:
raise ValueError("TAKO_MCP_URL environment variable is not set.")
_mcp_client = SimpleMCPClient(base_url=TAKO_MCP_URL)
await _mcp_client.initialize() # Ensure client is initialized on first use
elif not _mcp_client.session_id: # Reinitialize if session is lost
await _mcp_client.reconnect()
return _mcp_client
async def _call_mcp_tool(method: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
client = await get_mcp_client()
try:
response = await client._send(method, params)
if "result" in response:
return response["result"]
elif "error" in response:
print(f"MCP tool call error for {method}: {response['error']}")
return None
return response
except SessionExpiredException as e:
print(f"MCP Session expired during _call_mcp_tool: {e}. Attempting to reconnect automatically.")
# Reconnection logic is handled within _send, so this exception means a retry also failed
return None
except Exception as e:
print(f"Error calling MCP tool {method}: {e}")
return None
Một thể hiện client toàn cầu duy nhất duy trì phiên trên tất cả các lệnh gọi công cụ. Nếu một phiên hết hạn giữa cuộc trò chuyện, client sẽ tự động kết nối lại và thử lại một lần.
Tìm Kiếm Cơ Sở Tri Thức Của Tako
Hàm search_knowledge_base bọc công cụ knowledge_search của MCP để tìm các biểu đồ phù hợp. Mỗi kết quả trở thành một Resource với resource_type = "tako_chart". Node search_node chạy các truy vấn này song song với tìm kiếm web của Tavily bằng asyncio.gather() để tăng tốc độ (như chúng ta đã thảo luận ở phần trước).
async def search_knowledge_base(
query: str,
count: int = 5,
search_effort: str = "fast"
) -> List[Dict[str, Any]]:
"""
Search Tako knowledge base via MCP 'knowledge_search' tool.
Returns a list of charts with metadata (id, title, description, embed_url).
"""
if not TAKO_API_TOKEN:
print("TAKO_API_TOKEN is not set. Skipping Tako knowledge search.")
return []
print(f"Searching Tako knowledge base for: '{query}' with effort: {search_effort}")
result = await _call_mcp_tool("knowledge_search", {
"query": query,
"api_token": TAKO_API_TOKEN,
"count": count,
"search_effort": search_effort
})
if result and "results" in result:
formatted_results = []
for item in result["results"]:
item_id = item.get("card_id") or item.get("id")
title = item.get("title", "")
description = item.get("description", "")
# Construct embed_url using TAKO_URL if item_id is available
embed_url = f"{DATA_SOURCE_URL}/embed/{item_id}/?theme=dark" if item_id and DATA_SOURCE_URL else None
card_url = f"{DATA_SOURCE_URL}/card/{item_id}" if item_id and DATA_SOURCE_URL else None
formatted_results.append({
"type": "data_visualization",
"id": item_id,
"title": title,
"description": description,
"embed_url": embed_url,
"url": card_url # Use card URL for direct link if needed
})
print(f"Found {len(formatted_results)} Tako charts for query: '{query}'")
return formatted_results
print(f"No Tako charts found for query: '{query}'")
return []
Tìm Nạp Giao Diện Biểu Đồ Có Thể Nhúng
Hàm get_visualization_iframe là nơi **MCP Apps** tỏa sáng. Thay vì trả về dữ liệu biểu đồ, nó trả về toàn bộ HTML iframe.
Có hai đường dẫn liên quan ở đây:
- MCP UI (
open_chart_ui): Trả về HTML iframe hoàn chỉnh với các điều khiển tương tác và logic thay đổi kích thước. - URL nhúng dự phòng: Tạo HTML iframe cơ bản thủ công nếu lệnh gọi MCP thất bại.
HTML được trả về được chèn trực tiếp vào báo cáo markdown. MarkdownRenderer của frontend trích xuất các khối iframe này và hiển thị chúng dưới dạng các phần tử ngang hàng với nội dung markdown (ngăn ngừa nhấp nháy trong quá trình cập nhật streaming).
async def get_visualization_iframe(item_id: Optional[str] = None, embed_url: Optional[str] = None) -> Optional[str]:
"""
Get embeddable iframe HTML for a Tako chart.
Uses the MCP 'open_chart_ui' tool with automatic session reconnection.
Returns ready-to-embed iframe HTML with resize script.
"""
if not TAKO_API_TOKEN:
print("TAKO_API_TOKEN is not set. Cannot fetch Tako visualization iframe via MCP.")
return None
if item_id:
print(f"Attempting to get iframe for Tako item_id: {item_id} via MCP open_chart_ui.")
result = await _call_mcp_tool("open_chart_ui", {
"pub_id": item_id,
"api_token": TAKO_API_TOKEN, # Pass API token for authentication
"dark_mode": True,
"width": 900,
"height": 600
})
# Extract HTML from MCP response
if result and result.get("type") == "resource":
resource = result.get("resource", {})
html_content = resource.get("htmlString") or resource.get("text")
if html_content:
print(f"Successfully retrieved iframe HTML for item_id: {item_id} via MCP.")
return html_content
# Fallback: Generate iframe HTML with embed_url if MCP call fails or item_id is missing
if embed_url:
print(f"Falling back to generating iframe HTML for embed_url: {embed_url}")
return f'''<iframe
width="100%"
height="600"
src="{embed_url}"
scrolling="no"
frameborder="0"
></iframe>
<script type="text/javascript">
window.addEventListener("message", function(e) {{
if (e.data.type !== "tako::resize") return;
for (let iframe of document.querySelectorAll("iframe")) {{
if (iframe.contentWindow === e.source) {{
iframe.style.height = e.data.height + "px";
}}
}}
}});
</script>'''
print("Could not get visualization iframe: No item_id for MCP or valid embed_url provided.")
return None
Đây là cách MCP Apps thực sự hoạt động. Tác nhân viết các đánh dấu [CHART:tiêu đề] trong báo cáo, và backend thay thế chúng bằng HTML iframe đầy đủ từ Tako. Frontend không bao giờ cần biết cách hiển thị biểu đồ Tako; nó chỉ đơn giản nhúng iframe. Kiểm tra mã hoàn chỉnh tại agents/python/src/lib/mcp_integration.py.
Các Node Backend Khác (Tóm Tắt)
Kho lưu trữ bao gồm một vài node trợ giúp khác, nhưng chúng tuân theo các mô hình tiêu chuẩn và không giới thiệu các khái niệm mới, vì vậy tôi đã bỏ qua chúng để tránh độ dài không cần thiết.
download_node: Tải xuống nội dung tài nguyên web và lưu vào bộ đệm (cắt ngắn thành 3000 ký tự để giảm phình to ngữ cảnh).delete_node: xử lý việc xóa tài nguyên với xác nhận của người dùng (mô hình ngắt quãng cho giao diện người dùng sinh tạo).state.py: định nghĩa các schema TypedDict choAgentState,Resource,DataQuestion, vàLog.model.py: hàm để lấy thể hiện LLM (OpenAI/Anthropic/Google) dựa trên cấu hình môi trường.
6. Chạy Ứng Dụng
Sau khi hoàn thành tất cả các phần của mã, đã đến lúc chạy nó cục bộ. Hãy đảm bảo bạn đã thêm các thông tin xác thực vào tệp .env.local trong thư mục gốc.
Cài đặt tất cả các dependency (frontend + tác nhân Python) bằng cách chạy lệnh này từ thư mục gốc. Lệnh này cài đặt các gói frontend Next.js và cũng chạy uv sync trong agents/python để cài đặt các dependency Python.
npm install
Kho lưu trữ này chạy frontend và backend cùng nhau ở chế độ phát triển. Chỉ cần chạy lệnh sau từ thư mục gốc:
npm run dev


Đằng sau hậu trường, điều này được kết nối bằng concurrently trong package.json:
"dev": "concurrently -k -n ui,agent -c blue,red \"PORT=3000 npm run dev:ui\" \"PORT=2024 npm run dev:agent\"",
"dev:ui": "next dev",
"dev:agent": "cd agents/python && uv run main.py"
dev:ui→ chạy frontend Next.js trên cổng 3000.dev:agent→ chạy backend tác nhân FastAPI Python trên cổng 2024 (sử dụnguv run).concurrentlychạy cả hai song song với đầu ra màu (xanh cho UI, đỏ cho tác nhân).
Bạn có thể điều hướng đến http://localhost:3000 trong trình duyệt của mình để xem cục bộ.
7. Luồng Dữ Liệu
Bây giờ chúng ta đã xây dựng frontend (Next.js + CopilotKit), điều phối tác nhân (LangGraph) và các tích hợp backend (MCP + Tavily), đây là cách dữ liệu thực sự luân chuyển giữa chúng.
Điều này sẽ dễ dàng theo dõi nếu bạn đã xây dựng theo cho đến nay.
[Người dùng nhập câu hỏi nghiên cứu]
↓
Next.js UI (CopilotKit Provider + CopilotChat + ResearchCanvas)
↓
POST /api/copilotkit (điểm cuối runtime của CopilotKit)
↓
CopilotRuntime → Tác nhân Python (LangGraph)
↓
Điều phối StateGraph của LangGraph
├─ chat_node: diễn giải ý định, quyết định công cụ
│ └─ (tạo báo cáo với các đánh dấu [CHART:tiêu đề], sau đó thay thế bằng HTML iframe)
├─ search_node: thu thập song song + loại bỏ trùng lặp/chuẩn hóa tài nguyên
│ ├─ Tìm kiếm Tavily (kết quả web)
│ └─ Tìm kiếm Tako MCP (trả về siêu dữ liệu biểu đồ)
└─ download_node: lưu nội dung tài nguyên web vào bộ đệm
↓
copilotkit_emit_state(): truyền tải AgentState trung gian với báo cáo chứa HTML iframe (SSE)
↓
Frontend nhận các cập nhật trạng thái (tài nguyên/báo cáo với các iframe được nhúng)
↓
MarkdownRenderer trích xuất các khối iframe <!doctype html>...</html> và hiển thị chúng dưới dạng các phần tử ngang hàng với markdown
↓
Người dùng nhìn thấy: báo cáo streaming + danh sách tài nguyên + biểu đồ tương tác được nhúng
Vậy là xong! 🎉
Bạn hiện đã có một trợ lý nghiên cứu truyền tải trạng thái của tác nhân vào giao diện người dùng chat + canvas, kết hợp tìm kiếm web với biểu đồ được hỗ trợ bởi MCP, và tạo ra một báo cáo tương tác.
Điều làm cho điều này hoạt động là cấu trúc: trạng thái tác nhân rõ ràng, ranh giới công cụ rõ ràng và giao diện người dùng hiển thị tiến độ thay vì ẩn nó đi. Khi những phần đó đã được thiết lập, việc thêm các công cụ MCP hoặc ràng buộc mới chỉ là một chi tiết triển khai.
Tôi hy vọng bạn đã học được điều gì đó có giá trị. Chúc một ngày tốt lành!











