一、技术栈
后端框架:
- Flask
文件服务器:
- Nginx + WebDAV
- Basic认证
音视频处理:
- FFmpeg:格式转换和加速
- 音频参数:16kHz采样率、单声道、128k比特率
语音识别:
- 腾讯云ASR服务
二、实现思路
- 文件处理流程:
上传文件 -> 转换格式(如需) -> 上传到文件服务器 -> 提交ASR -> 轮询结果 -> 清理文件
关键优化点:
- 使用2倍速音频,降低ASR成本
- 异步处理(因为不清楚腾讯api多久后才能返回)
- 分离文件服务器,减轻应用负担
- 保留status.json,支持历史查询
状态管理:
状态流转:
accepted -> converting -> uploading -> asr_submitting -> asr_processing -> completed
\-> failed
三、系统架构
- 文件服务器配置(详情看nginx配置文章):
server {
location /files/ {
alias /var/www/files/;
dav_methods PUT DELETE MKCOL;
create_full_put_path on;
dav_access user:rw group:rw all:r;
auth_basic "Restricted";
auth_basic_user_file /etc/nginx/.htpasswd;
}
location /files-get/ {
alias /var/www/files/;
add_header Access-Control-Allow-Origin *;
}
}
- 项目结构:
project/
├── api.py # Flask应用
├── v2t_services.py # 核心服务
├── index.html # 前端界面
└── temp/
└── asr_tasks/ # 临时文件目录
└── {task_id}/ # 任务目录
└── status.json # 状态文件
- 接口设计:
POST /api/upload # 上传文件
GET /api/upload/{id}/status # 查询状态
GET /api/list # 获取任务列表
DELETE /api/delete/{id} # 删除任务
四、源代码
1. api.py
import json
import shutil
from flask import Flask, request, jsonify, send_file
from werkzeug.utils import secure_filename
import os
import uuid
from v2t_services import VideoProcessor
app = Flask(__name__)
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
app.config['UPLOAD_FOLDER'] = os.path.join(BASE_DIR, 'temp', 'asr_tasks')
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 # 1GB限制
ALLOWED_EXTENSIONS = {'mp4', 'mp3', 'wav', 'avi', 'mkv'}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/api/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'File type not allowed'}), 400
try:
task_id = str(uuid.uuid4())
task_dir = os.path.join(app.config['UPLOAD_FOLDER'], task_id)
os.makedirs(task_dir, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(task_dir, filename)
file.save(file_path)
processor = VideoProcessor()
processor.process_async(task_id, file_path)
return jsonify({
'task_id': task_id,
'status': 'accepted',
'message': '文件已接收,开始处理'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/upload/<task_id>/status', methods=['GET'])
def get_status(task_id):
try:
processor = VideoProcessor()
status = processor.get_status(task_id)
return jsonify(status)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/list', methods=['GET'])
def list_tasks():
try:
tasks = []
for task_id in os.listdir(app.config['UPLOAD_FOLDER']):
task_dir = os.path.join(app.config['UPLOAD_FOLDER'], task_id)
if os.path.isdir(task_dir):
status_file = os.path.join(task_dir, 'status.json')
if os.path.exists(status_file):
with open(status_file, 'r') as f:
status = json.load(f)
tasks.append({
'task_id': task_id,
'filename': status.get('filename', 'Unknown')
})
return jsonify(tasks)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/delete/<task_id>', methods=['DELETE'])
def delete_task(task_id):
try:
task_dir = os.path.join(app.config['UPLOAD_FOLDER'], task_id)
if not os.path.exists(task_dir):
return jsonify({'error': 'Task not found'}), 404
# 删除本地文件
shutil.rmtree(task_dir)
# 删除文件服务器上的文件
processor = VideoProcessor()
processor._cleanup_files(task_id)
return jsonify({'message': f'Task {task_id} deleted successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/')
def index():
return send_file('index.html')
if __name__ == '__main__':
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
app.run(debug=True, host='0.0.0.0', port=5000)
2. v2t_services.py
import os
import json
import time
import shutil
import threading
import subprocess
import requests
from typing import Dict, Optional
from threading import Lock
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.asr.v20190614 import asr_client, models
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
class VideoProcessor:
def __init__(self):
# 基础配置
self.tasks: Dict[str, dict] = {}
self.tasks_lock = Lock()
# 文件路径配置
self.BASE_DIR = os.path.abspath(os.path.dirname(__file__))
self.TASK_DIR = os.path.join(self.BASE_DIR, 'temp', 'asr_tasks')
os.makedirs(self.TASK_DIR, exist_ok=True)
# 文件服务器配置
self.UPLOAD_URL = "YOUR_FILE_SERVER_URL/files/"
self.GET_URL = "YOUR_FILE_SERVER_URL/files-get/"
self.FILE_SERVER_AUTH = ('YOUR_USERNAME', 'YOUR_PASSWORD')
# 腾讯云配置
self.SECRET_ID = "YOUR_SECRET_ID"
self.SECRET_KEY = "YOUR_SECRET_KEY"
# 初始化腾讯云客户端
self.cred = credential.Credential(self.SECRET_ID, self.SECRET_KEY)
http_profile = HttpProfile()
http_profile.endpoint = "asr.tencentcloudapi.com"
self.client_profile = ClientProfile()
self.client_profile.httpProfile = http_profile
self.client = asr_client.AsrClient(self.cred, "ap-guangzhou", self.client_profile)
def _convert_to_mp3(self, input_path: str) -> Optional[str]:
"""转换并加速音频"""
try:
output_path = input_path.rsplit('.', 1)[0] + '.mp3'
cmd = [
'ffmpeg', '-i', input_path,
'-vn', # 去除视频流
'-filter:a', 'atempo=2', # 2倍速
'-ar', '16000', # 采样率16kHz
'-ac', '1', # 单声道
'-b:a', '128k', # 比特率128k
output_path
]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait()
if process.returncode == 0:
return output_path
else:
raise Exception(f"FFmpeg conversion failed: {process.stderr.read().decode()}")
except Exception as e:
print(f"Error converting file: {str(e)}")
return None
def _upload_to_server(self, file_path: str, task_id: str) -> Optional[str]:
"""上传文件到文件服务器的指定task_id目录"""
try:
filename = os.path.basename(file_path)
# 直接上传到task_id目录
with open(file_path, 'rb') as f:
response = requests.put(
f"{self.UPLOAD_URL}{task_id}/{filename}",
data=f,
auth=self.FILE_SERVER_AUTH
)
if response.status_code in [201, 204]:
return f"{self.GET_URL}{task_id}/{filename}"
else:
raise Exception(f"Upload failed with status code: {response.status_code}")
except Exception as e:
print(f"Error uploading file: {str(e)}")
return None
def _cleanup_files(self, task_id: str):
"""清理文件服务器上的目录及其内容"""
try:
directory_url = f"{self.UPLOAD_URL}{task_id}/"
response = requests.delete(
directory_url,
auth=self.FILE_SERVER_AUTH
)
if response.status_code not in [204, 404]: # 404表示已经不存在
print(f"Warning: Failed to cleanup directory {task_id}: {response.status_code}")
except Exception as e:
print(f"Error cleaning up server files for task {task_id}: {e}")
def _cleanup_local_files(self, task_id: str):
"""清理本地临时文件,但保留状态文件"""
try:
task_dir = os.path.join(self.TASK_DIR, task_id)
if os.path.exists(task_dir):
status_file = os.path.join(task_dir, 'status.json')
for filename in os.listdir(task_dir):
filepath = os.path.join(task_dir, filename)
# 不删除status.json文件
if filepath != status_file:
if os.path.isfile(filepath):
os.remove(filepath)
elif os.path.isdir(filepath):
shutil.rmtree(filepath)
except Exception as e:
print(f"Error cleaning up local files for task {task_id}: {e}")
def _submit_to_asr(self, file_url: str) -> Optional[str]:
"""提交ASR任务"""
req = models.CreateRecTaskRequest()
params = {
"EngineModelType": "16k_zh",
"ChannelNum": 1,
"ResTextFormat": 2,
"SourceType": 0,
"Url": file_url
}
req.from_json_string(json.dumps(params))
resp = self.client.CreateRecTask(req)
result = json.loads(resp.to_json_string())
return result['Data']['TaskId']
def _check_asr_status(self, task_id: str, asr_task_id: str):
"""检查ASR任务状态"""
req = models.DescribeTaskStatusRequest()
params = {"TaskId": asr_task_id}
req.from_json_string(json.dumps(params))
resp = self.client.DescribeTaskStatus(req)
result = json.loads(resp.to_json_string())
return result['Data']
def _update_status(self, task_id: str, status: str,
url: Optional[str] = None, error: Optional[str] = None,
asr_task_id: Optional[str] = None,
result_text: Optional[str] = None,
filename: Optional[str] = None):
"""更新任务状态"""
with self.tasks_lock:
self.tasks[task_id] = {
'task_id': task_id,
'status': status,
'filename': filename
}
if url:
self.tasks[task_id]['url'] = url
if error:
self.tasks[task_id]['error'] = error
if asr_task_id:
self.tasks[task_id]['asr_task_id'] = asr_task_id
if result_text:
self.tasks[task_id]['result'] = result_text
# 保存到文件
task_dir = os.path.join(self.TASK_DIR, task_id)
os.makedirs(task_dir, exist_ok=True)
status_file = os.path.join(task_dir, 'status.json')
with open(status_file, 'w') as f:
json.dump(self.tasks[task_id], f)
def process_asr_result(self, asr_result: str) -> str:
"""处理ASR返回的文本"""
lines = asr_result.split('\n')
processed_lines = []
for line in lines:
# 去除时间标记
if line.startswith('['):
line = line.split(']', 1)[-1].strip()
if line:
processed_lines.append(line)
return ' '.join(processed_lines)
def _process_task(self, task_id: str, file_path: str):
"""处理任务的主函数"""
try:
# 获取原始文件名
filename = os.path.basename(file_path)
# 1. 转换格式(如果需要)
if not file_path.endswith('.mp3'):
self._update_status(task_id, 'converting', filename=filename)
mp3_path = self._convert_to_mp3(file_path)
if not mp3_path:
raise Exception("Failed to convert file to mp3")
else:
mp3_path = file_path
# 2. 上传到文件服务器
self._update_status(task_id, 'uploading', filename=filename)
url = self._upload_to_server(mp3_path, task_id)
if not url:
raise Exception("Failed to upload file")
# 3. 提交ASR任务
self._update_status(task_id, 'asr_submitting', filename=filename)
asr_task_id = self._submit_to_asr(url)
if not asr_task_id:
raise Exception("Failed to submit ASR task")
self._update_status(task_id, 'asr_processing',
asr_task_id=asr_task_id,
filename=filename)
# 4. 轮询ASR结果
while True:
result = self._check_asr_status(task_id, asr_task_id)
status = result['Status']
if status == 2: # 成功完成
text_result = result['Result']
# 处理ASR结果
processed_result = self.process_asr_result(text_result)
self._update_status(task_id, 'completed',
result_text=processed_result,
filename=filename)
# 清理文件
self._cleanup_files(task_id) # 清理远程文件
self._cleanup_local_files(task_id) # 清理本地文件
break
elif status == 3: # 任务失败
self._cleanup_files(task_id)
self._cleanup_local_files(task_id)
raise Exception("ASR processing failed")
else: # 继续处理中
self._update_status(task_id, 'asr_processing',
filename=filename)
time.sleep(5)
except Exception as e:
self._update_status(task_id, 'failed',
error=str(e),
filename=filename)
self._cleanup_files(task_id)
self._cleanup_local_files(task_id)
def process_async(self, task_id: str, file_path: str):
"""异步处理任务"""
filename = os.path.basename(file_path)
self._update_status(task_id, 'accepted', filename=filename)
thread = threading.Thread(target=self._process_task, args=(task_id, file_path))
thread.daemon = True
thread.start()
def get_status(self, task_id: str) -> dict:
"""获取任务状态"""
# 先从内存获取
with self.tasks_lock:
if task_id in self.tasks:
return self.tasks[task_id]
# 从文件获取
try:
status_file = os.path.join(self.TASK_DIR, task_id, 'status.json')
if os.path.exists(status_file):
with open(status_file, 'r') as f:
return json.load(f)
except Exception:
pass
return {
'task_id': task_id,
'status': 'not_found',
'error': 'Task not found'
}
3. index.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>音视频转写</title>
<style>
body {
max-width: 1200px;
margin: 20px auto;
padding: 0 20px;
font-family: Arial, sans-serif;
line-height: 1.6;
display: flex;
}
.main-content {
flex: 2;
margin-right: 20px;
}
.task-list {
flex: 1;
border-left: 1px solid #ccc;
padding-left: 20px;
}
.upload-section {
border: 2px dashed #ccc;
padding: 20px;
text-align: center;
margin-bottom: 20px;
}
.tasks {
margin-top: 20px;
}
.task-item {
border: 1px solid #eee;
padding: 10px;
margin-bottom: 10px;
border-radius: 4px;
}
.task-item pre {
white-space: pre-wrap;
word-wrap: break-word;
background: #f5f5f5;
padding: 10px;
margin: 10px 0;
border-radius: 4px;
}
.status {
font-weight: bold;
}
.error {
color: red;
}
.task-list-item {
margin-bottom: 10px;
}
.task-list-item button {
margin-left: 5px;
}
</style>
</head>
<body>
<div class="main-content">
<h1>音视频转写</h1>
<div class="upload-section">
<input type="file" id="fileInput" accept=".mp4,.mp3,.wav,.avi,.mkv">
<button onclick="uploadFile()">上传文件</button>
</div>
<div class="tasks" id="tasksList">
<!-- 任务列表将在这里动态显示 -->
</div>
</div>
<div class="task-list">
<h2>任务列表 <button onclick="refreshTaskList()">刷新</button></h2>
<div id="taskListContainer">
<!-- 任务列表将在这里动态显示 -->
</div>
</div>
<script>
// 存储正在处理的任务
let activeTasks = new Map();
// 上传文件
async function uploadFile() {
const fileInput = document.getElementById('fileInput');
const file = fileInput.files[0];
if (!file) {
alert('请选择文件');
return;
}
const formData = new FormData();
formData.append('file', file);
try {
const response = await fetch('/api/upload', {
method: 'POST',
body: formData
});
const result = await response.json();
if (response.ok) {
// 添加任务到列表
addTask(result.task_id, file.name);
// 开始轮询状态
pollStatus(result.task_id);
// 刷新任务列表
refreshTaskList();
} else {
alert('上传失败: ' + result.error);
}
} catch (error) {
alert('上传出错: ' + error);
}
}
// 添加任务到显示列表
function addTask(taskId, fileName) {
const tasksList = document.getElementById('tasksList');
const taskDiv = document.createElement('div');
taskDiv.className = 'task-item';
taskDiv.id = `task-${taskId}`;
taskDiv.innerHTML = `
<div>文件名: <span id="filename-${taskId}">${fileName}</span></div>
<div>任务ID: ${taskId}</div>
<div>状态: <span class="status" id="status-${taskId}">等待处理</span></div>
<div id="error-${taskId}" class="error"></div>
<pre id="result-${taskId}" style="display: none;"></pre>
`;
tasksList.insertBefore(taskDiv, tasksList.firstChild);
}
// 更新任务状态显示
function updateTaskStatus(taskId, status) {
const statusEl = document.getElementById(`status-${taskId}`);
const errorEl = document.getElementById(`error-${taskId}`);
const resultEl = document.getElementById(`result-${taskId}`);
const filenameEl = document.getElementById(`filename-${taskId}`);
if (status.filename) {
filenameEl.textContent = status.filename;
}
const statusText = {
'accepted': '已接收',
'converting': '格式转换中',
'uploading': '上传中',
'asr_submitting': '提交ASR任务',
'asr_processing': 'ASR处理中',
'completed': '已完成',
'failed': '失败',
'not_found': '未找到'
}[status.status] || status.status;
statusEl.textContent = `**${statusText}**`;
if (status.error) {
errorEl.textContent = `错误: ${status.error}`;
}
if (status.result) {
resultEl.style.display = 'block';
resultEl.textContent = status.result;
}
if (status.status === 'completed' || status.status === 'failed') {
activeTasks.delete(taskId);
}
}
// 轮询任务状态
async function pollStatus(taskId) {
if (!activeTasks.has(taskId)) {
activeTasks.set(taskId, true);
while (activeTasks.has(taskId)) {
try {
const response = await fetch(`/api/upload/${taskId}/status`);
const status = await response.json();
updateTaskStatus(taskId, status);
if (status.status === 'completed' || status.status === 'failed') {
break;
}
await new Promise(resolve => setTimeout(resolve, 5000));
} catch (error) {
console.error('轮询出错:', error);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
}
// 刷新任务列表
async function refreshTaskList() {
try {
const response = await fetch('/api/list');
const tasks = await response.json();
const taskListContainer = document.getElementById('taskListContainer');
taskListContainer.innerHTML = '';
tasks.forEach(task => {
const taskElement = document.createElement('div');
taskElement.className = 'task-list-item';
taskElement.innerHTML = `
${task.filename} (${task.task_id})
<button onclick="showTaskDetails('${task.task_id}')">详情</button>
<button onclick="deleteTask('${task.task_id}')">删除</button>
`;
taskListContainer.appendChild(taskElement);
});
} catch (error) {
console.error('刷新任务列表出错:', error);
}
}
// 显示任务详情
async function showTaskDetails(taskId) {
try {
const response = await fetch(`/api/upload/${taskId}/status`);
const status = await response.json();
alert(`任务ID: ${taskId}\n状态: ${status.status}\n文件名: ${status.filename}\n结果: ${status.result || '暂无'}`);
} catch (error) {
console.error('获取任务详情出错:', error);
alert('获取任务详情失败');
}
}
// 删除任务
async function deleteTask(taskId) {
if (confirm('确定要删除这个任务吗?')) {
try {
const response = await fetch(`/api/delete/${taskId}`, { method: 'DELETE' });
const result = await response.json();
if (response.ok) {
alert('任务删除成功');
refreshTaskList();
} else {
alert('删除失败: ' + result.error);
}
} catch (error) {
console.error('删除任务出错:', error);
alert('删除任务失败');
}
}
}
// 页面加载时刷新任务列表
window.onload = refreshTaskList;
</script>
</body>
</html>
此系统可以快速部署到支持Python的服务器上,注意事项:
- 安装必要的依赖:
pip install flask requests tencentcloud-sdk-python
- 配置正确的文件服务器地址和认证信息
- 配置有效的腾讯云API密钥
- 确保服务器已安装FFmpeg
- 确保temp目录具有适当的读写权限
评论区