https://www.huangdf.xyz/categories/study-notes
南风
南风
发布于 2025-06-20 / 0 阅读
0
0

消息推送--eventsource

1、是什么

1.1、官方定义

EventSource - Web API | MDN

EventSource是web内容和服务器发送事件通信的接口。一个EventSource实例会对HTTP开启一个持久化连接,以text/event-stream的格式发送事件。这个链接将会一直保持直到手动调用eventSource.close()。

1.2、优缺点

1.2.1、优点

1、基于http,因此支持大多数浏览器

2、属于轻量级服务,使用简单

3、支持自定义消息发送的类型

1.2.2、缺点

1、只支持单向通信

2、只支持文本传输,二进制数据需要编码后传输

2、怎么用

这里以java距离,可能是java封装的原因导致没办法做到由前端调用了close才关闭,而是存在一个默认连接时间,这里尝试过了两种方案,一种是DeferredResult,另一种是SseEmitter。

前者不设置过期时间的话,是立即关闭,后置不设置过期时间的话是跟随mvc底层的超时时间。

不多说直接上代码

2.1、controller

package com.hwb.security.controller;

import com.hwb.security.service.impl.MsgUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

/**
 * @author admin
 * @date 2025/6/19
 */
@Slf4j
@RestController
@RequestMapping("/event")
@RequiredArgsConstructor
public class EventMessageController extends BaseController {

    @CrossOrigin(origins = "*")
    @GetMapping(value = "/getMsg/{id}", produces = "text/event-stream;charset=UTF-8")
    public DeferredResult<String> getMsg(HttpServletResponse response, @PathVariable("id") String id) throws IOException {
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("UTF-8");
        response.setStatus(200);
        MsgUtil.removeErrorResponse();
        MsgUtil.add(response, id);
        if (!response.getWriter().checkError()) {
            response.getWriter().write("data:hello--"+System.currentTimeMillis()+"\n\n");
            response.getWriter().flush();
        }
        return new DeferredResult<>(3000L);
    }
    @GetMapping(value = "/flush/{id}")
    public void flush(@PathVariable("id") String id) {
        MsgUtil.run("fUck off22", id);
    }
}

这里是采用的DeferredResult,通过resposne设置数据格式,通过writer向页面上输出内容。DeferredResult的作用是

2.2、工具类

在工具类中进行消息的发送。

package com.hwb.security.service.impl;

import com.hwb.security.exception.GlobalException;
import lombok.extern.slf4j.Slf4j;

import javax.servlet.http.HttpServletResponse;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author 南风
 */
@Slf4j
public class MsgService {

    final static ConcurrentHashMap<String, HttpServletResponse> map_res = new ConcurrentHashMap<>();

    public static void add(HttpServletResponse response, String id) {
        map_res.put(id, response);
    }

    public static void run(String msg, String id) {
        removeErrorResponse();
        HttpServletResponse it = map_res.get(id);
        PrintWriter pw = null;
        try {
            pw = it.getWriter();
            if (pw == null || pw.checkError()) {
                throw new GlobalException("pw null");
            }
            pw.write("data:" + msg + "\n\n");
            pw.flush();
        } catch (Exception e) {
            throw new GlobalException("msg error");
        }
    }
    public synchronized static void removeErrorResponse() {
        Iterator<Map.Entry<String, HttpServletResponse>> iterator = map_res.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, HttpServletResponse> next = iterator.next();
            HttpServletResponse response = next.getValue();
            PrintWriter pw = null;
            try {
                pw = response.getWriter();
                if (pw == null) {
                    iterator.remove();
                } else if (pw.checkError()) {
                    pw.close();
                    iterator.remove();
                }
            } catch (Exception e) {
                iterator.remove();
            }
        }
    }
}

在每次发送消息前会检查一遍是否存在失效的连接以及存在问题的连接。如果存在就删除,反之则那根据id获取对应的输出流输出内容。移除过期的连接则是通过迭代器进行。正常的集合应该是不支持一边遍历一遍删除内容的,会抛出异常。

为了防止输出流混乱,这里维护了一个map,用来存放不同用户的输出流。输出也是同样的根据不同的id来进行匹配。

最基础的内容就这两个就够了,额外的拓展的话,可以结合具体的业务再拓展。

2.3、关于DeferredResult

这个类是spring freamwork提供的一个异步处理类。主要用于实现异步http请求,尤其是长时间运行的操作,或者需要等待外部事件触发的场景。

它允许将请求延迟到另一个线程中完成,而不会阻塞容器的工作线程。当异步操作完成后,通过预设的结果触发响应。

它本身提供了包括设置超时回调,超时时间,检查状态,错误回调等api来操作这个对象。常用的是设置超时时间,毕竟一秒结束还是太短了。上面的范例是3秒结束。

使用他的话还有个需要注意的点,发送的消息必须是以"data:"开头,“\n\n”结尾,不然无法识别。

2.4、关于SseEmitter

SseEmitter和def类似,但是具体的操作上有些许差别。一是消息的发送是通过api不通过响应流。

SseEmitter sseEmitter = new SseEmitter(5000L); //超时时间5秒
sseEmitter.send(SseEmitter.event().data("see")); //发送消息为see

格式方面也不需要遵循特定的格式,event除了data还有name、comment、id、reconnectTime这些属性,data还能枚举类指定数据类型(例如,MediaType.TEXT_EVENT_STREAM)。从代码的精简程度上更推荐这个,retry这个属性不知道有什么用,没试出来,文档也没有写。

3、一些心得

在使用过程中发现并不是像官网说的那样,是一个持久化的连接,直到前端主动close才会关闭。而是存在一个默认超时时间,DeferredResult的默认时间非常短,几乎是一瞬间就关闭了。

在关闭之后还会抛出一个异常,异步任务超时。如果不想看到控制台满屏幕的都是这个异常的信息,可以再加一个全局异常处理来处理这个异常。

    @ExceptionHandler({
            HttpMessageNotWritableException.class,
            AsyncRequestTimeoutException.class
    })
    public void handleMsg(Exception e, HttpServletResponse response) {
        if(e instanceof AsyncRequestTimeoutException){
            log.debug("消息发送异步请求连接超时");
        }else{
            log.debug("http输出流不可用");
        }
    }

下面贴一下前端demo,ai生成的html文件。我给他改了一下,改成连接超时之后自己重新连接,因为后端map根据键去刷新值,每次重连都是最新的值。

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>EventSource 示例</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            max-width: 800px;
            margin: 0 auto;
            padding: 20px;
        }
        h1 {
            color: #333;
        }
        #status {
            margin: 10px 0;
            padding: 10px;
            background-color: #f0f0f0;
            border-radius: 4px;
        }
        #messages {
            border: 1px solid #ddd;
            padding: 10px;
            height: 300px;
            overflow-y: auto;
            margin-bottom: 10px;
            border-radius: 4px;
        }
        .message {
            margin: 5px 0;
            padding: 8px;
            background-color: #e9f7fe;
            border-radius: 4px;
        }
        .error {
            background-color: #ffe9e9;
        }
        button {
            padding: 8px 16px;
            background-color: #4CAF50;
            color: white;
            border: none;
            border-radius: 4px;
            cursor: pointer;
        }
        button:hover {
            background-color: #45a049;
        }
        button:disabled {
            background-color: #cccccc;
            cursor: not-allowed;
        }
    </style>
</head>
<body>
    <h1>EventSource 服务器发送事件示例</h1>
    
    <div id="status">状态: 未连接</div>
    
    <div id="messages"></div>
    
    <button id="connectBtn">连接</button>
    <button id="disconnectBtn" disabled>断开连接</button>
    
    <script>
        const statusEl = document.getElementById('status');
        const messagesEl = document.getElementById('messages');
        const connectBtn = document.getElementById('connectBtn');
        const disconnectBtn = document.getElementById('disconnectBtn');
        
        let eventSource = null;
        
        function connect() {
            // 创建EventSource对象,连接到后台接口
            eventSource = new EventSource('http://127.0.0.1:8080/event/getMsg/2');
            
            // 连接打开时的处理
            eventSource.onopen = function() {
                statusEl.textContent = '状态: 已连接';
                //addMessage('连接已建立');
                connectBtn.disabled = true;
                disconnectBtn.disabled = false;
            };
            
            // 接收消息的处理
            eventSource.onmessage = function(event) {
                addMessage(`收到消息: ${event.data}`);
            };
            
            // 错误处理
            eventSource.onerror = function(error) {
                //statusEl.textContent = '状态: 连接错误';
                //addMessage('连接错误或服务器关闭连接', true);
                disconnect();
				connect();
            };
            
            // 可以监听自定义事件
            eventSource.addEventListener('custom-event', function(event) {
                addMessage(`收到自定义事件: ${event.data}`);
            });
        }
        
        function disconnect() {
            if (eventSource) {
                eventSource.close();
                eventSource = null;
                statusEl.textContent = '状态: 已断开连接';
                //addMessage('连接已关闭');
                connectBtn.disabled = false;
                disconnectBtn.disabled = true;
            }
        }
        
        function addMessage(message, isError = false) {
            const messageEl = document.createElement('div');
            messageEl.className = isError ? 'message error' : 'message';
            messageEl.textContent = message;
            messagesEl.appendChild(messageEl);
            // 滚动到底部
            messagesEl.scrollTop = messagesEl.scrollHeight;
        }
        
        // 绑定按钮事件
        connectBtn.addEventListener('click', connect);
        disconnectBtn.addEventListener('click', disconnect);
    </script>
</body>
</html>

预览图

设置的三秒超时,每一条收到消息都是一个新建链接。


评论