百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Spring WebFlux基于反应式WebSocket的应用

zhezhongyun 2025-01-14 19:06 102 浏览

环境:Springboot2.4.13


WebSocket介绍

WebSocket协议RFC 6455提供了一种标准化的方式,通过一个TCP连接在客户端和服务器之间建立全双工、双向的通信通道。它是一个不同于HTTP的TCP协议,但设计为在HTTP之上工作,使用80和443端口,并允许重用现有的防火墙规则。

WebSocket交互开始于一个HTTP请求,使用HTTP Upgrade Header进行升级,在本例中是切换到WebSocket协议。下面的例子展示了这种交互:

GET /spring-websocket-portfolio/portfolio HTTP/1.1

Host: localhost:8080

Upgrade: websocket // The Upgrade header.
Connection: Upgrade // Using the Upgrade connection.

Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==

Sec-WebSocket-Protocol: v10.stomp, v11.stomp

Sec-WebSocket-Version: 13 Origin: http://localhost:8080

支持WebSocket的服务器会返回类似下面的输出,而不是通常的200状态码:

HTTP/1.1 101 Switching Protocols

Upgrade: websocket

Connection: Upgrade

Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=

Sec-WebSocket-Protocol: v10.stomp

握手成功后,HTTP upgrade请求的TCP套接字保持打开,客户端和服务器可以继续发送和接收消息。

对WebSockets工作原理的完整介绍超出了本文档的范围。请参阅RFC 6455、HTML5中有关WebSocket的章节,或者网上的任何介绍和教程。

注意,如果WebSocket服务器运行在web服务器(例如nginx)后面,你可能需要配置它来将 WebSocket升级请求传递给WebSocket服务器。

自定义HandlerMapping

自定义HandlerMapping是为了在项目中能够自动的失败0到N的不同请求的WebSocket连接

public class WebSocketHandlerMapping extends SimpleUrlHandlerMapping {
  
  @Override
  public void initApplicationContext() throws BeansException {
    Map<String, WebSocketHandler> handlers = new HashMap<>();
    ApplicationContext context = getApplicationContext() ;
    Map<String, WebSocketHandler> beans = context.getBeansOfType(WebSocketHandler.class) ;
    for (WebSocketHandler handler : beans.values()) {
      WebSocketMapping webSocketMapping = AnnotatedElementUtils.findMergedAnnotation(handler.getClass(), WebSocketMapping.class) ;
      if (webSocketMapping != null) {
        String value = webSocketMapping.value() ;
        if (StringUtils.hasLength(value)) {
          handlers.put(value, handler) ;
        }
      }
    }
    if (handlers.size() > 0) {
      this.setUrlMap(handlers) ;
      super.initApplicationContext();
    }
  }

  @Override
  public int getOrder() {
    return Ordered.HIGHEST_PRECEDENCE ;
  }
  
}

在这HandlerMapping中使用了自定义的Mapping注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface WebSocketMapping {
  /**请求路径*/
  String value() default "" ;
}

通过上面的HandlerMapping处理能够识别出当前环境下所有带有@WebSocketMapping注解的Bean,然后进行注册到当前的URL集合中。

@Component
@WebSocketMapping("/chat2/{name}")
public class ChatWebSocketHandler2 implements WebSocketHandler {
  private static final Logger logger = LoggerFactory.getLogger(ChatWebSocketHandler2.class) ;
  public static final Map<String, WebSocketWrapper> sessions = new ConcurrentHashMap<>() ;

  @Override
  public Mono<Void> handle(WebSocketSession session) {
    System.out.println(session) ;
    URI uri = session.getHandshakeInfo().getUri() ;
    String path = uri.getPath() ;
    String username = path.split("/")[2] ;
    logger.info("Client id: {} Connected, Request URI: {}", session.getId(), uri) ;
    HttpHeaders headers = session.getHandshakeInfo().getHeaders() ;
    logger.info("Request Headers: {}", headers) ;
    Mono<Void> receive = session.receive()
        .doOnNext(message -> {
          // 这里如果header中没有to,那么返回null,所以要做好判断,不然默认异常是不会被抛出的
          // 导致连接即关闭,只有加了下面的onErrorMap才能看到异常信息
          List<String> tos = headers.get("to") ;
          if (tos !=null && !tos.isEmpty()) {
            String to = tos.get(0) ;
            WebSocketWrapper wsw = sessions.get(to) ;
            if (wsw != null) {
              String msg = message.getPayloadAsText() ;
              logger.info("给 {} 发送消息: {}", tos, msg) ;
              wsw.send(msg) ;
            }
          } else {
            logger.info("Chat 接收到消息: {}", message.getPayloadAsText());
          }
        }).onErrorMap(ex -> {
          ex.printStackTrace();
          return ex ;
        }).then() ;
    Mono<Void> sender = session.send(Flux.create(sink -> sessions.put(username, new WebSocketWrapper(session, sink)))) ;
    return Mono.zip(receive, sender).doFinally(signalType -> {
            logger.info("Client id: {}, 断开连接. 信号: {}", session.getId(), signalType.name());
            sessions.remove(username) ;
            session.close() ;
          }).then() ;
  }
}

WebSocketWrapper

public class WebSocketWrapper {
  private WebSocketSession session ;
  private FluxSink<WebSocketMessage> sink ; 
  public void send(String payload) {
    this.sink.next(session.textMessage(payload)) ;
  }
}

测试:

点对点消息

完毕!!!

长期创作关注不迷路!!!

Spring WebFlux使用函数式编程之HandlerFunction(1)
Spring WebFlux使用函数式编程之RouterFunction(2)
Spring WebFlux中使用WebClient远程接口调用
一文带你彻底理解Spring WebFlux的工作原理
Spring WebFlux请求处理流程
SpringBoot WebFlux整合MongoDB实现CRUD及分页功能
Spring WebFlux核心处理组件DispatcherHandler
SpringBoot WebFlux整合R2DBC实现数据库反应式编程
Spring WebFlux使用函数式编程之Filtering Handler Functions
Spring WebFlux入门实例并整合数据库实现基本的增删改查

相关推荐

perl基础——循环控制_principle循环

在编程中,我们往往需要进行不同情况的判断,选择,重复操作。这些时候我们需要对简单语句来添加循环控制变量或者命令。if/unless我们需要在满足特定条件下再执行的语句,可以通过if/unle...

CHAPTER 2 The Antechamber of M de Treville 第二章 特雷维尔先生的前厅

CHAPTER1TheThreePresentsofD'ArtagnantheElderCHAPTER2TheAntechamber...

CHAPTER 5 The King&#39;S Musketeers and the Cardinal&#39;S Guards 第五章 国王的火枪手和红衣主教的卫士

CHAPTER3TheAudienceCHAPTER5TheKing'SMusketeersandtheCardinal'SGuard...

CHAPTER 3 The Audience 第三章 接见

CHAPTER3TheAudienceCHAPTER3TheAudience第三章接见M.DeTrévillewasatt...

别搞印象流!数据说明谁才是外线防守第一人!

来源:Reddit译者:@assholeeric编辑:伯伦WhoarethebestperimeterdefendersintheNBA?Here'sagraphofStea...

V-Day commemorations prove anti-China claims hollow

People'sLiberationArmyhonorguardstakepartinthemilitaryparademarkingthe80thanniversary...

EasyPoi使用_easypoi api

EasyPoi的主要特点:1.设计精巧,使用简单2.接口丰富,扩展简单3.默认值多,writelessdomore4.springmvc支持,web导出可以简单明了使用1.easypoi...

关于Oracle数据库12c 新特性总结_oracle数据库12514

概述今天主要简单介绍一下Oracle12c的一些新特性,仅供参考。参考:http://docs.oracle.com/database/121/NEWFT/chapter12102.htm#NEWFT...

【开发者成长】JAVA 线上故障排查完整套路!

线上故障主要会包括CPU、磁盘、内存以及网络问题,而大多数故障可能会包含不止一个层面的问题,所以进行排查时候尽量四个方面依次排查一遍。同时例如jstack、jmap等工具也是不囿于一个方面的问题...

使用 Python 向多个地址发送电子邮件

在本文中,我们将演示如何使用Python编程语言向使用不同电子邮件地址的不同收件人发送电子邮件。具体来说,我们将向许多不同的人发送电子邮件。使用Python向多个地址发送电子邮件Python...

提高工作效率的--Linux常用命令,能够决解95%以上的问题

点击上方关注,第一时间接受干货转发,点赞,收藏,不如一次关注评论区第一条注意查看回复:Linux命令获取linux常用命令大全pdf+Linux命令行大全pdf为什么要学习Linux命令?1、因为Li...

linux常用系统命令_linux操作系统常用命令

系统信息arch显示机器的处理器架构dmidecode-q显示硬件系统部件-(SMBIOS/DMI)hdparm-i/dev/hda罗列一个磁盘的架构特性hdparm-tT/dev/s...

小白入门必知必会-PostgreSQL-15.2源码编译安装

一PostgreSQL编译安装1.1下载源码包在PostgreSQL官方主页https://www.postgresql.org/ftp/source/下载区选择所需格式的源码包下载。cd/we...

Linux操作系统之常用命令_linux系统常用命令详解

Linux操作系统一、常用命令1.系统(1)系统信息arch显示机器的处理器架构uname-m显示机器的处理器架构uname-r显示正在使用的内核版本dmidecode-q显示硬件系...

linux网络命名空间简介_linux 网络相关命令

此篇会以例子的方式介绍下linux网络命名空间。此例中会创建两个networknamespace:nsa、nsb,一个网桥bridge0,nsa、nsb中添加网络设备veth,网络设备间...