实现一个MCP服务器,让Claude能执行命令和编辑文件

基本思路

用FastMCP写了个文件操作+命令执行的服务器,限制在testworkspace目录内操作,避免搞坏系统。

from fastmcp import FastMCP
mcp = FastMCP("File Editor & Command Runner")

@mcp.tool
def create_file(filename: str, content: str = "") -> str:
    file_path = WORKSPACE_DIR / filename
    file_path.write_text(content, encoding='utf-8')
    return f"创建文件成功: {filename}"

编码问题的坑

中文Windows环境下,输入是UTF-8,但系统输出是GBK,直接执行命令会乱码或报错。

解决方案

启动时修复编码:

def fix_encoding():
    os.environ['PYTHONIOENCODING'] = 'utf-8'
    
    # 重设stdout编码
    if hasattr(sys.stdout, 'buffer'):
        sys.stdout = io.TextIOWrapper(
            sys.stdout.buffer, 
            encoding='utf-8', 
            errors='replace'
        )

执行命令时传递UTF-8环境:

env = os.environ.copy()
env['PYTHONIOENCODING'] = 'utf-8'

process = await asyncio.create_subprocess_shell(
    command,
    env=env,  # 关键
    stdout=asyncio.subprocess.PIPE,
    stderr=asyncio.subprocess.PIPE
)

# 容错解码
stdout_text = stdout.decode('utf-8', errors='replace')

emoji兼容处理:

def safe_format_output(template: str, **kwargs) -> str:
    try:
        return template.format(**kwargs)
    except UnicodeEncodeError:
        # emoji替换为文本
        replacements = {'✅': '[OK]', '❌': '[ERROR]'}
        for emoji, text in replacements.items():
            template = template.replace(emoji, text)
        return template.format(**kwargs)

实际效果

现在Claude可以:

  1. 跨平台编译:用WSL的gcc编译Windows里的C代码
  2. 联网操作:写Python脚本发邮件、爬数据
  3. 智能迭代:看到运行结果后自动修改代码

实际对话:

我:用gcc编译个hello world
Claude:好的,我来创建C文件并编译
-> create_file("hello.c", "#include <stdio.h>...")
-> run_command("wsl gcc hello.c -o hello")
-> run_command("wsl ./hello")
结果:Hello, World!
Claude:编译成功!程序正常运行。

为什么SSE而不是STDIO

STDIO需要Claude Desktop每次启动子进程,配置麻烦。SSE模式:

# 独立运行服务器
await mcp.run_async(transport="sse", host="localhost", port=3001)

Claude Desktop配置:

"file-editor-dev": {
    "command": "npx",
    "args": [
        "-y",
        "mcp-remote",
        "http://localhost:3001/sse",
        "--allow-http",
        "--timeout", "10000"
    ],
    "env": {
        "MCP_TRANSPORT_STRATEGY": "sse",
        "DEBUG": "mcp-remote"
    }
}

安全措施

路径检查:

def is_safe_path(path: Path) -> bool:
    return str(path.resolve()).startswith(str(WORKSPACE_DIR.resolve()))

命令过滤:

dangerous_commands = ['format', 'del', 'shutdown', 'reg delete']
if any(cmd in command.lower() for cmd in dangerous_commands):
    return "禁止执行危险命令"

完整源代码

import os
import sys
import subprocess
import asyncio
from pathlib import Path
from typing import List, Optional
from fastmcp import FastMCP

# ===== 编码修复 =====
def fix_encoding():
    """修复Windows环境下的编码问题"""
    os.environ['PYTHONIOENCODING'] = 'utf-8'

    try:
        import io
        if hasattr(sys.stdout, 'buffer'):
            sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
        if hasattr(sys.stderr, 'buffer'):
            sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
    except Exception:
        pass

fix_encoding()

def safe_format_output(template: str, **kwargs) -> str:
    """安全格式化输出,避免emoji编码问题"""
    try:
        return template.format(**kwargs)
    except UnicodeEncodeError:
        replacements = {
            '✅': '[OK]', '❌': '[ERROR]', '🔧': '[TOOL]',
            '📄': '[FILE]', '📁': '[FOLDER]', '🐍': '[PYTHON]',
            '📤': '[OUTPUT]', '⚠️': '[WARNING]', '⏰': '[TIMEOUT]'
        }
        
        safe_template = template
        for emoji, text in replacements.items():
            safe_template = safe_template.replace(emoji, text)
        return safe_template.format(**kwargs)

# 创建MCP服务器
mcp = FastMCP("File Editor & Command Runner SSE")
WORKSPACE_DIR = Path(__file__).parent / "testworkspace"

def ensure_workspace():
    WORKSPACE_DIR.mkdir(exist_ok=True)
    return WORKSPACE_DIR

def is_safe_path(path: Path) -> bool:
    try:
        resolved_path = path.resolve()
        workspace_resolved = WORKSPACE_DIR.resolve()
        return str(resolved_path).startswith(str(workspace_resolved))
    except:
        return False

# ===== 文件操作工具 =====
@mcp.tool
def create_file(filename: str, content: str = "") -> str:
    """在testworkspace目录中创建新文件"""
    ensure_workspace()
    file_path = WORKSPACE_DIR / filename
    
    if not is_safe_path(file_path):
        return safe_format_output("❌ 错误: 文件路径不安全: {filename}", filename=filename)
    
    try:
        file_path.write_text(content, encoding='utf-8')
        return safe_format_output("✅ 成功创建文件: {filename} ({length} 字符)",
                                  filename=filename, length=len(content))
    except Exception as e:
        return safe_format_output("❌ 创建文件失败: {error}", error=str(e))

@mcp.tool
def read_file(filename: str) -> str:
    """读取testworkspace目录中的文件内容"""
    ensure_workspace()
    file_path = WORKSPACE_DIR / filename
    
    if not is_safe_path(file_path):
        return safe_format_output("❌ 错误: 文件路径不安全: {filename}", filename=filename)
    
    if not file_path.exists():
        return safe_format_output("❌ 文件不存在: {filename}", filename=filename)
    
    try:
        content = file_path.read_text(encoding='utf-8')
        return safe_format_output("📄 文件 {filename} 内容:\n{content}",
                                  filename=filename, content=content)
    except Exception as e:
        return safe_format_output("❌ 读取文件失败: {error}", error=str(e))

@mcp.tool
def edit_file(filename: str, content: str) -> str:
    """编辑testworkspace目录中的文件(覆盖现有内容)"""
    ensure_workspace()
    file_path = WORKSPACE_DIR / filename
    
    if not is_safe_path(file_path):
        return safe_format_output("❌ 错误: 文件路径不安全: {filename}", filename=filename)
    
    try:
        file_path.write_text(content, encoding='utf-8')
        return safe_format_output("✅ 成功编辑文件: {filename} ({length} 字符)",
                                  filename=filename, length=len(content))
    except Exception as e:
        return safe_format_output("❌ 编辑文件失败: {error}", error=str(e))

@mcp.tool
def list_files() -> str:
    """列出testworkspace目录中的所有文件和文件夹"""
    ensure_workspace()
    
    try:
        items = []
        for item in WORKSPACE_DIR.iterdir():
            if item.is_file():
                size = item.stat().st_size
                items.append(safe_format_output("📄 {name} ({size} bytes)",
                                                name=item.name, size=size))
            elif item.is_dir():
                items.append(safe_format_output("📁 {name}/", name=item.name))
        
        if not items:
            return safe_format_output("📂 testworkspace 目录为空")
        
        return safe_format_output("📂 testworkspace 目录内容:\n{items}",
                                  items="\n".join(items))
    except Exception as e:
        return safe_format_output("❌ 列出文件失败: {error}", error=str(e))

# ===== 命令执行工具 =====
@mcp.tool
async def run_command(command: str, timeout: int = 30) -> str:
    """在testworkspace目录中异步执行Windows命令"""
    ensure_workspace()
    
    # 安全检查
    dangerous_commands = [
        'format', 'del', 'rmdir', 'rd', 'deltree', 'fdisk',
        'shutdown', 'restart', 'reboot', 'net user', 'net localgroup',
        'reg delete', 'reg add', 'powershell', 'wmic', 'sc delete'
    ]
    
    command_lower = command.lower()
    for dangerous in dangerous_commands:
        if dangerous in command_lower:
            return safe_format_output("❌ 禁止执行危险命令: {command}", command=command)
    
    try:
        # 设置环境变量确保子进程使用UTF-8
        env = os.environ.copy()
        env['PYTHONIOENCODING'] = 'utf-8'
        
        process = await asyncio.create_subprocess_shell(
            command,
            cwd=WORKSPACE_DIR,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            shell=True,
            env=env
        )
        
        try:
            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            process.kill()
            await process.wait()
            return safe_format_output("⏰ 命令执行超时 ({timeout}秒): {command}",
                                      timeout=timeout, command=command)
        
        output = safe_format_output("🔧 执行命令: {command}\n", command=command)
        output += safe_format_output("📁 工作目录: {workspace}\n", workspace=str(WORKSPACE_DIR))
        output += safe_format_output("🔧 返回码: {returncode}\n", returncode=process.returncode)
        
        if stdout:
            stdout_text = stdout.decode('utf-8', errors='replace')
            output += safe_format_output("📤 标准输出:\n{stdout}\n", stdout=stdout_text)
        
        if stderr:
            stderr_text = stderr.decode('utf-8', errors='replace')
            output += safe_format_output("⚠️ 标准错误:\n{stderr}\n", stderr=stderr_text)
        
        return output
        
    except Exception as e:
        return safe_format_output("❌ 命令执行失败: {error}", error=str(e))

@mcp.tool
async def run_python_code(code: str) -> str:
    """在testworkspace目录中异步执行Python代码"""
    ensure_workspace()
    
    temp_file = WORKSPACE_DIR / "temp_script.py"
    
    try:
        temp_file.write_text(code, encoding='utf-8')
        
        env = os.environ.copy()
        env['PYTHONIOENCODING'] = 'utf-8'
        
        process = await asyncio.create_subprocess_exec(
            'python', 'temp_script.py',
            cwd=WORKSPACE_DIR,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=env
        )
        
        try:
            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout=30
            )
        except asyncio.TimeoutError:
            process.kill()
            await process.wait()
            return safe_format_output("⏰ Python代码执行超时 (30秒)")
        
        output = safe_format_output("🐍 执行Python代码:\n{code}\n", code=code)
        output += safe_format_output("📁 工作目录: {workspace}\n", workspace=str(WORKSPACE_DIR))
        output += safe_format_output("🔧 返回码: {returncode}\n", returncode=process.returncode)
        
        if stdout:
            stdout_text = stdout.decode('utf-8', errors='replace')
            output += safe_format_output("📤 输出:\n{stdout}\n", stdout=stdout_text)
        
        if stderr:
            stderr_text = stderr.decode('utf-8', errors='replace')
            output += safe_format_output("⚠️ 错误:\n{stderr}\n", stderr=stderr_text)
        
        return output
        
    except Exception as e:
        return safe_format_output("❌ Python代码执行失败: {error}", error=str(e))
    finally:
        if temp_file.exists():
            temp_file.unlink()

# ===== 启动服务器 =====
async def main():
    print("🚀 启动 FastMCP SSE 文件编辑和命令执行服务器")
    print("🌐 SSE端点: http://localhost:3001/sse")
    print("📡 HTTP端点: http://localhost:3001/messages")
    
    await mcp.run_async(
        transport="sse",
        host="localhost",
        port=3001
    )

if __name__ == "__main__":
    asyncio.run(main())

mcp调用时序图示例

sequenceDiagram participant User as 用户 participant Claude as Claude Desktop participant Proxy as mcp-remote participant Server as MCP服务器 participant FS as 文件系统 participant OS as 操作系统 Note over User, OS: MCP服务器启动阶段 User->>Server: python mcp_server.py Server->>Server: fix_encoding() Server->>Server: ensure_workspace() Server->>+FS: 创建testworkspace目录 FS-->>-Server: 目录创建完成 Server->>Server: 启动SSE服务器 :3001 Note over Server: 🌐 http://localhost:3001/sse Note over User, OS: Claude Desktop连接阶段 Claude->>+Proxy: npx mcp-remote http://localhost:3001/sse Proxy->>+Server: SSE连接请求 Server-->>-Proxy: 连接建立 Proxy-->>-Claude: MCP协议握手 Claude->>+Proxy: tools/list Proxy->>+Server: 请求工具列表 Server-->>-Proxy: [create_file, read_file, edit_file, run_command...] Proxy-->>-Claude: 工具列表返回 Note over User, OS: 一次完整的用户交互 User->>Claude: "写一个计算斐波那契数列的C代码,用WSL编译并运行" Note over Claude: 🧠 Claude分析任务:<br/>1. 检查WSL环境<br/>2. 测试编译环境<br/>3. 写目标代码<br/>4. 编译运行<br/>5. 错误处理 rect rgb(240, 248, 255) Note over Claude: 步骤1: 检查WSL环境 Claude->>+Proxy: tools/call run_command Note right of Claude: command="wsl --version" Proxy->>+Server: JSON-RPC调用 Server->>+OS: 检查WSL OS-->>-Server: WSL版本信息 Server-->>-Proxy: "WSL 2 已安装" Proxy-->>-Claude: WSL可用 Note over Claude: 步骤2: 测试编译环境 Claude->>+Proxy: tools/call run_command Note right of Claude: command="wsl gcc --version" Proxy->>+Server: JSON-RPC调用 Server->>+OS: 检查GCC OS-->>-Server: GCC版本信息 Server-->>-Proxy: "GCC 11.4.0 可用" Proxy-->>-Claude: 编译器就绪 Note over Claude: 步骤3: 创建简单测试程序 Claude->>+Proxy: tools/call create_file Note right of Claude: filename="test.c"<br/>content="#include <stdio.h>..." Proxy->>+Server: JSON-RPC调用 Server->>+FS: 写入测试文件 FS-->>-Server: 文件创建成功 Server-->>-Proxy: "✅ test.c创建成功" Proxy-->>-Claude: 测试文件就绪 Note over Claude: 步骤4: 测试编译 Claude->>+Proxy: tools/call run_command Note right of Claude: command="wsl gcc test.c -o test" Proxy->>+Server: JSON-RPC调用 Server->>+OS: 编译测试程序 OS-->>-Server: 编译成功 Server-->>-Proxy: "编译完成,无错误" Proxy-->>-Claude: 编译环境正常 Note over Claude: 步骤5: 创建目标程序 Claude->>+Proxy: tools/call create_file Note right of Claude: filename="fibonacci.c"<br/>content="斐波那契递归实现" Proxy->>+Server: JSON-RPC调用 Server->>+FS: 写入目标文件 FS-->>-Server: 文件创建成功 Server-->>-Proxy: "✅ fibonacci.c创建成功" Proxy-->>-Claude: 目标文件就绪 Note over Claude: 步骤6: 编译目标程序 Claude->>+Proxy: tools/call run_command Note right of Claude: command="wsl gcc fibonacci.c -o fibonacci" Proxy->>+Server: JSON-RPC调用 Server->>+OS: 编译斐波那契程序 OS-->>-Server: 编译成功 Server-->>-Proxy: "编译完成" Proxy-->>-Claude: 目标程序编译成功 Note over Claude: 步骤7: 运行程序 Claude->>+Proxy: tools/call run_command Note right of Claude: command="wsl ./fibonacci" Proxy->>+Server: JSON-RPC调用 Server->>+OS: 执行程序 OS-->>-Server: "斐波那契数列: 1,1,2,3,5,8,13..." Server-->>-Proxy: "📤 程序输出正常" Proxy-->>-Claude: 运行结果 end Note over Claude: 🧠 分析所有步骤结果,<br/>准备用户友好的总结 Claude-->>User: "已完成!创建了斐波那契C程序,WSL编译成功并运行正常。<br/>输出了前10个斐波那契数:1,1,2,3,5,8,13,21,34,55" rect rgb(255, 245, 245) Note over User, OS: 错误处理示例(如果某步失败) alt 编译错误场景 Claude->>+Proxy: tools/call run_command Note right of Claude: command="wsl gcc buggy.c -o buggy" Proxy->>+Server: JSON-RPC调用 Server->>+OS: 编译有错误的代码 OS-->>-Server: "error: expected semicolon before closing brace" Server-->>-Proxy: "⚠️ 编译错误" Proxy-->>-Claude: 编译失败信息 Note over Claude: 🧠 分析错误信息,<br/>识别语法问题 Claude->>+Proxy: tools/call read_file Note right of Claude: filename="buggy.c" Proxy->>+Server: JSON-RPC调用 Server->>+FS: 读取文件内容 FS-->>-Server: 源代码内容 Server-->>-Proxy: 文件内容 Proxy-->>-Claude: 源代码 Note over Claude: 🧠 发现缺少分号,<br/>准备修复代码 Claude->>+Proxy: tools/call edit_file Note right of Claude: filename="buggy.c"<br/>content="修复后的代码" Proxy->>+Server: JSON-RPC调用 Server->>+FS: 更新文件 FS-->>-Server: 文件更新成功 Server-->>-Proxy: "✅ 文件已修复" Proxy-->>-Claude: 代码修复完成 Claude->>+Proxy: tools/call run_command Note right of Claude: command="wsl gcc buggy.c -o buggy" Proxy->>+Server: JSON-RPC调用 Server->>+OS: 重新编译 OS-->>-Server: 编译成功 Server-->>-Proxy: "编译通过" Proxy-->>-Claude: 修复成功 end end