Administrator
Administrator
Published on 2025-01-06 / 0 Visits
0
0

Springboot+ RocketMQ+WebSocket实现的聊天室项目

Description

Description

技术介绍

WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

很多网站为了实现推送技术,所用的技术都是轮询。轮询是在特定的时间间隔(如每1秒),由浏览器对服务器发出HTTP请求,然后由服务器返回最新的数据给客户端的浏览器。这种传统的模式带来很明显的缺点,即浏览器需要不断的向服务器发出请求,然而HTTP请求可能包含较长的头部,其中真正有效的数据可能只是很小的一部分,显然这样会浪费很多的带宽等资源。

在这种情况下,HTML5定义了WebSocket协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

服务端

第一步,添加相关依赖
              <!-- Web依赖  -->
           <dependency>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-starter-web</artifactId>
           </dependency>

        <!-- 消息队列依赖 -->
           <dependency>
               <groupId>org.apache.rocketmq</groupId>
               <artifactId>rocketmq-spring-boot-starter</artifactId>
               <version>2.2.2</version>
           </dependency>
        <!-- lombok依赖 -->
           <dependency>
               <groupId>org.projectlombok</groupId>
               <artifactId>lombok</artifactId>
               <scope>provided</scope>
           </dependency>

           <!-- json转换依赖 -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.51</version>
        </dependency>


        <!-- WebSocket依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
第二步,添加WebSocket配置


@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
第三步,封装WebSocket服务

@Slf4j
@Component
@ServerEndpoint("/websocket-chart/{userId}")
public class WebSocketServer {

    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 用户id
     */
    private String userId;
    /**
     * 用来存放每个客户端对应的MyWebSocket对象
     */
    private static CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
    /**
     * 用来存在线连接用户信息
     */
    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String, Session>();

    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        try {
            this.session = session;
            this.userId = userId;
            webSockets.add(this);
            sessionPool.put(userId, session);
            log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }

    /**
     * 链接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this);
            sessionPool.remove(this.userId);
            log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }

    /**
     * 收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("【websocket消息】收到客户端消息:" + message);

    }

    /**
     * 发送错误时的处理
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误,原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 此为广播消息
     */
    public void sendAllMessage(String message) {
        log.info("【websocket消息】广播消息:" + message);
        for (WebSocketServer webSocket : webSockets) {
            try {
                if (webSocket.session.isOpen()) {
                    webSocket.session.getAsyncRemote().sendText(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 此为单点消息
     */
    public void sendOneMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                log.info("【websocket消息】 单点消息:" + message);
                session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 此为单点消息(多人)
     */
    public void sendMoreMessage(String[] userIds, String message) {
        for (String userId : userIds) {
            Session session = sessionPool.get(userId);
            if (session != null && session.isOpen()) {
                try {
                    log.info("【websocket消息】 单点消息:" + message);
                    session.getAsyncRemote().sendText(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }
}
第四步 编写MQ监听,监听客户端消息

注意:多写个消费者(建议四个消费者实例),否则消费太慢,延时太高

/**
 * @author songz
 * @version 1.0
 * @date 2024-09-01 0:22
 */
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "group1",topic ="chartA")
public class ChatAServiceListener implements RocketMQListener<String> {

    @Autowired
    WebSocketServer webSocketServer;

    @Override
    public void onMessage(String message) {
        log.info("【消息队列MQ消息】收到客户端消息:" + message);
        webSocketServer.sendAllMessage(message);

    }
}

  中间省略两个个..... BC 实例
/**
 * @author songz
 * @version 1.0
 * @date 2024-09-01 0:22
 */
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "group1",topic ="chartA")
public class ChatDServiceListener implements RocketMQListener<String> {

    @Autowired
    WebSocketServer webSocketServer;

    @Override
    public void onMessage(String message) {
        log.info("【消息队列MQ消息】收到客户端消息:" + message);
        webSocketServer.sendAllMessage(message);

    }
}

注意:四个消费者必须是同一Group和同一Topic

第五步 别忘记配置application.yml

rocketmq:
  name-server: localhost:9876
  producer:
    group: group1
第六步 编写消息生产者(消费发送接口)

注意:本项目基于RocketMq 生产消息,未使用websocket的客户端消息发送功能,只使用服务端websocket的消息推送功能。


@CrossOrigin
@RestController
public class ChatServiceController {



   @Autowired
   RocketMQTemplate rocketMQTemplate;


    /**
     * 消息发送接口(注意临时返回ok字符串 而不是json)
     * @param message
     * @return
     */
   @PostMapping("/send/{topic}")
    public  String  send(@PathVariable("topic") String topic, @RequestBody  String message){
        log.info("接口收到消息:{}",message);
        rocketMQTemplate.sendOneWay(topic,message);
        return  "ok";

    }
}



/**
 * 消息体封装
 */
@Data
public class Msg {

    /**
     * 主题
     */
    private String topic;
    /**
     * 内容
     */
    private String content;
    /**
     * 用户编号
     */
    private String userId;
    /**
     * 用户名称
     */
    private String nickName;


}

前端界面

<script setup>
import { ref } from 'vue';

//是否建立链接
const isOpen = ref(0)
//消息列表
const messageList = ref([]);
//即将发送的消息
const message = ref({
  topic: "chartA",
  content: "",
  userId: "陌生人",
  nickName: "涛哥"

})


/**
 * 建立链接
 */
const connect = () => {
  const websocket = new WebSocket(`ws://localhost:20000/websocket-chart/${message.value.userId}`);
  websocket.onopen = function (evt) {
    isOpen.value = 1;

  };
  websocket.onclose = function (evt) {
    isOpen.value = 0;
  };
  websocket.onmessage = function (evt) {
    receiveMessage(evt)
  };
  websocket.onerror = function (evt) {

  };
}

/**
 * 收到消息后
 * @param {} evt 
 */
const receiveMessage = (evt) => {
  const msg = JSON.parse(evt.data)
  messageList.value.push(`${msg.userId}:${msg.content}`)

}

/**
 * 发送消息方法
 */
const sendMessage = () => {

  fetch(`http://localhost:30000/send/${message.value.topic}`, {
    method: "POST",
    body: JSON.stringify(message.value),
    headers: {
      'Content-Type': "application/json"

    }

  }).then(res => res.text()).then((r) => {
    message.value.content = "";
    console.log(r)

  })



}


</script>

<template>
  <el-card style="max-width: 480px">
    <template #header>
      <div class="card-header">
        <span>恋爱房</span>
      </div>
    </template>
    <div>
      <el-image v-if="isOpen === 0"
        src="https://img.zcool.cn/community/011c875d019b7ea801205e4b7e1408.jpg@1280w_1l_2o_100sh.jpg" />
      <div v-else>
        <p v-for="msg  in   messageList"> {{ msg }}</p>
      </div>



    </div>
    <template #footer>
      <div v-if="isOpen === 0" style="display: flex;">
        <el-input v-model="message.userId" />
        <el-button type="primary" @click="connect">进入房间</el-button>
      </div>

      <div v-else style="display: flex;">
        <el-input v-model="message.content" />
        <el-button type="primary" @click="sendMessage">发送消息</el-button>
      </div>



    </template>
  </el-card>
</template>

<style scoped></style>



Comment