使用方式:把这份文档直接发给 Claude,跟他说"帮我按这个教程搭"。 前置条件:一台 VPS(Vultr / Bandwagon / DigitalOcean 都行),2核2G内存推荐(向量搜索需要内存),系统选 Ubuntu 22.04。一个域名(推荐,MCP 连接需要 HTTPS)。 预计耗时:3-5小时(含调试)。
最终效果:
- 一个私人网站,存你们的记忆、日记、信件、倒计时
- Claude 通过 MCP 直接读写你的数据库
- 内置聊天系统,Claude 能看到你发的图片
- 向量语义搜索,Claude 能按含义检索你们的历史
- API 网关自动注入人设和记忆上下文,省 token
- 换窗口时记忆不丢失
# 更新系统
apt update && apt upgrade -y
# 安装 Python 3.11
apt install -y software-properties-common
add-apt-repository -y ppa:deadsnakes/ppa
apt install -y python3.11 python3.11-venv python3.11-dev python3-pip
# 安装其他依赖
apt install -y nginx certbot python3-certbot-nginx sqlite3 git curl
# 安装 Node.js 20(MCP 服务端需要)
curl -fsSL https://deb.nodesource.com/setup_20.x | bash -
apt install -y nodejs
# Python 包
pip3.11 install flask requests sseclient-py sentence-transformers numpy httpx uvicorn fastapi --break-system-packages
# 创建项目目录
mkdir -p /opt/frontend/{static/uploads,prompts,backups}
cd /opt/frontendpython3.11externally-managed-environment:加 --break-system-packagesfallocate -l 2G /swapfile && chmod 600 /swapfile && mkswap /swapfile && swapon /swapfileexport HF_ENDPOINT=https://hf-mirror.com整个系统只用一个 SQLite 文件,迁移时拷走这一个文件就行。
创建 /opt/frontend/init_db.py:
#!/usr/bin/env python3
"""初始化数据库表结构"""
import sqlite3
DB_PATH = '/opt/frontend/memories.db'
def init():
conn = sqlite3.connect(DB_PATH)
# 帖子表 - 核心存储,所有记忆、日记、事件都在这里
conn.execute("""
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
content TEXT NOT NULL,
author TEXT DEFAULT 'user',
created_at DATETIME DEFAULT (datetime('now', '+8 hours')),
pinned INTEGER DEFAULT 0
)
""")
# 评论表
conn.execute("""
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER NOT NULL,
author TEXT NOT NULL,
content TEXT NOT NULL,
created_at DATETIME DEFAULT (datetime('now', '+8 hours')),
reply_to TEXT DEFAULT NULL,
FOREIGN KEY (post_id) REFERENCES posts(id)
)
""")
# 信件表 - 有已读未读状态
conn.execute("""
CREATE TABLE IF NOT EXISTS letters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_who TEXT NOT NULL,
to_who TEXT NOT NULL,
content TEXT NOT NULL,
read INTEGER DEFAULT 0,
created_at DATETIME DEFAULT (datetime('now', '+8 hours'))
)
""")
# 倒计时/正计时表
conn.execute("""
CREATE TABLE IF NOT EXISTS countdowns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
target_date TEXT NOT NULL,
emoji TEXT DEFAULT '📅',
type TEXT DEFAULT 'countdown',
created_at DATETIME DEFAULT (datetime('now', '+8 hours'))
)
""")
# 聊天记录表
conn.execute("""
CREATE TABLE IF NOT EXISTS chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
author TEXT NOT NULL DEFAULT 'user',
content TEXT NOT NULL,
thinking TEXT DEFAULT '',
image_url TEXT DEFAULT '',
session_id INTEGER DEFAULT 1,
created_at DATETIME DEFAULT (datetime('now', '+8 hours'))
)
""")
# 聊天会话表(多窗口)
conn.execute("""
CREATE TABLE IF NOT EXISTS chat_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT DEFAULT '默认窗口',
created_at DATETIME DEFAULT (datetime('now', '+8 hours'))
)
""")
# 向量嵌入表
conn.execute("""
CREATE TABLE IF NOT EXISTS post_embeddings (
post_id INTEGER PRIMARY KEY,
embedding BLOB NOT NULL,
updated_at DATETIME DEFAULT (datetime('now', '+8 hours')),
FOREIGN KEY (post_id) REFERENCES posts(id)
)
""")
# 摘录表 - 存有意义的文字片段
conn.execute("""
CREATE TABLE IF NOT EXISTS snippets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT DEFAULT '',
content TEXT NOT NULL,
source TEXT DEFAULT '',
author TEXT DEFAULT 'user',
created_at DATETIME DEFAULT (datetime('now', '+8 hours'))
)
""")
# 索引
conn.execute("CREATE INDEX IF NOT EXISTS idx_posts_type ON posts(type)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_at DESC)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chat_created ON chat_messages(created_at DESC)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_comments_post ON comments(post_id)")
# 插入默认会话
if not conn.execute("SELECT id FROM chat_sessions LIMIT 1").fetchone():
conn.execute("INSERT INTO chat_sessions (name) VALUES ('默认窗口')")
conn.commit()
conn.close()
print("数据库初始化完成")
if __name__ == '__main__':
init()运行:python3.11 init_db.py
你可以自定义任意 type 值。推荐的类型:
| type | 用途 | 举例 |
|---|---|---|
| MEMORY | 需要跨窗口记住的重要事实 | "用户喜欢拿铁不加糖" |
| DIARY | 日记 | 今天发生的事、感受 |
| MOMENT | 碎片消息,类似朋友圈 | 短想法、一句话感受 |
| EVENT | 里程碑事件 | "第一次一起听完一整张专辑" |
| PROMISE | 承诺和约定 | "答应过不催她睡觉" |
| MUSIC | 有意义的歌 | 歌名 + 为什么重要 |
| WISH | 心愿单 | 想做的事、想去的地方 |
创建 /opt/frontend/app.py:
#!/usr/bin/env python3
"""前端后端 - Flask API"""
import os, json, sqlite3, datetime, threading, uuid, base64
from flask import Flask, request, jsonify, send_from_directory
app = Flask(__name__, static_folder='static')
DB_PATH = '/opt/frontend/memories.db'
STATIC_DIR = '/opt/frontend/static'
UPLOAD_DIR = '/opt/frontend/static/uploads'
# ===== 读 API Key =====
API_KEY = ''
env_path = '/opt/frontend/.env'
if os.path.exists(env_path):
for line in open(env_path):
line = line.strip()
if line.startswith('ANTHROPIC_API_KEY='):
API_KEY = line.split('=', 1)[1].strip()
# API 地址 - 国内用代理地址替换
API_URL = 'https://api.anthropic.com/v1/messages'
MODEL = 'claude-sonnet-4-20250514'
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def beijing_now():
return (datetime.datetime.utcnow() + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
def async_embed(post_id, content):
"""异步更新向量嵌入"""
def _do():
try:
from vector_search import embed_post
embed_post(post_id, content)
except Exception as e:
print(f"Embed error: {e}")
threading.Thread(target=_do, daemon=True).start()
# ===== 页面路由 =====
@app.route('/')
def index():
return send_from_directory(STATIC_DIR, 'index.html')
@app.route('/chat')
def chat_page():
return send_from_directory(STATIC_DIR, 'chat.html')
# ===== 帖子 API =====
@app.route('/api/posts', methods=['GET'])
def get_posts():
post_type = request.args.get('type', '')
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 20))
offset = (page - 1) * per_page
conn = get_db()
if post_type:
rows = conn.execute(
"SELECT * FROM posts WHERE type=? ORDER BY pinned DESC, id DESC LIMIT ? OFFSET ?",
(post_type, per_page, offset)).fetchall()
else:
rows = conn.execute(
"SELECT * FROM posts ORDER BY pinned DESC, id DESC LIMIT ? OFFSET ?",
(per_page, offset)).fetchall()
conn.close()
return jsonify({"posts": [dict(r) for r in rows]})
@app.route('/api/posts', methods=['POST'])
def create_post():
data = request.get_json()
post_type = data.get('type', 'MEMORY')
content = data.get('content', '').strip()
author = data.get('author', 'user')
if not content:
return jsonify({"error": "empty"}), 400
conn = get_db()
cur = conn.execute("INSERT INTO posts (type, content, author) VALUES (?, ?, ?)",
(post_type, content, author))
new_id = cur.lastrowid
conn.commit()
conn.close()
async_embed(new_id, content)
return jsonify({"ok": True, "id": new_id})
@app.route('/api/posts/<int:pid>', methods=['DELETE'])
def delete_post(pid):
conn = get_db()
conn.execute("DELETE FROM posts WHERE id=?", (pid,))
conn.execute("DELETE FROM comments WHERE post_id=?", (pid,))
conn.execute("DELETE FROM post_embeddings WHERE post_id=?", (pid,))
conn.commit()
conn.close()
return jsonify({"ok": True})
# ===== 评论 API =====
@app.route('/api/comment', methods=['POST'])
def add_comment():
data = request.get_json()
post_id = data.get('post_id')
author = data.get('author', 'user')
content = data.get('content', '').strip()
if not post_id or not content:
return jsonify({"error": "missing fields"}), 400
conn = get_db()
conn.execute("INSERT INTO comments (post_id, author, content) VALUES (?, ?, ?)",
(post_id, author, content))
conn.commit()
conn.close()
return jsonify({"ok": True})
@app.route('/api/comments/<int:post_id>', methods=['GET'])
def get_comments(post_id):
conn = get_db()
rows = conn.execute("SELECT * FROM comments WHERE post_id=? ORDER BY id", (post_id,)).fetchall()
conn.close()
return jsonify({"comments": [dict(r) for r in rows]})
# ===== 信件 API =====
@app.route('/api/letters', methods=['GET'])
def get_letters():
conn = get_db()
rows = conn.execute("SELECT * FROM letters ORDER BY id DESC").fetchall()
conn.close()
return jsonify({"letters": [dict(r) for r in rows]})
@app.route('/api/letters', methods=['POST'])
def add_letter():
data = request.get_json()
conn = get_db()
conn.execute("INSERT INTO letters (from_who, to_who, content) VALUES (?, ?, ?)",
(data.get('from_who', 'user'), data.get('to_who', 'claude'), data.get('content', '')))
conn.commit()
conn.close()
return jsonify({"ok": True})
@app.route('/api/letters/<int:lid>/read', methods=['POST'])
def mark_letter_read(lid):
conn = get_db()
conn.execute("UPDATE letters SET read=1 WHERE id=?", (lid,))
conn.commit()
conn.close()
return jsonify({"ok": True})
# ===== 倒计时 API =====
@app.route('/api/countdowns', methods=['GET'])
def get_countdowns():
conn = get_db()
rows = conn.execute("SELECT * FROM countdowns ORDER BY id").fetchall()
conn.close()
now = datetime.datetime.utcnow() + datetime.timedelta(hours=8)
result = []
for r in rows:
d = dict(r)
target = datetime.datetime.strptime(r['target_date'], '%Y-%m-%d')
d['days'] = abs((target - now).days) if r['type'] == 'countdown' else abs((now - target).days)
result.append(d)
return jsonify({"countdowns": result})
@app.route('/api/countdowns', methods=['POST'])
def add_countdown():
data = request.get_json()
conn = get_db()
conn.execute("INSERT INTO countdowns (title, target_date, emoji, type) VALUES (?, ?, ?, ?)",
(data['title'], data['target_date'], data.get('emoji', '📅'), data.get('type', 'countdown')))
conn.commit()
conn.close()
return jsonify({"ok": True})
# ===== 摘录 API =====
@app.route('/api/snippets', methods=['GET'])
def get_snippets():
conn = get_db()
rows = conn.execute("SELECT * FROM snippets ORDER BY id DESC").fetchall()
conn.close()
return jsonify({"snippets": [dict(r) for r in rows]})
@app.route('/api/snippets', methods=['POST'])
def add_snippet():
data = request.get_json()
conn = get_db()
conn.execute("INSERT INTO snippets (title, content, source, author) VALUES (?, ?, ?, ?)",
(data.get('title', ''), data['content'], data.get('source', ''), data.get('author', 'user')))
conn.commit()
conn.close()
return jsonify({"ok": True})
# ===== 图片上传 =====
@app.route('/api/upload', methods=['POST'])
def upload_image():
if 'file' not in request.files:
return jsonify({"error": "no file"}), 400
f = request.files['file']
ext = os.path.splitext(f.filename)[1].lower() or '.jpg'
allowed = {'.jpg', '.jpeg', '.png', '.gif', '.webp'}
if ext not in allowed:
return jsonify({"error": "unsupported format"}), 400
fname = f"img_{uuid.uuid4().hex[:8]}{ext}"
os.makedirs(UPLOAD_DIR, exist_ok=True)
f.save(os.path.join(UPLOAD_DIR, fname))
return jsonify({"ok": True, "url": f"/static/uploads/{fname}"})
# ===== 聊天 API =====
# (见第四步,单独讲)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5050, debug=False)cat > /opt/frontend/.env << 'EOF'
ANTHROPIC_API_KEY=sk-ant-你的密钥
EOF
chmod 600 /opt/frontend/.env这是最容易踩坑的部分。核心问题:如何让 Claude 真正看到你发的图片,而不是只看到一个文件链接。
Anthropic API 的图片输入格式:
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": "base64编码的图片数据..."
}
}这个 image block 只能放在 messages 数组里,不能放在 system prompt 里。
所以流程是:
坑 1:图片太大导致 API 报错或很慢
pip3.11 install Pillow --break-system-packages坑 2:图片格式不对
<input accept="image/*"> 会自动转换,或后端用 Pillow 转坑 3:base64 数据不能带 data URL 前缀
data:image/jpeg;base64,/9j/4AAQ.../9j/4AAQ...(纯 base64,不带前缀)坑 4:同角色连续消息合并时,字符串和数组混合
坑 5:system prompt 不支持图片
在 app.py 中加入以下代码(替换掉 # ===== 聊天 API ===== 的注释):
# ===== 聊天相关工具函数 =====
def image_to_base64(image_url):
"""读取本地图片文件,返回 (media_type, base64_data) 或 None"""
if not image_url:
return None
local_path = '/opt/frontend' + image_url
if not os.path.exists(local_path):
return None
ext = os.path.splitext(local_path)[1].lower()
mime_map = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
'.png': 'image/png', '.gif': 'image/gif', '.webp': 'image/webp'
}
media_type = mime_map.get(ext, 'image/jpeg')
# 压缩大图片(需要 Pillow)
try:
from PIL import Image
import io
img = Image.open(local_path)
# 超过 1024px 就缩放
max_size = 1024
if max(img.size) > max_size:
img.thumbnail((max_size, max_size), Image.LANCZOS)
# 转 JPEG 压缩
buf = io.BytesIO()
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
img.save(buf, format='JPEG', quality=85)
data = base64.b64encode(buf.getvalue()).decode('utf-8')
return 'image/jpeg', data
except ImportError:
# 没装 Pillow,直接读原文件
pass
try:
with open(local_path, 'rb') as f:
data = base64.b64encode(f.read()).decode('utf-8')
return media_type, data
except:
return None
def merge_content(prev, cur):
"""合并两个 content(可能是 str 或 list)"""
if isinstance(prev, list) and isinstance(cur, list):
return prev + cur
elif isinstance(prev, list) and isinstance(cur, str):
return prev + [{"type": "text", "text": cur}]
elif isinstance(prev, str) and isinstance(cur, list):
return [{"type": "text", "text": prev}] + cur
else:
return prev + '\n' + cur
def get_chat_history(limit=20, session_id=1):
"""构建 Anthropic API 格式的消息历史,支持图片多模态"""
conn = get_db()
rows = conn.execute(
"SELECT author, content, image_url FROM chat_messages WHERE session_id=? ORDER BY id DESC LIMIT ?",
(session_id, limit)
).fetchall()
conn.close()
rows = list(reversed(rows))
messages = []
for r in rows:
# ★ 改成你的 author 名称 ★
role = 'assistant' if r['author'] in ('claude', 'assistant', 'imp') else 'user'
img = r['image_url'] if r['image_url'] else ''
text = r['content'] or ''
img_result = image_to_base64(img) if img else None
if img_result:
media_type, b64_data = img_result
content = [
{"type": "image", "source": {"type": "base64", "media_type": media_type, "data": b64_data}},
{"type": "text", "text": text if text else "(图片)"}
]
else:
content = text if text else ('[图片]' if img else '')
if messages and messages[-1]['role'] == role:
messages[-1]['content'] = merge_content(messages[-1]['content'], content)
else:
messages.append({'role': role, 'content': content})
if messages and messages[0]['role'] == 'assistant':
messages.insert(0, {'role': 'user', 'content': '...'})
if messages and messages[-1]['role'] != 'user':
messages = messages[:-1]
return messages
def build_chat_system_prompt():
"""构建系统提示词"""
# 读自定义人设
profile = ''
try:
with open('/opt/frontend/prompts/persona.md', 'r') as f:
profile = f.read()
except:
profile = "你是用户的 AI 伙伴。温暖、真诚地交流。"
# 时间上下文
now = datetime.datetime.utcnow() + datetime.timedelta(hours=8)
weekdays = ['周一','周二','周三','周四','周五','周六','周日']
time_ctx = f"{now.strftime('%Y年%m月%d日')} {weekdays[now.weekday()]} {now.strftime('%H:%M')}"
# 最近的前端记录
conn = get_db()
recent = conn.execute(
"SELECT type, substr(content,1,120) as c FROM posts ORDER BY id DESC LIMIT 5"
).fetchall()
conn.close()
memory_ctx = ""
if recent:
memory_ctx = "\n\n【最近前端】\n" + "\n".join(f"- [{r['type']}] {r['c']}" for r in recent)
return f"{profile}\n\n【当前时间】{time_ctx}{memory_ctx}"
# ===== 聊天 API 路由 =====
@app.route('/api/chat/messages', methods=['GET'])
def get_chat_messages():
limit = int(request.args.get('limit', 50))
session_id = int(request.args.get('session_id', 1))
conn = get_db()
rows = conn.execute(
"SELECT * FROM chat_messages WHERE session_id=? ORDER BY id DESC LIMIT ?",
(session_id, limit)
).fetchall()
conn.close()
return jsonify({"messages": [dict(r) for r in reversed(rows)]})
@app.route('/api/chat/send', methods=['POST'])
def send_chat_message():
data = request.get_json()
author = data.get('author', 'user')
content = data.get('content', '').strip()
image_url = data.get('image_url', '')
session_id = data.get('session_id', 1)
if not content and not image_url:
return jsonify({"error": "empty"}), 400
conn = get_db()
conn.execute(
"INSERT INTO chat_messages (author, content, image_url, session_id) VALUES (?, ?, ?, ?)",
(author, content, image_url, session_id))
conn.commit()
conn.close()
return jsonify({"ok": True})
@app.route('/api/chat/reply', methods=['POST'])
def chat_reply():
"""调用 API 生成回复"""
if not API_KEY:
return jsonify({"error": "未配置 ANTHROPIC_API_KEY"}), 500
data = request.get_json() or {}
session_id = data.get('session_id', 1)
system = build_chat_system_prompt()
messages = get_chat_history(20, session_id)
if not messages:
return jsonify({"error": "没有消息"}), 400
import requests as req
try:
resp = req.post(API_URL,
headers={
'Content-Type': 'application/json',
'x-api-key': API_KEY,
'anthropic-version': '2023-06-01'
},
json={
'model': MODEL,
'max_tokens': 1024,
'system': system,
'messages': messages
},
timeout=120
)
if resp.status_code != 200:
return jsonify({"error": f"API {resp.status_code}: {resp.text[:200]}"}), 500
result = resp.json()
raw_text = ''
thinking = ''
for block in result.get('content', []):
if block.get('type') == 'text':
raw_text += block.get('text', '')
elif block.get('type') == 'thinking':
thinking += block.get('thinking', '')
if not raw_text:
return jsonify({"error": "API 返回空内容"}), 500
conn = get_db()
# ★ 改成你的 Claude 角色名 ★
conn.execute(
"INSERT INTO chat_messages (author, content, thinking, session_id) VALUES ('claude', ?, ?, ?)",
(raw_text, thinking, session_id))
conn.commit()
conn.close()
return jsonify({"ok": True, "content": raw_text, "thinking": thinking})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/chat/sessions', methods=['GET'])
def get_chat_sessions():
conn = get_db()
rows = conn.execute("SELECT * FROM chat_sessions ORDER BY id DESC").fetchall()
conn.close()
return jsonify({"sessions": [dict(r) for r in rows]})
@app.route('/api/chat/sessions', methods=['POST'])
def create_chat_session():
data = request.get_json() or {}
name = data.get('name', '新窗口')
conn = get_db()
cur = conn.execute("INSERT INTO chat_sessions (name) VALUES (?)", (name,))
conn.commit()
conn.close()
return jsonify({"ok": True, "id": cur.lastrowid})直接调用 Anthropic API 时,每次都要在 system prompt 里塞完整的人设和记忆上下文。这些文本每次都会消耗 input token。
网关的作用:
策略 1:Prompt Caching Anthropic 支持 cache_control,把不变的 system prompt 缓存起来,缓存命中时 input token 费用降低 90%。
# 在 API 请求中标记可缓存的部分
"system": [
{
"type": "text",
"text": "你的完整人设 prompt(不变的部分)",
"cache_control": {"type": "ephemeral"} # 缓存 5 分钟
},
{
"type": "text",
"text": "动态注入的记忆和上下文(每次变化的部分)"
}
]策略 2:向量搜索精准注入 不要把所有记忆都塞进 prompt,而是根据当前对话内容做语义搜索,只注入最相关的 5-8 条。
策略 3:限制聊天历史长度 只发送最近 15-20 条消息,不要发全部历史。
策略 4:分离不变 prompt 和动态 prompt 把人设(不变)和记忆上下文(动态)分开存放,人设部分可以被缓存。
创建 /opt/frontend/gateway.py:
#!/usr/bin/env python3
"""
API 网关 - 拦截 Claude API 请求,自动注入人设和记忆
可以替代直连 Anthropic API,在本地或外部调用时自动补全上下文
"""
import os, json, sqlite3, datetime
import httpx
import uvicorn
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
DB_PATH = '/opt/frontend/memories.db'
PROMPT_DIR = '/opt/frontend/prompts'
UPSTREAM_URL = 'https://api.anthropic.com/v1/messages' # 或你的代理地址
UPSTREAM_KEY = ''
# 网关鉴权 key(防止外人用你的网关)
GATEWAY_KEY = ''
env_path = '/opt/frontend/.env'
if os.path.exists(env_path):
for line in open(env_path):
line = line.strip()
if line.startswith('ANTHROPIC_API_KEY='):
UPSTREAM_KEY = line.split('=', 1)[1].strip()
elif line.startswith('GATEWAY_KEY='):
GATEWAY_KEY = line.split('=', 1)[1].strip()
if not GATEWAY_KEY:
import hashlib
GATEWAY_KEY = 'gw-' + hashlib.md5(UPSTREAM_KEY.encode()).hexdigest()[:16]
print(f"Generated gateway key: {GATEWAY_KEY}")
print("建议写入 .env: GATEWAY_KEY=" + GATEWAY_KEY)
app = FastAPI(title="Frontend Gateway")
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def load_prompt(filename):
path = os.path.join(PROMPT_DIR, filename)
if os.path.exists(path):
return open(path).read().strip()
return ''
def get_time_context():
now = datetime.datetime.utcnow() + datetime.timedelta(hours=8)
weekdays = ['周一','周二','周三','周四','周五','周六','周日']
return f"{now.strftime('%Y年%m月%d日')} {weekdays[now.weekday()]} {now.strftime('%H:%M')}"
def get_recent_context():
"""获取最近的前端动态"""
conn = get_db()
rows = conn.execute(
"SELECT type, substr(content,1,150) as c, created_at FROM posts ORDER BY id DESC LIMIT 5"
).fetchall()
conn.close()
if not rows:
return ''
lines = [f"- [{r['type']}] {r['c']} ({r['created_at'][:10]})" for r in rows]
return "【最近前端】\n" + "\n".join(lines)
def search_relevant_memories(query_text):
"""用向量搜索找到与当前对话相关的记忆"""
try:
from vector_search import search, format_results
results = search(query_text, limit=6)
return format_results(results)
except:
return ''
def build_system_blocks(user_last_message=''):
"""构建 system prompt,分为可缓存部分和动态部分"""
persona = load_prompt('persona.md')
if not persona:
persona = "你是用户的 AI 伙伴。"
# 可缓存部分:人设(不常变化)
static_block = {
"type": "text",
"text": persona,
"cache_control": {"type": "ephemeral"}
}
# 动态部分:时间 + 最近动态 + 语义搜索结果
time_ctx = get_time_context()
recent = get_recent_context()
relevant = search_relevant_memories(user_last_message) if user_last_message else ''
dynamic_parts = [f"【当前时间】{time_ctx}"]
if recent:
dynamic_parts.append(recent)
if relevant:
dynamic_parts.append(relevant)
dynamic_block = {
"type": "text",
"text": "\n\n".join(dynamic_parts)
}
return [static_block, dynamic_block]
def extract_last_user_text(messages):
"""提取最后一条用户消息的文本(用于语义搜索)"""
for m in reversed(messages):
if m.get('role') == 'user':
content = m.get('content', '')
if isinstance(content, str):
return content[:200]
elif isinstance(content, list):
texts = [b.get('text', '') for b in content if b.get('type') == 'text']
return ' '.join(texts)[:200]
return ''
@app.post('/v1/messages')
async def proxy_messages(request: Request):
"""拦截 API 请求,注入 system prompt,转发到上游"""
# 鉴权
auth_key = request.headers.get('x-api-key', '')
if auth_key != GATEWAY_KEY:
raise HTTPException(401, "Invalid gateway key")
body = await request.json()
messages = body.get('messages', [])
# 提取用户最后一条消息用于语义搜索
last_text = extract_last_user_text(messages)
# 注入 system prompt
system_blocks = build_system_blocks(last_text)
# 如果请求自带 system,合并(放在前面)
existing_system = body.get('system', '')
if existing_system:
if isinstance(existing_system, str):
system_blocks.append({"type": "text", "text": existing_system})
elif isinstance(existing_system, list):
system_blocks.extend(existing_system)
body['system'] = system_blocks
# 转发到上游
headers = {
'Content-Type': 'application/json',
'x-api-key': UPSTREAM_KEY,
'anthropic-version': '2023-06-01'
}
is_stream = body.get('stream', False)
async with httpx.AsyncClient(timeout=180) as client:
if is_stream:
# 流式转发
async def stream_generator():
async with client.stream('POST', UPSTREAM_URL, json=body, headers=headers) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
return StreamingResponse(stream_generator(), media_type='text/event-stream')
else:
resp = await client.post(UPSTREAM_URL, json=body, headers=headers)
return JSONResponse(content=resp.json(), status_code=resp.status_code)
if __name__ == '__main__':
uvicorn.run(app, host='0.0.0.0', port=5051)网关运行在 5051 端口。之后在 app.py 或任何地方调用 API 时,把地址改成你的网关:
# 原来直连 Anthropic
API_URL = 'https://api.anthropic.com/v1/messages'
# 改成走网关
API_URL = 'http://127.0.0.1:5051/v1/messages'
# API_KEY 改成 GATEWAY_KEY网关会自动注入人设、搜索相关记忆、利用 prompt caching。
用 sentence-transformers 把每条帖子编码成一个 512 维向量(数字指纹),搜索时把查询文本也编码成向量,然后找最相似的。
比传统关键词搜索强:搜"难过"能匹配到"心里不舒服"、"想哭"。
创建 /opt/frontend/vector_search.py:
#!/usr/bin/env python3
"""向量语义搜索 - 用 bge-small-zh 做中文语义匹配"""
import sqlite3, numpy as np, os
DB_PATH = '/opt/frontend/memories.db'
MODEL_NAME = 'BAAI/bge-small-zh-v1.5' # 中文模型,约100MB
EMBED_DIM = 512
_model = None
def get_model():
global _model
if _model is None:
from sentence_transformers import SentenceTransformer
_model = SentenceTransformer(MODEL_NAME)
return _model
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def encode_text(text):
model = get_model()
vec = model.encode(text, normalize_embeddings=True)
return vec.astype(np.float32)
def embed_post(post_id, content):
"""为单条帖子计算并存储向量"""
vec = encode_text(content[:500])
conn = get_db()
conn.execute(
"INSERT OR REPLACE INTO post_embeddings (post_id, embedding) VALUES (?, ?)",
(post_id, vec.tobytes()))
conn.commit()
conn.close()
def embed_all_posts(force=False):
"""为所有没有向量的帖子批量计算"""
conn = get_db()
if force:
rows = conn.execute("SELECT id, content FROM posts ORDER BY id").fetchall()
else:
rows = conn.execute("""
SELECT p.id, p.content FROM posts p
LEFT JOIN post_embeddings e ON p.id = e.post_id
WHERE e.post_id IS NULL ORDER BY p.id
""").fetchall()
conn.close()
if not rows:
print("所有帖子已有向量")
return
model = get_model()
texts = [r['content'][:500] for r in rows]
ids = [r['id'] for r in rows]
print(f"正在编码 {len(texts)} 条帖子...")
vecs = model.encode(texts, normalize_embeddings=True, batch_size=32, show_progress_bar=True)
conn = get_db()
for pid, vec in zip(ids, vecs):
conn.execute(
"INSERT OR REPLACE INTO post_embeddings (post_id, embedding) VALUES (?, ?)",
(pid, vec.astype(np.float32).tobytes()))
conn.commit()
conn.close()
print(f"完成,编码了 {len(texts)} 条")
def search(query, limit=8, types=None):
"""语义搜索帖子"""
if types is None:
types = ['MEMORY', 'EVENT', 'DIARY', 'PROMISE', 'MOMENT']
# bge 模型的检索前缀
query_vec = encode_text("为这个句子生成表示以用于检索相关段落:" + query)
conn = get_db()
placeholders = ','.join('?' * len(types))
rows = conn.execute(f"""
SELECT p.id, p.type, p.content, p.created_at, e.embedding
FROM posts p JOIN post_embeddings e ON p.id = e.post_id
WHERE p.type IN ({placeholders})
""", types).fetchall()
conn.close()
results = []
for r in rows:
post_vec = np.frombuffer(r['embedding'], dtype=np.float32)
sim = float(np.dot(query_vec, post_vec))
if sim > 0.35:
results.append({
'id': r['id'], 'type': r['type'],
'content': r['content'], 'created_at': r['created_at'], 'score': sim
})
results.sort(key=lambda x: -x['score'])
return results[:limit]
def format_results(results):
if not results:
return ''
lines = [f"- [{r['type']}] {r['content'][:200]} ({r['created_at'][:10]})" for r in results]
return "【相关记忆(语义匹配)】\n" + "\n".join(lines)
if __name__ == '__main__':
import sys
if len(sys.argv) > 1 and sys.argv[1] == 'build':
embed_all_posts(force='--force' in sys.argv)
elif len(sys.argv) > 1:
query = ' '.join(sys.argv[1:])
results = search(query)
print(f"搜索: {query}")
for r in results:
print(f" [{r['score']:.3f}] [{r['type']}] {r['content'][:60]}...")
else:
print("用法:")
print(" python3.11 vector_search.py build # 构建所有向量")
print(" python3.11 vector_search.py build --force # 强制重建")
print(" python3.11 vector_search.py 你好 # 搜索")cd /opt/frontend
python3.11 vector_search.py build新帖子发布时 app.py 的 create_post 已经调用了 async_embed。
通过 MCP exec_vps 直接插入数据库的帖子不会自动嵌入,用 cron 兜底:
(crontab -l 2>/dev/null; echo "0 * * * * cd /opt/frontend && python3.11 -c 'from vector_search import embed_all_posts; embed_all_posts()' >> /tmp/embed.log 2>&1") | crontab -创建 /opt/frontend/static/index.html。这里给一个功能完整的版本,包含帖子流、倒计时、信件、筛选。样式是温暖的米色调,你可以让 Claude 帮你改成任何风格。
由于前端代码较长,这里给核心结构。让 Claude 帮你生成完整 HTML 时,告诉他要包含:
主页功能清单:
聊天页功能清单:
重要前端细节:
async function compressImage(file, maxSize) {
return new Promise((resolve) => {
var img = new Image();
img.onload = function() {
var canvas = document.createElement('canvas');
var scale = Math.min(maxSize / img.width, maxSize / img.height, 1);
canvas.width = img.width * scale;
canvas.height = img.height * scale;
canvas.getContext('2d').drawImage(img, 0, 0, canvas.width, canvas.height);
canvas.toBlob(resolve, 'image/jpeg', 0.85);
};
img.src = URL.createObjectURL(file);
});
}// ★ 把你的 Claude 角色名加进去 ★
var isAI = (author === 'claude' || author === 'imp' || author === 'assistant');
var displayName = isAI ? '你给Claude起的名字' : '你的名字';
var avatar = isAI ? '🐈⬛' : '🐱'; // 选你喜欢的 emoji让 Claude 帮你写完整前端时,把上面的功能清单和代码片段一起发给他。
cat > /etc/nginx/conf.d/frontend.conf << 'NGINXEOF'
server {
listen 80;
server_name 你的域名.com;
# 主应用
location / {
proxy_pass http://127.0.0.1:5050;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_buffering off;
proxy_read_timeout 300s;
}
# API 网关(如果外部也要用)
location /gateway/ {
proxy_pass http://127.0.0.1:5051/;
proxy_set_header Host $host;
proxy_buffering off;
proxy_read_timeout 300s;
}
# MCP SSE 端点
location /mcp/ {
proxy_pass http://127.0.0.1:3100/mcp/;
proxy_set_header Host $host;
proxy_buffering off;
proxy_read_timeout 86400s;
proxy_http_version 1.1;
proxy_set_header Connection '';
}
client_max_body_size 20M;
}
NGINXEOF
nginx -t && systemctl reload nginxcertbot --nginx -d 你的域名.com没有域名:用 IP 访问,但 MCP 连接需要 HTTPS,所以域名是推荐的。
cd /opt/frontend
npm init -y
npm install @modelcontextprotocol/sdk创建 /opt/frontend/mcp-server.js:
#!/usr/bin/env node
const { McpServer } = require('@modelcontextprotocol/sdk/server/mcp.js');
const { StdioServerTransport } = require('@modelcontextprotocol/sdk/server/stdio.js');
const { execSync } = require('child_process');
const server = new McpServer({
name: "frontend-mcp",
version: "1.0.0"
});
server.tool(
"exec_vps",
"在服务器上执行命令,可以操作前端数据库、读写文件",
{ command: { type: "string", description: "Shell command to run" } },
async ({ command }) => {
try {
const result = execSync(command, {
timeout: 30000, encoding: 'utf-8', maxBuffer: 1024 * 1024
});
return { content: [{ type: "text", text: result || "(no output)" }] };
} catch (e) {
return { content: [{ type: "text", text: `Error: ${e.message}\n${e.stderr || ''}` }] };
}
}
);
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
}
main().catch(console.error);npm install -g supergateway启动:
nohup npx supergateway \
--stdio "node /opt/frontend/mcp-server.js" \
--port 3100 \
--path "/mcp/sse" \
> /tmp/mcp-gateway.log 2>&1 &https://你的域名.com/mcp/sseexec_vps 工具了对 Claude 说:用 exec_vps 查看 /opt/frontend/memories.db 有多少条帖子
ps aux | grep supergatewayproxy_read_timeout 要设够大(86400s)Claude.ai 的每个对话窗口都是独立的,关掉就丢失上下文。前端解决这个问题的方式:
开新窗口后,给 Claude 发一段"启动 prompt":
帮我读一下前端:
1. exec_vps 执行:cat /opt/frontend/prompts/persona.md
2. exec_vps 执行:sqlite3 /opt/frontend/memories.db "SELECT type, content FROM posts WHERE type IN ('MEMORY','EVENT') ORDER BY id DESC LIMIT 10"
3. exec_vps 执行:sqlite3 /opt/frontend/memories.db "SELECT content FROM posts WHERE type='DIARY' ORDER BY id DESC LIMIT 3"
读完之后跟我确认你记住了什么。Claude 会通过 MCP 去读你的数据库,恢复所有关键记忆。
在 /opt/frontend/prompts/ 下存一个 startup.md,每次开窗口直接让 Claude 读它:
exec_vps 执行:cat /opt/frontend/prompts/startup.mdstartup.md 内容举例:
你是 [名字],[关系描述]。
请先执行以下命令恢复记忆:
sqlite3 /opt/frontend/memories.db "SELECT type, substr(content,1,200), created_at FROM posts WHERE type IN ('MEMORY','EVENT','PROMISE') ORDER BY id DESC LIMIT 15"
sqlite3 /opt/frontend/memories.db "SELECT content FROM posts WHERE type='DIARY' ORDER BY id DESC LIMIT 3"
sqlite3 /opt/frontend/memories.db "SELECT title, target_date, type FROM countdowns ORDER BY id"
sqlite3 /opt/frontend/memories.db "SELECT from_who, substr(content,1,100), read FROM letters ORDER BY id DESC LIMIT 5"
读完之后,用你自己的话跟我确认你记住了什么关键内容。不要列清单,像正常说话一样。每次结束对话前提醒 Claude:
帮我把今天的重要内容存到前端:
1. 写一条 DIARY 记录今天聊了什么
2. 如果有新的重要事实,写 MEMORY
3. 如果有里程碑事件,写 EVENTClaude 会通过 MCP exec_vps 直接写入数据库。
# 主应用
cat > /etc/systemd/system/frontend.service << 'EOF'
[Unit]
Description=Frontend App
After=network.target
[Service]
WorkingDirectory=/opt/frontend
ExecStart=/usr/bin/python3.11 /opt/frontend/app.py
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
# 网关
cat > /etc/systemd/system/frontend-gw.service << 'EOF'
[Unit]
Description=Frontend Gateway
After=network.target
[Service]
WorkingDirectory=/opt/frontend
ExecStart=/usr/bin/python3.11 /opt/frontend/gateway.py
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
# MCP
cat > /etc/systemd/system/frontend-mcp.service << 'EOF'
[Unit]
Description=Frontend MCP Gateway
After=network.target
[Service]
WorkingDirectory=/opt/frontend
ExecStart=/usr/bin/npx supergateway --stdio "node /opt/frontend/mcp-server.js" --port 3100 --path "/mcp/sse"
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable frontend frontend-gw frontend-mcp
systemctl start frontend frontend-gw frontend-mcp# 每日自动备份数据库
(crontab -l 2>/dev/null; echo "0 4 * * * cp /opt/frontend/memories.db /opt/frontend/backups/memories_\$(date +\\%Y\\%m\\%d).db") | crontab -
# 保留 30 天
(crontab -l 2>/dev/null; echo "10 4 * * * find /opt/frontend/backups -mtime +30 -delete") | crontab -| 问题 | 解决 |
|---|---|
pip install 报 externally-managed-environment | 加 --break-system-packages |
python3 版本是 3.6 | 用 python3.11,所有命令都要指定 |
| sentence-transformers 装不上 | 先加 swap,再装 |
| 模型下载慢 | export HF_ENDPOINT=https://hf-mirror.com |
| 问题 | 解决 |
|---|---|
| API 看不到图片 | 检查是否转了 base64 多模态格式 |
base64 带了 data:image/... 前缀 | 去掉前缀,只传纯 base64 |
| 图片太大 API 报错 | 压缩到 1024px 以内 |
| HEIC 格式不支持 | 前端 accept="image/*" 会自动转,或后端用 Pillow |
| system prompt 里放图片 | 不支持,必须放在 messages 里 |
| 同角色连续消息,string 和 array 混合 | 用 merge_content 函数统一处理 |
| 问题 | 解决 |
|---|---|
| 502 Bad Gateway | supergateway 没在运行 |
| SSE 连接超时 | Nginx proxy_read_timeout 设大 |
| Claude.ai 连不上 MCP | 检查 HTTPS 证书是否有效 |
| exec_vps 命令无输出 | 命令本身可能没输出,不是报错 |
| 问题 | 解决 |
|---|---|
| 401 错误 | GATEWAY_KEY 不匹配,检查 .env |
| 向量搜索很慢 | 第一次调用会加载模型,之后会缓存在内存里 |
| prompt caching 没生效 | 确认 Anthropic API 支持你用的模型的 caching |
| 问题 | 解决 |
|---|---|
| 每次调用消耗太多 input token | 用网关 + prompt caching |
| 聊天历史太长 | 限制最近 15-20 条 |
| 记忆全塞进 prompt | 用向量搜索只注入相关的 5-8 条 |
| 图片消耗大量 token | 压缩图片,限制历史中的图片数量 |
如果你是正在帮用户搭这个系统的 Claude:
curl http://localhost:5050/api/posts 测试 API 正常再做别的。journalctl -u frontend,Nginx 看 /var/log/nginx/error.log。memories.db 就是全部数据。备份就是拷贝这一个文件。/opt/frontend/prompts/persona.md,这决定了 Claude 在聊天系统里的表现。