RabbitMQ之路由模式

发布时间:2022-07-04 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了RabbitMQ之路由模式脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

概念

简单来说就是控制消费者拿到特定条件的消息 比如一个情景:生产者生产日志消息 然后低级别的日志交给一号消费者处理 严重的交给二号消费者处理

RabbitMQ之路由模式

简单例子

生产者代码和之前的订阅模式 区别在于交换机模式改为DIRECT 同时要给出routekey 即判断的标准 然后生成消息发送时需要给出routekey

public class RoutePRoducer {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.198.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //创建连接
            connection = connectionFactory.newConnection("生产者");
            //获取通道
            channel = connection.createChannel();
            //创建交换机以及两个队列 同时绑定关系
            String exchangeName = "test_direct";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeTyPE.DIRECT,false,false,false,null);
            String queue1Name = "test_direct_queue1";
            String queue2Name = "test_direct_queue2";
            channel.queueDeclare(queue1Name,false,false,false,null);
            channel.queueDeclare(queue2Name,false,false,false,null);
            //绑定关系 第三个参数为routingKey 绑定规则 fanout使用""
            channel.queueBind(queue1Name,exchangeName,"error");

            channel.queueBind(queue2Name,exchangeName,"info");
            channel.queueBind(queue2Name,exchangeName,"error");
            channel.queueBind(queue2Name,exchangeName,"warning");
            //测试发送两条信息 给出对应的routeKey
            String message = "error消息测试";
            channel.basicPublish(exchangeName,"error",null,message.getBytes(StandardCharsets.UTF_8));
            message = "warning消息测试";
            channel.basicPublish(exchangeName,"warning",null,message.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        finally {
            //关闭通道
            if(channel != null && channel.isOpen()){
                try {
                    channel.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }

            if(connection != null && connection.isOpen()){
                try {
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者这边写法没改变 队列名字换一下即可

public class RouteConsumer {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.198.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //创建连接
            connection = connectionFactory.newConnection("消费者");
            //获取通道
            channel = connection.createChannel();
            //通过通道声明队列,创建交换机等一系列事情
            channel.basicConsume("test_direct_queue1", true, new DeliverCallback() {
                @override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("1号消费者接受到的消息为 " + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("收取消息失败");
                }
            });
            //卡一下
            System.out.println("键盘输入关闭消费者");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        finally {
            //关闭通道
            if(channel != null && channel.isOpen()){
                try {
                    channel.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }

            if(connection != null && connection.isOpen()){
                try {
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

运行程序: 可以看到消费者只能拿到对应routekey规则的信息

RabbitMQ之路由模式

RabbitMQ之路由模式

脚本宝典总结

以上是脚本宝典为你收集整理的RabbitMQ之路由模式全部内容,希望文章能够帮你解决RabbitMQ之路由模式所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。