
水平触发和边缘触发
水平触发Level_triggered
当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读完(如读写缓冲区太小),那么下次调用epoll_wait()时,它还会通知你在上没读写万的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。
边缘触发Edge_triggered
当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,知道该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统中不会充斥大量你不关心的就绪文件描述符!!
select(),poll()模型都是水平触发模式,信号驱动IO是边缘触发模式,epoll()模型即支持水平触发,也支持边缘触发,默认是水平触发。
原生的JDK网络编程
BIO编程
BIO服务端编写
package run.runnable.xiangxue;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BioServer {
private static ServerSocket server ;
//线程池
private static ExecutorService executorService = Executors.newFixedThreadPool(5);
private static void start() throws IOException {
try{
server = new ServerSocket(8000);
System.out.println("服务已经启动,端口号:"+8000);
while (true){
Socket socket = server.accept();
System.out.println("有新的客户端连接");
executorService.execute(new BioServerhandler(socket));
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if (server!=null){
server.close();
}
}
}
public static void main(String[] args) throws IOException {
start();
}
}
BIO服务端业务处理
package run.runnable.xiangxue;
import java.io.*;
import java.net.Socket;
import java.util.Date;
public class BioServerhandler extends Thread {
Socket socket ;
public BioServerhandler(Socket socket ) {
this.socket = socket;
}
@Override
public void run() {
try(BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()))){
PrintWriter out = new PrintWriter(socket.getOutputStream(),true);
String msg ;
String result;
while ((msg = in.readLine()) != null){
System.out.println("Server accept msg :"+msg);
result = "Hello ,"+msg+",Now is "+new Date(System.currentTimeMillis());
out.println(result);
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if (socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}
BIO客户端
package run.runnable.xiangxue;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Scanner;
public class BioClient {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1",8000);
System.out.println("请输入请求消息");
new ReadMsg(socket).start();
PrintWriter pw = null;
while (true){
pw = new PrintWriter(socket.getOutputStream());
pw.println(new Scanner(System.in).next());
pw.flush();
}
}
private static class ReadMsg extends Thread{
Socket socket;
public ReadMsg(Socket socket){
this.socket = socket;
}
@Override
public void run() {
try(BufferedReader br = new BufferedReader(
new InputStreamReader(socket.getInputStream())
)) {
String line = null;
while ((line=br.readLine())!=null){
System.out.printf("%S\n",line);
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if (socket!=null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
socket=null;
}
}
}
}

原生网络编程AIO

异步IO采用"订阅-通知"模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情,当操作系统发生IO事件,并且准备好数据后,再主动通知应用程序,触发相应的函数。
当应用程序订阅操作系统的网络数据时,操作系统什么时候把数据传输过来,我们是不知道的,所以这里存在一个问题:应用程序怎么知道操作系统传输数据过来?
JDK中提供了一个接口CompletionHandler
,这个接口提供了2个方法
/**
* 在操作完成时调用
*
* @param result 表示这个操作返回的结果对象
比如接收服务器请求,那么返回的就是一个socket
如果是读操作,那么就是在网络上读取到的字节数
* The result of the I/O operation.
* @param attachment 发起IO操作时附加的参数
* The object attached to the I/O operation when it was initiated.
*/
void completed(V result, A attachment);
/**
* 在操作失败时调用
*
* @param exc
* The exception to indicate why the I/O operation failed
* @param attachment
* The object attached to the I/O operation when it was initiated.
*/
void failed(Throwable exc, A attachment);
}
方法的作用已经在注释上标注出来。那这和我们之前提到的问题有什么关系呢?关于就在于,当应用程序调用操作系统的数据时,应用程序对应的处理器一定要实现这个接口。
比如:两个Socket进行连接,当服务器端进行连接后,要新起一个socket处理客户端请求,如果是读数据,同样的在处理器中实现的completed方法中进行业务处理。
而falied就是当操作系统发现这个IO事件失败的时候,通知应用程序应该怎么处理,比如重新请求。
实现一个AIO处理
在实现之前我们要清楚应该实现哪几部分

首先从客户端开始
客户端的属性存在一个客户端处理器AioClientHandler
,打开客户端的时候,新建连接,然后
package run.runnable.AIO;
import java.io.IOException;
import java.util.Scanner;
public class AioClient {
private static AioClientHandler aioClientHandler;
public static void start() throws IOException {
if (aioClientHandler!=null){
return;
}
aioClientHandler = new AioClientHandler("127.0.0.1",8000);
new Thread(aioClientHandler,"client").start();
}
public static boolean sendMsg(String msg){
if (msg.equals("exit")){
return false;
}
aioClientHandler.sendMsg(msg);
return true;
}
public static void main(String[] args) throws IOException {
AioClient.start();
System.out.println("请输入请求消息:");
Scanner scanner = new Scanner(System.in);
while (AioClient.sendMsg(scanner.nextLine()));
}
}
AIO客户端处理器
package run.runnable.AIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
public class AioClientHandler implements CompletionHandler<Void,AioClientHandler>,Runnable {
private AsynchronousSocketChannel clientChannel;
private String host;
private int port;
private CountDownLatch latch;
public AioClientHandler(String host, int port) throws IOException {
this.host = host;
this.port = port;
//创建一个实际的客户端通道
clientChannel = AsynchronousSocketChannel.open();
}
/**
* 这个是当操作系统有数据后会执行的方法,通知我们已经连接到了服务器端
* @param result
* @param attachment
*/
@Override
public void completed(Void result, AioClientHandler attachment) {
System.out.println("已经连接到客户端。");
}
@Override
public void failed(Throwable exc, AioClientHandler attachment) {
System.out.println("连接失败");
latch.countDown();
try {
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
clientChannel.connect(new InetSocketAddress(host,port),this,this);
try {
latch.await();
clientChannel.close();
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
public void sendMsg(String msg){
//把msg变成在网络上可以传递的格式
byte[] bytes = msg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
//异步写,也就是写操作也是异步的,写操作也需要传入一个实现了CompletionHandler接口的处理器
clientChannel.write(writeBuffer,writeBuffer,new AioClientWriteHandler(clientChannel,latch));
}
}
AIO客户端读处理器
package run.runnable.AIO;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class AioClientReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;
public AioClientReadHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String msg;
try {
msg = new String(bytes,"UTF-8");
System.out.println("accept message:"+msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("数据读取失败");
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}
AIO客户端写处理器
package run.runnable.AIO;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class AioClientWriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;
public AioClientWriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if(attachment.hasRemaining()){
clientChannel.write(attachment,attachment,this);
}else {
//读取服务端传回的数据
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//异步读
clientChannel.read(readBuffer,readBuffer,new AioClientReadHandler(clientChannel,latch));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("发送数据失败。。。");
try{
clientChannel.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}