网络编程Netty的使用
admin
2024-03-07 04:57:41
0

使用之前首先需要Netty的Maven包:

		io.nettynetty-all4.1.84.Final

一、服务端

服务声明类,TcpServer.class

package com.supcon.supfusion.oms.tankinfo.service.closedpath.tcp;import cn.hutool.core.collection.CollectionUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;import java.util.Map;/*** @author ZhaoXu* @date 2022/11/22 16:42*/
@Slf4j
public class TcpServer {private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;private ServerBootstrap server;private ChannelFuture channelFuture;private Integer port;public TcpServer(Integer port) {this.port = port;// nio连接处理池this.bossGroup = new NioEventLoopGroup();// 处理事件池this.workerGroup = new NioEventLoopGroup();server = new ServerBootstrap();server.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 自定义处理类ch.pipeline().addLast(new TcpServerHandler());ch.pipeline().addLast(new NettyDecoder());ch.pipeline().addLast(new NettyEncoder());}});server.option(ChannelOption.SO_BACKLOG, 128);server.childOption(ChannelOption.SO_KEEPALIVE, true);}public synchronized void startListen() {try {// 绑定到指定端口channelFuture = server.bind(port).sync();log.info("netty服务器在[{}]端口启动监听", port);} catch (Exception e) {log.error("netty服务器在[{}]端口启动监听失败", port);e.printStackTrace();}}public void sendMessageToClient(String clientIp, Object msg) {Map channelMap = TcpServerHandler.channelSkipMap.get(port);Channel channel = channelMap.get(clientIp);String sendStr;try {sendStr = OBJECT_MAPPER.writeValueAsString(msg);} catch (JsonProcessingException e) {throw new RuntimeException(e);}try {log.info("向客户端 {} 发送消息内容:{}", clientIp, sendStr);channel.writeAndFlush(sendStr);} catch (Exception var4) {log.error("向客户端 {} 发送消息失败,消息内容:{}", clientIp, sendStr);throw new RuntimeException(var4);}}public void pushMessageToClients(Object msg) {Map channelMap = TcpServerHandler.channelSkipMap.get(port);if (CollectionUtil.isNotEmpty(channelMap)) {channelMap.forEach((k, v) -> sendMessageToClient(k, msg));}}
}

handler接收处理类,继承SimpleChannelInboundHandler

package com.supcon.supfusion.oms.tankinfo.service.closedpath.tcp;import cn.hutool.core.collection.CollectionUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;/*** @author ZhaoXu* @date 2022/11/22 16:43*/
@Slf4j
public class TcpServerHandler extends SimpleChannelInboundHandler {/*** 用跳表存储连接channel*/public static Map> channelSkipMap = new ConcurrentSkipListMap<>();@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("应用程序的监听通道异常!");cause.printStackTrace();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();// 获取每个用户端连接的ipInetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();String clientIp = ipSocket.getAddress().getHostAddress();InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();// 本地端口做键int localPort = localSocket.getPort();Map channelMap = channelSkipMap.get(localPort);if (CollectionUtil.isEmpty(channelMap)) {channelMap = new HashMap<>(4);}channelMap.put(clientIp, channel);channelSkipMap.put(localPort, channelMap);log.info("应用程序添加监听通道,与客户端:{} 建立连接成功!", clientIp);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {// 获取每个用户端连接的ipChannel channel = ctx.channel();InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();int localPort = localSocket.getPort();InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();String clientIp = ipSocket.getAddress().getHostAddress();Map channelMap = channelSkipMap.get(localPort);channelMap.remove(clientIp);log.info("应用程序移除监听通道,与客户端:{} 断开连接!", clientIp);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String result = new String(req, StandardCharsets.UTF_8);log.info("接收到应用数据:{}", result);}
}
 

编码器 NettyEncoder

package com.supcon.supfusion.oms.tankinfo.service.closedpath.tcp;import com.supcon.supfusion.oms.tankinfo.service.closedpath.ClosedPathConstants;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;/*** @author ZhaoXu* @date 2022/11/8 16:23*/
public class NettyEncoder extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {byte[] byteMsg = msg.getBytes(ClosedPathConstants.CHARSET_GBK);int msgLength = byteMsg.length;ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(4 + byteMsg.length);buf.writeInt(msgLength);buf.writeBytes(byteMsg);out.writeBytes(buf);buf.release();}
}

解码器 NettyDecoder

package com.supcon.supfusion.oms.tankinfo.service.closedpath.tcp;import com.supcon.supfusion.oms.tankinfo.service.closedpath.ClosedPathConstants;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;import java.util.List;/*** @author ZhaoXu* @date 2022/11/22 17:03*/
@Slf4j
public class NettyDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {int beginReader = in.readerIndex();int dataLength = in.readInt();if (in.readableBytes() < dataLength) {in.readerIndex(beginReader);} else {byte[] data = new byte[dataLength];in.readBytes(data);String str = new String(data, 0, dataLength, ClosedPathConstants.CHARSET_GBK);out.add(str);}}
}
 

使用的时候:

TcpServer tcpServer = new TcpServer(40004);
tcpServer.startListen();

二、客户端

TcpClient.class

package com.supcon.supfusion.dataforward.tcp;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.supcon.supfusion.dataforward.constants.Constants;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;/*** @author ZhaoXu* @date 2022/11/8 13:39*/
@Slf4j
public class TcpClient {private EventLoopGroup group;private ChannelFuture channelFuture;private final String ip;private final Integer port;private final ObjectMapper objectMapper = new ObjectMapper();public Long lastUseTime = 0L;public TcpClient(String ip, Integer port) {this.ip = ip;this.port = port;}/*** 建立连接**/public synchronized void connectServer() {log.info("开始建立连接,ip:{}, port:{}", ip, port);// 生命nio连接池this.group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();// 配置解码器以及消息处理类b.group(this.group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new TcpClientHandler());pipeline.addLast(new NettyEncoder());pipeline.addLast(new NettyDecoder());}});// 开始连接this.channelFuture = b.connect(ip, port).sync();} catch (Exception var4) {log.error("连接建立失败,ip:{}, port:{}", ip, port);this.group.shutdownGracefully();var4.printStackTrace();}}/*** 关闭连接*/public void close() {this.group.shutdownGracefully();}/*** 发送消息** @param msg*/public synchronized void sendCommonMsg(Object msg) {String sendStr;if (!Constants.CONNECTED_STATUS.equals(getConnectStatus())) {connectServer();}try {sendStr = objectMapper.writeValueAsString(msg);} catch (JsonProcessingException e) {throw new RuntimeException(e);}try {log.info("发送消息内容:{}", sendStr);this.channelFuture.channel().writeAndFlush(sendStr);} catch (Exception var4) {log.error("发送消息失败,消息内容:{}", sendStr);throw new RuntimeException(var4);}}/*** 获取当前连接状态*/public Boolean getConnectStatus() {return group != null && !group.isShutdown() && !group.isShuttingDown();}
}

客户端也需要使用编码器与解码器,将服务端的拷过来就可以,双向通信时也需要继承SimpleChannelInboundHandler处理类处理消息

使用:

TcpClient tcpClient = new TcpClient("192.168.89.138", 40004);tcpClient.connectServer();

三、重要的组件

Channel
Channel 是 Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能,如获取该 Channe l的 EventLoop

ChannelFuture
Netty 为异步非阻塞,即所有的 I/O 操作都为异步的,因此,我们不能立刻得知消息是否已经被处理了。Netty 提供了 ChannelFuture 接口,通过该接口的 addListener() 方法注册一个 ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果

EventLoop
Netty 基于事件驱动模型,使用不同的事件来通知我们状态的改变或者操作状态的改变。它定义了在整个连接的生命周期里当有事件发生的时候处理的核心抽象
Channel 为Netty 网络操作抽象类,EventLoop 主要是为Channel 处理 I/O 操作,两者配合参与 I/O 操作
当一个连接到达时,Netty 就会注册一个 Channel,然后从 EventLoopGroup 中分配一个 EventLoop 绑定到这个Channel上,在该Channel的整个生命周期中都是有这个绑定的 EventLoop 来服务的
ChannelHandler
ChannelHandler 为 Netty 中最核心的组件,它充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
ChannelHandler 有两个核心子类 ChannelInboundHandler 和 ChannelOutboundHandler,其中 ChannelInboundHandler 用于接收、处理入站数据和事件,而 ChannelOutboundHandler 则相反
ChannelPipeline
ChannelPipeline 为 ChannelHandler 链提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API

一个数据或者事件可能会被多个 Handler 处理,在这个过程中,数据或者事件经流 ChannelPipeline,由 ChannelHandler 处理。在这个处理过程中,一个 ChannelHandler 接收数据后处理完成后交给下一个 ChannelHandler,或者什么都不做直接交给下一个 ChannelHandler

相关内容

热门资讯

英语的八大语法结构是什么? 英语的八大语法结构是什么?1.一般现在时:主语+do/doese.gWecleantheroomev...
风趣机智的白羊座们也会做事常常... 风趣机智的白羊座们也会做事常常马虎不正经?机灵,风趣机智的白羊座们也会做事常常虎头虎尾?白羊座的人虽...
蜡烛的诗句 蜡烛的诗句秋夕 杜牧 银烛秋光冷画屏,轻罗小扇扑流萤。天阶夜色凉如水,坐看牵牛织女星.落红不是无情物...
天龙八部里面的虚竹最早是什么派 天龙八部里面的虚竹最早是什么派大概是 蛋黄派最早是少林寺的少林少林少林少林少林少林少林少林少林少...
明代古墓女尸光鲜如活人,究竟是... 明代古墓女尸光鲜如活人,究竟是怎么回事?可能是棺木的密封性很好,致使尸体迅速脱水,隔绝氧气,减缓腐化...
什么是龙娘客 什么是龙娘客一个人骂另一个人说是龙娘客一样的这个事青田话。。。。就是说他像中年妇女,婆婆妈妈
完美国际高手请来看看 完美国际高手请来看看用G时,最好用360保险箱保护起来……
语言 用韩文怎么说 语言 用韩文怎么说语言 用韩文怎么说【中文】:语言【韩文】:말/언어【罗马音标】:Mal /Eon ...
完美世界里修真是什么意思? 完美世界里修真是什么意思?完美世界离得修真是一个修行术语,说白了包括打怪升级、系统任务、交友探险等等...
一个少年的求助! 一个少年的求助!乐观二字最重要,积极对待生命!!!我太同情你和你的妈妈了....我觉得你应该如实告诉...
《临界·爵迹》出完了吗 《临界·爵迹》出完了吗临界爵迹是出完了,接下来是《爵迹·风津道》,都是爵迹系列的,有好几本临界爵迹完...
我似乎能看懂别人的心思 我似乎能看懂别人的心思你好,那说明你能通过一些细节(说话,动作,表情)慎扰链揣摩人意,观察比较敏锐,...
我听见别人咀嚼的声音就特别烦躁 我听见别人咀嚼的声音就特别烦躁是心理有什么问题吗就是咀嚼和吞咽的声音,我一听就巨烦.有什么解决的办法...
如何解落降头 如何解落降头看一些佛教之类的书,能对你有帮助要自己认真做事,不要管别人的评论,没有人向你施了降头。戴...
赛尔号时空密码 从那里来?还有... 赛尔号时空密码 从那里来?还有那个精元是什么?精细!!要打精灵王6-9次系统就会给你的,精元.......
苏的拼音是什么 苏的拼音是什么我问的字是不苏,但我也不知道为什么一写出来就成了苏了,那个字是一个更右边半包了一个生字...
皇马哪一年的阵容中有齐达内,罗... 皇马哪一年的阵容中有齐达内,罗纳尔多,贝克汉姆,劳尔和欧文?差不多是2002年吧!2004-2005...
现在已经拍成了电影的《哈利波特... 现在已经拍成了电影的《哈利波特》一共拍了几集,分别叫什么?电影啊,不是小说!谢谢!是不是有一部《哈利...
关于母爱父爱好句好段 关于母爱父爱好句好段我急需!帮帮忙,大哥哥大姐姐母爱是迷惘时苦口婆心的规劝;母爱是远行时一声殷切的叮...
那位书友给推荐几本好看的玄幻小... 那位书友给推荐几本好看的玄幻小说?最好不是穿越的。不要推荐那种主人物开始就无敌了,所有美女都喜欢他。...