是的 本文介绍java如何作为客户端 发起websocket请求
博主不做标题党 不会服务端客户端分不清就写个标题 乱写文章
为什么会使用java作为websocket客户端? 虽说websocket协议 本意是web与服务端之间的通讯协议,那假设有一天 我们的供应商 或者是甲方大爷 只提供了websocket接口呢?
如果直接让前端去对接,再把数据传给后端,那从前端对接到后端入库的步骤,万一出现数据丢失呢? 总之把数据处理放在后端,是相对可靠的, 我们可以借助netty来实现websocket客户端功能
长链接有两点值得注意的,一是心跳机制 二是重连机制
如果不发送心跳包,可能过会儿连接就断开了;
重新机制就比较好理解了,不管是服务端还是客户端的断开,作为客户端都需要能够重连
博主对长链接并不是特别熟练,但是什么代码是能正式用的,什么代码上不了生产只是个写着玩的demo 还是一眼能分辨出来的,代码主要参考 git@github.com:yimiancheng/netty-study.git
,写的很不错,代码优化空间很少了, 线程池博主是推荐用new ThreadPoolExecutor()创建,避免OOM的问题,除此之外应该是准生产环境级别代码了。
maven依赖
io.netty netty-all 4.1.35.Final
websocket客户端handle类 主要处理接收的消息、事件等,
事件触发会进入userEventTriggered方法
建立连接会进入channelActive方法
接收消息会进入channelRead0方法
出现异常会进入exceptionCaught方法
断开连接会进入channelInactive方法(本文中未重写该方法, 在channelInactive方法中重连也是没问题的)
import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.SocketClient;
import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.websocket.SendMsg;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** WebSocketClientFrameHandler** @date 2019/8/20 16:42*/
public class WebSocketClientFrameHandler extends SimpleChannelInboundHandler {private static final Logger LOG = LoggerFactory.getLogger(WebSocketClientFrameHandler.class);private SocketClient socketClient;private ChannelPromise channelPromise;@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {LOG.info("客户端接收到事件 " + (evt.getClass()) + " | " + evt.toString());if(WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE.equals(evt)) {LOG.info(ctx.channel().id().asShortText() + " 握手完成!");socketClient.CHANNEL_IS_READY.set(true);channelPromise.setSuccess();// SendMsg.send(ctx.channel(),"客户端握手完成消息 -》服务器时间: " + System.currentTimeMillis());}else if(evt instanceof IdleStateEvent){//ctx.channel().writeAndFlush(new PingWebSocketFrame());IdleStateEvent evtIdle = (IdleStateEvent) evt;switch(evtIdle.state()) {case WRITER_IDLE:// SendMsg.send(ctx.channel(),"客户端 ping 消息 -》服务器时间: " + System.currentTimeMillis());ctx.channel().writeAndFlush(new PingWebSocketFrame());case READER_IDLE:// SendMsg.send(ctx.channel(),"客户端 ping 消息 -》服务器时间: " + System.currentTimeMillis());ctx.channel().writeAndFlush(new PingWebSocketFrame());default:break;}}super.userEventTriggered(ctx, evt);}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();LOG.info("打开连接 handlerAdded SUCCESS. | name = " +channel.id().asShortText());super.handlerAdded(ctx);channelPromise = ctx.newPromise();}/*** Channel 已经被注册到了EventLoop*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();LOG.info("注册成功 channelRegistered SUCCESS. | name = " +channel.id().asShortText());super.channelRegistered(ctx);}/*** Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();LOG.info("活动状态 channelActive SUCCESS. | name = " +channel.id().asShortText());super.channelActive(ctx);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throwsException {if(webSocketFrame instanceof TextWebSocketFrame) {// String message = textWebSocketFrame.content().toString(Charset.forName("utf-8"));String message = ((TextWebSocketFrame) webSocketFrame).text();LOG.info("客户端接收到消息:" + message);}else {LOG.info("接收到消息类型" + (webSocketFrame.getClass().getName()));}SendMsg.sendPong(channelHandlerContext.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {LOG.error("消息处理失败: " + cause.getMessage(), cause);ctx.close();}/*** Channel 已经被创建,但还未注册到EventLoop* 连接断开*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();LOG.info("连接断开 channelUnregistered SUCCESS. | name = " +channel.id().asShortText());super.channelUnregistered(ctx);channelPromise = null;}public SocketClient getSocketClient() {return socketClient;}public void setSocketClient(SocketClient socketClient) {this.socketClient = socketClient;}public ChannelPromise getChannelPromise() {return channelPromise;}
}
心跳任务类
import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.SocketClient;
import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.websocket.SendMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.TimerTask;/*** HeartBeatTimerTask** @date 2019/9/2 16:24*/
public class HeartBeatTimerTask extends TimerTask {private static final Logger LOG = LoggerFactory.getLogger(ReconnectTimerTask.class);private SocketClient socketClient;public HeartBeatTimerTask(SocketClient socketClient) {this.socketClient = socketClient;}@Overridepublic void run() {if(socketClient != null && socketClient.isValid()) {SendMsg.send(socketClient.getChannel(), "客户端心跳消息 => " + System.currentTimeMillis());}}
}
重连任务类
import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.SocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.TimerTask;/*** ReconnectTimerTask** @date 2019/9/2 16:03*/
public class ReconnectTimerTask extends TimerTask {private static final Logger LOG = LoggerFactory.getLogger(ReconnectTimerTask.class);private SocketClient socketClient;public ReconnectTimerTask(SocketClient socketClient) {this.socketClient = socketClient;}@Overridepublic void run() {if(socketClient != null && !socketClient.isValid()) {LOG.info("=== 客户端重连 " + System.currentTimeMillis());socketClient.connect();}}
}
自定义线程工厂类,主要是给线程重命名 便于维护、调试
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;public class NamedThreadFactory implements ThreadFactory {static final AtomicInteger poolNumber = new AtomicInteger(1);final AtomicInteger threadNumber = new AtomicInteger(1);final ThreadGroup group;final String prefix;final boolean isDaemon;final int priority;public NamedThreadFactory() {this("pool");}public NamedThreadFactory(String prefix) {this(prefix, false, Thread.NORM_PRIORITY);}public NamedThreadFactory(String prefix, boolean isDaemon, int priority) {SecurityManager sm = System.getSecurityManager();this.group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();this.prefix = prefix + "-" + poolNumber.getAndIncrement() + "-thread-";this.isDaemon = isDaemon;this.priority = priority;}public Thread newThread(Runnable r) {Thread thread = new Thread(group, r, prefix + threadNumber.getAndIncrement(), 0);thread.setDaemon(isDaemon);thread.setPriority(priority);return thread;}
}
消息监听类,主要用于发送消息(包括心跳包)是否成功
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;class CustomerChannelFutureListener implements ChannelFutureListener {private static final Logger LOG = LoggerFactory.getLogger(CustomerChannelFutureListener.class);public void operationComplete(ChannelFuture channelFuture) throws Exception {// LOG.info(JSON.toJSONString(channelFuture));if(channelFuture.isDone() && channelFuture.isSuccess()){// LOG.info("send success.");}else {channelFuture.channel().close();LOG.info("send error. cause = " + channelFuture.cause());channelFuture.cause().printStackTrace();}}}
简单封装的消息发送类
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** SendMsg** @date 2019/8/30 18:02*/
public class SendMsg {private static final Logger LOG = LoggerFactory.getLogger(SendMsg.class);public static ConcurrentHashMap ALL_CHANNEL = new ConcurrentHashMap();public static void startSendMsg() {Thread thread = new Thread(new Runnable() {public void run() {while(true) {sendMsgTest();}}});thread.start();}public static void sendMsgTest() {try {Map map = Collections.unmodifiableMap(ALL_CHANNEL);LOG.info("map size = " + map.size());if(MapUtils.isEmpty(map)) {Thread.sleep(2000);return;}for(Map.Entry entry : map.entrySet()) {LOG.info("------------- key = " + entry.getKey());send(entry.getValue(), "服务端发送消息 " + entry.getKey() + " | " + System.currentTimeMillis());}Thread.sleep(10000);}catch(Exception ex) {ex.printStackTrace();}}public static void put(Channel channel) {ALL_CHANNEL.put(channel.id().asShortText(), channel);}public static void remove(Channel channel) {ALL_CHANNEL.remove(channel.id().asShortText());}public static void send(Channel channel, Object msg) {final String textMsg = JSON.toJSONString(msg);if(channel != null && channel.isActive()) {TextWebSocketFrame frame = new TextWebSocketFrame(textMsg);channel.writeAndFlush(frame).addListener(new CustomerChannelFutureListener());}else {LOG.error("消息发送失败! textMsg = " + textMsg);}}public static void sendPing(Channel channel) {if(channel != null && channel.isActive()) {channel.writeAndFlush(new PingWebSocketFrame()).addListener(new CustomerChannelFutureListener());}else {LOG.error("消息发送失败! ping");}}public static void sendPong(Channel channel) {if(channel != null && channel.isActive()) {channel.writeAndFlush(new PongWebSocketFrame()).addListener(new CustomerChannelFutureListener());}else {LOG.error("消息发送失败! pong");}}}
channel初始化
import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.handler.WebSocketClientFrameHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.timeout.IdleStateHandler;/*** ClientChannelInitializer** @date 2019/8/31 16:05*/
public class ClientChannelInitializer extends ChannelInitializer {private WebSocketClientHandshaker webSocketClientHandshaker;private WebSocketClientFrameHandler webSocketFrameHandler;public ClientChannelInitializer(WebSocketClientHandshaker webSocketClientHandshaker, WebSocketClientFrameHandlerwebSocketFrameHandler) {this.webSocketClientHandshaker = webSocketClientHandshaker;this.webSocketFrameHandler = webSocketFrameHandler;}@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new HttpClientCodec());//Http协议编码解码器pipeline.addLast(new HttpObjectAggregator(65536));//聚合 HttpRequestpipeline.addLast(new IdleStateHandler(5, 10, 0));//会处理ping pong close消息pipeline.addLast(new WebSocketClientProtocolHandler(webSocketClientHandshaker,true));pipeline.addLast(webSocketFrameHandler);}
}
客户端主类 : 添加工作组 , 启动websocket client , 建立连接等
import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.handler.WebSocketClientFrameHandler;
import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.task.HeartBeatTimerTask;
import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.task.ReconnectTimerTask;
import com.qiuhuanhen.springroot.interfaces.websocket.websocketclient.thread.NamedThreadFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;/*** SocketClient** @date 2019/8/20 17:53*/
public class SocketClient {private static final Logger LOG = LoggerFactory.getLogger(SocketClient.class);public static String CLIENT_VERSION = "client_version";private static final int DEFAULT_PORT = 80;/*** 长链重连间隔时间,单位s*/public static long RECONNECT_INTERVAL = 10;/*** 长链心跳时间,单位s*/public static long FETCH_PERIOD = 30;public static String host = "127.0.0.1";public static int port = 8585;private Channel channel = null;private NioEventLoopGroup nioEventLoopGroup;public AtomicBoolean CHANNEL_IS_READY = new AtomicBoolean(false);private ScheduledExecutorService RECONNECT_TIMER;private ScheduledExecutorService HEARTBEAT_TIMER;static final String URL = System.getProperty("url", "ws://127.0.0.1:8585/boot/imserver/1111");URI uri = new URI(URL);static {// RECONNECT_TIMER = Executors.newSingleThreadScheduledExecutor();}public SocketClient(URI uri) throws URISyntaxException {this.uri = uri;}private void start() {Bootstrap boot = new Bootstrap();nioEventLoopGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());try {HttpHeaders httpHeaders = new DefaultHttpHeaders();httpHeaders.add(CLIENT_VERSION, 1);WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri,WebSocketVersion.V13, null, false, httpHeaders);boot.group(nioEventLoopGroup).option(ChannelOption.TCP_NODELAY, true).channel(NioSocketChannel.class);WebSocketClientFrameHandler webSocketFrameHandler = new WebSocketClientFrameHandler();webSocketFrameHandler.setSocketClient(this);ClientChannelInitializer clientChannelInitializer =new ClientChannelInitializer(webSocketClientHandshaker, webSocketFrameHandler);boot.handler(new LoggingHandler(LogLevel.INFO));boot.handler(clientChannelInitializer);port = (uri.getPort() == -1) ? DEFAULT_PORT : uri.getPort();host = uri.getHost();channel = boot.connect(host, port).sync().channel();LOG.info("SocketClient has started. CHANNEL_IS_READY = " + CHANNEL_IS_READY.get());webSocketFrameHandler.getChannelPromise().sync();LOG.info("SocketClient has started full. CHANNEL_IS_READY = " + CHANNEL_IS_READY.get());}catch(Exception ex) {ex.printStackTrace();LOG.error("connect error. uri " + uri.toString());}finally {// nioEventLoopGroup.shutdownGracefully();}}/*** 客户端连接服务端*/public void connect() {stop();start();startReconnect();doHeartBeat();}/*** 开启线程-断开重连*/public void startReconnect() {RECONNECT_TIMER = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("reconnect-schedule-pool", Boolean.TRUE, Thread.NORM_PRIORITY));// https://www.jianshu.com/p/502f9952c09bRECONNECT_TIMER.scheduleAtFixedRate(new ReconnectTimerTask(this),RECONNECT_INTERVAL * 1000L, RECONNECT_INTERVAL * 1000L, TimeUnit.MILLISECONDS);}/*** 心跳*/private void doHeartBeat() {HEARTBEAT_TIMER = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("heartbeat-schedule-pool", Boolean.TRUE, Thread.NORM_PRIORITY));// https://www.jianshu.com/p/502f9952c09bHEARTBEAT_TIMER.scheduleAtFixedRate(new HeartBeatTimerTask(this),FETCH_PERIOD * 1000L, FETCH_PERIOD * 1000L, TimeUnit.MILLISECONDS);}/*** 客户端停止*/public void stop() {try {if(nioEventLoopGroup != null) {nioEventLoopGroup.shutdownGracefully();}if(channel != null) {channel.close();}if(RECONNECT_TIMER != null) {RECONNECT_TIMER.shutdown();RECONNECT_TIMER = null;}if(HEARTBEAT_TIMER != null) {HEARTBEAT_TIMER.shutdown();HEARTBEAT_TIMER = null;}}catch(Exception ex) {//do nothing.}}public boolean isValid() {if (channel != null && channel.isActive()) {return true;} else {return false;}}public Channel getChannel() {return channel;}public static void main(String[] args) {try {new SocketClient(new URI(URL)).connect();} catch (URISyntaxException e) {e.printStackTrace();}}
}
重连成功截图:
上一篇:单片机——IIC协议与24C02
下一篇:四月份云南旅游攻略