实现BitTorrent私有Tracker:添加Passkey功能与用户统计追踪

引言

BitTorrent是一种流行的P2P文件分享协议,而Tracker服务器在这个生态系统中扮演着关键角色,负责协调peers之间的连接并跟踪各个种子文件的状态。在公开的BitTorrent网络中,任何人都可以访问Tracker并获取peers列表,但在私有环境下,通常需要对用户进行身份验证并追踪其上传和下载量。这就是passkey功能的主要用途。

本文记录了我为开源Java BitTorrent库ttorrent添加passkey支持的全过程,包括代码修改与实验验证。我将详细说明BitTorrent协议中上传下载统计的工作机制,以及如何利用passkey精确追踪用户行为。

背景知识

BitTorrent协议基础

在BitTorrent协议中,每个客户端通过定期向Tracker发送"announce"请求来报告自己的状态和获取其他peers的信息。这些请求包含以下关键信息:

  • info_hash:种子文件的唯一标识符
  • peer_id:客户端为特定种子生成的唯一会话标识符
  • uploaded/downloaded/left:客户端已上传、已下载和剩余的字节数
  • event:事件类型(started、completed、stopped等)

私有Tracker与Passkey

私有Tracker使用passkey(一个唯一的、通常随机生成的字符串)来标识和验证用户。这个passkey通常被嵌入到Tracker的URL中,格式为:

http://tracker.example.com/{passkey}/announce

客户端下载.torrent文件时已包含这个带passkey的URL,确保每次请求都能被正确关联到特定用户。

问题分析:BitTorrent中的上传下载统计机制

在实现passkey功能前,我需要先理解BitTorrent如何统计上传下载量。通过对ttorrent库的研究,我发现其统计机制具有以下特点:

  1. 统计基于Torrent会话:peer_id是Torrent文件连接到tracker时生成的会话ID,每个peer_id对应一个特定种子的会话
  2. peerid的生命周期:当客户端删除做种文件并重新下载,或BT客户端重启时,会生成新的peer_id
  3. 累积报告:客户端会累积报告其上传和下载总量,而非增量
  4. 特定种子独立:同一客户端对不同种子文件会使用不同的peer_id进行跟踪

这意味着要准确计算用户的总上传下载量,需要综合考虑passkey(用户标识)、peer_id(种子会话标识)和info_hash(种子标识)三个因素,并进行适当的差值计算。

多局域网部署扩展:Tracker名称追踪

在设计过程中,我意识到在多局域网部署环境下,可能会存在多个tracker实例同时运行的情况。为了区分和追踪不同tracker实例的统计数据,我设计了tracker名称功能。

这个扩展的关键需求是:

  1. 每个tracker实例需要有自己的唯一标识名称
  2. tracker名称应与上传下载统计数据关联
  3. 数据发送到中央服务器时应包含tracker名称
  4. 系统应能按tracker名称分类和聚合统计数据

在这种情况下,统计数据的完整关系变成了:passkey(用户标识)、tracker(实例标识)、peer_id(会话标识)和info_hash(种子标识)的四维关系。

实现过程

1. 添加Passkey支持

首先需要修改几个核心类来支持passkey:

1.1 HTTPAnnounceRequestMessage类

为HTTPAnnounceRequestMessage类添加passkey字段:

private final String passkey;

// 更新构造函数添加passkey参数
private HTTPAnnounceRequestMessage(ByteBuffer data,
                                 byte[] infoHash, Peer peer, long uploaded, long downloaded,
                                 long left, boolean compact, boolean noPeerId, RequestEvent event,
                                 int numWant, String passkey) {
  // 现有代码...
  this.passkey = passkey;
}

// 添加getter方法
public String getPasskey() {
  return this.passkey;
}

同时修改parse和craft方法以支持passkey参数。

1.2 TrackerServiceContainer类

修改URL路径解析逻辑,以支持/{passkey}/announce格式:

@Override
public void handle(Request request, final Response response) {
  String path = request.getPath().toString();
  String passkey = null;

  // 检查路径是否匹配 /{passkey}/announce 格式
  if (path.endsWith(Tracker.ANNOUNCE_URL) && path.length() > Tracker.ANNOUNCE_URL.length()) {
    // 提取passkey
    String[] pathParts = path.split("/");
    if (pathParts.length >= 3 && "announce".equals(pathParts[pathParts.length - 1])) {
      passkey = pathParts[pathParts.length - 2];
    }
  } else if (!Tracker.ANNOUNCE_URL.equals(path)) {
    // 既不是 /announce 也不是 /{passkey}/announce
    response.setCode(404);
    response.setText("Not Found");
    return;
  }

  // 将passkey传递给处理器
  if ("GET".equalsIgnoreCase(request.getMethod())) {
    myRequestProcessor.process(request.getAddress().toString(), 
                            request.getClientAddress().getAddress().getHostAddress(),
                            passkey,
                            getRequestHandler(response));
  } else {
    // 处理POST请求(多条announce请求)
    // ...
  }
}

1.3 TrackerRequestProcessor类

更新process方法以接收和验证passkey:

public void process(final String uri, final String hostAddress, String passkey, RequestHandler requestHandler)
        throws IOException {
    // 解析请求...
    HTTPAnnounceRequestMessage announceRequest;
    try {
        announceRequest = this.parseQuery(uri, hostAddress, passkey);
    } catch (MessageValidationException mve) {
        // 错误处理...
        return;
    }
    
    // 验证passkey
    if (announceRequest.getPasskey() == null || !validatePasskey(announceRequest.getPasskey())) {
        serveError(Status.UNAUTHORIZED, "Invalid passkey", requestHandler);
        return;
    }
    
    // 处理announce请求...
}

// 暂时的passkey验证方法(后续可扩展)
private boolean validatePasskey(String passkey) {
    // 暂时返回true,后续可连接数据库进行验证
    return true;
}

1.4 MultiAnnounceRequestProcessor类

MultiAnnounceRequestProcessor处理批量announce请求,也需要更新以支持passkey:

public void process(final String body, final String url, final String hostAddress, final String passkey, final TrackerRequestProcessor.RequestHandler requestHandler) throws IOException {
    final List<BEValue> responseMessages = new ArrayList<BEValue>();
    final AtomicBoolean isAnySuccess = new AtomicBoolean(false);
    for (String s : body.split("\n")) {
        myTrackerRequestProcessor.process(s, hostAddress, passkey, new TrackerRequestProcessor.RequestHandler() {
            @Override
            public void serveResponse(int code, String description, ByteBuffer responseData) {
                isAnySuccess.set(isAnySuccess.get() || (code == Status.OK.getCode()));
                try {
                    responseMessages.add(BDecoder.bdecode(responseData));
                } catch (IOException e) {
                    logger.warn("cannot decode message from byte buffer");
                }
            }
        });
    }
    if (responseMessages.isEmpty()) {
        ByteBuffer res;
        Status status;
        res = HTTPTrackerErrorMessage.craft("").getData();
        status = Status.BAD_REQUEST;
        requestHandler.serveResponse(status.getCode(), "", res);
        return;
    }
    final ByteArrayOutputStream out = new ByteArrayOutputStream();
    BEncoder.bencode(responseMessages, out);
    requestHandler.serveResponse(isAnySuccess.get() ? Status.OK.getCode() : Status.BAD_REQUEST.getCode(), "", ByteBuffer.wrap(out.toByteArray()));
}

2. 添加Tracker名称支持

为了实现多局域网环境下不同tracker实例的识别,我添加了以下功能:

2.1 Tracker配置文件

创建配置文件来存储tracker名称:

// 在TrackerTest.java中添加配置文件创建逻辑
File configFile = new File("tracker-config.properties");
if (!configFile.exists()) {
    FileOutputStream fos = null;
    try {
        fos = new FileOutputStream(configFile);
        Properties props = new Properties();
        props.setProperty("tracker.name", "tracker-" + InetAddress.getLocalHost().getHostName());
        props.store(fos, "Tracker Configuration");
        System.out.println("已创建默认配置文件: tracker-config.properties");
    } catch (IOException e) {
        System.err.println("无法创建配置文件: " + e.getMessage());
    } finally {
        if (fos != null) {
            try {
                fos.close();
            } catch (IOException e) {
                System.err.println("无法关闭配置文件输出流: " + e.getMessage());
            }
        }
    }
}

配置文件内容示例:

# Tracker Configuration
tracker.name=tracker-beijing

2.2 在Tracker类中添加tracker名称支持

// 在Tracker.java中添加
private String trackerName;

// 添加getter和setter方法
public String getTrackerName() {
    return trackerName;
}

public void setTrackerName(String trackerName) {
    this.trackerName = trackerName;
}

// 添加读取配置文件的方法
private String loadTrackerName() {
    File configFile = new File("tracker-config.properties");
    if (configFile.exists()) {
        Properties props = new Properties();
        FileInputStream fis = null;
        try {
            fis = new FileInputStream(configFile);
            props.load(fis);
            return props.getProperty("tracker.name", "unknown-tracker");
        } catch (IOException e) {
            logger.warn("Could not load tracker configuration: {}", e.getMessage());
        } finally {
            if (fis != null) {
                try {
                    fis.close();
                } catch (IOException e) {
                    logger.warn("Could not close config file input stream", e);
                }
            }
        }
    }
    return "unknown-tracker";
}

// 在构造函数中初始化trackerName
public Tracker(int port) throws IOException {
    this(port,
            getDefaultAnnounceUrl(new InetSocketAddress(InetAddress.getLocalHost(), port)).toString()
    );
    this.trackerName = loadTrackerName();
}

// 修改其他构造函数也加入trackerName的初始化和传递
public Tracker(int port, String announceURL) throws IOException {
    myPort = port;
    myAnnounceUrl = announceURL;
    this.trackerName = loadTrackerName();
    myTorrentsRepository = new TorrentsRepository(10);
    final TrackerRequestProcessor requestProcessor = new TrackerRequestProcessor(myTorrentsRepository);
    requestProcessor.setTrackerName(this.trackerName);  // 传递tracker名称
    myTrackerServiceContainer = new TrackerServiceContainer(requestProcessor,
            new MultiAnnounceRequestProcessor(requestProcessor));
    myPeerCollectorThread = new PeerCollectorThread(myTorrentsRepository);
}

2.3 在TrackerRequestProcessor中添加tracker名称处理

// 在TrackerRequestProcessor.java中添加
private String trackerName;

// 添加getter和setter方法
public String getTrackerName() {
    return trackerName;
}

public void setTrackerName(String trackerName) {
    this.trackerName = trackerName;
}

// 添加发送数据到Flask边缘服务器的方法,包含tracker名称
private void sendStatsToFlask(String passkey, String peerId, String infoHash,
                           long uploaded, long downloaded, long left) {
    try {
        // 创建URL连接
        URL url = new URL("http://117.72.106.234:5000/api/stats/update");
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        conn.setRequestMethod("POST");
        conn.setRequestProperty("Content-Type", "application/json");
        conn.setRequestProperty("Accept", "application/json");
        conn.setDoOutput(true);

        // 创建JSON数据,添加trackerName字段
        String jsonInputString = String.format(
                "{\"tracker\":\"%s\",\"passkey\":\"%s\",\"peerid\":\"%s\",\"infohash\":\"%s\",\"uploaded\":%d,\"downloaded\":%d,\"left\":%d}",
                this.trackerName != null ? this.trackerName : "unknown-tracker",
                passkey != null ? passkey : "",
                peerId,
                infoHash,
                uploaded,
                downloaded,
                left
        );

        // 发送数据
        OutputStream os = null;
        try {
            os = conn.getOutputStream();
            byte[] input = jsonInputString.getBytes("utf-8");
            os.write(input, 0, input.length);
        } finally {
            if (os != null) {
                try {
                    os.close();
                } catch (IOException e) {
                    // 忽略关闭时的异常
                }
            }
        }

        // 获取响应
        int responseCode = conn.getResponseCode();
        if (responseCode != HttpURLConnection.HTTP_OK) {
            logger.warn("Flask server returned non-OK status: {}", responseCode);
        }

        // 关闭连接
        conn.disconnect();
    } catch (Exception e) {
        // 记录错误日志,但不中断tracker的正常操作
        logger.warn("Error sending stats to Flask: {}", e.getMessage());
        if (logger.isDebugEnabled()) {
            logger.debug("Stack trace:", e);
        }
    }
}

3. 增加统计数据的输出

为了观察上传下载量的变化,我在处理请求的关键位置添加了输出语句,并加入tracker名称:

System.out.println(String.format("%tT", new java.util.Date()) + 
               " - Tracker: " + (this.trackerName != null ? this.trackerName : "unknown") +
               " - Passkey: " + (announceRequest.getPasskey() != null ? announceRequest.getPasskey() : "none") +
               " - Peer: " + announceRequest.getHexPeerId() +
               " - InfoHash: " + announceRequest.getHexInfoHash() +
               " - Uploaded: " + announceRequest.getUploaded() +
               " - Downloaded: " + announceRequest.getDownloaded() +
               " - Left: " + announceRequest.getLeft());

// 向Flask边缘服务器发送数据
try {
    sendStatsToFlask(
            announceRequest.getPasskey(),
            announceRequest.getHexPeerId(),
            announceRequest.getHexInfoHash(),
            announceRequest.getUploaded(),
            announceRequest.getDownloaded(),
            announceRequest.getLeft()
    );
} catch (Exception e) {
    // 记录错误但不中断正常流程
    logger.warn("Failed to send stats to Flask server: {}", e.getMessage());
}

实验验证

完成代码修改后,我进行了一系列测试来验证功能:

  1. 创建种子文件,使用格式为 http://192.168.3.200:6969/{passkey}/announce 的Tracker URL
  2. 启动Tracker服务器并公告种子
  3. 使用不同的客户端和passkey进行下载和做种

实验流程与结果

以下是我记录的一个完整测试过程,涉及两个不同的用户(passkey: 12test3456和syd233):

16:28:26 - Tracker: tracker-beijing - Passkey: 12test3456 - Peer: 2D7142353034302D454D315355304E566A58576A - InfoHash: F764A5ADD07690E98AB83D41B286D384D6259EA1 - Uploaded: 0 - Downloaded: 0 - Left: 0
16:28:56 - Tracker: tracker-beijing - Passkey: 12test3456 - Peer: 2D7142353034302D454D315355304E566A58576A - InfoHash: F764A5ADD07690E98AB83D41B286D384D6259EA1 - Uploaded: 0 - Downloaded: 0 - Left: 0
16:33:56 - Tracker: tracker-beijing - Passkey: 12test3456 - Peer: 2D7142353034302D454D315355304E566A58576A - InfoHash: F764A5ADD07690E98AB83D41B286D384D6259EA1 - Uploaded: 0 - Downloaded: 0 - Left: 0
16:37:19 - Tracker: tracker-beijing - Passkey: syd233 - Peer: 2D7142353034302D487762292D7A34214C676457 - InfoHash: F764A5ADD07690E98AB83D41B286D384D6259EA1 - Uploaded: 0 - Downloaded: 0 - Left: 21624620
16:38:21 - Tracker: tracker-beijing - Passkey: syd233 - Peer: 2D7142353034302D487762292D7A34214C676457 - InfoHash: F764A5ADD07690E98AB83D41B286D384D6259EA1 - Uploaded: 0 - Downloaded: 21624620 - Left: 0
16:38:31 - Tracker: tracker-beijing - Passkey: 12test3456 - Peer: 2D7142353034302D454D315355304E566A58576A - InfoHash: F764A5ADD07690E98AB83D41B286D384D6259EA1 - Uploaded: 21624620 - Downloaded: 0 - Left: 0
16:39:30 - Tracker: tracker-beijing - Passkey: 12test3456 - Peer: 2D7142353034302D454D315355304E566A58576A - InfoHash: F764A5ADD07690E98AB83D41B286D384D6259EA1 - Uploaded: 21624620 - Downloaded: 0 - Left: 0
16:39:35 - Tracker: tracker-beijing - Passkey: 12test3456 - Peer: 2D7142353034302D797030312E68775121296B56 - InfoHash: F764A5ADD07690E98AB83D41B286D384D6259EA1 - Uploaded: 0 - Downloaded: 0 - Left: 21624620
16:39:37 - Tracker: tracker-beijing - Passkey: 12test3456 - Peer: 2D7142353034302D797030312E68775121296B56 - InfoHash: F764A5ADD07690E98AB83D41B286D384D6259EA1 - Uploaded: 0 - Downloaded: 21624620 - Left: 0

这个日志清晰地显示了整个过程,并且包含了tracker名称信息:

  1. 初始阶段:用户12test3456开始做种文件(Left=0表示已完整拥有文件)
  2. 下载阶段:用户syd233开始下载(Left=21624620),随后完成下载(Left=0)
  3. 上传确认:用户12test3456的上传计数增加至21624620,表明成功上传给syd233
  4. 二次下载:用户12test3456删除本地文件并重新下载,生成新的peer_id(2D7142353034302D797030312E68775121296B56),从左到右可以看到一个完整的下载过程

通过这个实验,我确认了以下几点:

  1. Passkey功能正常工作,能够区分不同用户
  2. Tracker名称功能正常工作,成功标识不同的tracker实例
  3. Peer_id与种子的具体会话绑定,删除并重新下载同一文件会生成新的peer_id
  4. 上传下载统计是累积的,且基于特定的peer_id和info_hash组合

统计数据的复杂性与解决方案

从实验中,我观察到BitTorrent上传下载统计的几个特点:

  1. 会话性:统计基于特定种子的peer_id进行,同一用户删除并重新下载种子会生成新的peer_id
  2. 种子独立:每个info_hash的统计是独立的
  3. 累积报告:客户端报告的是累积值,而非增量
  4. 多实例环境:不同tracker实例的统计需要区分和聚合

这使得准确计算用户总上传下载量变得复杂,需要:

  • 使用passkey识别用户
  • 使用tracker名称识别来源实例
  • 跟踪每个(peer_id, info_hash)组合的上一次统计值
  • 计算增量并按passkey和tracker名称聚合

为处理这种复杂性,我开发了一个Flask边缘服务器,其主要功能包括:

  1. 接收并转发announce请求
  2. 提取passkey、tracker名称、peer_id、info_hash和上传下载数据
  3. 计算每次请求的上传下载增量
  4. 将数据存储在数据库中并按用户和tracker聚合统计

Redis数据结构设计与缓存管理机制

Redis Stream设计

在Flask边缘服务器的实现中,我采用了Redis Stream作为主要的数据结构来存储用户上传下载统计信息。这种选择的原因在于Stream是Redis专为消息队列场景设计的数据类型,非常适合记录随时间推移的增量数据。

Stream结构设计

user_stats_stream: {
    message_id_1: {
        'passkey': 'user123',
        'tracker': 'tracker-beijing',  // 添加tracker名称
        'timestamp': 1743782238,
        'uploaded': 1000,     // 仅在有上传增量时存在
        'downloaded': 2000    // 仅在有下载增量时存在
    },
    message_id_2: {
        'passkey': 'user456',
        'tracker': 'tracker-shanghai',  // 不同的tracker实例
        'timestamp': 1743782422,
        'uploaded': 2000,
        'downloaded': 4000
    },
    ...
}

关键特性:

  1. 消息ID自动生成:Redis为每条记录生成唯一递增的消息ID
  2. 字段灵活性:每条记录可以包含不同的字段组合
  3. 高效存储:仅当有实际增量时才存储相应字段,无需存储零值
  4. 时间序列特性:记录自然排序,便于时间序顺序处理
  5. 多维度分析:支持按用户(passkey)和实例(tracker)进行数据分析

数据写入逻辑

# 只有在有实际增量时才写入数据
if upload_delta > 0 or download_delta > 0:
    fields = {
        'passkey': passkey,
        'tracker': tracker,  # 添加tracker名称
        'timestamp': int(time.time())
    }
    
    if upload_delta > 0:
        fields['uploaded'] = upload_delta
        
    if download_delta > 0:
        fields['downloaded'] = download_delta
    
    # 添加到Redis Stream
    redis_client.xadd('user_stats_stream', fields)

内存缓存设计

为了高效计算上传下载增量,我设计了一个两层嵌套字典结构的内存缓存:

stats_cache = {
    "user123": {  # passkey作为第一层键
        "peer1:infohash1": {  # peer_id:info_hash组合作为第二层键
            "uploaded": 100,
            "downloaded": 200,
            "tracker": "tracker-beijing",  # 添加tracker名称
            "last_updated": 1743782100
        },
        "peer2:infohash2": {
            "uploaded": 300,
            "downloaded": 400,
            "tracker": "tracker-shanghai",  # 可能来自不同tracker
            "last_updated": 1743782200
        }
    },
    "user456": {
        # 不同用户的会话
    }
}

这种结构的优势:

  1. 快速查询:O(1)时间复杂度访问特定会话
  2. 内存效率:只加载必要的数据
  3. 用户隔离:用户数据自然分组,便于管理
  4. 多维度存储:tracker名称与上传下载量同级存储,便于多维度分析

懒加载机制

懒加载机制是为了解决内存效率和数据一致性的问题而设计的,分为两种情况:

情况1:整个passkey不在缓存中

当请求中的passkey完全不在内存缓存中时,尝试从JSON文件加载该用户的所有会话数据:

# passkey完全不在缓存中,尝试从JSON文件加载
if passkey not in stats_cache:
    if os.path.exists(CACHE_FILE):
        try:
            with open(CACHE_FILE, 'r') as f:
                file_cache = json.load(f)
                if passkey in file_cache:
                    stats_cache[passkey] = file_cache[passkey]
                    print(f"从JSON文件加载整个passkey的数据: {passkey}")
        except Exception as e:
            print(f"加载passkey数据出错: {e}")
    
    # 如果JSON中也没有,则创建新的空字典
    if passkey not in stats_cache:
        stats_cache[passkey] = {}

情况2:passkey在缓存中但特定会话不在

当passkey存在但特定的peer_id:info_hash组合不在缓存中时,只加载这一特定会话:

# passkey在缓存中,但特定会话不在
elif session_key not in stats_cache[passkey]:
    if os.path.exists(CACHE_FILE):
        try:
            with open(CACHE_FILE, 'r') as f:
                file_cache = json.load(f)
                if passkey in file_cache and session_key in file_cache[passkey]:
                    stats_cache[passkey][session_key] = file_cache[passkey][session_key]
                    print(f"从JSON文件加载特定会话: {passkey}:{session_key}")
        except Exception as e:
            print(f"加载会话数据出错: {e}")

这种精细化的懒加载方式能够在保证数据一致性的同时,最小化内存使用和磁盘I/O操作。

缓存清理机制

为了防止内存无限增长,实现了两级缓存清理机制:

会话级别清理

每小时检查并清理一小时内无活动的会话:

# 检查单个会话是否过期(1小时无活动)
expired_sessions = []
for session_key, session_data in sessions.items():
    if current_time - session_data.get('last_updated', 0) > session_expiry:  # 3600秒
        expired_sessions.append(session_key)

# 删除过期会话
for session_key in expired_sessions:
    del sessions[session_key]

用户级别清理

清理24小时内完全无活动的用户:

# 检查整个passkey是否过期(24小时无活动)
passkey_last_active = max([data.get('last_updated', 0) for data in sessions.values()], default=0)
if current_time - passkey_last_active > passkey_expiry:  # 86400秒
    expired_passkeys.append(passkey)
    continue

# 删除过期的passkey
for passkey in expired_passkeys:
    del stats_cache[passkey]

数据持久化策略

使用定期保存和智能合并策略确保数据持久性:

# 合并数据:智能合并,保留最新的记录
for passkey, current_sessions in current_cache.items():
    # 如果passkey不在现有数据中,直接添加整个passkey的数据
    if passkey not in existing_data:
        existing_data[passkey] = current_sessions
    else:
        # passkey存在,需要逐个会话比较
        for session_key, current_session_data in current_sessions.items():
            # 如果会话不在现有数据中,直接添加
            if session_key not in existing_data[passkey]:
                existing_data[passkey][session_key] = current_session_data
            else:
                # 会话存在,比较last_updated时间戳
                existing_timestamp = existing_data[passkey][session_key].get("last_updated", 0)
                current_timestamp = current_session_data.get("last_updated", 0)
                
                # 只有当内存中的记录更新时才替换现有记录
                if current_timestamp > existing_timestamp:
                    existing_data[passkey][session_key] = current_session_data

这种策略确保即使在服务器重启后,也能恢复用户的完整会话数据,并正确计算增量而不会丢失统计信息。

线程安全与并发控制

为了确保数据一致性,我们使用可重入锁(RLock)保护对共享数据的访问:

# 全局锁
cache_lock = threading.RLock()

# 在关键操作中使用锁
with cache_lock:
    # 数据读写操作

这种机制确保在高并发环境下数据的安全访问,同时通过使用RLock而非普通锁,避免了同一线程内的死锁问题。

状态监控与统计分析

为了监控系统运行状态和提供统计信息,实现了一个状态端点:

@app.route('/status', methods=['GET'])
def status():
    with cache_lock:
        # 计算缓存的会话总数
        total_sessions = sum(len(user_sessions) for user_sessions in stats_cache.values())
        
        # 计算每个tracker的会话数
        tracker_stats = {}
        for passkey, sessions in stats_cache.items():
            for session_data in sessions.values():
                tracker = session_data.get('tracker', 'unknown-tracker')
                if tracker not in tracker_stats:
                    tracker_stats[tracker] = 0
                tracker_stats[tracker] += 1

    # 获取Stream长度
    stream_length = redis_client.xlen('user_stats_stream')

    return jsonify({
        'status': 'running',
        'users_cached': len(stats_cache),
        'sessions_cached': total_sessions,
        'trackers': tracker_stats,  // 添加tracker统计信息
        'pending_stats_entries': stream_length,
        'timestamp': time.time()
    })

这个端点提供了系统的实时状态,包括每个tracker实例的会话数量,便于管理员监控多实例部署的情况。

总结

本设计综合考虑了性能、可靠性和内存效率,通过Redis Stream、双层缓存结构、懒加载和定期清理机制,实现了一个高效的BT统计系统。通过添加tracker名称支持,系统能够在多局域网部署环境中准确追踪和区分不同tracker实例的统计数据,为整个分布式环境提供了完整的数据收集和分析能力。