SpringBoot 基于同步线程池实现文件上传功能。
你猜
阅读:727
2021-03-31 16:58:00
评论:0
业务场景要求:进行大文件上传,验证大文件在传输过程中是否发送改变。
业务分析:前端将大文件进行分包,按顺序传递给后台接口,每次包接收完毕需要验证包在传输过程中是否发送改变,发送改变,移除该包,再将相关信息返回前端,让前端重新发送。当所有包发送完毕,且验证都是正确的包,发起包合并请求,生成大文件。
相关代码:
分包块实体对象定义:
import java.io.Serializable;
import org.springframework.web.multipart.MultipartFile;
/**
* 文件上传实体对象
* @author zzg
*
*/
@SuppressWarnings("serial")
public class ChunkInfoModel implements Serializable {
// 文件块sid
private String sid;
// 文件块编号
private Integer number;
// 默认文件块大小
private Long size;
// 实际文件块大小
private Long currentSize;
// 文件总大小
private Long totalSize;
// 文件标识符
private String identifier;
// 文件名称
private String filename;
// 文件类型
private String type;
// 文件块总数
private Integer totalChunks;
// 上传文件
private MultipartFile file;
public String getSid() {
return sid;
}
public void setSid(String sid) {
this.sid = sid;
}
public Integer getNumber() {
return number;
}
public void setNumber(Integer number) {
this.number = number;
}
public Long getSize() {
return size;
}
public void setSize(Long size) {
this.size = size;
}
public Long getCurrentSize() {
return currentSize;
}
public void setCurrentSize(Long currentSize) {
this.currentSize = currentSize;
}
public Long getTotalSize() {
return totalSize;
}
public void setTotalSize(Long totalSize) {
this.totalSize = totalSize;
}
public String getIdentifier() {
return identifier;
}
public void setIdentifier(String identifier) {
this.identifier = identifier;
}
public String getFilename() {
return filename;
}
public void setFilename(String filename) {
this.filename = filename;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Integer getTotalChunks() {
return totalChunks;
}
public void setTotalChunks(Integer totalChunks) {
this.totalChunks = totalChunks;
}
public MultipartFile getFile() {
return file;
}
public void setFile(MultipartFile file) {
this.file = file;
}
}
/**
* 文件上传配置参数
* @author zzg
*
*/
public class UploadFileConfig {
// 文件上传存储目录
private String location;
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
后台文件包合并后,判断与前端生成的文件标识符一致的工具类:
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
/**
* 上传文件校验工具类
* @author zzg
*
*/
public class UploadFileUtil {
// 密码提花
private static final char[] hexCode = "0123456789ABCDEF".toCharArray();
// 文件类取MD5
public static String calcMD5(File file){
try (InputStream stream = Files.newInputStream(file.toPath(), StandardOpenOption.READ)) {
return calcMD5(stream);
}catch (IOException e) {
e.printStackTrace();
return "";
}
}
// 输入流取MD5
public static String calcMD5(InputStream stream) {
try {
MessageDigest digest = MessageDigest.getInstance("MD5");
byte[] buf = new byte[8192];
int len;
while ((len = stream.read(buf)) > 0) {
digest.update(buf, 0, len);
}
return toHexString(digest.digest());
} catch (IOException e) {
e.printStackTrace();
return "";
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
return "";
}
}
public static String toHexString(byte[] data) {
StringBuilder r = new StringBuilder(data.length * 2);
for (byte b : data) {
r.append(hexCode[(b >> 4) & 0xF]);
r.append(hexCode[(b & 0xF)]);
}
return r.toString();
}
}
文件包上传和文件包合并核心代码类:
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.Callable;
import org.apache.commons.io.FileUtils;
import com.***.common.model.Result;
@SuppressWarnings("rawtypes")
public class UploadFileCallback implements Callable {
// 文件上传实体对象
private ChunkInfoModel model;
// 文件操作类型
private String type;
// 文件根路径
private String location;
// set 和 get 方法
public ChunkInfoModel getModel() {
return model;
}
public void setModel(ChunkInfoModel model) {
this.model = model;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
// 构造函数
public UploadFileCallback(ChunkInfoModel model, String type, String localtion) {
super();
this.model = model;
this.type = type;
this.location = localtion;
}
// 核心功能方法
@Override
public Object call() throws Exception {
if(this.type.equalsIgnoreCase("upload")){
// 临时目录用来存放所有分片文件
String tempFileDir = this.location + this.model.getIdentifier();
File parentFileDir = new File(tempFileDir);
if (!parentFileDir.exists()) {
parentFileDir.mkdirs();
}
// 分片处理时,前台会多次调用上传接口,每次都会上传文件的一部分到后台
File tempPartFile = new File(parentFileDir, this.model.getNumber() + ".part");
try {
FileUtils.copyInputStreamToFile(this.model.getFile().getInputStream(), tempPartFile);
} catch (IOException e) {
// TODO Auto-generated catch block
// 移除错误文件块
tempPartFile.delete();
// 打印堆栈信息
e.printStackTrace();
return Result.error("文件块上传异常").setDatas("chunk", this.model);
}
// 校验文件是否上传成功
long size = FileUtils.sizeOf(tempPartFile);
boolean target = this.model.getCurrentSize().equals(size);
if (target) {
return Result.ok("文件块上传成功").setDatas("chunk", this.model);
} else {
// 移除错误文件块
tempPartFile.delete();
return Result.error("文件块上传异常").setDatas("chunk", this.model);
}
}
if(this.type.equalsIgnoreCase("merge")){
File parentFileDir = new File(this.location + this.model.getIdentifier());
if (parentFileDir.isDirectory()) {
File destTempFile = new File(this.location + "/merge", this.model.getFilename());
if (!destTempFile.exists()) {
if(!destTempFile.getParentFile().exists()){
// 先得到文件的上级目录,并创建上级目录,在创建文件,
destTempFile.getParentFile().mkdir();
}
try {
// 创建文件
destTempFile.createNewFile(); // 上级目录没有创建,这里会报错
} catch (IOException e) {
// 输出堆栈信息
e.printStackTrace();
return Result.error("文件合并异常").setDatas("file", this.model);
}
}
for (int i = 0; i < parentFileDir.listFiles().length; i++) {
File partFile = new File(parentFileDir, i + ".part");
FileOutputStream destTempfos = new FileOutputStream(destTempFile, true);
// 遍历"所有分片文件"到"最终文件"中
FileUtils.copyFile(partFile, destTempfos);
destTempfos.close();
}
// 删除临时目录中的分片文件
FileUtils.deleteDirectory(parentFileDir);
// 校验文件是否完整
String marker = this.model.getIdentifier();
String md5 = UploadFileUtil.calcMD5(new File(this.location + "/merge/" + this.model.getFilename()));
if(md5.equalsIgnoreCase(marker)){
return Result.ok("文件上传成功").setDatas("file", this.model);
} else {
if(destTempFile.exists()){
// 移除合并文件
destTempFile.delete();
}
return Result.error("文件上传异常").setDatas("file", this.model);
}
}
}
return Result.error("文件无法处理");
}
}
SpringBoot 配置同步线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 同步线程池配置对象
* @author zzg
*
*/
@Configuration
public class ThreadConfig{
// 日志记录
public static final Logger log = LoggerFactory.getLogger(ThreadConfig.class);
// 文件上传初始化线程池大小
@Bean(value="uploadExecutor")
public ExecutorService getExecutor() {
return Executors.newFixedThreadPool(20);
}
}
SpringBoot 配置文件上传
import javax.servlet.MultipartConfigElement;
import org.springframework.boot.web.servlet.MultipartConfigFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
/**
* 文件上传配置参数
* @author zzg
*
*/
@Configuration
@ImportResource({"classpath:uploadFile.xml" })
public class UploadConfig {
/**
* 文件上传配置
* @return
*/
@Bean
public MultipartConfigElement multipartConfigElement() {
MultipartConfigFactory factory = new MultipartConfigFactory();
//文件最大
factory.setMaxFileSize("50MB"); //KB,MB
/// 设置总上传数据总大小
factory.setMaxRequestSize("60MB");
return factory.createMultipartConfig();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 文件上传配置参数 -->
<bean id="uploadFileConfig" class="com.***.common.upload.file.UploadFileConfig">
<property name="location" value="C:/data/upload_file/"/>
</bean>
</beans>
Controller 层代码:
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import com.***.common.model.Result;
import com.***.common.upload.file.ChunkInfoModel;
import com.***.common.upload.file.UploadFileCallback;
import com.***.common.upload.file.UploadFileConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@Controller
@RequestMapping("/api/file")
@Api(value = "文件上传Controlle", tags = "文件上传操作服务")
public class UploadFileController {
// 日志记录
public static final Logger log = LoggerFactory.getLogger(UploadFileController.class);
// 公共文件存储目录
@Autowired
private UploadFileConfig config;
// 文件上传线程池
@Autowired
private ExecutorService uploadExecutor;
@ApiOperation(httpMethod = "POST", value = "文件块上传")
@RequestMapping(value = "/upload", method = { RequestMethod.POST })
@ResponseBody
public Result upload(
ChunkInfoModel entity,
HttpServletRequest request, HttpServletResponse response) {
UploadFileCallback callback = new UploadFileCallback(entity, "upload", config.getLocation());
Future<Result> result = this.uploadExecutor.submit(callback);
try {
return result.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
log.error(e.getMessage());
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
log.error(e.getMessage());
e.printStackTrace();
}
return Result.ok("文件块上传成功");
}
@ApiOperation(httpMethod = "POST", value = "文件合并")
@RequestMapping(value = "/merge", method = { RequestMethod.POST })
@ResponseBody
public Result merge(ChunkInfoModel entity) {
UploadFileCallback callback = new UploadFileCallback(entity, "merge", config.getLocation());
Future<Result> result = this.uploadExecutor.submit(callback);
try {
return result.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
log.error(e.getMessage());
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
log.error(e.getMessage());
e.printStackTrace();
}
return Result.ok("文件上传成功");
}
@ApiOperation(httpMethod = "POST", value = "文件上传记录")
@RequestMapping(value = "/find", method = { RequestMethod.POST })
@ResponseBody
public Result find(
@RequestBody @ApiParam(name = "文件上传对象", value = "json格式对象", required = true) ChunkInfoModel entity) {
File parentFileDir = new File(config.getLocation() + entity.getIdentifier());
// 已经上传文件块信息
List<String> numbers = new ArrayList<String>();
// 已经上传文件大小
long uploadFileSize = 0;
// 判断文件目录是否存储
if(parentFileDir.exists() && parentFileDir.isDirectory()){
// 指定文件已经存在,用户已经上传指定文件,但是文件未上传完毕
File[] files = parentFileDir.listFiles();
if(files != null && files.length > 0){
for (int i = 0; i < files.length; i++) {
File file = files[i];
String number = file.getName().split(".")[0];
numbers.add(number);
uploadFileSize = uploadFileSize + file.length();
}
}
}
return Result.ok("文件上传记录").setDatas("numbers", numbers).setDatas("uploadFileSize", uploadFileSize);
}
}
声明
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。