java Nio使用NioSocket客户端与服务端交互实现方式
java Nio是jdk1.4新增的io方式—?nio(new IO),这种方式在目前来说算不算new,更合适的解释应该是non-block IO。
non-block是相对于传统的io方式来讲的。传统的Io方式是阻塞的,我们拿网络io来举例,传统的io模型如下:
服务端主线程负责不断地server.accept(),如果没有客户端请求主线程就会阻塞,当客户端请求时,主线程会通过线程池创建一个新的线程执行。
简单解释就是一个线程负责一个客户端的socket,当客户端因网络等原因传递速度慢的时候,服务端对应的客户端的线程就会等待,很浪费资源。
同时线程过少的话会影响服务的吞吐量,而线程过多的话由于上下文切换等原因会导致效率十分低下,传统的io方式并不适合如今的网络流量。
Nio的模型如下:
nio相比传统的io模型,最大的特点是优化了线程的使用。
nio通过selector可以使用一个线程去管理多个socket句柄,说是管理也不太合适,nio是采用的事件驱动模型,selector负责的是监控各个连接句柄的状态,不是去轮询每个句柄,而是在数据就绪后,将消息通知给selector,而具体的socket句柄管理则是采用多路复用的模型,交由操作系统来完成。
selector充当的是一个消息的监听者,负责监听channel在其注册的事件,这样就可以通过一个线程完成了大量连接的管理,当注册的事件发生后,再调用相应线程进行处理。
这样就不需要为每个连接都使用一个线程去维持长连接,减少了长连接的开销,同时减少了上下文的切换提高了系统的吞吐量。
java Nio的组成java Nio主要由三个核心部分组成:
- Buffer - Channel - Selector
所有的io的Nio都是从一个channel开始的,Channel有点类似于流,但是和流不同的是,channel是可以双向读写的。Channel有几种类型,主要包含文件io操作和网络io:
- FileChannel (文件io) - DatagramChannel (udp数据报) - SocketChannel (tcp客户端) - ServerSocketChannel (tcp服务端)
Buffer是一个中间缓存区,数据可以从channel读取到buffer,也可以从buffer写到channel中,在java中,传统方式与io的交互,需要将数据从堆内存读取到直接内存中,然后交由c语言来调用系统服务完成io的交互。
而使用Buffer可以直接在直接内存中开辟内存区域,减少了io复制的操作,从而提高了io操作的效率。
#基本数据类型的buffer - ByteBuffer - CharBuffer - DoubleBuffer - FloatBuffer - IntBuffer - LongBuffer - ShortBuffer#文件内存映射buffer - MappedByteBuffer#直接内存区buffer - DirectBuffer
Selector允许单个线程处理多个channel,可以将多个channel教给selector管理,并注册相应的事件,而selector则采用事件驱动的方式,当注册的事件就绪后,调用相应的相应的线程处理该时间,不用使用线程去维持长连接,减少了线程的开销。
Selector通过静态工厂的open方法建立,然后通过channel的register注册到Channel上。
注册后通过select方法等待请求,select请求有long类型参数,代表等待时间,如果等待时间内接受到操作请求,则返回可以操作请求的数量,否则超时往下走。
传入参数为零或者无参方法,则会采用阻塞模式知道有相应请求。
收到请求后调用selectedKeys返回SelectionKey的集合。
SelectionKey保存了处理当前请求的Channel和Selector,并且提供了不同的操作类型。
SelectionKey的操作有四种:
- SelectionKey.OP_CONNECT - SelectionKey.OP_ACCEPT - SelectionKey.OP_READ - SelectionKey.OP_WRITE
下面为一个客户端与服务端实用NioSocket交互的简单例子:
//对selectionKey事件的处理/** * description: * * @author wkGui */interface ServerHandlerBs { void handleAccept(SelectionKey selectionKey) throws IOException; String handleRead(SelectionKey selectionKey) throws IOException;}/** * description: * * @author wkGui */public class ServerHandlerImpl implements ServerHandlerBs { private int bufferSize = 1024; private String localCharset = 'UTF-8'; public ServerHandlerImpl() { } public ServerHandlerImpl(int bufferSize) {this(bufferSize, null); } public ServerHandlerImpl(String localCharset) {this(-1, localCharset); } public ServerHandlerImpl(int bufferSize, String localCharset) {this.bufferSize = bufferSize > 0 ? bufferSize : this.bufferSize;this.localCharset = localCharset == null ? this.localCharset : localCharset; } @Override public void handleAccept(SelectionKey selectionKey) throws IOException {//获取channelSocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();//非阻塞socketChannel.configureBlocking(false);//注册selectorsocketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));System.out.println('建立请求......'); } @Override public String handleRead(SelectionKey selectionKey) throws IOException {SocketChannel socketChannel = (SocketChannel) selectionKey.channel();ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();String receivedStr = '';if (socketChannel.read(buffer) == -1) { //没读到内容关闭 socketChannel.shutdownOutput(); socketChannel.shutdownInput(); socketChannel.close(); System.out.println('连接断开......');} else { //将channel改为读取状态 buffer.flip(); //按照编码读取数据 receivedStr = Charset.forName(localCharset).newDecoder().decode(buffer).toString(); buffer.clear(); //返回数据给客户端 buffer = buffer.put(('received string : ' + receivedStr).getBytes(localCharset)); //读取模式 buffer.flip(); socketChannel.write(buffer); //注册selector 继续读取数据 socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));}return receivedStr; }}
//服务端server类/** * description: * * @author wkGui */public class NioSocketServer { private volatile byte flag = 1; public void setFlag(byte flag) {this.flag = flag; } public void start() {//创建serverSocketChannel,监听8888端口try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) { serverSocketChannel.socket().bind(new InetSocketAddress(8888)); //设置为非阻塞模式 serverSocketChannel.configureBlocking(false); //为serverChannel注册selector Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println('服务端开始工作:'); //创建消息处理器 ServerHandlerBs handler = new ServerHandlerImpl(1024); while (flag == 1) {selector.select();System.out.println('开始处理请求 : ');//获取selectionKeys并处理Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); try {//连接请求if (key.isAcceptable()) { handler.handleAccept(key);}//读请求if (key.isReadable()) { System.out.println(handler.handleRead(key));} } catch (IOException e) {e.printStackTrace(); } //处理完后移除当前使用的key keyIterator.remove();}System.out.println('完成请求处理。'); }} catch (IOException e) { e.printStackTrace();} }}//server端启动类/** * description: * * @author wkGui */public class ServerMain { public static void main(String[] args) {NioSocketServer server = new NioSocketServer();new Thread(() -> { try {Thread.sleep(10*60*1000); } catch (InterruptedException e) {e.printStackTrace(); }finally {server.setFlag((byte) 0); }}).start();server.start(); }}
//客户端client类/** * description: * * @author wkGui */public class NioSocketClient { public void start() {try (SocketChannel socketChannel = SocketChannel.open()) { //连接服务端socket SocketAddress socketAddress = new InetSocketAddress('localhost', 8888); socketChannel.connect(socketAddress); int sendCount = 0; ByteBuffer buffer = ByteBuffer.allocate(1024); //这里最好使用selector处理 这里只是为了写的简单 while (sendCount < 10) {buffer.clear();//向服务端发送消息buffer.put(('current time : ' + System.currentTimeMillis()).getBytes());//读取模式buffer.flip();socketChannel.write(buffer);buffer.clear();//从服务端读取消息int readLenth = socketChannel.read(buffer);//读取模式buffer.flip();byte[] bytes = new byte[readLenth];buffer.get(bytes);System.out.println(new String(bytes, 'UTF-8'));buffer.clear();sendCount++;try { Thread.sleep(1000);} catch (InterruptedException e) { e.printStackTrace();} }} catch (IOException e) { e.printStackTrace();} }}//client启动类/** * description: * * @author wkGui */public class ClientMain { public static void main(String[] args) {new NioSocketClient().start(); }}Java NIO 实现 WebSocket 协议WebSocket协议
WebSocket是一种在单个TCP连接上进行全双工通信的协议。 WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
WebSocket协议相比于Http协议来说,最大的特点就是可以实现服务端主动向客户端发送消息。在WebSocket出现之前,如果客户端想实时获取服务端的消息,就需要使用AJAX轮询,查询是否有消息,这样就很消耗服务器资源和带宽。但是用WebSocket就可以实现服务端主动向客户端发送数据,并且只需要占用一个TCP连接,节省了资源和带宽。
WebSocket连接建立过程为了建立一个WebSocket连接,客户端浏览器首先要向服务器发起一个HTTP请求,这个请求和通常的HTTP请求不同,包含了一些附加的头信息,其中附加头信息“Upgrade: WebSocket” 表明这是一个申请协议升级的HTTP请求。服务器端解析这些附加的信息头,然后生成应答消息返回给客户端,客户端和服务端的WebSocket连接就建立了。之后就可以使用WebSocket协议的格式来双向发送消息。
建立连接时发送的HTTP请求头:
返回的HTTP响应头:
在响应头中的 Sec-WebSocket-Accept 时通过Sec-WebSocket-Key构造出来的。首先在Sec-WebSocket-Key后接上一个258EAFA5-E914-47DA-95CA-C5AB0DC85B11,然后再进行SHA1摘要得到160位数据在,在使用BASE64进行编码,最后得到的就是Sec-WebSocket-Accept。
WebSocket数据发送过程WebSocket数据发送的帧格式如下所示:
FIN - 1bit
在数据发送的过程中,可能会分片发送,FIN表示是否为最后一个分片。如果发生了分片,则1表示时最后一个分片;不能再分片的情况下,这个标志总是为1。
RSV1 RSV2 RSV3 - 1bit each
用于扩展,不使用扩展时需要为全0;非零时通信双方必须协商好扩展。这里我们用不上。
OPCODE - 4bits
用于表示所传送数据的类型,也就是payload中的数据。
数值 含义 0x0 附加数据帧 0x1 文本数据帧 0x2 二进制数据帧 0x3-0x7 保留 0x8 关闭连接帧 0x9 ping帧 0xA pong帧 0xB-0xF 保留MASK - 1bit
用于表示payload是否被进行了掩码运算,1表示使用掩码,0表示不使用掩码。从客户端发送向服务端的数据帧必须使用掩码。
Payload length 7 bits,7+16 bits or 7+64 bits
用于表示payload的长度,有以下三种情况:
Payload length 表示的大小 payload的长度 0 - 125 Payload length 大小 126 之后的2个字节表示的无符号整数 127 之后的8个字节表示的无符号整数Masking-key - 0 or 4 bytes
32 bit长的掩码,如果MASK为1,则帧中就存在这一个字段,在解析payload时,需要进行使用32长掩码进行异或操作,之后才能得到正确结果。
Java NIO 实现利用Java NIO 来实现一个聊天室。部分代码如下。
NIO的常规代码:
selector.select(1000);Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectionKeys.iterator();while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isAcceptable()) {handleAccept(key); } if (key.isReadable()) {handleRead(key); }}
接受连接:
public void handleAccept(SelectionKey key) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc; try {sc = ssc.accept();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_READ);System.out.println(String.format('[server] -- client %s connected.', sc.getRemoteAddress().toString())); } catch (IOException e) {System.out.println(String.format('[server] -- error occur when accept: %s.', e.getMessage()));key.cancel(); }}
读取通道中的数据:
public void handleRead(SelectionKey key) { SocketChannel sc = (SocketChannel) key.channel(); Client client = (Client) key.attachment(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 如果是第一次连接进来,就需要创建一个客户端对象,存储起来 if (client == null) {client = new Client(sc);clients.add(client);key.attach(client);byteBuffer.clear();// 如果连接还没有建立,就是要HTTP建立连接try { sc.read(byteBuffer); byteBuffer.flip(); String response = WebSocketHandler.getResponse(new String(byteBuffer.array())); byteBuffer.clear(); byteBuffer.put(response.getBytes()); byteBuffer.flip(); while (byteBuffer.hasRemaining()) {sc.write(byteBuffer); }} catch (IOException e) { System.out.println(String.format('[server] -- error occur when read: %s.', e.getMessage()));}String message = '[系统消息] ' + client.toString() + ' 加入了群聊';broadcast(message.getBytes(), client); } byteBuffer.clear(); int read = 0; try {read = sc.read(byteBuffer);if (read > 0) { byteBuffer.flip(); int opcode = byteBuffer.get() & 0x0f; // 8表示客户端关闭了连接 if (opcode == 8) {System.out.println(String.format('[server] -- client %s connection close.', sc.getRemoteAddress()));clients.remove(client);String message = '[系统消息] ' + client.toString() + ' 退出了群聊';broadcast(message.getBytes(), client);sc.close();key.cancel();return; } // 只考虑了最简单的payload长度情况。 int len = byteBuffer.get(); len &= 0x7f; byte[] mask = new byte[4]; byteBuffer.get(mask); byte[] payload = new byte[len]; byteBuffer.get(payload); for (int i = 0; i < payload.length; i++) {payload[i] ^= mask[i % 4]; } System.out.println(String .format('[server] -- client: [%s], send: [%s].', client.toString(), new String(payload))); String message = String.format('[%s]: %s', client.toString(), new String(payload)); broadcast(message.getBytes(), client);} else if (read == -1) { System.out.println(String.format('[server] -- client %s connection close.', sc.getRemoteAddress())); clients.remove(client); String message = '[系统消息] ' + client.toString() + ' 退出了群聊'; broadcast(message.getBytes(), client); sc.close(); key.cancel();} } catch (IOException e) {System.out.println(String.format('[server] -- error occur when read: %s.', e.getMessage())); }}
使用HTTP建立WebSocket连接。
public class WebSocketHandler { private static String APPEND_STRING = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; static class Header {private Map<String, String> properties = new HashMap<>();public String get(String key) { return properties.get(key);} } private WebSocketHandler() {} private static Header phrase(String request) {Header header = new Header();String[] pros = request.split('rn');for (String pro : pros) { if (pro.contains(':')) {int index = pro.indexOf(':');String key = pro.substring(0, index).trim();String value = pro.substring(index + 1).trim();header.properties.put(key, value); }}return header; } public static String getResponse(String request) {Header header = phrase(request);String acceptKey = header.get('Sec-WebSocket-Key') + APPEND_STRING;MessageDigest sha1;try { sha1 = MessageDigest.getInstance('sha1'); sha1.update(acceptKey.getBytes()); acceptKey = new String(Base64.getEncoder().encode(sha1.digest()));} catch (NoSuchAlgorithmException e) { System.out.println('fail to encode ' + e.getMessage()); return null;}StringBuilder stringBuilder = new StringBuilder();stringBuilder.append('HTTP/1.1 101 Switching Protocolsrn').append('Upgrade: websocketrn') .append('Connection: Upgradern').append('Sec-WebSocket-Accept: ' + acceptKey + 'rn') .append('rn');return stringBuilder.toString(); }}
客户端对象
/** * @author XinHui Chen * @date 2020/2/8 19:20 */public class Client { private SocketChannel socketChannel = null; private String id = null; public SocketChannel getSocketChannel() {return socketChannel; } public String getId() {return id; } Client(SocketChannel socketChannel) {this.socketChannel = socketChannel;this.id = UUID.randomUUID().toString(); } @Override public String toString() {try { return id + ' ' + socketChannel.getRemoteAddress().toString();} catch (IOException e) { System.out.println(e.getMessage()); return null;} }}结果
使用网页和控制台与服务端建立WebSocket连接,发送数据。两个都能成功显示。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持好吧啦网。
相关文章: