技术介绍
WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
很多网站为了实现推送技术,所用的技术都是轮询。轮询是在特定的时间间隔(如每1秒),由浏览器对服务器发出HTTP请求,然后由服务器返回最新的数据给客户端的浏览器。这种传统的模式带来很明显的缺点,即浏览器需要不断的向服务器发出请求,然而HTTP请求可能包含较长的头部,其中真正有效的数据可能只是很小的一部分,显然这样会浪费很多的带宽等资源。
在这种情况下,HTML5定义了WebSocket协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。
服务端
第一步,添加相关依赖
<!-- Web依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 消息队列依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<!-- lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<!-- json转换依赖 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.51</version>
</dependency>
<!-- WebSocket依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
第二步,添加WebSocket配置
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
第三步,封装WebSocket服务
@Slf4j
@Component
@ServerEndpoint("/websocket-chart/{userId}")
public class WebSocketServer {
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 用户id
*/
private String userId;
/**
* 用来存放每个客户端对应的MyWebSocket对象
*/
private static CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
/**
* 用来存在线连接用户信息
*/
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String, Session>();
/**
* 链接成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try {
this.session = session;
this.userId = userId;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
} catch (Exception e) {
}
}
/**
* 链接关闭调用的方法
*/
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
} catch (Exception e) {
}
}
/**
* 收到客户端消息后调用的方法
*/
@OnMessage
public void onMessage(String message) {
log.info("【websocket消息】收到客户端消息:" + message);
}
/**
* 发送错误时的处理
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误,原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 此为广播消息
*/
public void sendAllMessage(String message) {
log.info("【websocket消息】广播消息:" + message);
for (WebSocketServer webSocket : webSockets) {
try {
if (webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 此为单点消息
*/
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 此为单点消息(多人)
*/
public void sendMoreMessage(String[] userIds, String message) {
for (String userId : userIds) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
第四步 编写MQ监听,监听客户端消息
注意:多写个消费者(建议四个消费者实例),否则消费太慢,延时太高
/**
* @author songz
* @version 1.0
* @date 2024-09-01 0:22
*/
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "group1",topic ="chartA")
public class ChatAServiceListener implements RocketMQListener<String> {
@Autowired
WebSocketServer webSocketServer;
@Override
public void onMessage(String message) {
log.info("【消息队列MQ消息】收到客户端消息:" + message);
webSocketServer.sendAllMessage(message);
}
}
中间省略两个个..... BC 实例
/**
* @author songz
* @version 1.0
* @date 2024-09-01 0:22
*/
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "group1",topic ="chartA")
public class ChatDServiceListener implements RocketMQListener<String> {
@Autowired
WebSocketServer webSocketServer;
@Override
public void onMessage(String message) {
log.info("【消息队列MQ消息】收到客户端消息:" + message);
webSocketServer.sendAllMessage(message);
}
}
注意:四个消费者必须是同一Group和同一Topic
第五步 别忘记配置application.yml
rocketmq:
name-server: localhost:9876
producer:
group: group1
第六步 编写消息生产者(消费发送接口)
注意:本项目基于RocketMq 生产消息,未使用websocket的客户端消息发送功能,只使用服务端websocket的消息推送功能。
@CrossOrigin
@RestController
public class ChatServiceController {
@Autowired
RocketMQTemplate rocketMQTemplate;
/**
* 消息发送接口(注意临时返回ok字符串 而不是json)
* @param message
* @return
*/
@PostMapping("/send/{topic}")
public String send(@PathVariable("topic") String topic, @RequestBody String message){
log.info("接口收到消息:{}",message);
rocketMQTemplate.sendOneWay(topic,message);
return "ok";
}
}
/**
* 消息体封装
*/
@Data
public class Msg {
/**
* 主题
*/
private String topic;
/**
* 内容
*/
private String content;
/**
* 用户编号
*/
private String userId;
/**
* 用户名称
*/
private String nickName;
}
前端界面
<script setup>
import { ref } from 'vue';
//是否建立链接
const isOpen = ref(0)
//消息列表
const messageList = ref([]);
//即将发送的消息
const message = ref({
topic: "chartA",
content: "",
userId: "陌生人",
nickName: "涛哥"
})
/**
* 建立链接
*/
const connect = () => {
const websocket = new WebSocket(`ws://localhost:20000/websocket-chart/${message.value.userId}`);
websocket.onopen = function (evt) {
isOpen.value = 1;
};
websocket.onclose = function (evt) {
isOpen.value = 0;
};
websocket.onmessage = function (evt) {
receiveMessage(evt)
};
websocket.onerror = function (evt) {
};
}
/**
* 收到消息后
* @param {} evt
*/
const receiveMessage = (evt) => {
const msg = JSON.parse(evt.data)
messageList.value.push(`${msg.userId}:${msg.content}`)
}
/**
* 发送消息方法
*/
const sendMessage = () => {
fetch(`http://localhost:30000/send/${message.value.topic}`, {
method: "POST",
body: JSON.stringify(message.value),
headers: {
'Content-Type': "application/json"
}
}).then(res => res.text()).then((r) => {
message.value.content = "";
console.log(r)
})
}
</script>
<template>
<el-card style="max-width: 480px">
<template #header>
<div class="card-header">
<span>恋爱房</span>
</div>
</template>
<div>
<el-image v-if="isOpen === 0"
src="https://img.zcool.cn/community/011c875d019b7ea801205e4b7e1408.jpg@1280w_1l_2o_100sh.jpg" />
<div v-else>
<p v-for="msg in messageList"> {{ msg }}</p>
</div>
</div>
<template #footer>
<div v-if="isOpen === 0" style="display: flex;">
<el-input v-model="message.userId" />
<el-button type="primary" @click="connect">进入房间</el-button>
</div>
<div v-else style="display: flex;">
<el-input v-model="message.content" />
<el-button type="primary" @click="sendMessage">发送消息</el-button>
</div>
</template>
</el-card>
</template>
<style scoped></style>