1. WebSocket 简介

WebSocket 是一种全双工通信协议,允许客户端与服务器之间建立持久连接,并进行实时数据交换。它的优势在于相比传统的 HTTP 协议,它无需多次建立连接,数据的传输效率更高,适用于聊天室、股票行情推送等实时应用。

Java 提供了标准的 WebSocket API(JSR 356),使得开发者能够方便地创建 WebSocket 服务器和客户端。

2. WebSocketUtils 工具类

WebSocketUtils 是一个基于 Java WebSocket API 的工具类,包含了发送消息、广播消息、连接管理等功能。

WebSocketUtils 工具类代码
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * WebSocketUtils - WebSocket 通信工具类
 * 用于管理 WebSocket 连接,发送消息,广播消息等操作
 */
@ServerEndpoint("/ws")
public class WebSocketUtils {

    // 记录当前在线的连接数量
    private static final AtomicInteger onlineCount = new AtomicInteger(0);

    // 用于存储每个客户端的 WebSocketUtils 实例
    private static final Set<WebSocketUtils> webSocketSet = new CopyOnWriteArraySet<>();

    // 当前会话,代表与某个客户端的连接会话,必须通过它来发送数据
    private Session session;

    /**
     * 连接建立成功时调用的方法
     * @param session WebSocket 会话
     */
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        webSocketSet.add(this);  // 将当前实例加入到集合中
        int count = onlineCount.incrementAndGet();  // 在线数 +1
        System.out.println("有新连接加入,当前在线人数为:" + count);
        sendMessage("连接成功");
    }

    /**
     * 连接关闭时调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  // 从集合中移除当前实例
        int count = onlineCount.decrementAndGet();  // 在线数 -1
        System.out.println("有连接关闭,当前在线人数为:" + count);
    }

    /**
     * 收到客户端消息时调用的方法
     * @param message 客户端发送的消息
     * @param session 当前会话
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("收到来自客户端的消息:" + message);
        // 这里可以自定义处理逻辑,比如将消息广播给其他客户端
        broadcast(message);
    }

    /**
     * 发生错误时调用的方法
     * @param session 当前会话
     * @param error   错误信息
     */
    @OnError
    public void onError(Session session, Throwable error) {
        System.err.println("发生错误:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 发送消息到当前客户端
     * @param message 要发送的消息
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            System.err.println("发送消息失败:" + e.getMessage());
        }
    }

    /**
     * 广播消息给所有客户端
     * @param message 要广播的消息
     */
    public static void broadcast(String message) {
        for (WebSocketUtils item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (Exception e) {
                System.err.println("广播消息失败:" + e.getMessage());
            }
        }
    }

    /**
     * 获取当前在线连接数
     * @return 在线连接数
     */
    public static int getOnlineCount() {
        return onlineCount.get();
    }

    /**
     * 获取当前所有的 WebSocket 连接
     * @return WebSocket 连接集合
     */
    public static Set<WebSocketUtils> getWebSocketSet() {
        return webSocketSet;
    }
}

3. 功能详解

3.1. WebSocket 注解介绍
  • @ServerEndpoint:用来标记一个类作为 WebSocket 端点,并指定 WebSocket 服务的 URI(这里是 /ws)。
  • @OnOpen:标记在客户端连接成功时触发的方法。
  • @OnMessage:用于接收客户端消息。
  • @OnClose:客户端断开连接时触发。
  • @OnError:当发生错误时触发。
3.2. 连接管理
  • SessionSession 对象代表了与某个客户端的会话,通过它可以实现消息发送。每个客户端的 WebSocket 连接都有唯一的 Session
  • onlineCount:使用 AtomicInteger 来记录当前在线的 WebSocket 连接数。AtomicInteger 是线程安全的,在多线程环境下可以安全地递增或递减计数。
  • webSocketSet:存储当前所有活跃的 WebSocket 连接,使用 CopyOnWriteArraySet,这是线程安全的集合,支持并发的读写操作。
3.3. 核心方法
onOpen(Session session)

当客户端成功连接时,会触发此方法:

  • 新连接的 Session 对象会保存在工具类的实例中。
  • 将当前连接的实例加入到 webSocketSet 中,用于后续消息的广播。
  • 在线人数通过 onlineCount 递增,并将新连接成功的消息发送给客户端。
onClose()

当客户端断开连接时,触发此方法:

  • webSocketSet 中移除断开连接的实例。
  • 在线人数通过 onlineCount 递减。
onMessage(String message, Session session)

当服务器收到客户端消息时触发:

  • 输出客户端发送的消息。
  • 调用 broadcast 方法,将消息广播给所有已连接的客户端。
onError(Session session, Throwable error)

当连接发生错误时触发:

  • 输出错误信息,方便调试和排查问题。
sendMessage(String message)

用于发送消息给当前连接的客户端:

  • 通过 Session.getBasicRemote().sendText 方法实现消息发送。
  • 捕获 IOException 异常并输出错误日志。
broadcast(String message)

用于广播消息给所有客户端:

  • 遍历 webSocketSet 集合,调用每个实例的 sendMessage 方法,将消息发送给所有已连接的客户端。
  • 捕获发送过程中可能的异常,防止个别连接的失败影响整体功能。
getOnlineCount()

用于获取当前在线的连接数。

getWebSocketSet()

返回当前所有的 WebSocket 连接实例。

4. 示例代码 - 客户端

为了测试 WebSocketUtils,我们可以编写一个简单的客户端连接到服务器。Java 提供了 javax.websocket API 来实现客户端。

import javax.websocket.*;
import java.net.URI;

@ClientEndpoint
public class WebSocketClient {
    
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("连接服务器成功!");
        try {
            session.getBasicRemote().sendText("Hello Server!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @OnMessage
    public void onMessage(String message) {
        System.out.println("收到来自服务器的消息:" + message);
    }

    @OnClose
    public void onClose() {
        System.out.println("连接关闭");
    }

    @OnError
    public void onError(Throwable error) {
        System.err.println("发生错误:" + error.getMessage());
    }

    public static void main(String[] args) {
        try {
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            URI uri = new URI("ws://localhost:8080/ws");
            container.connectToServer(WebSocketClient.class, uri);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

5. 扩展功能

WebSocketUtils 可以根据实际需求进行扩展,例如:

  • 心跳检测:定时发送心跳消息,检测连接是否保持活跃。
  • 身份验证:在建立连接时,验证客户端的身份合法性。
  • 消息格式:可以通过 JSON 格式进行通信,方便扩展复杂的消息结构。

6. 总结

WebSocketUtils 是一个简单且功能完善的 WebSocket 工具类,它利用 Java WebSocket API 实现了客户端与服务器之间的实时通信。该工具类支持消息广播、连接管理、错误处理等功能,适用于需要实时数据交互的场景,如在线聊天室、股票实时行情推送等。

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐