百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

B 客户端流RPC (clientstream Client Stream)

zhezhongyun 2025-04-29 06:53 6 浏览

客户端编写一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端写完消息,它就等待服务器读取消息并返回响应

gRPC再次保证了单个RPC调用中的消息排序

在客户端流 RPC 模式中,客户端会发送多个请求给服务器端,而不再是单个请求。服务器端则会发送一个响应给客户端。

但是,服务器端不一定要等到从客户端接收到所有消息后才发送响应。

基于这样的逻辑,我们可以在接收到流中的一条消息或几条消息之后就发送响应,也可以在读取完流中的所有消息之后再发送响应

假设希望在订单管理服务中添加新的 updateOrders 方法,从而更新一个订单集合,

如图所示。在这里,我们想以消息流的形式发送订单列表到服务器端,服务器端会处理这个流并发送一条带有已更新订单状态的消息给客户端。


// 服务端 推送流

rpc GetStream (StreamReqData) returns (stream StreamResData){}

目录结构

在proto文件夹新建clientstream.proto文件

syntax = "proto3";

option go_package= "./;clientstream";

package clientstream;

service Greeter {
  // 客户端 推送流
  rpc PutStream (stream StreamReqData) returns (StreamResData){}
}

// request
message StreamReqData {
   string data = 1;
}

// response
message StreamResData {
   string data = 1;
}

编译clientstream.proto

在上级目录新建clientstream文件夹 然后编译

protoc.exe --go_out=../clientstream --go-grpc_out=../clientstream clientstream.proto

会 在clientstream文件夹下生成: clientstream_grpc.pb.go clientstream.pb.go



实现GRPC服务端

clientstream文件夹下创建clientstreamservice.go 在这个文件中我们要实现GreeterServer接口的如下方法

这代表GRPC服务对象

// GreeterServer is the server API for Greeter service.
// All implementations must embed UnimplementedGreeterServer
// for forward compatibility
type GreeterServer interface {
    // 客户端 推送流
    PutStream(Greeter_PutStreamServer) error
    mustEmbedUnimplementedGreeterServer()
}

第二个方法不重要可以什么都不写

主要是实现第一个PutStream方法 : 参数是个Greeter_PutStreamServer接口 它有两个公开方法 以及它也继承了 grpc.ServerStream的接口

这里注意

RegisterGreeterServer(注册服务)的工厂方法 以及 var Greeter_ServiceDesc = grpc.ServiceDesc的服务结构

这两个就是把服务相关的结构关联了起来

func _Greeter_PutStream_Handler(srv interface{}, stream grpc.ServerStream) error {
    return srv.(GreeterServer).PutStream(&greeterPutStreamServer{stream})
}

type Greeter_PutStreamServer interface {
    SendAndClose(*StreamResData) error
    Recv() (*StreamReqData, error)
    grpc.ServerStream
}

type greeterPutStreamServer struct {
    grpc.ServerStream
}

func (x *greeterPutStreamServer) SendAndClose(m *StreamResData) error {
    return x.ServerStream.SendMsg(m)
}

func (x *greeterPutStreamServer) Recv() (*StreamReqData, error) {
    m := new(StreamReqData)
    if err := x.ServerStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

func RegisterGreeterServer(s grpc.ServiceRegistrar, srv GreeterServer) {
    s.RegisterService(&Greeter_ServiceDesc, srv)
}
// Greeter_ServiceDesc is the grpc.ServiceDesc for Greeter service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Greeter_ServiceDesc = grpc.ServiceDesc{
    ServiceName: "clientstream.Greeter",
    HandlerType: (*GreeterServer)(nil),
    Methods:     []grpc.MethodDesc{},
    Streams: []grpc.StreamDesc{
        {
            StreamName:    "PutStream",
            Handler:       _Greeter_PutStream_Handler,
            ClientStreams: true,
        },
    },
    Metadata: "clientstream.proto",
}


clientstreamservice.go内容如下

package clientstream

import "log"

type ClientstreamServer struct {
}

func (s *ClientstreamServer) mustEmbedUnimplementedGreeterServer() {}

// 客户端 单向流-- 服务器端接收客户端发送过来的流数据
func (s *ClientstreamServer) PutStream(cliStr Greeter_PutStreamServer) error {
    for {
        if tem, err := cliStr.Recv(); err == nil {
            log.Println(tem)
        } else {
            log.Println("break, err :", err)
            break
        }

    }
    return nil
}


在braceapi/grpc/test下创建clientstream文件夹

在braceapi/grpc/test/clientstream下面创建clientstreamService.go文件 用于启动服务端

clientstreamService.go内容如下

//go:build ignore

package main

import (
    "net"

    "braceapi.hgbaoxian.cn/braceapi/grpc/grpcserver/clientstream"
    "google.golang.org/grpc"
)

func main() {

    //创建一个grpc服务器
    s := grpc.NewServer()

    //注册GRPC服务
    clientstream.RegisterGreeterServer(s, &clientstream.ClientstreamServer{})

    //监听
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        return
    }
    //启动服务
    s.Serve(lis)
}

实现GRPC客户端

重要的接口或结构如下

type GreeterClient interface {
    // 客户端 推送流
    PutStream(ctx context.Context, opts ...grpc.CallOption) (Greeter_PutStreamClient, error)
}

type greeterClient struct {
    cc grpc.ClientConnInterface
}

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
    return &greeterClient{cc}
}

func (c *greeterClient) PutStream(ctx context.Context, opts ...grpc.CallOption) (Greeter_PutStreamClient, error) {
    stream, err := c.cc.NewStream(ctx, &Greeter_ServiceDesc.Streams[0], Greeter_PutStream_FullMethodName, opts...)
    if err != nil {
        return nil, err
    }
    x := &greeterPutStreamClient{stream}
    return x, nil
}

type Greeter_PutStreamClient interface {
    Send(*StreamReqData) error
    CloseAndRecv() (*StreamResData, error)
    grpc.ClientStream
}

type greeterPutStreamClient struct {
    grpc.ClientStream
}

func (x *greeterPutStreamClient) Send(m *StreamReqData) error {
    return x.ClientStream.SendMsg(m)
}

func (x *greeterPutStreamClient) CloseAndRecv() (*StreamResData, error) {
    if err := x.ClientStream.CloseSend(); err != nil {
        return nil, err
    }
    m := new(StreamResData)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}


因为greeterClient这个结构已经实现了GreeterClient的接口

而这个greeterClient结构是未公开的 所以为了方便,这里有个NewGreeterClient工厂函数返回了这个结构的引用

所以客户端主要工作是:

通过NewGreeterClient方法实例化一个GRPC客户端对象greeterClient 然后调用greeterClient结构的PutStream方法

返回一个grpc.ClientStream客户端数据流对象 然后调用Send方法 向服务端推送数据

把Send方法放入for死循环中 可以不断地读取向服务端发送流数据

在braceapi/grpc/test/clientstream下面创建clientstreamClient.go文件 用于启动服务端


clientstreamClient.go内容如下

//go:build ignore

package main

import (
    "context"
    "time"

    "braceapi.hgbaoxian.cn/braceapi/grpc/grpcserver/clientstream"
    "google.golang.org/grpc"
)

func main() {
    // 通过grpc库,建立一个连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        return
    }
    defer conn.Close()

    //通过建立的连接生成一个client对象
    c := clientstream.NewGreeterClient(conn)

    //客户端持续推送数据流到服务端
    putRes, _ := c.PutStream(context.Background())
    i := 1
    for {
        i++
        putRes.Send(&clientstream.StreamReqData{Data: "Psych"})
        time.Sleep(time.Second)
        if i > 10 {
            break
        }
    }
}

可以看出 客户端流和服务端流相反 , 客户端发送流数据 服务端接收流数据

服务端流则是 服务端发送流数据 客户端接收流数据

启动测试:首先启动服务端 然后启动客户端


相关推荐

JPA实体类注解,看这篇就全会了

基本注解@Entity标注于实体类声明语句之前,指出该Java类为实体类,将映射到指定的数据库表。name(可选):实体名称。缺省为实体类的非限定名称。该名称用于引用查询中的实体。不与@Tab...

Dify教程02 - Dify+Deepseek零代码赋能,普通人也能开发AI应用

开始今天的教程之前,先解决昨天遇到的一个问题,docker安装Dify的时候有个报错,进入Dify面板的时候会出现“InternalServerError”的提示,log日志报错:S3_USE_A...

用离散标记重塑人体姿态:VQ-VAE实现关键点组合关系编码

在人体姿态估计领域,传统方法通常将关键点作为基本处理单元,这些关键点在人体骨架结构上代表关节位置(如肘部、膝盖和头部)的空间坐标。现有模型对这些关键点的预测主要采用两种范式:直接通过坐标回归或间接通过...

B 客户端流RPC (clientstream Client Stream)

客户端编写一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端写完消息,它就等待服务器读取消息并返回响应gRPC再次保证了单个RPC调用中的消息排序在客户端流RPC模式中,客户端会发送多个请...

我的模型我做主02——训练自己的大模型:简易入门指南

模型训练往往需要较高的配置,为了满足友友们的好奇心,这里我们不要内存,不要gpu,用最简单的方式,让大家感受一下什么是模型训练。基于你的硬件配置,我们可以设计一个完全在CPU上运行的简易模型训练方案。...

开源项目MessageNest打造个性化消息推送平台多种通知方式

今天介绍一个开源项目,MessageNest-可以打造个性化消息推送平台,整合邮件、钉钉、企业微信等多种通知方式。定制你的消息,让通知方式更灵活多样。开源地址:https://github.c...

使用投机规则API加快页面加载速度

当今的网络用户要求快速导航,从一个页面移动到另一个页面时应尽量减少延迟。投机规则应用程序接口(SpeculationRulesAPI)的出现改变了网络应用程序接口(WebAPI)领域的游戏规则。...

JSONP安全攻防技术

关于JSONPJSONP全称是JSONwithPadding,是基于JSON格式的为解决跨域请求资源而产生的解决方案。它的基本原理是利用HTML的元素标签,远程调用JSON文件来实现数据传递。如果...

大数据Doris(六):编译 Doris遇到的问题

编译Doris遇到的问题一、js_generator.cc:(.text+0xfc3c):undefinedreferenceto`well_known_types_js’查找Doris...

网页内嵌PDF获取的办法

最近女王大人为了通过某认证考试,交了2000RMB,官方居然没有给线下教材资料,直接给的是在线教材,教材是PDF的但是是内嵌在网页内,可惜却没有给具体的PDF地址,无法下载,看到女王大人一点点的截图保...

印度女孩被邻居家客人性骚扰,父亲上门警告,反被围殴致死

微信的规则进行了调整希望大家看完故事多点“在看”,喜欢的话也点个分享和赞这样事儿君的推送才能继续出现在你的订阅列表里才能继续跟大家分享每个开怀大笑或拍案惊奇的好故事啦~话说只要稍微关注新闻的人,应该...

下周重要财经数据日程一览 (1229-0103)

下周焦点全球制造业PMI美国消费者信心指数美国首申失业救济人数值得注意的是,下周一希腊还将举行第三轮总统选举需要谷歌日历同步及部分智能手机(安卓,iPhone)同步日历功能的朋友请点击此链接,数据公布...

PyTorch 深度学习实战(38):注意力机制全面解析

在上一篇文章中,我们探讨了分布式训练实战。本文将深入解析注意力机制的完整发展历程,从最初的Seq2Seq模型到革命性的Transformer架构。我们将使用PyTorch实现2个关键阶段的注意力机制变...

聊聊Spring AI的EmbeddingModel

序本文主要研究一下SpringAI的EmbeddingModelEmbeddingModelspring-ai-core/src/main/java/org/springframework/ai/e...

前端分享-少年了解过iframe么

iframe就像是HTML的「内嵌画布」,允许在页面中加载独立网页,如同在画布上叠加另一幅动态画卷。核心特性包括:独立上下文:每个iframe都拥有独立的DOM/CSS/JS环境(类似浏...