LLM App 设计文档

1. 项目背景

LLM App 是课程平台的一个子模块,用于为课程提供智能知识库和对话功能。主要功能包括:

  • 为每个课程创建独立的知识库
  • 支持文档上传和知识库构建
  • 提供基于知识库的智能对话
  • 支持多用户独立对话历史

2. 系统架构

2.1 核心组件

  • Django后端(LLM App)
  • AnythingLLM服务
  • Celery异步任务处理
  • MySQL数据库
  • Redis缓存(用于Celery broker和文档状态管理)

2.2 关键依赖

  • AnythingLLM API
  • Celery
  • Redis
  • django-redis

3. 数据模型

3.1 Workspace(工作区)

class Workspace(models.Model):
    id = models.AutoField(primary_key=True)
    course = models.OneToOneField(Course, on_delete=models.CASCADE, related_name='workspace')
    teacher = models.ForeignKey(
        CustomUser, 
        on_delete=models.CASCADE,
        limit_choices_to={'role': 'teacher'},
        related_name='workspaces'
    )
    name = models.CharField(max_length=255)  # 格式:{course_id}-{course_name}
    slug = models.CharField(max_length=100, unique=True)
    openai_temp = models.FloatField(null=True, blank=True)  # 默认0.7
    openai_history = models.IntegerField(null=True, blank=True)  # 默认20
    openai_prompt = models.TextField(null=True, blank=True)  # 默认系统提示词
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = 'llm_workspace'

3.2 Document(文档)

class Document(models.Model):
    id = models.AutoField(primary_key=True)
    workspace = models.ForeignKey(
        Workspace, 
        on_delete=models.CASCADE, 
        related_name='documents'
    )
    filename = models.CharField(max_length=255)  # 格式:{course_id}_{document_id}_document
    file_path = models.CharField(max_length=500)  # AnythingLLM返回的文件路径
    uploader = models.ForeignKey(
        CustomUser,
        on_delete=models.SET_NULL,
        null=True,
        limit_choices_to={'role': 'teacher'}
    )
    embedding_status = models.CharField(
        max_length=20,
        choices=[
            ('pending', 'Pending'),
            ('processing', 'Processing'),
            ('completed', 'Completed'),
            ('failed', 'Failed')
        ],
        default='pending'
    )
    error_message = models.TextField(null=True, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        db_table = 'llm_document'

3.3 ChatThread(对话线程)

class ChatThread(models.Model):
    id = models.AutoField(primary_key=True)
    user = models.ForeignKey(
        CustomUser, 
        on_delete=models.CASCADE, 
        related_name='chat_threads'
    )
    workspace = models.ForeignKey(
        Workspace, 
        on_delete=models.CASCADE, 
        related_name='chat_threads'
    )
    thread_slug = models.CharField(max_length=100)
    name = models.CharField(max_length=255)
    is_active = models.BooleanField(default=True)
    created_at = models.DateTimeField(auto_now_add=True)
    last_used_at = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = 'llm_chat_thread'
        unique_together = ['workspace', 'thread_slug']

4. 异步任务设计

4.1 文档状态管理

使用 Redis 管理文档的处理状态:

# Redis中的键值结构
{
    "workspace:{workspace_id}": {
        "{document_id}": {
            "status": "pending|processing|completed|failed",
            "error": "错误信息(可选)",
            "timestamp": 1701363789.123
        }
    }
}

4.2 Celery Tasks

from typing import List
import json
import time
from celery import shared_task
from django.conf import settings
from django_redis import get_redis_connection
from .models import Document
from .anythingClient import AnythingClient
from .constants import DocStatus

@shared_task
def poll_pending_documents():
    """轮询所有工作区的待处理文档"""
    redis_client = get_redis_connection("default")

    # 检查每个工作区的文档状态
    for workspace_key in redis_client.keys("workspace:*"):
        try:
            workspace_id = int(workspace_key.decode().split(":")[1])
            docs = redis_client.hgetall(workspace_key)
            if not docs:
                continue

            # 解析文档状态
            docs_status = {
                int(doc_id.decode()): json.loads(status.decode())
                for doc_id, status in docs.items()
            }

            # 如果有正在处理的文档,跳过该工作区
            if any(status['status'] == DocStatus.PROCESSING.value 
                  for status in docs_status.values()):
                continue

            # 获取最多8个待处理文档
            pending_docs = [
                doc_id for doc_id, status in docs_status.items()
                if status['status'] == DocStatus.PENDING.value
            ][:8]

            if pending_docs:
                process_documents_embedding(pending_docs)

        except Exception as e:
            print(f"处理工作区{workspace_key}出错: {str(e)}")
            continue

@shared_task(bind=True, max_retries=3)
def process_documents_embedding(self, document_ids: List[int]):
    """执行文档嵌入任务"""
    redis_client = get_redis_connection("default")

    try:
        # 获取所有文档记录
        documents = Document.objects.filter(id__in=document_ids)
        if not documents.exists():
            return

        workspace = documents.first().workspace
        workspace_key = f"workspace:{workspace.id}"

        # 更新状态为处理中
        for doc_id in document_ids:
            redis_client.hset(
                workspace_key, 
                doc_id, 
                json.dumps({
                    'status': DocStatus.PROCESSING.value,
                    'timestamp': time.time()
                })
            )

        # 批量处理嵌入
        document_paths = [doc.file_path for doc in documents]
        client = AnythingClient(
            api_key=settings.ANYTHING_LLM_API_KEY,
            base_url=settings.ANYTHING_LLM_URL
        )

        response = client.update_documents_embeddings(
            slug=workspace.slug,
            document_paths=document_paths
        )

        result = json.loads(response)
        if "workspace" not in result:
            raise Exception(result.get("error", "嵌入处理失败"))

        # 更新成功状态
        for doc_id in document_ids:
            redis_client.hset(
                workspace_key, 
                doc_id, 
                json.dumps({
                    'status': DocStatus.COMPLETED.value,
                    'timestamp': time.time()
                })
            )
        documents.update(embedding_status='completed')

    except Exception as e:
        # 更新失败状态
        for doc_id in document_ids:
            redis_client.hset(
                workspace_key, 
                doc_id, 
                json.dumps({
                    'status': DocStatus.FAILED.value,
                    'error': str(e),
                    'timestamp': time.time()
                })
            )
        documents.update(
            embedding_status='failed',
            error_message=str(e)
        )

        if self.request.retries < self.max_retries:
            raise self.retry(exc=e, countdown=5)

4.3 常量定义

from enum import Enum

class DocStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

5. 文档处理流程

5.1 文档上传

sequenceDiagram
    participant U as User
    participant B as Backend
    participant A as AnythingLLM
    participant R as Redis
    
    U->>B: 上传文件
    B->>B: 创建Document记录
    B->>A: 上传文件
    A-->>B: 返回文件路径
    B->>B: 更新Document路径
    B->>R: 添加待处理记录
    B-->>U: 返回上传成功

5.2 嵌入处理

sequenceDiagram
    participant C as Celery
    participant R as Redis
    participant A as AnythingLLM
    participant B as Backend
    
    C->>R: 轮询待处理文档
    R-->>C: 返回文档状态
    C->>R: 更新为处理中
    C->>A: 批量调用嵌入API
    A-->>C: 返回处理结果
    C->>R: 更新处理状态
    C->>B: 更新数据库状态

6. API设计

参见 API 文档。