我们专注攀枝花网站设计 攀枝花网站制作 攀枝花网站建设
成都网站建设公司服务热线:400-028-6601

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

RabbitMQ的广播模式是什么意思

本篇内容主要讲解“Rabbit MQ的广播模式是什么意思”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Rabbit MQ的广播模式是什么意思”吧!

为宁县等地区用户提供了全套网页设计制作服务,及宁县网站建设行业解决方案。主营业务为网站制作、成都网站制作、宁县网站设计,以传统方式定制建设网站,并提供域名空间备案等一条龙服务,秉承以专业、用心的态度为用户提供真诚的服务。我们深信只要达到每一位用户的要求,就会得到认可,从而选择与我们长期合作。这样,我们也可以走得更远!

        最近做了一个需求,需要一个接口,可以给3万多人发消息,因为人数较多,用一条线程同步发送肯定是不行的,如果是一条线程,同步发送的话,哪怕有一条数据出现了问题,都会导致本次发送的失败,所以,想到了用Rabbit MQ的广播模式来做。

        广播模式其实就是将你要发送的消息放到“交换机”中,然后对应的队列去交换机中获取消息给消费者(接收消息的一端)消费。如下图所示:

Rabbit MQ的广播模式是什么意思

(emmm 图片是我在网上找的,自己画的太丑了)

在上述过程中,主线程执行到第一步之后就可以返回了,这是一个异步的过程,不需要等待队列去交换机中取完数据,“主线程返回“和“队列取数据给消费者”这两个过程是同时进行的。这样的话,发送消息的时候就不必等到3万多条消息都发送完成才能继续进行下一步了。

功能大概介绍了一下,接下来就是代码了,我只做一个简单的记录

首先是一个配置类

package com.apps.rabbit.FanoutMessage;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TestExchangeConfiguration {
    /**
     * 创建广播形式的交换机
     *
     * @return 交换机实例
     */
    @Bean
    public FanoutExchange fanoutExchanges() {
        System.out.println("【【【交换机实例创建成功】】】");
        return new FanoutExchange(Constants.FANOUT_EXCHANGE_NAME);
    }

    /**
     * 测试队列一
     *
     * @return 队列实例
     */
    @Bean
    public Queue queue1() {
        System.out.println("【【【测试队列一实例创建成功】】】");
        return new Queue(Constants.TEST_QUEUE1_NAME);
    }



    /**
     * 绑定队列一到交换机
     *
     * @return 绑定对象
     */
    @Bean
    public Binding bingQueue1ToExchange() {
        System.out.println("【【【绑定队列一到交换机成功】】】");
        return BindingBuilder.bind(queue1()).to(fanoutExchanges());
    }



}

上边这个用来创建交换机和队列,并将队列和交换机进行绑定

下边这个是消息生产者

package com.apps.rabbit.FanoutMessage;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Sender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     *
     * @param message 消息内容
     * 说明: routingKey可以指定也可以不指定,这里我们给的是队列名
     */
    public void sendMessage(Object message) {
       System.out.println("【消息发送者】发送消息到fanout交换机,消息内容为: {}"+message);
        rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE_NAME, Constants.TEST_QUEUE1_NAME, message);
    }



}

再添加一个辅助类,存常量(别问为啥,大家都是这么做的哈哈哈哈)

package com.apps.rabbit.FanoutMessage;

public class Constants {


    /**
     * 交换机名称
     */
    public static final String FANOUT_EXCHANGE_NAME = "message.fanout.exchange";

    /**
     * 测试队列名称1
     */
    public static final String TEST_QUEUE1_NAME = "messageReceiverQueue";


}

最后是消息的消费者

package com.apps.rabbit.FanoutMessage;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.apps.bcodemsg.MsgResponse;
import com.apps.model.basic.message.InfoMessageUser;
import com.apps.model.basic.message.InfoMessageUserVO;
import com.apps.service.basic.message.InfoMessageUserService;
import com.rabbitmq.client.Channel;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@Component

public class Receiver1 {

    @Autowired
    private InfoMessageUserService infoMessageUserService;

    private static String CONTENT_KEY="message";
    private static String MESSAGE_ID_KEY="messageId";
    private static String USER_ID_KEY="userId";

    @RabbitListener(queues = "messageReceiverQueue")
    @RabbitHandler
    public void receiveMessage(String messageInfo, Channel channel, Message message) throws IOException {
       
        doSomething();
    }


}

到此就完事了,只需要在你的业务中调用生产者就行了,消息会被队列自动接收,而接收到的消息需要怎么处理,就是业务逻辑的事了。

上边只是一个最简单的例子,也只用了一个队列,如果想要添加多个队列,需要在配置类中进行声明,并进行绑定,此外,在调用生产者的时候,需要将rabbitTemplate.convertAndSend()方法的第二个参数去掉。

ok!

到此,相信大家对“Rabbit MQ的广播模式是什么意思”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


网页名称:RabbitMQ的广播模式是什么意思
本文链接:http://mswzjz.cn/article/jeeigi.html

其他资讯