Kaze
Kaze
Published on 2023-10-20 / 20 Visits
0
0

WebSocket

maven依赖

   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-websocket</artifactId>
  </dependency>

WebSocket配置类

@Configuration
public class WebSocketConfig {
    /**
     * 	注入ServerEndpointExporter,
     * 	这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
    
}

WebSocket操作类

通过该类WebSocket可以进行群推送以及单点推送

@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8084/websocket/userId;
public class WebSocket {

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    private String userId;

    //concurrent包的线程安全Map,用来存放每个客户端对应的WebSocket对象。
    private static ConcurrentHashMap<String,WebSocket> webSockets = new ConcurrentHashMap<>();

    // 使用线程安全的原子整数来存在线连接数
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    //因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例
    private RedisTemplate template = (RedisTemplate) ApplicationContextProvider.getBean("redisTemplateInit");

    private ObjectMapper mapper = ApplicationContextProvider.getBean(ObjectMapper.class);

    private ExchangeService exchangeService = ApplicationContextProvider.getBean(ExchangeServiceImpl.class);

    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value="userId") String userId) {
        try {
            this.userId = userId;
            this.session = session;
            webSockets.put(userId,this);
            onlineCount.incrementAndGet();
            log.info("【websocket消息】有新的连接,当前连接总数为:"+onlineCount.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
        pullUnreadMessage(userId);
    }

    /**
     * 链接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this.userId);
            onlineCount.decrementAndGet();
            log.info("【websocket消息】连接断开,当前连接总数为:"+onlineCount.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message
     * @param
     */
    @OnMessage
    public void onMessage(String message) throws JsonProcessingException {
        log.info("【websocket消息】收到客户端消息:"+message);
        messageHandler(message);
    }

    /** 发送错误时的处理
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误,原因:"+error.getMessage());
        error.printStackTrace();
    }

    // 发送单点消息
    public void sendOneMessage(Message message) {
        WebSocket webSocket = webSockets.get(message.getReceiver());
        if (webSocket != null && webSocket.session.isOpen()) {
            try {
                log.info("【websocket消息】 单点消息:"+message);
                webSocket.session.getAsyncRemote().sendText(mapper.writeValueAsString(message));
            } catch (Exception e) {
                e.printStackTrace();
            }
        } else {
            log.info("消息接收者还未建立WebSocket连接");
        }
        //将通知消息存储在redis中
        if (message.getContent() != null) {
            template.opsForHash().put(message.getReceiver(),message.getContent().getId(),message);
        }
    }

    //拉取未读消息
    public void pullUnreadMessage(String userId){
        //从redis中读取通知消息
        List<Message> list = template.opsForHash().values(userId);
        if (list != null && !list.isEmpty()) {
            for (Message message : list) {
                sendOneMessage(message);
            }
            log.info("拉取未读消息成功");
        } else {
            log.info("暂无未读消息");
        }
    }

    //处理收到的消息
    public void messageHandler(String message) throws JsonProcessingException {
        //收到此消息不做任何处理
        if (message.equals("1")) {
            return;
        }
        //反序列化消息
        Message value = mapper.readValue(message, Message.class);
        //若消息类型为notice则进行消息转发处理
        if (value.getType().equals("notice")) {
            //生成初始交换记录
            ExchangeRecord record = exchangeService.add(value.getContent());
            //将交换记录回设,作为消息内容发送
            value.setContent(record);
            sendOneMessage(value);
        }
        //若消息类型为agree则进行物品交换处理
        if (value.getType().equals("agree")) {
            //开始交换
            boolean exchange = exchangeService.exchange(value.getContent());

            Message message1 = new Message();
            message1.setReceiver(value.getSender());
            if (exchange) {
                //交换成功后更新交换记录状态为成功,并向前端发送success消息
                value.getContent().setStatus(ExchangeStatus.EXCHANGE_SUCCESS);
                message1.setType("success");
            } else {
                //交换失败则更新交换记录状态为失败,并向前端发送error消息
                value.getContent().setStatus(ExchangeStatus.EXCHANGE_FAILED);
                message1.setType("error");
            }
            sendOneMessage(message1);
            exchangeService.update(value.getContent());
            //删除已处理的消息
            template.opsForHash().delete(value.getSender(),value.getContent().getId());
        }
        //若消息类型为reject则更新交换记录状态为失败
        if (value.getType().equals("reject")) {
            value.getContent().setStatus(ExchangeStatus.EXCHANGE_FAILED);
            exchangeService.update(value.getContent());

            //删除已处理的消息
            template.opsForHash().delete(value.getSender(),value.getContent().getId());
        }


    }
}
因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例

方法调用示例

注入我们的操作类

@Autowired
private WebSocket webSocket;

发送消息给前端

//创建业务消息信息
//单个用户发送
webSocket.sendOneMessage(message);		

Comment