跳到主要内容

HTTP中的异步大文件流

一、引子

平常使用爬虫获取的数据太多,图片视频之类的东西,下载不完,那为什么不直接在服务器搭建一个文件服务呢!

二、步骤

2.1 搭建maven项目

那为了简单起见,所以这里直接新建一个springboot项目,搭建过程就不赘述了。

2.2 添加依赖,jar插件

netty的依赖

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>

考虑到要部署到服务器上,所以maven添加jar包的插件

<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>

2.3 编写服务端代码

2.3.1 HttpStaticFileServer启动类

HttpStaticFileServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public final class HttpStaticFileServer {

static final int PORT = Integer.parseInt(System.getProperty("port", "8081"));

public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpStaticFileServerInitializer());

Channel ch = b.bind(PORT).sync().channel();

System.err.println(" 打开你的浏览器并导航到网址 http://127.0.0.1:" + PORT + '/');

ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

2.3.2 编写ChannelInitializer

HttpStaticFileServerInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

public class HttpStaticFileServerInitializer extends ChannelInitializer<SocketChannel> {


@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//HTTP 编解码
pipeline.addLast(new HttpServerCodec());
//HttpObjectAggregator 当您不想处理传输编码为“块状”的HTTP消息时,此功能很有用。
pipeline.addLast(new HttpObjectAggregator(65536));
//异步写入大型数据流,既不占用大量内存,也不会出现OutOfMemoryError
pipeline.addLast(new ChunkedWriteHandler());
//自定义channelhandler
pipeline.addLast(new HttpStaticFileServerHandler());
}
}

2.3.3 编写自定义channelInboundHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.SystemPropertyUtil;

import javax.activation.MimetypesFileTypeMap;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.regex.Pattern;

import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_0;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

/**
*自定义channelInboundHandler,处理的对象是FullHttpRequest
*
*
*/
public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
public static final int HTTP_CACHE_SECONDS = 60;

private FullHttpRequest request;

@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
this.request = request;
if (!request.decoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
return;
}

if (!GET.equals(request.method())) {
this.sendError(ctx, METHOD_NOT_ALLOWED);
return;
}

final boolean keepAlive = HttpUtil.isKeepAlive(request);
final String uri = request.uri();
//指定文件共享的目录
final String path = "/root/pythonWorkspace/"+URLDecoder.decode(request.uri());
if (path == null) {
this.sendError(ctx, FORBIDDEN);
return;
}

File file = new File(path);
if (file.isHidden() || !file.exists()) {
this.sendError(ctx, NOT_FOUND);
return;
}

if (file.isDirectory()) {
if (uri.endsWith("/")) {
this.sendListing(ctx, file, uri);
} else {
this.sendRedirect(ctx, uri + '/');
}
return;
}

if (!file.isFile()) {
sendError(ctx, FORBIDDEN);
return;
}

//缓存验证
String ifModifiedSince = request.headers().get(HttpHeaderNames.IF_MODIFIED_SINCE);
if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince);

// 只比较秒,因为我们发送给客户端的日期时间格式没有毫秒
long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000;
long fileLastModifiedSeconds = file.lastModified() / 1000;
if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) {
this.sendNotModified(ctx);
return;
}
}

RandomAccessFile raf;
try {
raf = new RandomAccessFile(file, "r");
} catch (FileNotFoundException ignore) {
sendError(ctx, NOT_FOUND);
return;
}
long fileLength = raf.length();

HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
HttpUtil.setContentLength(response, fileLength);
setContentTypeHeader(response, file);
setDateAndCacheHeaders(response, file);

if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
} else if (request.protocolVersion().equals(HTTP_1_0)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}

// 写下首行和标题。
ctx.write(response);

// 内容
ChannelFuture sendFileFuture;
ChannelFuture lastContentFuture;
if (ctx.pipeline().get(SslHandler.class) == null) {
sendFileFuture =
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
// 输出结束标记
lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} else {
sendFileFuture =
ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
ctx.newProgressivePromise());
// HttpChunkedInput将为我们编写结束标记(LastHttpContent)。
lastContentFuture = sendFileFuture;
}

sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) { // total unknown
System.err.println(future.channel() + " Transfer progress: " + progress);
} else {
System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
}
}

@Override
public void operationComplete(ChannelProgressiveFuture future) {
System.err.println(future.channel() + " Transfer complete.");
}
});

// 确定是否关闭连接。
if (!keepAlive) {
// 全部内容写完后,关闭连接
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}

private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");

private static String sanitizeUri(String uri) {
// 解码路径。
try {
uri = URLDecoder.decode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new Error(e);
}

if (uri.isEmpty() || uri.charAt(0) != '/') {
return null;
}

// 转换文件分隔符。
uri = uri.replace('/', File.separatorChar);

// 简单的哑安全检查。
// 如果是生产环境,这样的安全检测肯定不够
if (uri.contains(File.separator + '.') ||
uri.contains('.' + File.separator) ||
uri.charAt(0) == '.' || uri.charAt(uri.length() - 1) == '.' ||
INSECURE_URI.matcher(uri).matches()) {
return null;
}

// 转换为绝对路径。
return SystemPropertyUtil.get("user.dir") + File.separator + uri;
}

private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[^-\\._]?[^<>&\\\"]*");

private void sendListing(ChannelHandlerContext ctx, File dir, String dirPath) {
StringBuilder buf = new StringBuilder()
.append("<!DOCTYPE html>\r\n")
.append("<html><head><meta charset='utf-8' /><title>")
.append("Listing of: ")
.append(dirPath)
.append("</title></head><body>\r\n")

.append("<h3>Listing of: ")
.append(dirPath)
.append("</h3>\r\n")

.append("<ul>")
.append("<li><a href=\"../\">..</a></li>\r\n");

File[] files = dir.listFiles();
if (files != null) {
for (File f: files) {
if (f.isHidden() || !f.canRead()) {
continue;
}

String name = f.getName();
if (!ALLOWED_FILE_NAME.matcher(name).matches()) {
continue;
}

buf.append("<li><a href=\"")
.append(name)
.append("\">")
.append(name)
.append("</a></li>\r\n");
}
}

buf.append("</ul></body></html>\r\n");

ByteBuf buffer = ctx.alloc().buffer(buf.length());
buffer.writeCharSequence(buf.toString(), CharsetUtil.UTF_8);

FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buffer);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");

this.sendAndCleanupConnection(ctx, response);
}

private void sendRedirect(ChannelHandlerContext ctx, String newUri) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND, Unpooled.EMPTY_BUFFER);
response.headers().set(HttpHeaderNames.LOCATION, newUri);

this.sendAndCleanupConnection(ctx, response);
}

private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");

this.sendAndCleanupConnection(ctx, response);
}

/**
* 当文件时间戳与浏览器发送的时间戳相同时,发送“ 304 Not Modified”
*
* @param ctx
* Context
*/
private void sendNotModified(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED, Unpooled.EMPTY_BUFFER);
setDateHeader(response);

this.sendAndCleanupConnection(ctx, response);
}

/**
* 如果禁用Keep-Alive,则将“ Connection:close”标头附加到响应,并在发送响应后关闭连接。
*/
private void sendAndCleanupConnection(ChannelHandlerContext ctx, FullHttpResponse response) {
final FullHttpRequest request = this.request;
final boolean keepAlive = HttpUtil.isKeepAlive(request);
HttpUtil.setContentLength(response, response.content().readableBytes());
if (!keepAlive) {
//发送响应后,我们将立即关闭连接,因此我们也应向客户端说明这一点。
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
} else if (request.protocolVersion().equals(HTTP_1_0)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}

ChannelFuture flushPromise = ctx.writeAndFlush(response);

if (!keepAlive) {
// 发送响应后立即关闭连接。
flushPromise.addListener(ChannelFutureListener.CLOSE);
}
}

/**
* Sets the Date header for the HTTP response
*
* @param response
* HTTP response
*/
private static void setDateHeader(FullHttpResponse response) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));

Calendar time = new GregorianCalendar();
response.headers().set(HttpHeaderNames.DATE, dateFormatter.format(time.getTime()));
}

/**
* 设置HTTP响应的日期和缓存标头
* @param response
* HTTP response
* @param fileToCache
* file to extract content type
*/
private static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));

// Date header
Calendar time = new GregorianCalendar();
response.headers().set(HttpHeaderNames.DATE, dateFormatter.format(time.getTime()));

// Add cache headers
time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
response.headers().set(HttpHeaderNames.EXPIRES, dateFormatter.format(time.getTime()));
response.headers().set(HttpHeaderNames.CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
response.headers().set(
HttpHeaderNames.LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));
}

/**
*
* 设置HTTP响应的内容类型标头
*
* @param response
* HTTP response
* @param file
* file to extract content type
*/
private static void setContentTypeHeader(HttpResponse response, File file) {
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.headers().set(HttpHeaderNames.CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
}
}

2.3.4 替换掉springboot的启动

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class FileserverApplication {

public static void main(String[] args) {
try {
HttpStaticFileServer.main(args);
} catch (Exception e) {
e.printStackTrace();
}
}

}

三、测试

将channelInboundHandler中的路径改成本地的试试 image.png

点击看能不能下载 image.png

这样,一个简单的文件服务就完成了,虽然只能下载文件,如果需要部署到linux上的话,需要更改为linux上的路径,开放端口就完成了