聊一聊 gRPC 的四种通信模式
admin
2024-05-24 02:28:29
0

温馨提示:本文需要结合上一篇 gRPC 文章一起食用,否则可能看不懂。

前面一篇文章松哥和大家聊了 gRPC 的基本用法,今天我们再来稍微深入一点点,来看下 gRPC 中四种不同的通信模式。

gRPC 中四种不同的通信模式分别是:

  1. 一元 RPC
  2. 服务端流 RPC
  3. 客户端流 RPC
  4. 双向流 RPC

接下来松哥就通过四个完整的案例,来分别和向伙伴们演示这四种不同的通信模式。

1. 准备工作

关于 gRPC 的基础知识我们就不啰嗦了,咱们直接来看我今天的 proto 文件,如下:

这次我新建了一个名为 book.proto 的文件,这里主要定义了一些图书相关的方法,如下:

syntax = "proto3";option java_multiple_files = true;
option java_package = "org.javaboy.grpc.demo";
option java_outer_classname = "BookServiceProto";
import "google/protobuf/wrappers.proto";package book;service BookService {rpc addBook(Book) returns (google.protobuf.StringValue);rpc getBook(google.protobuf.StringValue) returns (Book);rpc searchBooks(google.protobuf.StringValue) returns (stream Book);rpc updateBooks(stream Book) returns (google.protobuf.StringValue);rpc processBooks(stream google.protobuf.StringValue) returns (stream BookSet);
}message Book {string id = 1;repeated string tags = 2;string name = 3;float price = 4;string author = 5;
}message BookSet {string id = 1;repeated Book bookList = 3;
}

这个文件中,有一些内容我们在上篇文章中都讲过了,讲过的我就不再重复了,我说一些上篇文章没有涉及到的东西:

  1. 由于我们在这个文件中,引用了 Google 提供的 StringValue(google.protobuf.StringValue),所以这个文件上面我们首先用 import 导入相关的文件,导入之后,才可以使用。
  2. 在方法参数和返回值中出现的 stream,就表示这个方法的参数或者返回值是流的形式(其实就是数据可以多次传输)。
  3. message 中出现了一个上篇文章没有的关键字 repeated,这个表示这个字段可以重复,可以简单理解为这就是我们 Java 中的数组。

好了,和上篇文章相比,本文主要就是这几个地方不一样。

proto 文件写好之后,按照上篇文章介绍的方法进行编译,生成对应的代码,这里就不再重复了。

2. 一元 RPC

一元 RPC 是一种比较简单的 RPC 模式,其实说白了我们上篇文章和大家介绍的就是一种一元 RPC,也就是客户端发起一个请求,服务端给出一个响应,然后请求结束。

上面我们定义的五个方法中,addBook 和 getBook 都算是一种一元 RPC。

2.1 addBook

先来看 addBook 方法,这个方法的逻辑很简单,我们提前在服务端准备一个 Map 用来保存 Book,addBook 调用的时候,就把 book 对象存入到 Map 中,并且将 book 的 ID 返回,大家就这样一件事,来看看服务端的代码:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map bookMap = new HashMap<>();public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();bookMap.put("1", b1);bookMap.put("2", b2);bookMap.put("3", b3);}@Overridepublic void addBook(Book request, StreamObserver responseObserver) {bookMap.put(request.getId(), request);responseObserver.onNext(StringValue.newBuilder().setValue(request.getId()).build());responseObserver.onCompleted();}
}

看过上篇文章的小伙伴,我觉得这段代码应该很好理解。

客户端调用方式如下:

public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);addBook(stub);}private static void addBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);stub.addBook(Book.newBuilder().setPrice(99).setId("100").setName("java").setAuthor("javaboy").build(), new StreamObserver() {@Overridepublic void onNext(StringValue stringValue) {System.out.println("stringValue.getValue() = " + stringValue.getValue());}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {countDownLatch.countDown();System.out.println("添加完毕");}});countDownLatch.await();}
}

这里我使用了 CountDownLatch 来实现线程等待,等服务端给出响应之后,客户端再结束。这里在回调的 onNext 方法中,我们就可以拿到服务端的返回值。

2.2 getBook

getBook 跟上面的 addBook 类似,先来看服务端代码,如下:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map bookMap = new HashMap<>();public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();bookMap.put("1", b1);bookMap.put("2", b2);bookMap.put("3", b3);}@Overridepublic void getBook(StringValue request, StreamObserver responseObserver) {String id = request.getValue();Book book = bookMap.get(id);if (book != null) {responseObserver.onNext(book);responseObserver.onCompleted();} else {responseObserver.onCompleted();}}
}

这个 getBook 就是根据客户端传来的 id,从 Map 中查询到一个 Book 并返回。

客户端调用代码如下:

public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);getBook(stub);}private static void getBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);stub.getBook(StringValue.newBuilder().setValue("2").build(), new StreamObserver() {@Overridepublic void onNext(Book book) {System.out.println("book = " + book);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {countDownLatch.countDown();System.out.println("查询完毕");}});countDownLatch.await();}
}

小伙伴们大概也能看出来,addBook 和 getBook 基本上操作套路是一模一样的。

3. 服务端流 RPC

前面的一元 RPC,客户端发起一个请求,服务端给出一个响应,请求就结束了。服务端流则是客户端发起一个请求,服务端给一个响应序列,这个响应序列组成一个流。

上面我们给出的 searchBook 就是这样一个例子,searchBook 是传递图书的 tags 参数,然后在服务端查询哪些书的 tags 满足条件,将满足条件的书全部都返回去。

我们来看下服务端的代码:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map bookMap = new HashMap<>();public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();bookMap.put("1", b1);bookMap.put("2", b2);bookMap.put("3", b3);}@Overridepublic void searchBooks(StringValue request, StreamObserver responseObserver) {Set keySet = bookMap.keySet();String tags = request.getValue();for (String key : keySet) {Book book = bookMap.get(key);int tagsCount = book.getTagsCount();for (int i = 0; i < tagsCount; i++) {String t = book.getTags(i);if (t.equals(tags)) {responseObserver.onNext(book);break;}}}responseObserver.onCompleted();}
}

小伙伴们看下,这段 Java 代码应该很好理解:

  1. 首先从 request 中提取客户端传来的 tags 参数。
  2. 遍历 bookMap,查看每一本书的 tags 是否等于客户端传来的 tags,如果相等,说明添加匹配,则通过 responseObserver.onNext(book); 将这本书写回到客户端。
  3. 等所有操作都完成后,执行 responseObserver.onCompleted();,表示服务端的响应序列结束了,这样客户端也就知道请求结束了。

我们来看看客户端的代码,如下:

public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);searchBook(stub);}private static void searchBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);stub.searchBooks(StringValue.newBuilder().setValue("明清小说").build(), new StreamObserver() {@Overridepublic void onNext(Book book) {System.out.println(book);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {countDownLatch.countDown();System.out.println("查询完毕!");}});countDownLatch.await();}
}

客户端的代码好理解,搜索的关键字是 明清小说,每当服务端返回一次数据的时候,客户端回调的 onNext 方法就会被触发一次,当服务端之行了 responseObserver.onCompleted(); 之后,客户端的 onCompleted 方法也会被触发。

这个就是服务端流,客户端发起一个请求,服务端通过 onNext 可以多次写回数据。

4. 客户端流 RPC

客户端流则是客户端发起多个请求,服务端只给出一个响应。

上面的 updateBooks 就是一个客户端流的案例,客户端想要修改图书,可以发起多个请求修改多本书,服务端则收集多次修改的结果,将之汇总然后一次性返回给客户端。

我们先来看看服务端的代码:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map bookMap = new HashMap<>();public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();bookMap.put("1", b1);bookMap.put("2", b2);bookMap.put("3", b3);}@Overridepublic StreamObserver updateBooks(StreamObserver responseObserver) {StringBuilder sb = new StringBuilder("更新的图书 ID 为:");return new StreamObserver() {@Overridepublic void onNext(Book book) {bookMap.put(book.getId(), book);sb.append(book.getId()).append(",");}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {responseObserver.onNext(StringValue.newBuilder().setValue(sb.toString()).build());responseObserver.onCompleted();}};}
}

客户端每发送一本书来,就会触发服务端的 onNext 方法,然后我们在这方法中进行图书的更新操作,并记录更新结果。最后,我们在 onCompleted 方法中,将更新结果汇总返回给客户端,基本上就是这样一个流程。

我们再来看看客户端的代码:

public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);updateBook(stub);}private static void updateBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);StreamObserver request = stub.updateBooks(new StreamObserver() {@Overridepublic void onNext(StringValue stringValue) {System.out.println("stringValue.getValue() = " + stringValue.getValue());}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {System.out.println("更新完毕");countDownLatch.countDown();}});request.onNext(Book.newBuilder().setId("1").setName("a").setAuthor("b").build());request.onNext(Book.newBuilder().setId("2").setName("c").setAuthor("d").build());request.onCompleted();countDownLatch.await();}
}

在客户端这块,updateBooks 方法会返回一个 StreamObserver 对象,调用该对象的 onNext 方法就是给服务端传递数据了,可以传递多个数据,调用该对象的 onCompleted 方法就是告诉服务端数据传递结束了,此时也会触发服务端的 onCompleted 方法,服务端的 onCompleted 方法执行之后,进而触发了客户端的 onCompleted 方法。

5. 双向流 RPC

双向流其实就是 3、4 小节的合体。即客户端多次发送数据,服务端也多次响应数据。

我们先来看下服务端的代码:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map bookMap = new HashMap<>();private List books = new ArrayList<>();public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();bookMap.put("1", b1);bookMap.put("2", b2);bookMap.put("3", b3);}@Overridepublic StreamObserver processBooks(StreamObserver responseObserver) {return new StreamObserver() {@Overridepublic void onNext(StringValue stringValue) {Book b = Book.newBuilder().setId(stringValue.getValue()).build();books.add(b);if (books.size() == 3) {BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();responseObserver.onNext(bookSet);books.clear();}}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();responseObserver.onNext(bookSet);books.clear();responseObserver.onCompleted();}};}
}

这段代码没有实际意义,单纯为了给小伙伴们演示双向流,我的操作逻辑是客户端传递多个 ID 到服务端,然后服务端根据这些 ID 构建对应的 Book 对象,然后三个三个一组,再返回给客户端。客户端每次发送一个请求,都会触发服务端的 onNext 方法,我们在这个方法中对请求分组返回。最后如果还有剩余的请求,我们在 onCompleted() 方法中返回。

再来看看客户端的代码:

public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);processBook(stub);}private static void processBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);StreamObserver request = stub.processBooks(new StreamObserver() {@Overridepublic void onNext(BookSet bookSet) {System.out.println("bookSet = " + bookSet);System.out.println("=============");}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {System.out.println("处理完毕!");countDownLatch.countDown();}});request.onNext(StringValue.newBuilder().setValue("a").build());request.onNext(StringValue.newBuilder().setValue("b").build());request.onNext(StringValue.newBuilder().setValue("c").build());request.onNext(StringValue.newBuilder().setValue("d").build());request.onCompleted();countDownLatch.await();}
}

这个客户端的代码跟第四小节一模一样,不再赘述了。

好啦,这就是松哥和小伙伴们介绍的 gRPC 的四种不同的通信模式,文章中只给出了一些关键代码,如果小伙伴们没看明白,建议结合上篇文章一起阅读就懂啦~

相关内容

热门资讯

探秘通灵大峡谷(广西)——与大... 通灵大峡谷,位于广西壮族自治区的一个绝美旅游胜地,风景如画,是游客与大自然亲密接触的理想之地。无论是...
探秘霞浦杨家溪:福建最美水乡的... 霞浦,位于福建省东北部,是一个以美丽的海滨景观和丰富的渔业资源而闻名的地方。这里的海岸线悠长,沙滩细...
淮南豆腐竟藏这般故事?美味背后... 淮南豆腐源远流长,内涵深厚,不仅是一道美味的菜肴,还蕴含着当地深厚的地域文化和民俗特色,对于塑造淮南...
清甜鲜香秘诀!手把手教你炖出湖... 木瓜鲫鱼汤:清甜鲜香滋补 木瓜鲫鱼汤是一道经典的养生汤品,汤汁奶白浓郁,木瓜清甜,鲫鱼鲜美,兼具美...
金钟湖落日余晖,湖面金光天花板... 金钟湖位于我国东南部的一个小城郊外,这片湖泊以其秀丽的自然风光和深厚的文化底蕴吸引了无数游客。它距离...
一篇图文读懂天下名茶——六安瓜... 一盏春茶 漫山春绿 年华欢欣 “且将新火试新茶,诗酒趁年华” 39岁的苏轼,置身暮春的烟雨之中,留下...
紫阳茶会:益阳茶馆 一叶春茶系... 紫阳茶会:益阳茶馆 一叶春茶系流年 5月10日益阳市茶业协会茶馆分会举办“一叶春茶系流年”活动,这是...
晚餐就吃这4道“掉秤餐”,低卡... 晚餐是一天中的重要一餐,但对于很多想要控制体重的人来说,如何选择既能满足口腹之欲,又能避免摄入过多热...
南海丹灶仙湖度假区,湖光山色绝... 在南海丹灶的怀抱中,有一颗璀璨的明珠——仙湖度假区。这里湖光山色相映成趣,仿佛一幅天然画卷,等待着每...
特呈岛海上秋千打卡,荡向海平面... 在湛蓝的天空与碧绿的海水之间,特呈岛像一颗镶嵌在南海之上的明珠,散发着迷人的光彩。这里不仅有原生态的...
探秘海螺沟:四川的冰川奇境与自... 位于四川省甘孜藏族自治州的海螺沟,是一片神奇的土地,它汇集了冰川、森林、温泉等自然景观,呈现出一幅震...
汉口江滩游玩攻略:武汉市区最美... 汉口江滩,作为武汉市最具标志性的滨江公园之一,集江滩景观、文化建筑、休闲娱乐于一体,是武汉市民和外地...
束河古镇最全游玩攻略,带你体验... 束河古镇位于云南省丽江市,是纳西族文化的发源地之一。与丽江大研古城的繁华不同,束河古镇更显宁静和原生...
东北自由行攻略:必游景点与特色... 冰雪王国里的极致浪漫,东北的冬天宛如童话世界。哈尔滨冰雪大世界每年12月下旬盛大开园,巨型冰雕在LE...
海丰热门旅游景点推荐 嘿,朋友们!今天咱们来聊聊海丰那些好玩的旅游景点吧!你是不是一到假期就愁没地方去玩?想找一个既有美景...
淮阴十大热门旅游景点推荐一览 淮阴,位于江苏省北部,历史文化底蕴深厚,自然与人文景观交相辉映,是苏北地区不可错过的旅行目的地。以下...
原创 邱... 近日,57岁的邱淑贞和她的两个女儿在西安的旅游照片在网络上引发了热议!大女儿沈月晒出的几张母女三人一...
濮阳风味手抓饼酥脆夹心丰富的特... 濮阳,位于华北平原的中心地带,以其独特的地域文化和美食闻名于世。近日,当地的一种小吃——手抓饼,以其...
农村饭桌上的"刮油黄... 还记得小时候跟着奶奶去田间采野菜的日子吗? 那些被我们踩在脚下的马齿苋,其实是老祖宗留给我们的"刮油...