分片上传及断点续传原理深入分析及示例Demo
分片上传、断点续传,这两个名词对于做过或者熟悉文件上传的朋友来说应该不会陌生。之所有有这边文章,还是因为自己在网上浏览了一些文章后发现没有找到一篇能瞬间明白原理和实现的,因此决定自己写一篇文章,方便有需要的朋友了解原理和实现。
分片上传,就是将所要上传的文件,按照一定的大小,将整个文件分隔成多个数据块(我们称之为Part)来进行分别上传,上传完之后再由服务端对所有上传的文件进行汇总整合成原始的文件。分片上传不仅可以避免因网络环境不好导致的一直需要从文件起始位置还是上传的问题,还能使用多线程对不同分块数据进行并发发送,提高发送效率,降低发送时间。
分片上传主要适用于以下几种场景:
- 网络环境不好:当出现上传失败的时候,可以对失败的Part进行独立的重试,而不需要重新上传其他的Part。
- 断点续传:中途暂停之后,可以从上次上传完成的Part的位置继续上传。
- 加速上传:要上传到OSS的本地文件很大的时候,可以并行上传多个Part以加快上传。
- 流式上传:可以在需要上传的文件大小还不确定的情况下开始上传。这种场景在视频监控等行业应用中比较常见。
- 文件较大:一般文件比较大时,默认情况下一般都会采用分片上传。
分片上传的整个流程大致如下:
- 将需要上传的文件按照一定的分割规则,分割成相同大小的数据块;
- 初始化一个分片上传任务,返回本次分片上传唯一标识;
- 按照一定的策略(串行或并行)发送各个分片数据块;
- 发送完成后,服务端根据判断数据上传是否完整,如果完整,则进行数据块合成得到原始文件。
在整个数据上传的过程中当然还涉及数据的签名校验,防止数据被恶意篡改。整个上传流程图如下所示
基于上面的原理介绍,下面写个简单的示例Demo来看下怎么发送上传。
单个小文件全量上传
在看分片上传之后,先来看下单个小文件可以怎么上传,示例代码
这里的文件发送使用了httpclient和httpmime,如果你使用maven,那么可以通过在pom.xml文件中引入下面的依赖进行使用。
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.6</version>
</dependency>
大文件分片上传
基于上面的分片原理,我们写代码看下怎么实现。考虑到只是个原理Demo,这里没有考虑性能问题,在具体实现时需要重点关注实现的方式与性能问题。
首先获取文件的字节大小,然后根据分片大小进行切割,判断最终需要的分片数。之后按照分片大小,循环读取出每个分片并生成一个临时文件。
然后将各个临时文件通过http请求发送出去。
最后将每个分片数据按照打个小文件的方式发送给后台,需要注意的是,每次请求需要携带分片状态的信息,如当前分片位置,总分片数,分片大小等。这里是将相关信息作为form数据传递的,一般情况下还可以将相关数据作为http自定义头进行传递。
这里我们写了个简单的接收代码用来测试一下:
package com.majing.learning.fileuploadserver;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.fileupload.FileItem;
import org.apache.commons.fileupload.disk.DiskFileItemFactory;
import org.apache.commons.fileupload.servlet.FileCleanerCleanup;
import org.apache.commons.fileupload.servlet.ServletFileUpload;
import org.apache.commons.io.FileCleaningTracker;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
public class UploadServlet extends HttpServlet {
private static final long serialVersionUID = 6610636816102284862L;
private static String fileUploadTempDir = "D:/tmp/fileuploaddir";
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
boolean isMultipart = ServletFileUpload.isMultipartContent(req);
if(isMultipart){
ServletContext servletContext = this.getServletConfig().getServletContext();
File repository = new File(fileUploadTempDir);
DiskFileItemFactory factory = newDiskFileItemFactory(servletContext, repository);
PrintWriter printWriter = resp.getWriter();
resp.setContentType("application/json");
resp.setCharacterEncoding("utf-8");
ServletFileUpload upload = new ServletFileUpload(factory);
JSONArray files = new JSONArray();
try{
List<FileItem> items = upload.parseRequest(req);
for (FileItem item : items) {
if (item.isFormField()) { //表单输入域
System.out.println("form表单字段:"+item.getFieldName());
} else { // 文件上传域
String name = item.getName();
if (!name.equals("")) {
int index = name.indexOf("\\");
File uploadedFile;
if (index == -1) {
uploadedFile = new File(fileUploadTempDir, File.separator + name);
} else {
uploadedFile = new File(fileUploadTempDir, File.separator + name.substring(name.lastIndexOf(File.separator) + 1));
}
item.write(uploadedFile);
JSONObject file = new JSONObject();
file.put("name", name);
file.put("size", item.getSize());
file.put("url", fileUploadTempDir + "/" + name);
files.add(file);
} else {
JSONObject file = new JSONObject();
file.put("result", "please choose a file");
files.add(file);
}
}
}
}catch(Exception e){
e.printStackTrace();
}finally{
JSONObject result = new JSONObject();
result.put("files", files);
printWriter.write(result.toString());
printWriter.close();
}
}
}
private DiskFileItemFactory newDiskFileItemFactory(ServletContext context, File repository) {
FileCleaningTracker fileCleaningTracker = FileCleanerCleanup.getFileCleaningTracker(context);
DiskFileItemFactory factory = new DiskFileItemFactory(DiskFileItemFactory.DEFAULT_SIZE_THRESHOLD, repository);
factory.setFileCleaningTracker(fileCleaningTracker);
return factory;
}
}
下面我们启动接收程序(上面是servlet应用程序),发送下面的一个简单示例文件,文件总大小54594字节。这里我们按照10K为一个分片数据块进行发送,运行结果如下:
从上图可以看出,待上传文件已经按照指定的大小限制上传到后台,接下来就是需要对这些分片数据进行合成。
分片合成
分片合成是对上传到后台的分片文件进行汇总合成原始数据文件的过程。下面是一个简单的分片合成的实现Demo。
package com.majing.learning.fileuploadserver;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.io.FileUtils;
/**
* @author admin
*
* 这里在进行合并时,如果遇到失败或者异常情况,那么需要删除临时文件,并且不能删除之前上传的文件
*/
public class MergeFile {
public static void main(String[] args) throws IOException {
String mergeFilesDir = "D:/tmp/fileuploaddir";
File dirFile = new File(mergeFilesDir);
if(!dirFile.exists()){
throw new RuntimeException("文件不存在!");
}
String targetFileName = "lx-lxAiui-plugin.zip";
//分片上传的文件已经位于同一个文件夹下,方便寻找和遍历
String[] fileNames = dirFile.list();
List<SequentialFile> files = new ArrayList<SequentialFile>();
for(String fileName: fileNames){
int index = Integer.parseInt(fileName.substring(fileName.lastIndexOf(".part")+5));
files.add(new SequentialFile(index, fileName));
}
Collections.sort(files);
File targetFile = new File(mergeFilesDir, targetFileName);
RandomAccessFile writeFile = new RandomAccessFile(targetFile, "rw");
int position = 0;
for(SequentialFile fileName: files){
File sourceFile = new File(mergeFilesDir, fileName.getFileName());
RandomAccessFile readFile = new RandomAccessFile(sourceFile, "rw");
int chunksize = 10240;
byte[] buf = new byte[chunksize];
writeFile.seek(position);
int byteCount = 0;
while((byteCount = readFile.read(buf))!=-1){
if(byteCount!=chunksize){
byte[] tempBytes = new byte[byteCount];
System.arraycopy(buf, 0, tempBytes, 0, byteCount);
buf = tempBytes;
}
writeFile.write(buf);
position = position + byteCount;
}
readFile.close();
FileUtils.deleteQuietly(sourceFile);
}
writeFile.close();
}
}
对上面的数据运行相应的合成代码后,合成的数据文件如下所示,和之前的文件一模一样。
至此关于分片上传的原理和简单实现Demo已经说完。
断点续传
由于分片上传的数据是永久性的,因此可以很容易的基于分片上传来实现断点续传。
在分片上传的过程中,如果因为系统崩溃或者网络中断等异常因素导致上传中断,这时候客户端需要记录上传的进度。在之后支持再次上传时,可以继续从上次上传中断的地方进行继续上传。
为了避免客户端在上传之后的进度数据被删除而导致重新开始从头上传的问题,服务端也可以提供相应的接口便于客户端对已经上传的分片数据进行查询,从而使客户端知道已经上传的分片数据,从而从下一个分片数据开始继续上传。
断点续传流程图大致如下所示:
文件的上传进度文件一般与上传文件同名,带有特定的后缀名(如.ucp,upload check point的简写)。
可以参考下阿里云OSS的具体实现,定义一个表示文件上传进度的实体类,如UploadCheckPoint,在这个类中去定义一些分片上传相关的状态信息,如下所示,可能包含上传文件名、上传唯一标识ID、每一分片的上传大小及上传状态、上传文件的md5签名、每个分片上传的结果等。
为避免上传中断后文件状态发生变化而导致的原分片数据不一致文件,一般还需要维护上传文件的状态(如原文件大小,最后修改时间等),确保在中断过程中待上传文件没有发生更新过。
对于每个分片的上传状态UploadPart一般包含分片的索引顺序、分片数据相对于原始文件的字节偏移量、分片数据的大小以及上传的完成情况,定义如下所示:
每个分片上传成功之后,服务端会返回一个关于本次分片数据上传的状态结果,针对每个分片返回一个eTag标识,在最后客户端向服务端发起分片数据合并时将该eTag携带过去,服务端对eTag进行校验,校验通过后进行文件合并。
如前面所说,在每个分片上传的过程中需要不断的及时将上传状态同步到进度文件ucp中,这里采用了对象流的方式,从文件中读一个对象,或者向一个文件中写入对象。具体如下:
从文件中读取进度:
将上传进度更新到进度文件:
在每个分片上传完之后,客户端既要更新内存中的上传进度,还要更新进度文件,如下所示:
至此,我们通过对阿里云OSS的实现源码阅读了解了断点续传的基本原理和实现方式。
相信通过上面的讲述,大家应该对分片上传和断点续传的原理和实现有了比较直观的认识和了解,可以自行加以实现。