一、技术栈

  1. 后端框架

    • Flask
  2. 文件服务器

    • Nginx + WebDAV
    • Basic认证
  3. 音视频处理

    • FFmpeg:格式转换和加速
    • 音频参数:16kHz采样率、单声道、128k比特率
  4. 语音识别

    • 腾讯云ASR服务

二、实现思路

  1. 文件处理流程
上传文件 -> 转换格式(如需) -> 上传到文件服务器 -> 提交ASR -> 轮询结果 -> 清理文件
  1. 关键优化点

    • 使用2倍速音频,降低ASR成本
    • 异步处理(因为不清楚腾讯api多久后才能返回)
    • 分离文件服务器,减轻应用负担
    • 保留status.json,支持历史查询
  2. 状态管理

状态流转:
accepted -> converting -> uploading -> asr_submitting -> asr_processing -> completed
                                                                      \-> failed

三、系统架构

  1. 文件服务器配置(详情看nginx配置文章)
server {
    location /files/ {
        alias /var/www/files/;
        dav_methods PUT DELETE MKCOL;
        create_full_put_path on;
        dav_access user:rw group:rw all:r;
        auth_basic "Restricted";
        auth_basic_user_file /etc/nginx/.htpasswd;
    }

    location /files-get/ {
        alias /var/www/files/;
        add_header Access-Control-Allow-Origin *;
    }
}
  1. 项目结构
project/
├── api.py              # Flask应用
├── v2t_services.py     # 核心服务
├── index.html          # 前端界面
└── temp/
    └── asr_tasks/      # 临时文件目录
        └── {task_id}/  # 任务目录
            └── status.json  # 状态文件
  1. 接口设计
POST   /api/upload            # 上传文件
GET    /api/upload/{id}/status # 查询状态
GET    /api/list              # 获取任务列表
DELETE /api/delete/{id}       # 删除任务

四、源代码

1. api.py

import json
import shutil
from flask import Flask, request, jsonify, send_file
from werkzeug.utils import secure_filename
import os
import uuid
from v2t_services import VideoProcessor

app = Flask(__name__)
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
app.config['UPLOAD_FOLDER'] = os.path.join(BASE_DIR, 'temp', 'asr_tasks')
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024  # 1GB限制

ALLOWED_EXTENSIONS = {'mp4', 'mp3', 'wav', 'avi', 'mkv'}

def allowed_file(filename):
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

@app.route('/api/upload', methods=['POST'])
def upload_file():
    if 'file' not in request.files:
        return jsonify({'error': 'No file part'}), 400
    
    file = request.files['file']
    if file.filename == '':
        return jsonify({'error': 'No selected file'}), 400
    
    if not allowed_file(file.filename):
        return jsonify({'error': 'File type not allowed'}), 400

    try:
        task_id = str(uuid.uuid4())
        task_dir = os.path.join(app.config['UPLOAD_FOLDER'], task_id)
        os.makedirs(task_dir, exist_ok=True)
        
        filename = secure_filename(file.filename)
        file_path = os.path.join(task_dir, filename)
        file.save(file_path)
        
        processor = VideoProcessor()
        processor.process_async(task_id, file_path)
        
        return jsonify({
            'task_id': task_id,
            'status': 'accepted',
            'message': '文件已接收,开始处理'
        })
        
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/api/upload/<task_id>/status', methods=['GET'])
def get_status(task_id):
    try:
        processor = VideoProcessor()
        status = processor.get_status(task_id)
        return jsonify(status)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/api/list', methods=['GET'])
def list_tasks():
    try:
        tasks = []
        for task_id in os.listdir(app.config['UPLOAD_FOLDER']):
            task_dir = os.path.join(app.config['UPLOAD_FOLDER'], task_id)
            if os.path.isdir(task_dir):
                status_file = os.path.join(task_dir, 'status.json')
                if os.path.exists(status_file):
                    with open(status_file, 'r') as f:
                        status = json.load(f)
                        tasks.append({
                            'task_id': task_id,
                            'filename': status.get('filename', 'Unknown')
                        })
        return jsonify(tasks)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/api/delete/<task_id>', methods=['DELETE'])
def delete_task(task_id):
    try:
        task_dir = os.path.join(app.config['UPLOAD_FOLDER'], task_id)
        if not os.path.exists(task_dir):
            return jsonify({'error': 'Task not found'}), 404

        # 删除本地文件
        shutil.rmtree(task_dir)

        # 删除文件服务器上的文件
        processor = VideoProcessor()
        processor._cleanup_files(task_id)

        return jsonify({'message': f'Task {task_id} deleted successfully'})
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/')
def index():
    return send_file('index.html')

if __name__ == '__main__':
    os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
    app.run(debug=True, host='0.0.0.0', port=5000)

2. v2t_services.py

import os
import json
import time
import shutil
import threading
import subprocess
import requests
from typing import Dict, Optional
from threading import Lock
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.asr.v20190614 import asr_client, models
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException


class VideoProcessor:
    def __init__(self):
        # 基础配置
        self.tasks: Dict[str, dict] = {}
        self.tasks_lock = Lock()

        # 文件路径配置
        self.BASE_DIR = os.path.abspath(os.path.dirname(__file__))
        self.TASK_DIR = os.path.join(self.BASE_DIR, 'temp', 'asr_tasks')
        os.makedirs(self.TASK_DIR, exist_ok=True)

        # 文件服务器配置
        self.UPLOAD_URL = "YOUR_FILE_SERVER_URL/files/"
        self.GET_URL = "YOUR_FILE_SERVER_URL/files-get/"
        self.FILE_SERVER_AUTH = ('YOUR_USERNAME', 'YOUR_PASSWORD')

        # 腾讯云配置
        self.SECRET_ID = "YOUR_SECRET_ID"
        self.SECRET_KEY = "YOUR_SECRET_KEY"

        # 初始化腾讯云客户端
        self.cred = credential.Credential(self.SECRET_ID, self.SECRET_KEY)
        http_profile = HttpProfile()
        http_profile.endpoint = "asr.tencentcloudapi.com"

        self.client_profile = ClientProfile()
        self.client_profile.httpProfile = http_profile
        self.client = asr_client.AsrClient(self.cred, "ap-guangzhou", self.client_profile)

    def _convert_to_mp3(self, input_path: str) -> Optional[str]:
        """转换并加速音频"""
        try:
            output_path = input_path.rsplit('.', 1)[0] + '.mp3'
            cmd = [
                'ffmpeg', '-i', input_path,
                '-vn',  # 去除视频流
                '-filter:a', 'atempo=2',  # 2倍速
                '-ar', '16000',  # 采样率16kHz
                '-ac', '1',  # 单声道
                '-b:a', '128k',  # 比特率128k
                output_path
            ]

            process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            process.wait()

            if process.returncode == 0:
                return output_path
            else:
                raise Exception(f"FFmpeg conversion failed: {process.stderr.read().decode()}")

        except Exception as e:
            print(f"Error converting file: {str(e)}")
            return None

    def _upload_to_server(self, file_path: str, task_id: str) -> Optional[str]:
        """上传文件到文件服务器的指定task_id目录"""
        try:
            filename = os.path.basename(file_path)

            # 直接上传到task_id目录
            with open(file_path, 'rb') as f:
                response = requests.put(
                    f"{self.UPLOAD_URL}{task_id}/{filename}",
                    data=f,
                    auth=self.FILE_SERVER_AUTH
                )

            if response.status_code in [201, 204]:
                return f"{self.GET_URL}{task_id}/{filename}"
            else:
                raise Exception(f"Upload failed with status code: {response.status_code}")

        except Exception as e:
            print(f"Error uploading file: {str(e)}")
            return None

    def _cleanup_files(self, task_id: str):
        """清理文件服务器上的目录及其内容"""
        try:
            directory_url = f"{self.UPLOAD_URL}{task_id}/"
            response = requests.delete(
                directory_url,
                auth=self.FILE_SERVER_AUTH
            )

            if response.status_code not in [204, 404]:  # 404表示已经不存在
                print(f"Warning: Failed to cleanup directory {task_id}: {response.status_code}")

        except Exception as e:
            print(f"Error cleaning up server files for task {task_id}: {e}")

    def _cleanup_local_files(self, task_id: str):
        """清理本地临时文件,但保留状态文件"""
        try:
            task_dir = os.path.join(self.TASK_DIR, task_id)
            if os.path.exists(task_dir):
                status_file = os.path.join(task_dir, 'status.json')
                for filename in os.listdir(task_dir):
                    filepath = os.path.join(task_dir, filename)
                    # 不删除status.json文件
                    if filepath != status_file:
                        if os.path.isfile(filepath):
                            os.remove(filepath)
                        elif os.path.isdir(filepath):
                            shutil.rmtree(filepath)
        except Exception as e:
            print(f"Error cleaning up local files for task {task_id}: {e}")

    def _submit_to_asr(self, file_url: str) -> Optional[str]:
        """提交ASR任务"""
        req = models.CreateRecTaskRequest()
        params = {
            "EngineModelType": "16k_zh",
            "ChannelNum": 1,
            "ResTextFormat": 2,
            "SourceType": 0,
            "Url": file_url
        }

        req.from_json_string(json.dumps(params))
        resp = self.client.CreateRecTask(req)
        result = json.loads(resp.to_json_string())

        return result['Data']['TaskId']

    def _check_asr_status(self, task_id: str, asr_task_id: str):
        """检查ASR任务状态"""
        req = models.DescribeTaskStatusRequest()
        params = {"TaskId": asr_task_id}

        req.from_json_string(json.dumps(params))
        resp = self.client.DescribeTaskStatus(req)
        result = json.loads(resp.to_json_string())

        return result['Data']

    def _update_status(self, task_id: str, status: str,
                       url: Optional[str] = None, error: Optional[str] = None,
                       asr_task_id: Optional[str] = None,
                       result_text: Optional[str] = None,
                       filename: Optional[str] = None):
        """更新任务状态"""
        with self.tasks_lock:
            self.tasks[task_id] = {
                'task_id': task_id,
                'status': status,
                'filename': filename
            }
            if url:
                self.tasks[task_id]['url'] = url
            if error:
                self.tasks[task_id]['error'] = error
            if asr_task_id:
                self.tasks[task_id]['asr_task_id'] = asr_task_id
            if result_text:
                self.tasks[task_id]['result'] = result_text

        # 保存到文件
        task_dir = os.path.join(self.TASK_DIR, task_id)
        os.makedirs(task_dir, exist_ok=True)
        status_file = os.path.join(task_dir, 'status.json')
        with open(status_file, 'w') as f:
            json.dump(self.tasks[task_id], f)

    def process_asr_result(self, asr_result: str) -> str:
        """处理ASR返回的文本"""
        lines = asr_result.split('\n')
        processed_lines = []
        for line in lines:
            # 去除时间标记
            if line.startswith('['):
                line = line.split(']', 1)[-1].strip()
            if line:
                processed_lines.append(line)
        return ' '.join(processed_lines)

    def _process_task(self, task_id: str, file_path: str):
        """处理任务的主函数"""
        try:
            # 获取原始文件名
            filename = os.path.basename(file_path)

            # 1. 转换格式(如果需要)
            if not file_path.endswith('.mp3'):
                self._update_status(task_id, 'converting', filename=filename)
                mp3_path = self._convert_to_mp3(file_path)
                if not mp3_path:
                    raise Exception("Failed to convert file to mp3")
            else:
                mp3_path = file_path

            # 2. 上传到文件服务器
            self._update_status(task_id, 'uploading', filename=filename)
            url = self._upload_to_server(mp3_path, task_id)
            if not url:
                raise Exception("Failed to upload file")

            # 3. 提交ASR任务
            self._update_status(task_id, 'asr_submitting', filename=filename)
            asr_task_id = self._submit_to_asr(url)
            if not asr_task_id:
                raise Exception("Failed to submit ASR task")

            self._update_status(task_id, 'asr_processing',
                               asr_task_id=asr_task_id,
                               filename=filename)

            # 4. 轮询ASR结果
            while True:
                result = self._check_asr_status(task_id, asr_task_id)
                status = result['Status']

                if status == 2:  # 成功完成
                    text_result = result['Result']
                    # 处理ASR结果
                    processed_result = self.process_asr_result(text_result)

                    self._update_status(task_id, 'completed',
                                       result_text=processed_result,
                                       filename=filename)
                    # 清理文件
                    self._cleanup_files(task_id)  # 清理远程文件
                    self._cleanup_local_files(task_id)  # 清理本地文件
                    break

                elif status == 3:  # 任务失败
                    self._cleanup_files(task_id)
                    self._cleanup_local_files(task_id)
                    raise Exception("ASR processing failed")

                else:  # 继续处理中
                    self._update_status(task_id, 'asr_processing',
                                       filename=filename)
                    time.sleep(5)

        except Exception as e:
            self._update_status(task_id, 'failed',
                               error=str(e),
                               filename=filename)
            self._cleanup_files(task_id)
            self._cleanup_local_files(task_id)

    def process_async(self, task_id: str, file_path: str):
        """异步处理任务"""
        filename = os.path.basename(file_path)
        self._update_status(task_id, 'accepted', filename=filename)
        thread = threading.Thread(target=self._process_task, args=(task_id, file_path))
        thread.daemon = True
        thread.start()

    def get_status(self, task_id: str) -> dict:
        """获取任务状态"""
        # 先从内存获取
        with self.tasks_lock:
            if task_id in self.tasks:
                return self.tasks[task_id]

        # 从文件获取
        try:
            status_file = os.path.join(self.TASK_DIR, task_id, 'status.json')
            if os.path.exists(status_file):
                with open(status_file, 'r') as f:
                    return json.load(f)
        except Exception:
            pass

        return {
            'task_id': task_id,
            'status': 'not_found',
            'error': 'Task not found'
        }

3. index.html

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>音视频转写</title>
    <style>
        body {
            max-width: 1200px;
            margin: 20px auto;
            padding: 0 20px;
            font-family: Arial, sans-serif;
            line-height: 1.6;
            display: flex;
        }
        .main-content {
            flex: 2;
            margin-right: 20px;
        }
        .task-list {
            flex: 1;
            border-left: 1px solid #ccc;
            padding-left: 20px;
        }
        .upload-section {
            border: 2px dashed #ccc;
            padding: 20px;
            text-align: center;
            margin-bottom: 20px;
        }
        .tasks {
            margin-top: 20px;
        }
        .task-item {
            border: 1px solid #eee;
            padding: 10px;
            margin-bottom: 10px;
            border-radius: 4px;
        }
        .task-item pre {
            white-space: pre-wrap;
            word-wrap: break-word;
            background: #f5f5f5;
            padding: 10px;
            margin: 10px 0;
            border-radius: 4px;
        }
        .status {
            font-weight: bold;
        }
        .error {
            color: red;
        }
        .task-list-item {
            margin-bottom: 10px;
        }
        .task-list-item button {
            margin-left: 5px;
        }
    </style>
</head>
<body>
    <div class="main-content">
        <h1>音视频转写</h1>

        <div class="upload-section">
            <input type="file" id="fileInput" accept=".mp4,.mp3,.wav,.avi,.mkv">
            <button onclick="uploadFile()">上传文件</button>
        </div>

        <div class="tasks" id="tasksList">
            <!-- 任务列表将在这里动态显示 -->
        </div>
    </div>

    <div class="task-list">
        <h2>任务列表 <button onclick="refreshTaskList()">刷新</button></h2>
        <div id="taskListContainer">
            <!-- 任务列表将在这里动态显示 -->
        </div>
    </div>

    <script>
        // 存储正在处理的任务
        let activeTasks = new Map();

        // 上传文件
        async function uploadFile() {
            const fileInput = document.getElementById('fileInput');
            const file = fileInput.files[0];

            if (!file) {
                alert('请选择文件');
                return;
            }

            const formData = new FormData();
            formData.append('file', file);

            try {
                const response = await fetch('/api/upload', {
                    method: 'POST',
                    body: formData
                });

                const result = await response.json();

                if (response.ok) {
                    // 添加任务到列表
                    addTask(result.task_id, file.name);
                    // 开始轮询状态
                    pollStatus(result.task_id);
                    // 刷新任务列表
                    refreshTaskList();
                } else {
                    alert('上传失败: ' + result.error);
                }
            } catch (error) {
                alert('上传出错: ' + error);
            }
        }

        // 添加任务到显示列表
        function addTask(taskId, fileName) {
            const tasksList = document.getElementById('tasksList');
            const taskDiv = document.createElement('div');
            taskDiv.className = 'task-item';
            taskDiv.id = `task-${taskId}`;
            taskDiv.innerHTML = `
                <div>文件名: <span id="filename-${taskId}">${fileName}</span></div>
                <div>任务ID: ${taskId}</div>
                <div>状态: <span class="status" id="status-${taskId}">等待处理</span></div>
                <div id="error-${taskId}" class="error"></div>
                <pre id="result-${taskId}" style="display: none;"></pre>
            `;
            tasksList.insertBefore(taskDiv, tasksList.firstChild);
        }

        // 更新任务状态显示
        function updateTaskStatus(taskId, status) {
            const statusEl = document.getElementById(`status-${taskId}`);
            const errorEl = document.getElementById(`error-${taskId}`);
            const resultEl = document.getElementById(`result-${taskId}`);
            const filenameEl = document.getElementById(`filename-${taskId}`);

            if (status.filename) {
                filenameEl.textContent = status.filename;
            }

            const statusText = {
                'accepted': '已接收',
                'converting': '格式转换中',
                'uploading': '上传中',
                'asr_submitting': '提交ASR任务',
                'asr_processing': 'ASR处理中',
                'completed': '已完成',
                'failed': '失败',
                'not_found': '未找到'
            }[status.status] || status.status;

            statusEl.textContent = `**${statusText}**`;

            if (status.error) {
                errorEl.textContent = `错误: ${status.error}`;
            }

            if (status.result) {
                resultEl.style.display = 'block';
                resultEl.textContent = status.result;
            }

            if (status.status === 'completed' || status.status === 'failed') {
                activeTasks.delete(taskId);
            }
        }

        // 轮询任务状态
        async function pollStatus(taskId) {
            if (!activeTasks.has(taskId)) {
                activeTasks.set(taskId, true);

                while (activeTasks.has(taskId)) {
                    try {
                        const response = await fetch(`/api/upload/${taskId}/status`);
                        const status = await response.json();

                        updateTaskStatus(taskId, status);

                        if (status.status === 'completed' || status.status === 'failed') {
                            break;
                        }

                        await new Promise(resolve => setTimeout(resolve, 5000));
                    } catch (error) {
                        console.error('轮询出错:', error);
                        await new Promise(resolve => setTimeout(resolve, 5000));
                    }
                }
            }
        }

        // 刷新任务列表
        async function refreshTaskList() {
            try {
                const response = await fetch('/api/list');
                const tasks = await response.json();

                const taskListContainer = document.getElementById('taskListContainer');
                taskListContainer.innerHTML = '';

                tasks.forEach(task => {
                    const taskElement = document.createElement('div');
                    taskElement.className = 'task-list-item';
                    taskElement.innerHTML = `
                        ${task.filename} (${task.task_id})
                        <button onclick="showTaskDetails('${task.task_id}')">详情</button>
                        <button onclick="deleteTask('${task.task_id}')">删除</button>
                    `;
                    taskListContainer.appendChild(taskElement);
                });
            } catch (error) {
                console.error('刷新任务列表出错:', error);
            }
        }

        // 显示任务详情
        async function showTaskDetails(taskId) {
            try {
                const response = await fetch(`/api/upload/${taskId}/status`);
                const status = await response.json();

                alert(`任务ID: ${taskId}\n状态: ${status.status}\n文件名: ${status.filename}\n结果: ${status.result || '暂无'}`);
            } catch (error) {
                console.error('获取任务详情出错:', error);
                alert('获取任务详情失败');
            }
        }

        // 删除任务
        async function deleteTask(taskId) {
            if (confirm('确定要删除这个任务吗?')) {
                try {
                    const response = await fetch(`/api/delete/${taskId}`, { method: 'DELETE' });
                    const result = await response.json();

                    if (response.ok) {
                        alert('任务删除成功');
                        refreshTaskList();
                    } else {
                        alert('删除失败: ' + result.error);
                    }
                } catch (error) {
                    console.error('删除任务出错:', error);
                    alert('删除任务失败');
                }
            }
        }

        // 页面加载时刷新任务列表
        window.onload = refreshTaskList;
    </script>
</body>
</html>

此系统可以快速部署到支持Python的服务器上,注意事项:

  1. 安装必要的依赖:pip install flask requests tencentcloud-sdk-python
  2. 配置正确的文件服务器地址和认证信息
  3. 配置有效的腾讯云API密钥
  4. 确保服务器已安装FFmpeg
  5. 确保temp目录具有适当的读写权限