问题现象
在 Spring Boot 3.2.1 + Spring Data Redis 3.2.1 环境下,使用 Redis 发布订阅功能时出现以下异常:
java.lang.NullPointerException: Cannot invoke "org.springframework.data.redis.listener.adapter.MessageListenerAdapter$MethodInvoker.getMethodName()" because "this.invoker" is null
at org.springframework.data.redis.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:307)
at org.springframework.data.redis.listener.RedisMessageListenerContainer.processMessage(RedisMessageListenerContainer.java:818)
问题背景
项目是一个电池监控平台,使用 Redis 发布订阅机制实现实时数据推送。系统架构如下:
数据采集 → 数据处理 → 数据库保存 → Redis发布消息 → 订阅者处理 → WebSocket推送
配置了 5 个 Redis 频道:
realtime:data- 实时数据alarm:notification- 报警通知system:notification- 系统通知device:status- 设备状态broadcast- 广播消息
问题分析
错误代码
原始配置代码(RedisMessageListenerConfig.java):
@Configuration
public class RedisMessageListenerConfig {
private final RedisMessageListener redisMessageListener;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 错误的写法
container.addMessageListener(
new MessageListenerAdapter(redisMessageListener, "handleMessage"),
new ChannelTopic("realtime:data")
);
return container;
}
}
问题根源
两参数构造函数不存在或已废弃
在 Spring Data Redis 3.2.1 中,
MessageListenerAdapter(Object delegate, String methodName)这个构造函数的行为与预期不符,导致内部的invoker字段未被正确初始化。缺少生命周期回调
MessageListenerAdapter实现了InitializingBean接口,需要调用afterPropertiesSet()方法来完成初始化。当手动创建实例时,必须显式调用此方法。初始化时序问题
错误的初始化流程:
new MessageListenerAdapter(delegate) → setDefaultListenerMethod(method) → 未调用 afterPropertiesSet() → this.invoker 仍为 null → 消息到达时调用 invoker.getMethodName() → NullPointerException正确的初始化流程:
new MessageListenerAdapter(delegate) → setDefaultListenerMethod(method) → afterPropertiesSet() → 创建 MethodInvoker 并赋值给 this.invoker → 消息处理正常
解决方案
方案一:使用两参数构造函数(不推荐)
虽然文档中提到可以使用两参数构造函数,但在 Spring Data Redis 3.2.1 中这种方式并不可靠。
// 不推荐,可能在某些版本中失效
new MessageListenerAdapter(redisMessageListener, "handleMessage")
方案二:手动调用 afterPropertiesSet()(推荐)
这是最小化改动且可靠的方案:
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory) throws Exception {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 创建适配器
MessageListenerAdapter realtimeDataAdapter = new MessageListenerAdapter(redisMessageListener);
realtimeDataAdapter.setDefaultListenerMethod("handleMessage");
realtimeDataAdapter.afterPropertiesSet(); // 关键步骤
container.addMessageListener(realtimeDataAdapter, new ChannelTopic("realtime:data"));
MessageListenerAdapter alarmAdapter = new MessageListenerAdapter(redisMessageListener);
alarmAdapter.setDefaultListenerMethod("handleAlarmMessage");
alarmAdapter.afterPropertiesSet(); // 关键步骤
container.addMessageListener(alarmAdapter, new ChannelTopic("alarm:notification"));
// 其他适配器配置...
return container;
}
方案三:声明为独立 Bean(最优雅)
让 Spring 容器管理 MessageListenerAdapter 的生命周期:
@Configuration
public class RedisMessageListenerConfig {
@Bean
public MessageListenerAdapter realtimeDataAdapter(RedisMessageListener listener) {
MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
adapter.setDefaultListenerMethod("handleMessage");
return adapter; // Spring 会自动调用 afterPropertiesSet()
}
@Bean
public MessageListenerAdapter alarmAdapter(RedisMessageListener listener) {
MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
adapter.setDefaultListenerMethod("handleAlarmMessage");
return adapter;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter realtimeDataAdapter,
MessageListenerAdapter alarmAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(realtimeDataAdapter, new ChannelTopic("realtime:data"));
container.addMessageListener(alarmAdapter, new ChannelTopic("alarm:notification"));
return container;
}
}
方案四:实现 MessageListener 接口
直接实现 MessageListener 接口,避免使用适配器:
@Component
public class RedisMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
String body = new String(message.getBody());
switch (channel) {
case "realtime:data" -> handleMessage(body);
case "alarm:notification" -> handleAlarmMessage(body);
case "system:notification" -> handleSystemMessage(body);
case "device:status" -> handleDeviceStatusMessage(body);
case "broadcast" -> handleBroadcastMessage(body);
}
}
private void handleMessage(String message) {
// 处理实时数据消息
}
// 其他处理方法...
}
配置简化为:
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
RedisMessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener, Arrays.asList(
new ChannelTopic("realtime:data"),
new ChannelTopic("alarm:notification"),
new ChannelTopic("system:notification"),
new ChannelTopic("device:status"),
new ChannelTopic("broadcast")
));
return container;
}
最终实现代码
本次采用方案二,完整配置如下:
package com.panasonic.monitor.config;
import com.panasonic.monitor.listener.RedisMessageListener;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "redis.queue.pubsub", name = "enabled",
havingValue = "true", matchIfMissing = true)
public class RedisMessageListenerConfig {
private final RedisMessageListener redisMessageListener;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory) throws Exception {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 实时数据频道
MessageListenerAdapter realtimeDataAdapter = new MessageListenerAdapter(redisMessageListener);
realtimeDataAdapter.setDefaultListenerMethod("handleMessage");
realtimeDataAdapter.afterPropertiesSet();
container.addMessageListener(realtimeDataAdapter, new ChannelTopic("realtime:data"));
// 报警通知频道
MessageListenerAdapter alarmAdapter = new MessageListenerAdapter(redisMessageListener);
alarmAdapter.setDefaultListenerMethod("handleAlarmMessage");
alarmAdapter.afterPropertiesSet();
container.addMessageListener(alarmAdapter, new ChannelTopic("alarm:notification"));
// 系统通知频道
MessageListenerAdapter systemAdapter = new MessageListenerAdapter(redisMessageListener);
systemAdapter.setDefaultListenerMethod("handleSystemMessage");
systemAdapter.afterPropertiesSet();
container.addMessageListener(systemAdapter, new ChannelTopic("system:notification"));
// 设备状态频道
MessageListenerAdapter deviceStatusAdapter = new MessageListenerAdapter(redisMessageListener);
deviceStatusAdapter.setDefaultListenerMethod("handleDeviceStatusMessage");
deviceStatusAdapter.afterPropertiesSet();
container.addMessageListener(deviceStatusAdapter, new ChannelTopic("device:status"));
// 广播频道
MessageListenerAdapter broadcastAdapter = new MessageListenerAdapter(redisMessageListener);
broadcastAdapter.setDefaultListenerMethod("handleBroadcastMessage");
broadcastAdapter.afterPropertiesSet();
container.addMessageListener(broadcastAdapter, new ChannelTopic("broadcast"));
return container;
}
}
验证结果
修复后的日志输出:
2025-10-11 10:23:19.450 [redisMessageListenerContainer-3] DEBUG c.p.m.listener.RedisMessageListener - 收到实时数据消息: {"timestamp":1760149399398,"data":{...},"type":"realtime_data","deviceId":4}
2025-10-11 10:23:19.451 [redisMessageListenerContainer-3] DEBUG c.p.m.listener.RedisMessageListener - 处理实时数据: {"timestamp":1760149399398,...}
使用 Redis CLI 验证:
redis-cli -h 123.56.218.56 -p 6379 -a syd233 -n 7 PSUBSCRIBE "*"
# 输出
1) "pmessage"
2) "*"
3) "realtime:data"
4) "{\"timestamp\":1760149758720,\"data\":{...},\"type\":\"realtime_data\",\"deviceId\":4}"
关键要点总结
Spring Data Redis 3.x 版本变化
MessageListenerAdapter的构造函数行为在不同版本间可能有差异- 手动创建实例时必须调用
afterPropertiesSet()
生命周期管理
- 实现了
InitializingBean接口的类需要正确初始化 - 让 Spring 容器管理 Bean 可以自动处理生命周期回调
- 实现了
异常诊断技巧
- NullPointerException 在访问内部字段时,往往是初始化问题
- 查看源码确认初始化流程和必要的生命周期方法
最佳实践建议
- 优先使用 Spring Bean 管理方式(方案三)
- 如果需要手动创建,务必调用
afterPropertiesSet() - 考虑直接实现
MessageListener接口以简化配置
参考资料
环境信息
- Spring Boot: 3.2.1
- Spring Data Redis: 3.2.1
- JDK: 21
- Redis: 6.x+
- Netty: 4.1.104.Final
修复时间线
- 发现问题:Redis 消息订阅时抛出 NullPointerException
- 初步分析:检查 MessageListenerAdapter 构造方式
- 尝试方案一:改用单参数构造 + setter,问题依旧
- 深入研究:查阅 Spring Data Redis 文档和源码
- 发现根因:缺少
afterPropertiesSet()调用 - 应用修复:在每个 adapter 创建后调用
afterPropertiesSet() - 验证成功:消息订阅和处理正常工作
评论区