百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Spring WebFlux基于反应式WebSocket的应用

zhezhongyun 2025-01-14 19:06 74 浏览

环境:Springboot2.4.13


WebSocket介绍

WebSocket协议RFC 6455提供了一种标准化的方式,通过一个TCP连接在客户端和服务器之间建立全双工、双向的通信通道。它是一个不同于HTTP的TCP协议,但设计为在HTTP之上工作,使用80和443端口,并允许重用现有的防火墙规则。

WebSocket交互开始于一个HTTP请求,使用HTTP Upgrade Header进行升级,在本例中是切换到WebSocket协议。下面的例子展示了这种交互:

GET /spring-websocket-portfolio/portfolio HTTP/1.1

Host: localhost:8080

Upgrade: websocket // The Upgrade header.
Connection: Upgrade // Using the Upgrade connection.

Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==

Sec-WebSocket-Protocol: v10.stomp, v11.stomp

Sec-WebSocket-Version: 13 Origin: http://localhost:8080

支持WebSocket的服务器会返回类似下面的输出,而不是通常的200状态码:

HTTP/1.1 101 Switching Protocols

Upgrade: websocket

Connection: Upgrade

Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=

Sec-WebSocket-Protocol: v10.stomp

握手成功后,HTTP upgrade请求的TCP套接字保持打开,客户端和服务器可以继续发送和接收消息。

对WebSockets工作原理的完整介绍超出了本文档的范围。请参阅RFC 6455、HTML5中有关WebSocket的章节,或者网上的任何介绍和教程。

注意,如果WebSocket服务器运行在web服务器(例如nginx)后面,你可能需要配置它来将 WebSocket升级请求传递给WebSocket服务器。

自定义HandlerMapping

自定义HandlerMapping是为了在项目中能够自动的失败0到N的不同请求的WebSocket连接

public class WebSocketHandlerMapping extends SimpleUrlHandlerMapping {
  
  @Override
  public void initApplicationContext() throws BeansException {
    Map<String, WebSocketHandler> handlers = new HashMap<>();
    ApplicationContext context = getApplicationContext() ;
    Map<String, WebSocketHandler> beans = context.getBeansOfType(WebSocketHandler.class) ;
    for (WebSocketHandler handler : beans.values()) {
      WebSocketMapping webSocketMapping = AnnotatedElementUtils.findMergedAnnotation(handler.getClass(), WebSocketMapping.class) ;
      if (webSocketMapping != null) {
        String value = webSocketMapping.value() ;
        if (StringUtils.hasLength(value)) {
          handlers.put(value, handler) ;
        }
      }
    }
    if (handlers.size() > 0) {
      this.setUrlMap(handlers) ;
      super.initApplicationContext();
    }
  }

  @Override
  public int getOrder() {
    return Ordered.HIGHEST_PRECEDENCE ;
  }
  
}

在这HandlerMapping中使用了自定义的Mapping注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface WebSocketMapping {
  /**请求路径*/
  String value() default "" ;
}

通过上面的HandlerMapping处理能够识别出当前环境下所有带有@WebSocketMapping注解的Bean,然后进行注册到当前的URL集合中。

@Component
@WebSocketMapping("/chat2/{name}")
public class ChatWebSocketHandler2 implements WebSocketHandler {
  private static final Logger logger = LoggerFactory.getLogger(ChatWebSocketHandler2.class) ;
  public static final Map<String, WebSocketWrapper> sessions = new ConcurrentHashMap<>() ;

  @Override
  public Mono<Void> handle(WebSocketSession session) {
    System.out.println(session) ;
    URI uri = session.getHandshakeInfo().getUri() ;
    String path = uri.getPath() ;
    String username = path.split("/")[2] ;
    logger.info("Client id: {} Connected, Request URI: {}", session.getId(), uri) ;
    HttpHeaders headers = session.getHandshakeInfo().getHeaders() ;
    logger.info("Request Headers: {}", headers) ;
    Mono<Void> receive = session.receive()
        .doOnNext(message -> {
          // 这里如果header中没有to,那么返回null,所以要做好判断,不然默认异常是不会被抛出的
          // 导致连接即关闭,只有加了下面的onErrorMap才能看到异常信息
          List<String> tos = headers.get("to") ;
          if (tos !=null && !tos.isEmpty()) {
            String to = tos.get(0) ;
            WebSocketWrapper wsw = sessions.get(to) ;
            if (wsw != null) {
              String msg = message.getPayloadAsText() ;
              logger.info("给 {} 发送消息: {}", tos, msg) ;
              wsw.send(msg) ;
            }
          } else {
            logger.info("Chat 接收到消息: {}", message.getPayloadAsText());
          }
        }).onErrorMap(ex -> {
          ex.printStackTrace();
          return ex ;
        }).then() ;
    Mono<Void> sender = session.send(Flux.create(sink -> sessions.put(username, new WebSocketWrapper(session, sink)))) ;
    return Mono.zip(receive, sender).doFinally(signalType -> {
            logger.info("Client id: {}, 断开连接. 信号: {}", session.getId(), signalType.name());
            sessions.remove(username) ;
            session.close() ;
          }).then() ;
  }
}

WebSocketWrapper

public class WebSocketWrapper {
  private WebSocketSession session ;
  private FluxSink<WebSocketMessage> sink ; 
  public void send(String payload) {
    this.sink.next(session.textMessage(payload)) ;
  }
}

测试:

点对点消息

完毕!!!

长期创作关注不迷路!!!

Spring WebFlux使用函数式编程之HandlerFunction(1)
Spring WebFlux使用函数式编程之RouterFunction(2)
Spring WebFlux中使用WebClient远程接口调用
一文带你彻底理解Spring WebFlux的工作原理
Spring WebFlux请求处理流程
SpringBoot WebFlux整合MongoDB实现CRUD及分页功能
Spring WebFlux核心处理组件DispatcherHandler
SpringBoot WebFlux整合R2DBC实现数据库反应式编程
Spring WebFlux使用函数式编程之Filtering Handler Functions
Spring WebFlux入门实例并整合数据库实现基本的增删改查

相关推荐

JavaScript做个贪吃蛇小游戏(过关-加速),无需网络直接玩。

JavaScript做个贪吃蛇小游戏(过关-则加速)在浏览器打开文件,无需网络直接玩。<!DOCTYPEhtml><htmllang="en"><...

大模型部署加速方法简单总结(大模型 ai)

以下对大模型部署、压缩、加速的方法做一个简单总结,为后续需要备查。llama.cppGithub:https://github.com/ggerganov/llama.cppLLaMA.cpp项...

安徽医大第一医院应用VitaFlow Liberty(R)Flex为患者焕然一“心”

近日,在安徽医科大学第一附属医院心血管内科负责人暨北京安贞医院安徽医院业务副院长喻荣辉教授的鼎力支持和卓越带领下,凭借着先进的VitaFlowLiberty(R)Flex经导管主动脉瓣可回收可...

300 多行代码搞定微信 8.0 的「炸」「裂」特效!

微信8.0更新的一大特色就是支持动画表情,如果发送的消息只有一个内置的表情图标,这个表情会有一段简单的动画,一些特殊的表情还有全屏特效,例如烟花表情有全屏放烟花的特效,炸弹表情有爆炸动画并且消息和...

让div填充屏幕剩余高度的方法(div填充20px)

技术背景在前端开发中,经常会遇到需要让某个div元素填充屏幕剩余高度的需求,比如创建具有固定头部和底部,中间内容区域自适应填充剩余空间的布局。随着CSS技术的发展,有多种方法可以实现这一需求。实现步骤...

css之div内容居中(css中div怎么居中)

div中的内容居中显示,包括水平和垂直2个方向。<html><head><styletype="text/css">...

使用uniapp开发小程序遇到的一些问题及解决方法

1、swiper组件自定义知识点swiper组件的指示点默认是圆圈,想要自己设置指示点,需要获得当前索引,然后赋给当前索引不同的样式,然后在做个动画就可以了。*关键点用change方法,然后通过e.d...

微信小程序主页面排版(怎样设置小程序的排版)

开发小程序的话首先要了解里面的每个文件的作用小程序没有DOM对象,一切基于组件化小程序的四个重要的文件*.js*.wxml--->view结构---->html*.wxss--...

Vue动态组件的实践与原理探究(vue动态组件component原理)

我司有一个工作台搭建产品,允许通过拖拽小部件的方式来搭建一个工作台页面,平台内置了一些常用小部件,另外也允许自行开发小部件上传使用,本文会从实践的角度来介绍其实现原理。ps.本文项目使用VueCLI...

【HarmonyOS Next之旅】兼容JS的类Web开发(四) -> tabs

目录1->创建Tabs2->设置Tabs方向3->设置样式4->显示页签索引5->场景示例编辑1->创建Tabs在pages/index目录...

CSS:前端必会的flex布局,我把布局代码全部展示出来了

进入我的主页,查看更多CSS的分享!首先呢,先去看文档,了解flex是什么,这里不做赘述。当然,可以看下面的代码示例,辅助你理解。一、row将子元素在水平方向进行布局:1.垂直方向靠顶部,水平方向靠...

【HarmonyOS Next之旅】兼容JS的类Web开发(四) -> swiper

目录1->创建Swiper组件2->添加属性3->设置样式4->绑定事件5->场景示例编辑1->创建Swiper组件在pages/index...

CSS:Flex布局,网页排版神器!(css3 flex布局)

还在为网页排版抓狂?别担心,CSS的flex布局来了,让你轻松玩转各种页面布局,实现网页设计自由!什么是Flex布局?Flex布局,也称为弹性布局,是CSS中的一种强大布局方式,它能够让你...

移动WEB开发之flex布局,附携程网首页案例制作

一、flex布局体验传统布局兼容性好布局繁琐局限性,不能再移动端很好的布局1.1flex弹性布局:操作方便,布局极为简单,移动端应用很广泛PC端浏览器支持情况较差IE11或更低版本,不支持或仅部...

2024最新升级–前端内功修炼 5大主流布局系统进阶(mk分享)

2024最新升级–前端内功修炼5大主流布局系统进阶(mk分享)获课》789it.top/14658/前端布局是网页设计中至关重要的一环,它决定了网页的结构和元素的排列方式。随着前端技术的不断发展,现...