java netty and html 实现点对点 即时通讯 点对群及时通讯 实例代码 和jvm设置

网友投稿 271 2022-11-15

java netty and html 实现点对点 即时通讯 点对群及时通讯 实例代码 和jvm设置

目录

​​点对点​​

​​点对点反向​​

​​群发​​

​​maven 引入依赖​​

​​java netty 主启动类 代码​​

​​java netty 初始化类 代码​​

​​java netty 通道类 代码​​

​​java netty 通讯上下文管理类​​

​​html 网页代码​​

​​实例代码和jar包​​

​​附上一个netty 启动jar  jvm ​​

​​jvm常见配置汇总介绍​​

点对点

点对点反向

群发

maven 引入依赖

io.netty netty-all 4.1.32.Final

java netty 主启动类 代码

package com.superman.testnetty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * 程序的入口,负责启动应用 *Timely communication */public class TCNettyServer { private int port; public TCNettyServer(int port) { this.port = port; } public void run() { // 启用两个Reactor线程池【netty是基于NIO的,基于线程处理的】 // 用于接收Client端连接的 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 进行网络通信读写 EventLoopGroup workGroup = new NioEventLoopGroup(); try { // 创建一个辅助类Bootstrap,就是对我们的Server进行一系列的配置 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup); // 指定使用NioServerSocketChannel这种类型的通道 b.channel(NioServerSocketChannel.class); /* * 使用 childHandler 去初始化服务器 添加handler,用来监听已经连接的客户端的Channel的动作和状态。 * * 被绑定的MyWebSocketChannelHandler()里面设置了服务端初始化参数以及 */ b.childHandler(new TCWebSocketChannelHandler()); System.out.println("netty服务端开启等待客户端连接...."); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 优雅的退出程序 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { new TCNettyServer(8888).run(); }}

springboot 类启动的时候调用 new TCNettyServer(8888).run();

java netty 初始化类 代码

package com.superman.testnetty;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.io.netty.handler.codec.io.netty.handler.stream.ChunkedWriteHandler;import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/** * 初始化连接时候的各个组件 * Timely communication * */public class TCWebSocketChannelHandler extends ChannelInitializer { @Override protected void initChannel(SocketChannel e) throws Exception { e.pipeline().addLast("new HttpServerCodec()); e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); e.pipeline().addLast("new ChunkedWriteHandler()); // 添加具体的处理器。可以addLast(或者addFirst)多个handler, // 第一个参数是名字,无具体要求,如果填写null,系统会自动命名。 e.pipeline().addLast("handler", new TCWebSocketHandler()); /**通过使用管道的ChannelPipeline方式来处理请求 * 第一个配置的管道先处理,然后移交给下一个管道来处理,在每个管道处理中 * 各个handler可以决定是否继续或中断 * ChannelPipeline和ChannelHandler机制类似于Servlet和Filter过滤器{@link ChannelPipeline} * Netty中的事件分为inbound事件和outbound事件。 * inbound事件通常由I/O线程触发,例如TCP链路建立事件、链路关闭事件、读事件、异常通知事件等。方法名以file开始{@link ChannelHandlerContext} * outbound事件类似于发送、刷新、断开连接、绑定本地地址等关闭channel */ }}

java netty 通道类 代码

package com.superman.testnetty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.handler.codec.io.netty.handler.codec.io.netty.handler.codec.io.netty.handler.codec.io.netty.handler.codec.io.netty.handler.codec.io.netty.handler.codec.io.netty.handler.codec.io.netty.handler.codec.io.netty.handler.codec.io.netty.handler.codec.io.netty.handler.codec.io.netty.util.CharsetUtil;import java.io.IOException;import java.util.HashMap;import java.util.List;import java.util.Map;/** * 接收/处理/响应客户端websocket请求的核心业务处理类 * 通过添加hanlder,我们可以监听Channel的各种动作以及状态的改变,包括连接,绑定,接收消息等。 * * Timely communication */public class TCWebSocketHandler extends SimpleChannelInboundHandler { // 用于服务器端web套接字打开和关闭握手 private WebSocketServerHandshaker handshaker; private static final String WEB_SOCKET_URL = "/websocket"; //客户端与服务端创建连接的时候调用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { TCChannelManage.group.add(ctx.channel()); System.out.println("客户端与服务端连接开启,客户端remoteAddress:" + ctx.channel().remoteAddress()); } //客户端与服务端断开连接的时候调用 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { TCChannelManage.group.remove(ctx.channel()); System.out.println("客户端与服务端连接关闭..."); } //服务端接收客户端发送过来的数据结束之后调用 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } //工程出现异常的时候调用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } //服务端处理客户端websocket请求的核心方法 protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception { /* 传统的HTTP接入(采用 * 第一次握手请求消息由HTTP协议承载,所以它是一个HTTP消息, * 握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。 * 执行handleHttpRequest方法来处理WebSocket握手请求。 */ // FullHttpRequest是完整的 HTTP请求,协议头和Form数据是在一起的,不用分开读 if (msg instanceof FullHttpRequest) { handHttpRequest(context, (FullHttpRequest) msg); } /** * WebSocket接入(采用socket处理方式) * 提交请求消息给服务端, * WebSocketServerHandler接收到的是已经解码后的WebSocketFrame消息。 */ else if (msg instanceof WebSocketFrame) { handWebsocketFrame(context, (WebSocketFrame) msg); } /** * Websocket的数据传输是frame形式传输的,比如会将一条消息分为几个frame,按照先后顺序传输出去。这样做会有几个好处: * * 1)大数据的传输可以分片传输,不用考虑到数据大小导致的长度标志位不足够的情况。 * * 2)和 */ } /** * 处理客户端与服务端之前的websocket业务 * * @param ctx * @param frame */ private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { //判断是否是关闭websocket的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); } //判断是否是ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } if (frame instanceof TextWebSocketFrame) { // 返回应答消息 String requestMsg = ((TextWebSocketFrame) frame).text(); System.out.println("收到客户端" + ctx.channel().remoteAddress() + "的消息==》" + requestMsg); String[] array = requestMsg.split(","); // 先判断通道管理器中是否存在该通道,没有则添加进去 if (!TCChannelManage.hasChannel(array[0])) { TCChannelManage.userIdAndChannelMap.put(array[0], ctx.channel()); } if (array[0].length() != 0 && array[1].length() != 0) { TCChannelManage.send(array[0], array[1], array[2], ctx.channel()); } else if (array[0].length() != 0 && array[1].length() == 0) { //如果没有指定接收者表示群发array.length() = 2 System.out.println("用户" + array[0] + "群发了一条消息:" + array[2]); TCChannelManage.group.writeAndFlush(new TextWebSocketFrame("用户" + array[0] + "群发了一条消息:" + array[2])); } else { //如果没有指定发送者与接收者表示向服务端发送array.length() = 1 System.out.println("服务端接收用户" + ctx.channel().remoteAddress() + "消息,不再发送出去"); ctx.writeAndFlush(new TextWebSocketFrame("你向服务端发送了消息==》" + array[2])); } } } /** * 处理客户端向服务端发起 * * @param ctx * @param req */ private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { System.out.println("处理+ req.getMethod() + ",+ req.getUri()); Map parmMap = new HashMap<>(); try { parmMap = parse(req); } catch (IOException e) { e.printStackTrace(); } // 如果不是WebSocket握手请求消息,那么就返回 HTTP 400 BAD REQUEST 响应给客户端。 if (!req.getDecoderResult().isSuccess() || !("websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } //如果是握手请求,那么就进行握手 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( WEB_SOCKET_URL, null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); } else { // 通过它构造握手响应消息返回给客户端, // 同时将WebSocket相关的编码和解码类动态添加到ChannelPipeline中,用于WebSocket消息的编解码, // 添加WebSocketEncoder和WebSocketDecoder之后,服务端就可以自动对WebSocket消息进行编解码了 handshaker.handshake(ctx.channel(), req); } } /** * 服务端向客户端响应消息 * * @param ctx * @param req * @param res */ private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回应答给客户端 if (res.getStatus().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } // 如果是非Keep-Alive,关闭连接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (res.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } /** * 解析GET、POST请求参数 * @return 包含所有请求参数的键值对, 如果没有参数, 则返回空Map * * @throws IOException */ public Map parse(FullHttpRequest fullReq) throws IOException { HttpMethod method = fullReq.getMethod(); Map parmMap = new HashMap<>(); if (HttpMethod.GET == method) { // 是GET请求 QueryStringDecoder decoder = new QueryStringDecoder(fullReq.getUri()); decoder.parameters().entrySet().forEach( entry -> { // entry.getValue()是一个List, 只取第一个元素 parmMap.put(entry.getKey(), entry.getValue().get(0)); }); } else if (HttpMethod.POST == method) { HttpPostRequestDecoder decoder = new HttpPostRequestDecoder( new DefaultHttpDataFactory(false), fullReq); List postData = decoder.getBodyHttpDatas(); for(InterfaceHttpData data:postData){ if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { MemoryAttribute attribute = (MemoryAttribute) data; parmMap.put(attribute.getName(), attribute.getValue()); } } } else { // 不支持其它方法 System.out.println("不支持其他方法提交的参数"); } return parmMap; } @Override protected void channelRead0(ChannelHandlerContext arg0, Object arg1) throws Exception { messageReceived(arg0,arg1); }}

这个类主要处理通讯时候寻找对应的通道

java netty 通讯上下文管理类

package com.superman.testnetty;import io.netty.channel.Channel;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.handler.codec.io.netty.util.concurrent.GlobalEventExecutor;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.locks.ReentrantReadWriteLock;/** * 存储整个工程的全局配置 * Timely communication * */public class TCChannelManage { /** * 存储每一个客户端接入进来时的channel对象 */ public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 读锁 private static ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); public static ConcurrentMap userIdAndChannelMap = new ConcurrentHashMap<>(); public static void send(String senderId, String receiverId, String message, Channel senderChannel) { // 发送肯定是A要给B发,A就是发消息的对象,B可以是人,机器等对象 try { rwLock.readLock().lock(); // 1.寻找receiverId的channel Channel receiverChannel = userIdAndChannelMap.get(receiverId); if (receiverChannel == null) { // 使用发送者的通道告知发送者,你要发的那个人不在线 senderChannel.writeAndFlush(new TextWebSocketFrame(receiverId + "不在线")); return; } // 2.发送。A给B发,B若要收到消息,其实是通过B的channel给B发消息 receiverChannel.writeAndFlush(new TextWebSocketFrame(senderId + "发来消息===》" + message)); } catch (Exception e) { e.printStackTrace(); } finally { rwLock.readLock().unlock(); } } public static boolean hasChannel(String id) { Channel channel = userIdAndChannelMap.get(id); if (channel == null) { return false; } else { return true; } }}

通道存储和添加删除啥的用在这即可

html 网页代码

WebSocket客户端

及时唠嗑








客户端接收到服务端返回的应答消息

实例代码和jar包

​​javascript:void(0)​​

附上一个netty 启动jar  jvm

java -jar -server -Xms4G -Xmx4G -XX:NewSize=3584m -XX:PermSize=64m -XX:SurvivorRatio=1 -XX:+UseParallelGC -XX:-UseAdaptiveSizePolicy

java -jar -server -Xms4G:初始堆大小,设置JVM促使内存为4G。此值可以设置与-Xmx相同,以避免每次垃圾回收完成后JVM重新分配内存。 -Xmx4G  :最大堆大小设置JVM最大可用内存为4G。 -XX:NewSize=3584m :设置年轻代大小-XX:PermSize=64m :设置持久代大小为-XX:SurvivorRatio=1  :年轻代中Eden区与两个Survivor区的比值。注意Survivor区有两个。如:3,表示Eden:Survivor=3:2,一个Survivor区占整个年轻代的1/5-XX:+UseParallelGC  :设置并行收集器-XX:+UseAdaptiveSizePolicy :设置此选项后,并行收集器会自动选择年轻代区大小和相应的Survivor区比例,以达到目标系统规定的最低相应时间或者收集频率等,此值建议使用并行收集器时,一直打开。

jvm常见配置汇总介绍

堆设置

-Xms:初始堆大小-Xmx:最大堆大小-XX:NewSize=n:设置年轻代大小-XX:NewRatio=n:设置年轻代和年老代的比值。如:为3,表示年轻代与年老代比值为1:3,年轻代占整个年轻代年老代和的1/4-XX:SurvivorRatio=n:年轻代中Eden区与两个Survivor区的比值。注意Survivor区有两个。如:3,表示Eden:Survivor=3:2,一个Survivor区占整个年轻代的1/5-XX:MaxPermSize=n:设置持久代大小

收集器设置

-XX:+UseSerialGC:设置串行收集器-XX:+UseParallelGC:设置并行收集器-XX:+UseParalledlOldGC:设置并行年老代收集器-XX:+UseConcMarkSweepGC:设置并发收集器

垃圾回收统计信息

-XX:+PrintGC-XX:+PrintGCDetails-XX:+PrintGCTimeStamps-Xloggc:filename

并行收集器设置

-XX:ParallelGCThreads=n:设置并行收集器收集时使用的CPU数。并行收集线程数。-XX:MaxGCPauseMillis=n:设置并行收集最大暂停时间-XX:GCTimeRatio=n:设置垃圾回收时间占程序运行时间的百分比。公式为1/(1+n)

并发收集器设置

-XX:+CMSIncrementalMode:设置为增量模式。适用于单CPU情况。-XX:ParallelGCThreads=n:设置并发收集器年轻代收集方式为并行收集时,使用的CPU数。并行收集线程数。

轻松处理器支持1w往上

ok

持续更新

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:结合Mybatis聊聊对SQL注入的见解
下一篇:如何抓住QQ小游戏买量红利:休闲与内购小游戏买量优化方法分享
相关文章

 发表评论

暂时没有评论,来抢沙发吧~