RabbitMQ实践——搭建单人聊天服务

大纲

  • 创建Core交换器
  • 用户登录
  • 发起聊天邀请
  • 接受邀请
  • 聊天
  • 实验过程
  • 总结
  • 代码工程

经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。
基本结构如下。为了避免Server有太多连线导致杂乱,下图将Server画成两个模块,实则是一个服务。
在这里插入图片描述
该服务由两个核心交换器构成。
Core交换器是服务启动时创建的,它主要是为了向不同用户传递“系统通知型”消息。比如Jerry向Tom发起聊天邀请,则是通过上面黑色字体6-10的流程发给了Core交换器。然后Core交换器将该条消息告知Tom。
Fanout交换器是用来消息传递的。Jerry和Tom都向其发送消息,然后路由到两个队列。它们两各自订阅一个队列,就可以看到彼此的聊天内容了。

创建Core交换器

package com.rabbitmq.chat.service;

import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;

@Service
public class Core {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private ConnectionFactory connectionFactory;

    final String exchangeName = "Core";

    @PostConstruct
    public void init() {
        connectionFactory = rabbitTemplate.getConnectionFactory();
        createExchange(exchangeName);
    }

    private void createExchange(String exchangeName) {
        rabbitTemplate.execute(channel -> {
            channel.exchangeDeclare(exchangeName, "direct", false, true, null);
            return null;
        });
    }

用户登录

用户登录后,我们会创建一个“系统通知”队列。然后用户就会通过长连接形式,持续等待系统发出通知。

    private final ReentrantLock lock = new ReentrantLock();
    final private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();
    
    public Flux<String> Login(String username) {
        createExclusiveQueue(username);
        createBanding(exchangeName, username, username);
        return Flux.create(emitter -> {
           SimpleMessageListenerContainer container = getListener(username, (Message message) -> {
               String msg = new String(message.getBody());
               System.out.println("Received message: " + msg);
               emitter.next(msg);
           });
           container.start();
       });
    }
 
     private void createExchange(String exchangeName) {
        rabbitTemplate.execute(channel -> {
            channel.exchangeDeclare(exchangeName, "direct", false, true, null);
            return null;
        });
    }

    private void createBanding(String exchangeName, String queueName, String routingKey) {
        rabbitTemplate.execute(channel -> {
            channel.queueBind(queueName, exchangeName, routingKey);
            return null;
        });
    }
    
    private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {
        lock.lock();
        try {
            SimpleMessageListenerContainer listener = listeners.get(queueName);
            if (listener == null && messageListener != null) {
                listener = new SimpleMessageListenerContainer();
                listener.setConnectionFactory(connectionFactory);
                listener.setQueueNames(queueName);
                listener.setMessageListener(messageListener);
                listeners.put(queueName, listener);
            }
            return listener;
        } finally {
            lock.unlock();
        }
    }

Controller如下

package com.rabbitmq.chat.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.rabbitmq.chat.service.Core;

import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/user")
public class UserController {
    @Autowired
    private Core core;

    @PostMapping(value = "/login", produces = "text/event-stream")
    public Flux<String> login(@RequestParam String username) {
        return core.Login(username);
    }
}

发起聊天邀请

发起聊天邀请时,系统会预先创建一个聊天室(ChatRoomInfo )。它包含上图中Fanout交换器、以及聊天双方需要订阅的消息队列。
这些创建完后,发起方就会等待对方发送的消息,也可以自己和自己聊天。因为消息队列已经创建好了,只是对方还没使用。

package com.rabbitmq.chat.service;

import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import jakarta.annotation.PostConstruct;
import lombok.Data;
import reactor.core.publisher.Flux;

@Service
public class ChatRoom {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private ConnectionFactory connectionFactory;

    @Data
    private class ChatRoomInfo {
        private String exchange;
        private Map<String, String> usernameToQueuename;
    }

    private final Map<String, ChatRoomInfo> chatRooms = new java.util.HashMap<>();
    private final ReentrantLock lock = new ReentrantLock();   
    
    @PostConstruct
    public void init() {
        connectionFactory = rabbitTemplate.getConnectionFactory();
    }

    public Flux<String> invite(String fromUsername, String toUsername) {
        String chatRoomName = getChatRoomName(fromUsername, toUsername);
        ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);
        if (chatRoomInfo == null) {
            createChatRoom(fromUsername, toUsername);
        }
        return talk(chatRoomName, fromUsername);
    }
    
    private void createChatRoom(String fromUsername, String toUsername) {
        String chatRoomName = getChatRoomName(fromUsername, toUsername);
        String exchangeName = chatRoomName;
        String fromQueueName = "queue-" + fromUsername + "-" + toUsername;
        String toQueueName = "queue-" + toUsername + "-" + fromUsername;
        
        rabbitTemplate.execute(action -> {
            action.exchangeDeclare(exchangeName, "fanout", false, true, null);
            action.queueDeclare(fromQueueName, false, true, false, null);
            action.queueDeclare(toQueueName, false, true, false, null);
            action.queueBind(fromQueueName, exchangeName, "");
            action.queueBind(toQueueName, exchangeName, "");
            return null;
        });

        lock.lock();
        try {
            ChatRoomInfo chatRoomInfo = new ChatRoomInfo();
            chatRoomInfo.setExchange(exchangeName);
            chatRoomInfo.setUsernameToQueuename(Map.of(fromUsername, fromQueueName, toUsername, toQueueName));
            chatRooms.put(chatRoomName, chatRoomInfo);
        } finally {
            lock.unlock();
        }
    }

接受邀请

被邀请方通过Core交换器得知有人要和它聊天。
然后接受邀请的请求会寻找聊天室信息,然后订阅聊天记录队列。

    public Flux<String> accept(String fromUsername, String toUsername) {
        String chatRoomName = getChatRoomName(fromUsername, toUsername);
        return talk(chatRoomName, toUsername);
    }

    private Flux<String> talk(String chatRoomName, String username) {
        ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);
        if (chatRoomInfo == null) {
            throw new IllegalArgumentException("Chat room not found");
        }
        String queueName = chatRoomInfo.getUsernameToQueuename().get(username);
        return Flux.create(emitter -> {
            SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();
            listener.setConnectionFactory(connectionFactory);
            listener.setQueueNames(queueName);
            listener.setMessageListener((Message message) -> {
                String msg = new String(message.getBody());
                System.out.println(username + " received message: " + msg);
                emitter.next(msg);
            });
            listener.start();
        });
    }

聊天

聊天的逻辑就是找到聊天室信息,然后向交换器发送消息。

    public void chat(String fromUsername, String toUsername, String message) {
        String chatRoomName = getChatRoomName(fromUsername, toUsername);
        ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);
        if (chatRoomInfo == null) {
            chatRoomName = getChatRoomName(toUsername, fromUsername);
            chatRoomInfo = chatRooms.get(chatRoomName);
        }
        if (chatRoomInfo == null) {
            throw new IllegalArgumentException("Chat room not found");
        }
        rabbitTemplate.convertAndSend(chatRoomInfo.getExchange(), "", fromUsername + ": " + message);
    }
    
    private String getChatRoomName(String fromUsername, String toUsername) {
        return fromUsername + "-" + toUsername + "-chat-room";
    }

Controller侧代码

package com.rabbitmq.chat.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.rabbitmq.chat.service.ChatRoom;
import com.rabbitmq.chat.service.Core;

import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/chat")
public class ChatController {
    @Autowired
    private Core core;

    @Autowired
    private ChatRoom chatRoom;

    @PutMapping(value = "/invite", produces = "text/event-stream")
    public Flux<String> invite(@RequestParam String fromUsername, @RequestParam String toUsername) {
        core.invite(fromUsername, toUsername);
        return chatRoom.invite(fromUsername, toUsername);
    }

    @PutMapping(value = "/accept", produces = "text/event-stream")
    public Flux<String> accept(@RequestParam String fromUsername, @RequestParam String toUsername) {
        core.accept(fromUsername, toUsername);
        return chatRoom.accept(fromUsername, toUsername);
    }

    @PostMapping("/send")
    public void send(@RequestParam String fromUsername, @RequestParam String toUsername, @RequestParam String message) {
        chatRoom.chat(fromUsername, toUsername, message);
    }
}

实验过程

在Postman中,我们先让tom登录,然后jerry登录。
在这里插入图片描述
在这里插入图片描述
在后台,我们看到创建两个队列
在这里插入图片描述
以及Core交换器的绑定关系也被更新
在这里插入图片描述
Jerry向Tom发起聊天邀请
在这里插入图片描述
可以看到Tom收到了邀请
在这里插入图片描述
同时新增了两个队列
在这里插入图片描述
以及一个交换器
在这里插入图片描述
在这里插入图片描述
Tom通过下面请求接受邀请
在这里插入图片描述
Jerry收到Tom接受了邀请的通知
在这里插入图片描述
后面它们就可以聊天了
在这里插入图片描述
在这里插入图片描述
它们的聊天窗口都收到了消息
在这里插入图片描述
在这里插入图片描述

总结

本文主要使用的知识点:

  • direct交换器以及其绑定规则
  • fanout交换器
  • 自动删除的交换器
  • 自动删除的队列
  • 只有一个消费者的队列
  • WebFlux响应式编程

代码工程

https://github.com/f304646673/RabbitMQDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/752605.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【MySQL基础篇】概述及SQL指令:DDL及DML

数据库是一个按照数据结构来组织、存储和管理数据的仓库。以下是对数据库概念的详细解释&#xff1a;定义与基本概念&#xff1a; 数据库是长期存储在计算机内的、有组织的、可共享的、统一管理的大量数据的集合。 数据库不仅仅是数据的简单堆积&#xff0c;而是遵循一定的规则…

可用的搜索引擎

presearchhttps://presearch.com/yandexhttps://yandex.com/ 以上&#xff0c;目前均不需科学上网。

GEOS学习笔记(一)

下载编译GEOS 从Download and Build | GEOS (libgeos.org)下载geos-3.10.6.tar.bz2 使用cmake-3.14.0版本配置VS2015编译 按默认配置生成VS工程文件 编译后生成geos.dll&#xff0c;geos_c.dll 后面学习使用C接口进行编程

PCB在工业领域的应用以及人工智能的影响。

什么是pcb呢? PCB,全称Printed Circuit Board,中文名称为印制电路板,也被称为印刷线路板或印制板1。这是一种重要的电子部件,主要由绝缘基板、连接导线和装配焊接电子元器件的焊盘组成。PCB的主要作用是作为电子元器件的支撑体和电气连接的载体,它能够简化电子产品的装配…

三分钟快速搭建基于FastAPI的AI Agent应用!

点击下方“JavaEdge”&#xff0c;选择“设为星标” 第一时间关注技术干货&#xff01; 免责声明~ 任何文章不要过度深思&#xff01; 万事万物都经不起审视&#xff0c;因为世上没有同样的成长环境&#xff0c;也没有同样的认知水平&#xff0c;更「没有适用于所有人的解决方案…

【鸿蒙学习笔记】页面和自定义组件生命周期

官方文档&#xff1a;页面和自定义组件生命周期 目录标题 [Q&A] 都谁有生命周期&#xff1f; [Q&A] 什么是组件生命周期&#xff1f; [Q&A] 什么是组件&#xff1f;组件生命周期 [Q&A] 什么是页面生命周期&#xff1f; [Q&A] 什么是页面&#xff1f;页面生…

代码随想录算法训练营第五十二天| [KC]100. 岛屿的最大面积、101. 孤岛的总面积、102. 沉没孤岛、103. 水流问题

[KamaCoder] 100. 岛屿的最大面积 [KamaCoder] 100. 岛屿的最大面积 文章解释 题目描述 给定一个由 1&#xff08;陆地&#xff09;和 0&#xff08;水&#xff09;组成的矩阵&#xff0c;计算岛屿的最大面积。岛屿面积的计算方式为组成岛屿的陆地的总数。岛屿由水平方向或垂直…

开放式耳机哪个牌子好?2024热门红榜开放式耳机测评真实篇!

当你跟朋友们聊天时&#xff0c;他们经常抱怨说长时间戴耳机会令耳朵感到不适,后台也有很多人来滴滴我&#xff0c;作为一位致力于开放式耳机的测评博主&#xff0c;在对比了多款开放式耳机之后&#xff0c;你开放式耳机在保护听力方面确实有用。开放式的设计有助于减轻耳道内的…

自适应蚁群算法优化的攀爬机器人的路径规划

大家好&#xff0c;我是带我去滑雪&#xff01; 攀爬机器人是一种能够在复杂环境中自主移动和攀爬的具有广阔应用前景的智能机器人&#xff0c;具有较强的应用潜力和广泛的研究价值。随着科技的不断发展&#xff0c;攀爬机器人在许多领域中的应用越来越广泛&#xff0c;例如建筑…

FastGPT 手动部署错误:MongooseServerSelectionError: getaddrinfo EAI_AGAIN mongo

在运行 FastGPT 时&#xff0c;mongodb 报如下错误&#xff1a; MongooseServerSelectionError: getaddrinfo EAI_AGAIN mongo 这是因为 mongo 没有解析出来&#xff0c;在 hosts 文件中添加如下信息&#xff1a; 127.0.0.1 mongo 重新运行 FastGPT 即可。 参考链接&#xff…

力扣随机一题 位运算/滑动窗口/数组

博客主页&#xff1a;誓则盟约系列专栏&#xff1a;IT竞赛 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ 3191.使二进制数组全部等于1的最少操作次数I【中等】 题目&#xff1a; 给…

C语言力扣刷题7——删除排序链表中的重复元素 II——[快慢双指针法]

力扣刷题7——删除排序链表中的重复元素 II——[快慢双指针法] 一、博客声明二、题目描述三、解题思路1、思路说明 四、解题代码&#xff08;附注释&#xff09; 一、博客声明 找工作逃不过刷题&#xff0c;为了更好的督促自己学习以及理解力扣大佬们的解题思路&#xff0c;开辟…

STM32将外部SDRAM空间作为系统堆(Heap)空间

概述 stm32可以外扩很大的sram&#xff0c;常见外部sram的初始化函数一般是c语言写的&#xff0c;默认写在main函数里面。stm32初始化首先进入汇编代码startup_stm32f429xx.s&#xff0c;在汇编代码中Reset_Handler&#xff08;复位中断服务程序&#xff09;里面先调用了Syste…

光明致优尊耀呈现“柏林爱乐在上海”音乐会正式开幕,奏响盛夏狂热乐章

2024年6月26日&#xff0c;由光明致优尊耀呈现的中国上海国际艺术节特别项目“柏林爱乐在上海”音乐会正式开幕。暌违七年&#xff0c;世界顶级交响乐团——柏林爱乐乐团再度访沪&#xff0c;在首席指挥基里尔别特连科率领下&#xff0c;正式在中国上海国际艺术节登台演出&…

Talk|CityU 助理教授马佳葳: CVPR 2024, 基于多模态理解的混合数据专家模型

本期为TechBeat人工智能社区第604期线上Talk。 北京时间6月27日(周四)20:00&#xff0c;香港城市大学助理教授—马佳葳的Talk已经准时在TechBeat人工智能社区开播&#xff01; 他与大家分享的主题是: “基于多模态理解的混合数据专家模型”&#xff0c;他向大家介绍了混合数据专…

x86 平台实现一个原子加法操作

1&#xff0c;先上代码 #include <iostream> #include <omp.h>int atomicAdd(int* ptr, int value) {int result;asm volatile("lock xaddl %0, %1\n": "r" (result), "m" (*ptr): "0" (value), "m" (*ptr): &…

程序猿大战Python——Python与MySQL交互三

SQL注入 目标&#xff1a;了解什么是SQL注入&#xff1f; SQL注入指的是&#xff1a;恶意篡改或注入SQL条件。 当开发者的数据条件若被恶意篡改&#xff0c;那就达不到预期的查询效果。 为了了解SQL注入是怎么回事&#xff1f;通过一个案例来分析。 例如&#xff0c;使用命令…

综合布线实训室建设可行性报告

1、 建设综合布线实训室的目的和意义 1.1 响应国家职业教育政策 在国家对职业教育的高度重视和政策支持下&#xff0c;综合布线实训室的建设不仅是对国家教育方针的积极响应&#xff0c;也是对技术教育改革的有力推动。通过这一平台&#xff0c;我们旨在培育出一批具有强烈实…

ChatGPT智能对话绘画系统 带完整的安装源代码包以及搭建教程

系统概述 ChatGPT 智能对话绘画系统是一款集智能语言处理和绘画创作于一体的综合性系统。它利用了深度学习和自然语言处理技术&#xff0c;能够理解用户的意图和需求&#xff0c;并通过与用户的交互&#xff0c;生成富有创意的绘画作品。该系统的核心是一个强大的人工智能模型…

高考后的抉择:专业优先还是学校优先?

随着2024年高考的帷幕落下&#xff0c;高考生们面临的一个重要抉择再度浮上心头&#xff1a;在分数受限的情况下&#xff0c;是选择一个心仪的专业&#xff0c;还是选择一个知名度更高的学校&#xff1f;这是一个困扰了众多考生和家长的长期难题。在这个关键的时刻&#xff0c;…