SpringBoot 基于同步线程池实现文件上传功能。分析

你猜 阅读:168 2021-03-31 16:58:00 评论:0

业务场景要求:进行大文件上传,验证大文件在传输过程中是否发送改变。

业务分析:前端将大文件进行分包,按顺序传递给后台接口,每次包接收完毕需要验证包在传输过程中是否发送改变,发送改变,移除该包,再将相关信息返回前端,让前端重新发送。当所有包发送完毕,且验证都是正确的包,发起包合并请求,生成大文件。

 

相关代码:

分包块实体对象定义:

import java.io.Serializable; 
import org.springframework.web.multipart.MultipartFile; 
 
/** 
 * 文件上传实体对象 
 * @author zzg 
 * 
 */ 
@SuppressWarnings("serial") 
public class ChunkInfoModel implements Serializable { 
	 
	// 文件块sid 
	private String sid; 
	 
	// 文件块编号 
	private Integer number; 
	 
	// 默认文件块大小 
	private Long size; 
	 
	// 实际文件块大小 
	private Long currentSize; 
	 
	// 文件总大小 
	private Long totalSize; 
	 
	// 文件标识符 
	private String identifier; 
	 
	// 文件名称 
	private String filename; 
	 
	// 文件类型 
	private String type; 
	 
	// 文件块总数 
	private Integer totalChunks; 
	 
	// 上传文件 
	private MultipartFile file; 
 
	public String getSid() { 
		return sid; 
	} 
 
	public void setSid(String sid) { 
		this.sid = sid; 
	} 
 
	public Integer getNumber() { 
		return number; 
	} 
 
	public void setNumber(Integer number) { 
		this.number = number; 
	} 
 
	public Long getSize() { 
		return size; 
	} 
 
	public void setSize(Long size) { 
		this.size = size; 
	} 
 
	public Long getCurrentSize() { 
		return currentSize; 
	} 
 
	public void setCurrentSize(Long currentSize) { 
		this.currentSize = currentSize; 
	} 
 
	public Long getTotalSize() { 
		return totalSize; 
	} 
 
	public void setTotalSize(Long totalSize) { 
		this.totalSize = totalSize; 
	} 
 
	public String getIdentifier() { 
		return identifier; 
	} 
 
	public void setIdentifier(String identifier) { 
		this.identifier = identifier; 
	} 
 
	public String getFilename() { 
		return filename; 
	} 
 
	public void setFilename(String filename) { 
		this.filename = filename; 
	} 
 
	public String getType() { 
		return type; 
	} 
 
	public void setType(String type) { 
		this.type = type; 
	} 
 
	public Integer getTotalChunks() { 
		return totalChunks; 
	} 
 
	public void setTotalChunks(Integer totalChunks) { 
		this.totalChunks = totalChunks; 
	} 
 
	public MultipartFile getFile() { 
		return file; 
	} 
 
	public void setFile(MultipartFile file) { 
		this.file = file; 
	} 
}
/** 
 * 文件上传配置参数 
 * @author zzg 
 * 
 */ 
public class UploadFileConfig { 
	// 文件上传存储目录 
	private String location; 
 
	public String getLocation() { 
		return location; 
	} 
 
	public void setLocation(String location) { 
		this.location = location; 
	} 
}

后台文件包合并后,判断与前端生成的文件标识符一致的工具类:

import java.io.File; 
import java.io.IOException; 
import java.io.InputStream; 
import java.nio.file.Files; 
import java.nio.file.StandardOpenOption; 
import java.security.MessageDigest; 
import java.security.NoSuchAlgorithmException; 
 
/** 
 * 上传文件校验工具类 
 * @author zzg 
 * 
 */ 
public class UploadFileUtil { 
	// 密码提花 
	private static final char[] hexCode = "0123456789ABCDEF".toCharArray(); 
	 
	// 文件类取MD5 
    public static String calcMD5(File file){ 
        try (InputStream stream = Files.newInputStream(file.toPath(), StandardOpenOption.READ)) { 
            return calcMD5(stream); 
        }catch (IOException e) { 
            e.printStackTrace(); 
            return ""; 
        } 
    } 
    // 输入流取MD5 
    public static String calcMD5(InputStream stream) { 
        try { 
            MessageDigest digest = MessageDigest.getInstance("MD5"); 
            byte[] buf = new byte[8192]; 
            int len; 
            while ((len = stream.read(buf)) > 0) { 
                digest.update(buf, 0, len); 
            } 
            return toHexString(digest.digest()); 
        } catch (IOException e) { 
            e.printStackTrace(); 
            return ""; 
        } catch (NoSuchAlgorithmException e) { 
            e.printStackTrace(); 
            return ""; 
        } 
    } 
 
    public static String toHexString(byte[] data) { 
        StringBuilder r = new StringBuilder(data.length * 2); 
        for (byte b : data) { 
            r.append(hexCode[(b >> 4) & 0xF]); 
            r.append(hexCode[(b & 0xF)]); 
        } 
        return r.toString(); 
    } 
}

文件包上传和文件包合并核心代码类:

import java.io.File; 
import java.io.FileOutputStream; 
import java.io.IOException; 
import java.util.concurrent.Callable; 
import org.apache.commons.io.FileUtils; 
import com.***.common.model.Result; 
 
@SuppressWarnings("rawtypes") 
public class UploadFileCallback implements Callable { 
	 
	// 文件上传实体对象 
	private ChunkInfoModel model; 
	// 文件操作类型 
	private String type; 
	// 文件根路径 
	private String location; 
	 
	 
	// set 和  get 方法 
	public ChunkInfoModel getModel() { 
		return model; 
	} 
	 
	public void setModel(ChunkInfoModel model) { 
		this.model = model; 
	} 
	 
	public String getType() { 
		return type; 
	} 
 
	public void setType(String type) { 
		this.type = type; 
	} 
	 
	public String getLocation() { 
		return location; 
	} 
 
	public void setLocation(String location) { 
		this.location = location; 
	} 
 
 
	// 构造函数 
	public UploadFileCallback(ChunkInfoModel model, String type, String localtion) { 
		super(); 
		this.model = model; 
		this.type = type; 
		this.location = localtion; 
	} 
 
 
 
	// 核心功能方法 
	@Override 
	public Object call() throws Exception { 
		if(this.type.equalsIgnoreCase("upload")){ 
			// 临时目录用来存放所有分片文件 
			String tempFileDir = this.location + this.model.getIdentifier(); 
			File parentFileDir = new File(tempFileDir); 
			if (!parentFileDir.exists()) { 
				parentFileDir.mkdirs(); 
			} 
			// 分片处理时,前台会多次调用上传接口,每次都会上传文件的一部分到后台 
			File tempPartFile = new File(parentFileDir, this.model.getNumber() + ".part"); 
			try { 
				FileUtils.copyInputStreamToFile(this.model.getFile().getInputStream(), tempPartFile); 
			} catch (IOException e) { 
				// TODO Auto-generated catch block 
				// 移除错误文件块 
				tempPartFile.delete(); 
				// 打印堆栈信息 
				e.printStackTrace(); 
				 
				return Result.error("文件块上传异常").setDatas("chunk", this.model); 
			} 
 
			// 校验文件是否上传成功 
			long size = FileUtils.sizeOf(tempPartFile); 
			boolean target = this.model.getCurrentSize().equals(size); 
			if (target) { 
				return Result.ok("文件块上传成功").setDatas("chunk", this.model); 
			} else { 
				// 移除错误文件块 
				tempPartFile.delete(); 
				return Result.error("文件块上传异常").setDatas("chunk", this.model); 
			} 
 
		}  
		if(this.type.equalsIgnoreCase("merge")){ 
				File parentFileDir = new File(this.location + this.model.getIdentifier()); 
				if (parentFileDir.isDirectory()) { 
					File destTempFile = new File(this.location + "/merge", this.model.getFilename()); 
					if (!destTempFile.exists()) { 
						if(!destTempFile.getParentFile().exists()){ 
							// 先得到文件的上级目录,并创建上级目录,在创建文件, 
							destTempFile.getParentFile().mkdir();							 
						} 
						try { 
							// 创建文件 
							destTempFile.createNewFile(); // 上级目录没有创建,这里会报错 
						} catch (IOException e) { 
							// 输出堆栈信息 
							e.printStackTrace(); 
							return Result.error("文件合并异常").setDatas("file", this.model); 
						} 
					} 
					for (int i = 0; i < parentFileDir.listFiles().length; i++) { 
						File partFile = new File(parentFileDir, i + ".part"); 
						FileOutputStream destTempfos = new FileOutputStream(destTempFile, true); 
						// 遍历"所有分片文件"到"最终文件"中 
						FileUtils.copyFile(partFile, destTempfos); 
						destTempfos.close(); 
					} 
					// 删除临时目录中的分片文件 
					FileUtils.deleteDirectory(parentFileDir); 
					 
					// 校验文件是否完整 
					String marker = this.model.getIdentifier();  
					String md5 = UploadFileUtil.calcMD5(new File(this.location + "/merge/" + this.model.getFilename())); 
					if(md5.equalsIgnoreCase(marker)){ 
						return Result.ok("文件上传成功").setDatas("file", this.model); 
					} else { 
						if(destTempFile.exists()){ 
							// 移除合并文件 
							destTempFile.delete(); 
						} 
						return Result.error("文件上传异常").setDatas("file", this.model); 
					} 
				} 
		} 
		return Result.error("文件无法处理"); 
	} 
 
}

SpringBoot 配置同步线程池

import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
/** 
 * 同步线程池配置对象 
 * @author zzg 
 * 
 */ 
@Configuration 
public class ThreadConfig{ 
	// 日志记录 
	public static final Logger log = LoggerFactory.getLogger(ThreadConfig.class); 
 
	 
	// 文件上传初始化线程池大小 
	@Bean(value="uploadExecutor") 
	public ExecutorService getExecutor() { 
		return Executors.newFixedThreadPool(20); 
	} 
 
 
} 

SpringBoot 配置文件上传

import javax.servlet.MultipartConfigElement; 
import org.springframework.boot.web.servlet.MultipartConfigFactory; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.context.annotation.ImportResource; 
/** 
 * 文件上传配置参数 
 * @author zzg 
 * 
 */ 
@Configuration 
@ImportResource({"classpath:uploadFile.xml" }) 
public class UploadConfig { 
	/** 
     * 文件上传配置 
     * @return 
     */ 
    @Bean 
    public MultipartConfigElement multipartConfigElement() { 
        MultipartConfigFactory factory = new MultipartConfigFactory(); 
        //文件最大 
        factory.setMaxFileSize("50MB"); //KB,MB 
        /// 设置总上传数据总大小 
        factory.setMaxRequestSize("60MB"); 
        return factory.createMultipartConfig(); 
    } 
}
<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
	http://www.springframework.org/schema/beans/spring-beans.xsd"> 
  
	 
	<!-- 文件上传配置参数  --> 
	<bean id="uploadFileConfig" class="com.***.common.upload.file.UploadFileConfig"> 
		<property name="location" value="C:/data/upload_file/"/> 
	</bean> 
	 
	 
</beans>

Controller 层代码:

import java.io.File; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Future; 
import javax.servlet.http.HttpServletRequest; 
import javax.servlet.http.HttpServletResponse; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.stereotype.Controller; 
import org.springframework.web.bind.annotation.RequestBody; 
import org.springframework.web.bind.annotation.RequestMapping; 
import org.springframework.web.bind.annotation.RequestMethod; 
import org.springframework.web.bind.annotation.ResponseBody; 
import com.***.common.model.Result; 
import com.***.common.upload.file.ChunkInfoModel; 
import com.***.common.upload.file.UploadFileCallback; 
import com.***.common.upload.file.UploadFileConfig; 
import io.swagger.annotations.Api; 
import io.swagger.annotations.ApiOperation; 
import io.swagger.annotations.ApiParam; 
 
@Controller 
@RequestMapping("/api/file") 
@Api(value = "文件上传Controlle", tags = "文件上传操作服务") 
public class UploadFileController { 
	// 日志记录 
	public static final Logger log = LoggerFactory.getLogger(UploadFileController.class); 
	 
	// 公共文件存储目录 
	@Autowired 
	private UploadFileConfig config; 
	// 文件上传线程池 
	@Autowired 
	private ExecutorService uploadExecutor; 
 
	@ApiOperation(httpMethod = "POST", value = "文件块上传") 
	@RequestMapping(value = "/upload", method = { RequestMethod.POST }) 
	@ResponseBody 
	public Result upload( 
			ChunkInfoModel entity, 
			HttpServletRequest request, HttpServletResponse response) { 
 
			UploadFileCallback callback = new UploadFileCallback(entity, "upload", config.getLocation()); 
			Future<Result> result = this.uploadExecutor.submit(callback); 
			try { 
				return result.get(); 
			} catch (InterruptedException e) { 
				// TODO Auto-generated catch block 
				log.error(e.getMessage()); 
				e.printStackTrace(); 
			} catch (ExecutionException e) { 
				// TODO Auto-generated catch block 
				log.error(e.getMessage()); 
				e.printStackTrace(); 
			} 
		return Result.ok("文件块上传成功"); 
	} 
 
	@ApiOperation(httpMethod = "POST", value = "文件合并") 
	@RequestMapping(value = "/merge", method = { RequestMethod.POST }) 
	@ResponseBody 
	public Result merge(ChunkInfoModel entity) { 
		UploadFileCallback callback = new UploadFileCallback(entity, "merge", config.getLocation()); 
		Future<Result> result = this.uploadExecutor.submit(callback); 
		try { 
			return result.get(); 
		} catch (InterruptedException e) { 
			// TODO Auto-generated catch block 
			log.error(e.getMessage()); 
			e.printStackTrace(); 
		} catch (ExecutionException e) { 
			// TODO Auto-generated catch block 
			log.error(e.getMessage()); 
			e.printStackTrace(); 
		} 
		return Result.ok("文件上传成功"); 
	} 
	 
	 
	@ApiOperation(httpMethod = "POST", value = "文件上传记录") 
	@RequestMapping(value = "/find", method = { RequestMethod.POST }) 
	@ResponseBody 
	public Result find( 
			@RequestBody @ApiParam(name = "文件上传对象", value = "json格式对象", required = true) ChunkInfoModel entity) { 
		File parentFileDir = new File(config.getLocation() + entity.getIdentifier()); 
		// 已经上传文件块信息 
		List<String> numbers = new ArrayList<String>(); 
		// 已经上传文件大小 
		long uploadFileSize = 0; 
		 
		// 判断文件目录是否存储 
		if(parentFileDir.exists() && parentFileDir.isDirectory()){ 
			// 指定文件已经存在,用户已经上传指定文件,但是文件未上传完毕 
			File[] files = parentFileDir.listFiles(); 
			if(files != null && files.length > 0){ 
				for (int i = 0; i < files.length; i++) { 
					File file = files[i]; 
					String number = file.getName().split(".")[0]; 
					numbers.add(number); 
					uploadFileSize = uploadFileSize + file.length(); 
				} 
			} 
			 
		}  
		return Result.ok("文件上传记录").setDatas("numbers", numbers).setDatas("uploadFileSize", uploadFileSize); 
	} 
	 
}

 

声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

发表评论
搜索
排行榜
关注我们

一个IT知识分享的公众号