Redis发布订阅,右手就行!
创始人
2025-07-10 19:30:47
0

哈喽,大家好,我是了不起。

Redis平常作为缓存使用较多,但是也可以作为发布订阅的消息队列来使用,本篇给大家介绍一下如何简单使用!右手就能操作

前言

本篇我们会使用Spring Data Redis中集成的发布订阅功能来展示这个示例,

先看我们需要的依赖, 其实只需要引入spring-boot-starter-data-redis 就够了,另外再写一个接口来触发消息发布。


   org.springframework.boot
   spring-boot-starter-data-redis
  
  
   org.springframework.boot
   spring-boot-starter-webflux
  

Spring Data 为 Redis 提供了专用的消息传递集成,其功能和命名与 Spring Framework 中的 JMS 集成类似。

Redis 消息传递大致可分为两个功能领域:

  • 消息的发布或制作
  • 消息的订阅或消费

其中主要的类都在这两个包下面,感兴趣的小伙伴可以去看看,原理就先不讲了,下期再安排吧。

org.springframework.data.redis.connection
org.springframework.data.redis.listener

发布消息

发布消息我们可以直接使用RedisTemplate的 convertAndSend , 这个方法有两个参数,分别是channel, 还有消息内容。

public Long convertAndSend(String channel, Object message) {
        Assert.hasText(channel, "a non-empty channel is required");
        byte[] rawChannel = this.rawString(channel);
        byte[] rawMessage = this.rawValue(message);
        return (Long)this.execute((connection) -> {
            return connection.publish(rawChannel, rawMessage);
        }, true);
    }

本次我们使用如下类来发布消息。作为示例就要简单粗暴。

public interface MessagePublisher {
    void publish(String message);
}


import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;

public class RedisMessagePublisher implements MessagePublisher {

    private RedisTemplate redisTemplate;
    private ChannelTopic topic;

    public RedisMessagePublisher() {
    }

    public RedisMessagePublisher(
            RedisTemplate redisTemplate, ChannelTopic topic) {
        this.redisTemplate = redisTemplate;
        this.topic = topic;
    }

    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

订阅消息

订阅消息需要实现MessageListener的接口 ,onMessage的方法是收到消息后的消费方法。

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;

@Service
public class RedisMessageSubscriber implements MessageListener {
    
    public void onMessage(Message message, byte[] pattern) {
        System.*out*.println("Message received: " + message.toString());
    }

}

// 消息订阅2
@Service("redisMessageSubscriber2")
public class RedisMessageSubscriber2 implements MessageListener {

    public void onMessage(Message message, byte[] pattern) {
        System.out.println("Message received2: " + message.toString());
    }

}

消息监听容器和适配器

另外就是订阅方订阅发布者,SpringDataRedis这里使用了一个消息监听容器和适配器来处理。我们直接贴出代码:

import com.north.redis.message.MessagePublisher;
import com.north.redis.message.RedisMessagePublisher;
import com.north.redis.message.RedisMessageSubscriber;
import jakarta.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Resource
    MessageListener redisMessageSubscriber2;
    
    @Bean
    public RedisTemplate redisTemplate() {
        RedisTemplate template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        // 使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());
        // 使用GenericJackson2JsonRedisSerializer来序列化和反序列化redis的value值
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new RedisMessageSubscriber());
    }

    
    @Bean
    RedisMessageListenerContainer redisContainer() {
        RedisMessageListenerContainer container
                = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(messageListener(), topic());
        container.addMessageListener(redisMessageSubscriber2, topic());
        return container;
    }

    @Bean
    MessagePublisher redisPublisher() {
        return new RedisMessagePublisher(redisTemplate(), topic());
    }

    @Bean
    ChannelTopic topic() {
        return new ChannelTopic("northQueue");
    }

}

以上代码中有几个点:

  1. 创建适配器时,这里面我们使用了MessageListener的实现类,简单容易理解。
  2. 使用消息容器来订阅消息队列,其中addMessageListener中可以订阅多个队列,其中第二个参数可以传入队列名数组。而且可以添加多个订阅方。

RedisMessageListenerContainer 是处理消费者和发布者的关系的类 ,使用起来也比较简单。

测试

下面我们做一个小测试:

写一个接口来出发消息发布,使用多个订阅者

@RestController
public class TestController {
    @Resource
    private MessagePublisher redisMessagePublisher;

    @GetMapping("/hello")
    public Flux hello(@RequestParam String message) {
        redisMessagePublisher.publish(message);
        return Flux.*just*("Hello", "Webflux");
    }
}

启动SpringBoot项目后我们发送消息测试:

图片图片

两个消费者都接到了消息:

图片图片


相关内容

热门资讯

PHP新手之PHP入门 PHP是一种易于学习和使用的服务器端脚本语言。只需要很少的编程知识你就能使用PHP建立一个真正交互的...
网络中立的未来 网络中立性是什... 《牛津词典》中对“网络中立”的解释是“电信运营商应秉持的一种原则,即不考虑来源地提供所有内容和应用的...
各种千兆交换机的数据接口类型详... 千兆交换机有很多值得学习的地方,这里我们主要介绍各种千兆交换机的数据接口类型,作为局域网的主要连接设...
什么是大数据安全 什么是大数据... 在《为什么需要大数据安全分析》一文中,我们已经阐述了一个重要观点,即:安全要素信息呈现出大数据的特征...
如何允许远程连接到MySQL数... [[277004]]【51CTO.com快译】默认情况下,MySQL服务器仅侦听来自localhos...
如何利用交换机和端口设置来管理... 在网络管理中,总是有些人让管理员头疼。下面我们就将介绍一下一个网管员利用交换机以及端口设置等来进行D...
P2P的自白|我不生产内容,我... 现在一提起P2P,人们就会联想到正在被有关部门“围剿”的互联网理财服务。×租宝事件使得劳...
Intel将Moblin社区控... 本周二,非营利机构Linux基金会宣布,他们将担负起Moblin社区的管理工作,而这之前,Mobli...
施耐德电气数据中心整体解决方案... 近日,全球能效管理专家施耐德电气正式启动大型体验活动“能效中国行——2012卡车巡展”,作为该活动的...
Windows恶意软件20年“... 在Windows的早期年代,病毒游走于系统之间,偶尔删除文件(但被删除的文件几乎都是可恢复的),并弹...