LLM App 设计文档
1. 项目背景
LLM App 是课程平台的一个子模块,用于为课程提供智能知识库和对话功能。主要功能包括:
- 为每个课程创建独立的知识库
- 支持文档上传和知识库构建
- 提供基于知识库的智能对话
- 支持多用户独立对话历史
2. 系统架构
2.1 核心组件
- Django后端(LLM App)
- AnythingLLM服务
- Celery异步任务处理
- MySQL数据库
- Redis缓存(用于Celery broker和文档状态管理)
2.2 关键依赖
- AnythingLLM API
- Celery
- Redis
- django-redis
3. 数据模型
3.1 Workspace(工作区)
class Workspace(models.Model):
id = models.AutoField(primary_key=True)
course = models.OneToOneField(Course, on_delete=models.CASCADE, related_name='workspace')
teacher = models.ForeignKey(
CustomUser,
on_delete=models.CASCADE,
limit_choices_to={'role': 'teacher'},
related_name='workspaces'
)
name = models.CharField(max_length=255) # 格式:{course_id}-{course_name}
slug = models.CharField(max_length=100, unique=True)
openai_temp = models.FloatField(null=True, blank=True) # 默认0.7
openai_history = models.IntegerField(null=True, blank=True) # 默认20
openai_prompt = models.TextField(null=True, blank=True) # 默认系统提示词
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'llm_workspace'
3.2 Document(文档)
class Document(models.Model):
id = models.AutoField(primary_key=True)
workspace = models.ForeignKey(
Workspace,
on_delete=models.CASCADE,
related_name='documents'
)
filename = models.CharField(max_length=255) # 格式:{course_id}_{document_id}_document
file_path = models.CharField(max_length=500) # AnythingLLM返回的文件路径
uploader = models.ForeignKey(
CustomUser,
on_delete=models.SET_NULL,
null=True,
limit_choices_to={'role': 'teacher'}
)
embedding_status = models.CharField(
max_length=20,
choices=[
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed')
],
default='pending'
)
error_message = models.TextField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'llm_document'
3.3 ChatThread(对话线程)
class ChatThread(models.Model):
id = models.AutoField(primary_key=True)
user = models.ForeignKey(
CustomUser,
on_delete=models.CASCADE,
related_name='chat_threads'
)
workspace = models.ForeignKey(
Workspace,
on_delete=models.CASCADE,
related_name='chat_threads'
)
thread_slug = models.CharField(max_length=100)
name = models.CharField(max_length=255)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
last_used_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'llm_chat_thread'
unique_together = ['workspace', 'thread_slug']
4. 异步任务设计
4.1 文档状态管理
使用 Redis 管理文档的处理状态:
# Redis中的键值结构
{
"workspace:{workspace_id}": {
"{document_id}": {
"status": "pending|processing|completed|failed",
"error": "错误信息(可选)",
"timestamp": 1701363789.123
}
}
}
4.2 Celery Tasks
from typing import List
import json
import time
from celery import shared_task
from django.conf import settings
from django_redis import get_redis_connection
from .models import Document
from .anythingClient import AnythingClient
from .constants import DocStatus
@shared_task
def poll_pending_documents():
"""轮询所有工作区的待处理文档"""
redis_client = get_redis_connection("default")
# 检查每个工作区的文档状态
for workspace_key in redis_client.keys("workspace:*"):
try:
workspace_id = int(workspace_key.decode().split(":")[1])
docs = redis_client.hgetall(workspace_key)
if not docs:
continue
# 解析文档状态
docs_status = {
int(doc_id.decode()): json.loads(status.decode())
for doc_id, status in docs.items()
}
# 如果有正在处理的文档,跳过该工作区
if any(status['status'] == DocStatus.PROCESSING.value
for status in docs_status.values()):
continue
# 获取最多8个待处理文档
pending_docs = [
doc_id for doc_id, status in docs_status.items()
if status['status'] == DocStatus.PENDING.value
][:8]
if pending_docs:
process_documents_embedding(pending_docs)
except Exception as e:
print(f"处理工作区{workspace_key}出错: {str(e)}")
continue
@shared_task(bind=True, max_retries=3)
def process_documents_embedding(self, document_ids: List[int]):
"""执行文档嵌入任务"""
redis_client = get_redis_connection("default")
try:
# 获取所有文档记录
documents = Document.objects.filter(id__in=document_ids)
if not documents.exists():
return
workspace = documents.first().workspace
workspace_key = f"workspace:{workspace.id}"
# 更新状态为处理中
for doc_id in document_ids:
redis_client.hset(
workspace_key,
doc_id,
json.dumps({
'status': DocStatus.PROCESSING.value,
'timestamp': time.time()
})
)
# 批量处理嵌入
document_paths = [doc.file_path for doc in documents]
client = AnythingClient(
api_key=settings.ANYTHING_LLM_API_KEY,
base_url=settings.ANYTHING_LLM_URL
)
response = client.update_documents_embeddings(
slug=workspace.slug,
document_paths=document_paths
)
result = json.loads(response)
if "workspace" not in result:
raise Exception(result.get("error", "嵌入处理失败"))
# 更新成功状态
for doc_id in document_ids:
redis_client.hset(
workspace_key,
doc_id,
json.dumps({
'status': DocStatus.COMPLETED.value,
'timestamp': time.time()
})
)
documents.update(embedding_status='completed')
except Exception as e:
# 更新失败状态
for doc_id in document_ids:
redis_client.hset(
workspace_key,
doc_id,
json.dumps({
'status': DocStatus.FAILED.value,
'error': str(e),
'timestamp': time.time()
})
)
documents.update(
embedding_status='failed',
error_message=str(e)
)
if self.request.retries < self.max_retries:
raise self.retry(exc=e, countdown=5)
4.3 常量定义
from enum import Enum
class DocStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
5. 文档处理流程
5.1 文档上传
sequenceDiagram
participant U as User
participant B as Backend
participant A as AnythingLLM
participant R as Redis
U->>B: 上传文件
B->>B: 创建Document记录
B->>A: 上传文件
A-->>B: 返回文件路径
B->>B: 更新Document路径
B->>R: 添加待处理记录
B-->>U: 返回上传成功
5.2 嵌入处理
sequenceDiagram
participant C as Celery
participant R as Redis
participant A as AnythingLLM
participant B as Backend
C->>R: 轮询待处理文档
R-->>C: 返回文档状态
C->>R: 更新为处理中
C->>A: 批量调用嵌入API
A-->>C: 返回处理结果
C->>R: 更新处理状态
C->>B: 更新数据库状态
6. API设计
参见 API 文档。
评论区