问题现象

在 Spring Boot 3.2.1 + Spring Data Redis 3.2.1 环境下,使用 Redis 发布订阅功能时出现以下异常:

java.lang.NullPointerException: Cannot invoke "org.springframework.data.redis.listener.adapter.MessageListenerAdapter$MethodInvoker.getMethodName()" because "this.invoker" is null
    at org.springframework.data.redis.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:307)
    at org.springframework.data.redis.listener.RedisMessageListenerContainer.processMessage(RedisMessageListenerContainer.java:818)

问题背景

项目是一个电池监控平台,使用 Redis 发布订阅机制实现实时数据推送。系统架构如下:

数据采集 → 数据处理 → 数据库保存 → Redis发布消息 → 订阅者处理 → WebSocket推送

配置了 5 个 Redis 频道:

  • realtime:data - 实时数据
  • alarm:notification - 报警通知
  • system:notification - 系统通知
  • device:status - 设备状态
  • broadcast - 广播消息

问题分析

错误代码

原始配置代码(RedisMessageListenerConfig.java):

@Configuration
public class RedisMessageListenerConfig {

    private final RedisMessageListener redisMessageListener;

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 错误的写法
        container.addMessageListener(
            new MessageListenerAdapter(redisMessageListener, "handleMessage"),
            new ChannelTopic("realtime:data")
        );

        return container;
    }
}

问题根源

  1. 两参数构造函数不存在或已废弃

    在 Spring Data Redis 3.2.1 中,MessageListenerAdapter(Object delegate, String methodName) 这个构造函数的行为与预期不符,导致内部的 invoker 字段未被正确初始化。

  2. 缺少生命周期回调

    MessageListenerAdapter 实现了 InitializingBean 接口,需要调用 afterPropertiesSet() 方法来完成初始化。当手动创建实例时,必须显式调用此方法。

  3. 初始化时序问题

    错误的初始化流程:

    new MessageListenerAdapter(delegate)
    → setDefaultListenerMethod(method)
    → 未调用 afterPropertiesSet()
    → this.invoker 仍为 null
    → 消息到达时调用 invoker.getMethodName()
    → NullPointerException
    

    正确的初始化流程:

    new MessageListenerAdapter(delegate)
    → setDefaultListenerMethod(method)
    → afterPropertiesSet()
    → 创建 MethodInvoker 并赋值给 this.invoker
    → 消息处理正常
    

解决方案

方案一:使用两参数构造函数(不推荐)

虽然文档中提到可以使用两参数构造函数,但在 Spring Data Redis 3.2.1 中这种方式并不可靠。

// 不推荐,可能在某些版本中失效
new MessageListenerAdapter(redisMessageListener, "handleMessage")

方案二:手动调用 afterPropertiesSet()(推荐)

这是最小化改动且可靠的方案:

@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
        RedisConnectionFactory connectionFactory) throws Exception {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);

    // 创建适配器
    MessageListenerAdapter realtimeDataAdapter = new MessageListenerAdapter(redisMessageListener);
    realtimeDataAdapter.setDefaultListenerMethod("handleMessage");
    realtimeDataAdapter.afterPropertiesSet();  // 关键步骤
    container.addMessageListener(realtimeDataAdapter, new ChannelTopic("realtime:data"));

    MessageListenerAdapter alarmAdapter = new MessageListenerAdapter(redisMessageListener);
    alarmAdapter.setDefaultListenerMethod("handleAlarmMessage");
    alarmAdapter.afterPropertiesSet();  // 关键步骤
    container.addMessageListener(alarmAdapter, new ChannelTopic("alarm:notification"));

    // 其他适配器配置...

    return container;
}

方案三:声明为独立 Bean(最优雅)

让 Spring 容器管理 MessageListenerAdapter 的生命周期:

@Configuration
public class RedisMessageListenerConfig {

    @Bean
    public MessageListenerAdapter realtimeDataAdapter(RedisMessageListener listener) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
        adapter.setDefaultListenerMethod("handleMessage");
        return adapter;  // Spring 会自动调用 afterPropertiesSet()
    }

    @Bean
    public MessageListenerAdapter alarmAdapter(RedisMessageListener listener) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
        adapter.setDefaultListenerMethod("handleAlarmMessage");
        return adapter;
    }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory,
            MessageListenerAdapter realtimeDataAdapter,
            MessageListenerAdapter alarmAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        container.addMessageListener(realtimeDataAdapter, new ChannelTopic("realtime:data"));
        container.addMessageListener(alarmAdapter, new ChannelTopic("alarm:notification"));

        return container;
    }
}

方案四:实现 MessageListener 接口

直接实现 MessageListener 接口,避免使用适配器:

@Component
public class RedisMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        String body = new String(message.getBody());

        switch (channel) {
            case "realtime:data" -> handleMessage(body);
            case "alarm:notification" -> handleAlarmMessage(body);
            case "system:notification" -> handleSystemMessage(body);
            case "device:status" -> handleDeviceStatusMessage(body);
            case "broadcast" -> handleBroadcastMessage(body);
        }
    }

    private void handleMessage(String message) {
        // 处理实时数据消息
    }

    // 其他处理方法...
}

配置简化为:

@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
        RedisConnectionFactory connectionFactory,
        RedisMessageListener listener) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);

    container.addMessageListener(listener, Arrays.asList(
        new ChannelTopic("realtime:data"),
        new ChannelTopic("alarm:notification"),
        new ChannelTopic("system:notification"),
        new ChannelTopic("device:status"),
        new ChannelTopic("broadcast")
    ));

    return container;
}

最终实现代码

本次采用方案二,完整配置如下:

package com.panasonic.monitor.config;

import com.panasonic.monitor.listener.RedisMessageListener;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "redis.queue.pubsub", name = "enabled",
                       havingValue = "true", matchIfMissing = true)
public class RedisMessageListenerConfig {

    private final RedisMessageListener redisMessageListener;

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory) throws Exception {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 实时数据频道
        MessageListenerAdapter realtimeDataAdapter = new MessageListenerAdapter(redisMessageListener);
        realtimeDataAdapter.setDefaultListenerMethod("handleMessage");
        realtimeDataAdapter.afterPropertiesSet();
        container.addMessageListener(realtimeDataAdapter, new ChannelTopic("realtime:data"));

        // 报警通知频道
        MessageListenerAdapter alarmAdapter = new MessageListenerAdapter(redisMessageListener);
        alarmAdapter.setDefaultListenerMethod("handleAlarmMessage");
        alarmAdapter.afterPropertiesSet();
        container.addMessageListener(alarmAdapter, new ChannelTopic("alarm:notification"));

        // 系统通知频道
        MessageListenerAdapter systemAdapter = new MessageListenerAdapter(redisMessageListener);
        systemAdapter.setDefaultListenerMethod("handleSystemMessage");
        systemAdapter.afterPropertiesSet();
        container.addMessageListener(systemAdapter, new ChannelTopic("system:notification"));

        // 设备状态频道
        MessageListenerAdapter deviceStatusAdapter = new MessageListenerAdapter(redisMessageListener);
        deviceStatusAdapter.setDefaultListenerMethod("handleDeviceStatusMessage");
        deviceStatusAdapter.afterPropertiesSet();
        container.addMessageListener(deviceStatusAdapter, new ChannelTopic("device:status"));

        // 广播频道
        MessageListenerAdapter broadcastAdapter = new MessageListenerAdapter(redisMessageListener);
        broadcastAdapter.setDefaultListenerMethod("handleBroadcastMessage");
        broadcastAdapter.afterPropertiesSet();
        container.addMessageListener(broadcastAdapter, new ChannelTopic("broadcast"));

        return container;
    }
}

验证结果

修复后的日志输出:

2025-10-11 10:23:19.450 [redisMessageListenerContainer-3] DEBUG c.p.m.listener.RedisMessageListener - 收到实时数据消息: {"timestamp":1760149399398,"data":{...},"type":"realtime_data","deviceId":4}
2025-10-11 10:23:19.451 [redisMessageListenerContainer-3] DEBUG c.p.m.listener.RedisMessageListener - 处理实时数据: {"timestamp":1760149399398,...}

使用 Redis CLI 验证:

redis-cli -h 123.56.218.56 -p 6379 -a syd233 -n 7 PSUBSCRIBE "*"

# 输出
1) "pmessage"
2) "*"
3) "realtime:data"
4) "{\"timestamp\":1760149758720,\"data\":{...},\"type\":\"realtime_data\",\"deviceId\":4}"

关键要点总结

  1. Spring Data Redis 3.x 版本变化

    • MessageListenerAdapter 的构造函数行为在不同版本间可能有差异
    • 手动创建实例时必须调用 afterPropertiesSet()
  2. 生命周期管理

    • 实现了 InitializingBean 接口的类需要正确初始化
    • 让 Spring 容器管理 Bean 可以自动处理生命周期回调
  3. 异常诊断技巧

    • NullPointerException 在访问内部字段时,往往是初始化问题
    • 查看源码确认初始化流程和必要的生命周期方法
  4. 最佳实践建议

    • 优先使用 Spring Bean 管理方式(方案三)
    • 如果需要手动创建,务必调用 afterPropertiesSet()
    • 考虑直接实现 MessageListener 接口以简化配置

参考资料

环境信息

  • Spring Boot: 3.2.1
  • Spring Data Redis: 3.2.1
  • JDK: 21
  • Redis: 6.x+
  • Netty: 4.1.104.Final

修复时间线

  1. 发现问题:Redis 消息订阅时抛出 NullPointerException
  2. 初步分析:检查 MessageListenerAdapter 构造方式
  3. 尝试方案一:改用单参数构造 + setter,问题依旧
  4. 深入研究:查阅 Spring Data Redis 文档和源码
  5. 发现根因:缺少 afterPropertiesSet() 调用
  6. 应用修复:在每个 adapter 创建后调用 afterPropertiesSet()
  7. 验证成功:消息订阅和处理正常工作