实现一个MCP服务器,让Claude能执行命令和编辑文件
基本思路
用FastMCP写了个文件操作+命令执行的服务器,限制在testworkspace目录内操作,避免搞坏系统。
from fastmcp import FastMCP
mcp = FastMCP("File Editor & Command Runner")
@mcp.tool
def create_file(filename: str, content: str = "") -> str:
file_path = WORKSPACE_DIR / filename
file_path.write_text(content, encoding='utf-8')
return f"创建文件成功: {filename}"
编码问题的坑
中文Windows环境下,输入是UTF-8,但系统输出是GBK,直接执行命令会乱码或报错。
解决方案
启动时修复编码:
def fix_encoding():
os.environ['PYTHONIOENCODING'] = 'utf-8'
# 重设stdout编码
if hasattr(sys.stdout, 'buffer'):
sys.stdout = io.TextIOWrapper(
sys.stdout.buffer,
encoding='utf-8',
errors='replace'
)
执行命令时传递UTF-8环境:
env = os.environ.copy()
env['PYTHONIOENCODING'] = 'utf-8'
process = await asyncio.create_subprocess_shell(
command,
env=env, # 关键
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# 容错解码
stdout_text = stdout.decode('utf-8', errors='replace')
emoji兼容处理:
def safe_format_output(template: str, **kwargs) -> str:
try:
return template.format(**kwargs)
except UnicodeEncodeError:
# emoji替换为文本
replacements = {'✅': '[OK]', '❌': '[ERROR]'}
for emoji, text in replacements.items():
template = template.replace(emoji, text)
return template.format(**kwargs)
实际效果
现在Claude可以:
- 跨平台编译:用WSL的gcc编译Windows里的C代码
- 联网操作:写Python脚本发邮件、爬数据
- 智能迭代:看到运行结果后自动修改代码
实际对话:
我:用gcc编译个hello world
Claude:好的,我来创建C文件并编译
-> create_file("hello.c", "#include <stdio.h>...")
-> run_command("wsl gcc hello.c -o hello")
-> run_command("wsl ./hello")
结果:Hello, World!
Claude:编译成功!程序正常运行。
为什么SSE而不是STDIO
STDIO需要Claude Desktop每次启动子进程,配置麻烦。SSE模式:
# 独立运行服务器
await mcp.run_async(transport="sse", host="localhost", port=3001)
Claude Desktop配置:
"file-editor-dev": {
"command": "npx",
"args": [
"-y",
"mcp-remote",
"http://localhost:3001/sse",
"--allow-http",
"--timeout", "10000"
],
"env": {
"MCP_TRANSPORT_STRATEGY": "sse",
"DEBUG": "mcp-remote"
}
}
安全措施
路径检查:
def is_safe_path(path: Path) -> bool:
return str(path.resolve()).startswith(str(WORKSPACE_DIR.resolve()))
命令过滤:
dangerous_commands = ['format', 'del', 'shutdown', 'reg delete']
if any(cmd in command.lower() for cmd in dangerous_commands):
return "禁止执行危险命令"
完整源代码
import os
import sys
import subprocess
import asyncio
from pathlib import Path
from typing import List, Optional
from fastmcp import FastMCP
# ===== 编码修复 =====
def fix_encoding():
"""修复Windows环境下的编码问题"""
os.environ['PYTHONIOENCODING'] = 'utf-8'
try:
import io
if hasattr(sys.stdout, 'buffer'):
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'buffer'):
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
except Exception:
pass
fix_encoding()
def safe_format_output(template: str, **kwargs) -> str:
"""安全格式化输出,避免emoji编码问题"""
try:
return template.format(**kwargs)
except UnicodeEncodeError:
replacements = {
'✅': '[OK]', '❌': '[ERROR]', '🔧': '[TOOL]',
'📄': '[FILE]', '📁': '[FOLDER]', '🐍': '[PYTHON]',
'📤': '[OUTPUT]', '⚠️': '[WARNING]', '⏰': '[TIMEOUT]'
}
safe_template = template
for emoji, text in replacements.items():
safe_template = safe_template.replace(emoji, text)
return safe_template.format(**kwargs)
# 创建MCP服务器
mcp = FastMCP("File Editor & Command Runner SSE")
WORKSPACE_DIR = Path(__file__).parent / "testworkspace"
def ensure_workspace():
WORKSPACE_DIR.mkdir(exist_ok=True)
return WORKSPACE_DIR
def is_safe_path(path: Path) -> bool:
try:
resolved_path = path.resolve()
workspace_resolved = WORKSPACE_DIR.resolve()
return str(resolved_path).startswith(str(workspace_resolved))
except:
return False
# ===== 文件操作工具 =====
@mcp.tool
def create_file(filename: str, content: str = "") -> str:
"""在testworkspace目录中创建新文件"""
ensure_workspace()
file_path = WORKSPACE_DIR / filename
if not is_safe_path(file_path):
return safe_format_output("❌ 错误: 文件路径不安全: {filename}", filename=filename)
try:
file_path.write_text(content, encoding='utf-8')
return safe_format_output("✅ 成功创建文件: {filename} ({length} 字符)",
filename=filename, length=len(content))
except Exception as e:
return safe_format_output("❌ 创建文件失败: {error}", error=str(e))
@mcp.tool
def read_file(filename: str) -> str:
"""读取testworkspace目录中的文件内容"""
ensure_workspace()
file_path = WORKSPACE_DIR / filename
if not is_safe_path(file_path):
return safe_format_output("❌ 错误: 文件路径不安全: {filename}", filename=filename)
if not file_path.exists():
return safe_format_output("❌ 文件不存在: {filename}", filename=filename)
try:
content = file_path.read_text(encoding='utf-8')
return safe_format_output("📄 文件 {filename} 内容:\n{content}",
filename=filename, content=content)
except Exception as e:
return safe_format_output("❌ 读取文件失败: {error}", error=str(e))
@mcp.tool
def edit_file(filename: str, content: str) -> str:
"""编辑testworkspace目录中的文件(覆盖现有内容)"""
ensure_workspace()
file_path = WORKSPACE_DIR / filename
if not is_safe_path(file_path):
return safe_format_output("❌ 错误: 文件路径不安全: {filename}", filename=filename)
try:
file_path.write_text(content, encoding='utf-8')
return safe_format_output("✅ 成功编辑文件: {filename} ({length} 字符)",
filename=filename, length=len(content))
except Exception as e:
return safe_format_output("❌ 编辑文件失败: {error}", error=str(e))
@mcp.tool
def list_files() -> str:
"""列出testworkspace目录中的所有文件和文件夹"""
ensure_workspace()
try:
items = []
for item in WORKSPACE_DIR.iterdir():
if item.is_file():
size = item.stat().st_size
items.append(safe_format_output("📄 {name} ({size} bytes)",
name=item.name, size=size))
elif item.is_dir():
items.append(safe_format_output("📁 {name}/", name=item.name))
if not items:
return safe_format_output("📂 testworkspace 目录为空")
return safe_format_output("📂 testworkspace 目录内容:\n{items}",
items="\n".join(items))
except Exception as e:
return safe_format_output("❌ 列出文件失败: {error}", error=str(e))
# ===== 命令执行工具 =====
@mcp.tool
async def run_command(command: str, timeout: int = 30) -> str:
"""在testworkspace目录中异步执行Windows命令"""
ensure_workspace()
# 安全检查
dangerous_commands = [
'format', 'del', 'rmdir', 'rd', 'deltree', 'fdisk',
'shutdown', 'restart', 'reboot', 'net user', 'net localgroup',
'reg delete', 'reg add', 'powershell', 'wmic', 'sc delete'
]
command_lower = command.lower()
for dangerous in dangerous_commands:
if dangerous in command_lower:
return safe_format_output("❌ 禁止执行危险命令: {command}", command=command)
try:
# 设置环境变量确保子进程使用UTF-8
env = os.environ.copy()
env['PYTHONIOENCODING'] = 'utf-8'
process = await asyncio.create_subprocess_shell(
command,
cwd=WORKSPACE_DIR,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
shell=True,
env=env
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
return safe_format_output("⏰ 命令执行超时 ({timeout}秒): {command}",
timeout=timeout, command=command)
output = safe_format_output("🔧 执行命令: {command}\n", command=command)
output += safe_format_output("📁 工作目录: {workspace}\n", workspace=str(WORKSPACE_DIR))
output += safe_format_output("🔧 返回码: {returncode}\n", returncode=process.returncode)
if stdout:
stdout_text = stdout.decode('utf-8', errors='replace')
output += safe_format_output("📤 标准输出:\n{stdout}\n", stdout=stdout_text)
if stderr:
stderr_text = stderr.decode('utf-8', errors='replace')
output += safe_format_output("⚠️ 标准错误:\n{stderr}\n", stderr=stderr_text)
return output
except Exception as e:
return safe_format_output("❌ 命令执行失败: {error}", error=str(e))
@mcp.tool
async def run_python_code(code: str) -> str:
"""在testworkspace目录中异步执行Python代码"""
ensure_workspace()
temp_file = WORKSPACE_DIR / "temp_script.py"
try:
temp_file.write_text(code, encoding='utf-8')
env = os.environ.copy()
env['PYTHONIOENCODING'] = 'utf-8'
process = await asyncio.create_subprocess_exec(
'python', 'temp_script.py',
cwd=WORKSPACE_DIR,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=30
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
return safe_format_output("⏰ Python代码执行超时 (30秒)")
output = safe_format_output("🐍 执行Python代码:\n{code}\n", code=code)
output += safe_format_output("📁 工作目录: {workspace}\n", workspace=str(WORKSPACE_DIR))
output += safe_format_output("🔧 返回码: {returncode}\n", returncode=process.returncode)
if stdout:
stdout_text = stdout.decode('utf-8', errors='replace')
output += safe_format_output("📤 输出:\n{stdout}\n", stdout=stdout_text)
if stderr:
stderr_text = stderr.decode('utf-8', errors='replace')
output += safe_format_output("⚠️ 错误:\n{stderr}\n", stderr=stderr_text)
return output
except Exception as e:
return safe_format_output("❌ Python代码执行失败: {error}", error=str(e))
finally:
if temp_file.exists():
temp_file.unlink()
# ===== 启动服务器 =====
async def main():
print("🚀 启动 FastMCP SSE 文件编辑和命令执行服务器")
print("🌐 SSE端点: http://localhost:3001/sse")
print("📡 HTTP端点: http://localhost:3001/messages")
await mcp.run_async(
transport="sse",
host="localhost",
port=3001
)
if __name__ == "__main__":
asyncio.run(main())
评论区