原文详见:

用 Java 编写 WebSocket 服务器 - Web API | MDN (mozilla.org)

首先,我们需要明白,WebSocket的本质是基于TCP的端到端双工通信协议,它是区别于Http的一种协议。所以接到客户端发来的websocket连接请求时,需要进行验证,保证双方都支持websocket协议。

当然,在之前,我们需要监听端口来接受请求:

//创建一个监听 8080 端口的套接字连接
ServerSocket server = new ServerSocket(8080);
//监听并接受与该套接字的连接。该方法会阻塞,直到建立连接
Socket accept = server.accept();
System.out.println("客户端已连接");

Socket 方法
  • java.net.Socket.getInputStream()

    返回该 Socket 的输入流。

  • java.net.Socket.getOutputStream()

    返回该 Socket 的输出流。

//该连接的输入流
InputStream inputStream = accept.getInputStream();
//该连接的输出流
OutputStream outputStream = accept.getOutputStream();
​
Scanner scanner = new Scanner(inputStream, "UTF-8");

握手

当客户端连接到服务器时,它会发送 GET 请求以从简单的 HTTP 请求升级到 WebSocket 的连接。这被称为握手。

        String data = scanner.useDelimiter("\\r\\n\\r\\n").next();
        //创建正则表达式模式,匹配data中以GET开始的字符串
        Pattern compile = Pattern.compile("^GET");
        Matcher get = compile.matcher(data);
        //拿到请求头中的信息,与客户端”握手“
        if(get.find()){
            //获取 Sec-WebSocket-Key 请求标头的值,去除头部和尾部的所有空格
            Matcher match = Pattern.compile("Sec-WebSocket-Key: (.*)").matcher(data);
            if(match.find()){ //健壮性判断
                //追加字符串“258EAFA5-E914-47DA-95CA-C5AB0DC85B11” ---》固定的,为了校验双方是否都支持websocket
                String s = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
                //计算 SHA-1 值及其 Base64 编码
                //将其作为 HTTP 响应的一部分写回 Sec-WebSocket-Accept 响应标头的值
                byte[] response = ("HTTP/1.1 101 Switching Protocols\r\n"
                        + "Connection: Upgrade\r\n"
                        + "Upgrade: websocket\r\n"
                        + "Sec-WebSocket-Accept: "
                        + Base64.getEncoder().encodeToString(MessageDigest.getInstance("SHA-1")
                        .digest((match.group(1) + s).getBytes("UTF-8")))
                        + "\r\n\r\n").getBytes("UTF-8");
                outputStream.write(response, 0, response.length);
                System.out.println("握手成功");

解码消息

解码字节 = 编码字节 XOR 密钥的第(编码字节位置 AND 0x3)个字节

//假如传入了数据:
byte[] decoded = new byte[6];
byte[] encoded = new byte[] { (byte) 198, (byte) 131, (byte) 130, (byte) 182, (byte) 194, (byte) 135 };
byte[] key = new byte[] { (byte) 167, (byte) 225, (byte) 225, (byte) 210 };
for (int i = 0; i < encoded.length; i++) {
    ecoded[i] = (byte) (encoded[i] ^ key[i & 0x3]);
}
System.out.println(Arrays.toString(decoded));

完整示例代码
package com.yuziqikk.websocketdemo.websocket_test;
​
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
​
/**
 * @Package: com.yuziqikk.websocketdemo.websocket_test
 * @Author: YuZiQiKK
 * @Created: 22/5/2024 上午10:32
 **/
public class Test {
    public void test() throws IOException, NoSuchAlgorithmException {
        //创建一个监听 8080 端口的套接字连接
        ServerSocket server = new ServerSocket(8080);
        //监听并接受与该套接字的连接。该方法会阻塞,直到建立连接
        Socket accept = server.accept();
        System.out.println("客户端已连接");
        //该连接的输入流
        InputStream inputStream = accept.getInputStream();
        //该连接的输出流
        OutputStream outputStream = accept.getOutputStream();
​
        Scanner scanner = new Scanner(inputStream, "UTF-8");
​
        String data = scanner.useDelimiter("\\r\\n\\r\\n").next();
        //创建正则表达式模式,匹配data中以GET开始的字符串
        Pattern compile = Pattern.compile("^GET");
        Matcher get = compile.matcher(data);
        //拿到请求头中的信息,与客户端”握手“
        if(get.find()){
            //获取 Sec-WebSocket-Key 请求标头的值,去除头部和尾部的所有空格
            Matcher match = Pattern.compile("Sec-WebSocket-Key: (.*)").matcher(data);
            if(match.find()){ //健壮性判断
                //追加字符串“258EAFA5-E914-47DA-95CA-C5AB0DC85B11” ---》固定的,为了校验双方是否都支持websocket
                String s = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
                //计算 SHA-1 值及其 Base64 编码
                //将其作为 HTTP 响应的一部分写回 Sec-WebSocket-Accept 响应标头的值
                byte[] response = ("HTTP/1.1 101 Switching Protocols\r\n"
                        + "Connection: Upgrade\r\n"
                        + "Upgrade: websocket\r\n"
                        + "Sec-WebSocket-Accept: "
                        + Base64.getEncoder().encodeToString(MessageDigest.getInstance("SHA-1")
                        .digest((match.group(1) + s).getBytes("UTF-8")))
                        + "\r\n\r\n").getBytes("UTF-8");
                outputStream.write(response, 0, response.length);
                System.out.println("握手成功");
                //假如传入了数据:
                byte[] decoded = new byte[6];
                byte[] encoded = new byte[] { (byte) 198, (byte) 131, (byte) 130, (byte) 182, (byte) 194, (byte) 135 };
                byte[] key = new byte[] { (byte) 167, (byte) 225, (byte) 225, (byte) 210 };
                for (int i = 0; i < encoded.length; i++) {
                    decoded[i] = (byte) (encoded[i] ^ key[i & 0x3]);
                }
                //传输的消息为:
                System.out.println(Arrays.toString(decoded));
            }
        }
    }
}

SpringBoot整合WebSocket

引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置WebSocketConfig
@Configuration
public class WebSocketConfig {
    // 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

编写核心代码
类注解

添加@Component注解将类纳入SpringBoot的管理中

添加@ServerEndpoint声明这是一个WebSocket服务器,路径参数则是处理该路径的请求(注意,这里使用/{user}是为了获得当前发起请求的用户名,后面也许这个信息应该放在请求体中?

@Component
@ServerEndpoint("/web/{user}")

成员变量
   /**
     *  与某个客户端的连接对话,需要通过它来给客户端发送消息
     */
    private Session session;
​
    /**
     * 标识当前连接客户端的用户名
     */
    private String user;
​
    /**
     * 线程安全的hashmap,存每个客户端的连接,key 即是 name, value这里是类名,
     当然,应该把session和user抽离出去作为一个单独的类,然后作为value
     */
    private static ConcurrentHashMap<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();

@OnOpen
    /**
     * 新的连接
     * @param session
     * @param user (从路径参数中取的值)
     */
    @OnOpen
    public void OnOpen(Session session, @PathParam(value = "user") String user){
        //把用户信息(用户名)保存到Session
        session.getUserProperties().put("user", user);
        this.session = session;
        this.user = user;
        // name是用来表示唯一客户端,如果需要指定发送,需要指定发送通过user来区分
        webSocketSet.put(user, this);
        System.out.println("[WebSocket] 连接成功,当前连接人数为:={}" + webSocketSet.size());
    }

@OnMessage
    /**
     * 接收消息
     * @param msg
     */
    @OnMessage
    public void OnMessage(String msg){
        System.out.println("[WebSocket] 收到消息:{}" + msg);
​
        /*
        说明A要向B发送信息
        注意,WebSocket发送的信息本质上一串字符串,所以校验规则可以自定义
        这里是在发送的消息前带上了 user:{接受消息的用户名}
        */
        if(msg.indexOf("user:") == 0){
            //截取用户名
            String user = msg.substring("user:{".length(), msg.indexOf("}"));
            msg = msg.substring(msg.indexOf("}") + 1);
            appointSending(user, msg);
        }else{
            groupSending(msg);
        }
    }

私聊
    /**
     * 指定发送
     * @param name
     * @param message
     */
    public void appointSending(String name,String message){
        try {
            webSocketSet.get(name).session.getBasicRemote().sendText(message);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

群发
    /**
     * 群发
     * @param message
     */
    public void groupSending(String message){
        for (String name : webSocketSet.keySet()){
            //过滤掉自己的用户名,这样就不会给自己也发送消息
            if(name == session.getUserProperties().get("user")){
                continue;
            }
            try {
                webSocketSet.get(name).session.getBasicRemote().sendText(message);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

@OnClose
    /**
     * 关闭连接
     */
    @OnClose
    public void OnClose(){
        webSocketSet.remove(this.user);
        System.out.println("[WebSocket] 退出成功,当前连接人数为:={}" + webSocketSet.size());
    }

完整示例
package com.yuziqikk.websocketdemo;
​
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.ConcurrentHashMap;
​
/**
 * @Package: com.yuziqikk.websocketdemo
 * @Author: YuZiQiKK
 * @Created: 22/5/2024 上午9:20
 **/
@Component
@ServerEndpoint("/web/{user}")
public class WebSocket {
​
    /**
     *  与某个客户端的连接对话,需要通过它来给客户端发送消息
     */
    private Session session;
​
    /**
     * 标识当前连接客户端的用户名
     */
    private String user;
​
    /**
     * 线程安全的hashmap,存每个客户端的连接,key 即是 name
     */
    private static ConcurrentHashMap<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();
​
    /**
     * 新的连接
     * @param session
     * @param user
     */
    @OnOpen
    public void OnOpen(Session session, @PathParam(value = "user") String user){
        //把用户信息(用户名)保存到Session
        session.getUserProperties().put("user", user);
        this.session = session;
        this.user = user;
        // name是用来表示唯一客户端,如果需要指定发送,需要指定发送通过user来区分
        webSocketSet.put(user, this);
        System.out.println("[WebSocket] 连接成功,当前连接人数为:={}" + webSocketSet.size());
    }
​
    /**
     * 关闭连接
     */
    @OnClose
    public void OnClose(){
        webSocketSet.remove(this.user);
        System.out.println("[WebSocket] 退出成功,当前连接人数为:={}" + webSocketSet.size());
    }
​
    /**
     * 接受消息
     * @param msg
     */
    @OnMessage
    public void OnMessage(String msg){
        System.out.println("[WebSocket] 收到消息:{}" + msg);
​
        //说明A要向B发送信息
        if(msg.indexOf("user:") == 0){
            //截取用户名
            String user = msg.substring("user:{".length(), msg.indexOf("}"));
            msg = msg.substring(msg.indexOf("}") + 1);
            appointSending(user, msg);
        }else{
            groupSending(msg);
        }
    }
​
    /**
     * 群发
     * @param message
     */
    public void groupSending(String message){
        for (String name : webSocketSet.keySet()){
            if(name == session.getUserProperties().get("user")){
                continue;
            }
            try {
                webSocketSet.get(name).session.getBasicRemote().sendText(message);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
​
    /**
     * 指定发送
     * @param name
     * @param message
     */
    public void appointSending(String name,String message){
        try {
            webSocketSet.get(name).session.getBasicRemote().sendText(message);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
​

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐