什么是断点续传

简单来说断点续传指的是文件在上传或下载的过程中,由于网络差断开了,那么下次上传或下载时应该从断点处开始。

怎么实现

  1. 前端对文件进行分块
  2. 前端使用多线程一块一块上传,上传前给服务端发一个消息检验该分块是否上传,如果已上传则不再上传。
  3. 等所有分块上传完毕,服务端合并所有分块,校验文件的完整性。(
    因为分块全部上传到了服务器,服务器将所有分块按顺序进行合并,就是写每个分块文件内容按顺序依次写入一个文件中。)
  4. 前端给服务传一个md5值,服务端合并文件后计算合并后的文件的md5是否一样,一样说明完整,否则不完整,需要重新上传。

此外针对文件上传一半不传了,之前上传到的minio 分块文件需要清理
1. 在数据库中记录minio存储的文件信息的文件表
2. 文件开始上传时写入文件表,状态为上传中,上传成功更新状态为上传完成
3. 当一个文件传了一半不在上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio 中没有上传成功的文件目录。

需要注意的是 minio 合并 每一分块文件最少是5MB 否则会报错,此外上传分块的文件如果大于1MB,SpringBoot不允许 默认是1MB 因此通过以下配置修改为50MB。

1
2
3
4
5
6
#文件上传大小默认1M 改为50M
spring:
servlet:
multipart:
max-file-size: 50MB
maxrequest-size: 50MB

大文件分块 合并 本地单元测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package com.jhj.media;

import io.minio.ComposeObjectArgs;
import io.minio.ComposeSource;
import io.minio.MinioClient;
import io.minio.UploadObjectArgs;
import io.minio.errors.*;
import org.apache.commons.codec.digest.DigestUtils;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* @author jhj
* @version 1.0.0
* @ClassName BigFileTest.java
* @Description TODO
* @createTime 2024年06月23日 00:35:00
*/
public class BigFileTest {
MinioClient minioClient = MinioClient
.builder()
.endpoint("http://192.168.56.200:49160")
.credentials("minio", "12345678")
.build();

//分块测试
@Test
public void testChunk() throws IOException {
//源文件
File sourceFile = new File("G:\\1.mp4");
//分块文件存储路径
String chunkFilePath = "G:\\chunk\\";
//分块文件大小
int chunkSize = 1024 * 1024 * 5;
//分块文件的个数
int chunkNum = (int) Math.ceil(sourceFile.length() * 1.0 / chunkSize);
//使用流对源文件读数据,向分块文件中写数据
RandomAccessFile raf_r = new RandomAccessFile(sourceFile, "r");
//缓存区 临时存文件数据的
byte[] bytes = new byte[1024];
for (int i = 0; i < chunkNum; i++) {
File chunkFile = new File(chunkFilePath + i);
//分块文件的写入流
RandomAccessFile raf_rw = new RandomAccessFile(chunkFile, "rw");
int len = -1;
while ((len = raf_r.read(bytes)) != -1) {
raf_rw.write(bytes, 0, len);
if (chunkFile.length() >= chunkSize) {
break;
}
}
raf_rw.close();
}
raf_r.close();
}

@Test
public void testMerge() throws IOException {
//分块路径
String chunkFilePath = "G:\\chunk\\";
File chunkFolder = new File(chunkFilePath);
//源文件
File sourceFile = new File("G:\\1.mp4");
//合并后的文件
File mergeFile = new File("G:\\2.mp4");
//取出所有的分块文件
File[] files = chunkFolder.listFiles();
//将数组转为list
List<File> fileList = Arrays.asList(files);
Collections.sort(fileList, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
return Integer.parseInt(o1.getName()) -
Integer.parseInt(o2.getName());
}
});
RandomAccessFile raf_rw = new RandomAccessFile(mergeFile, "rw");
byte[] bytes = new byte[1024];
for (File file : fileList) {
//读
RandomAccessFile raf_r = new RandomAccessFile(file, "r");
int len = -1;
while ((len = raf_r.read(bytes)) != -1) {
raf_rw.write(bytes, 0, len);
}
raf_r.close();
}
raf_rw.close();

//合并文件完成,进行校验,判断有没有丢包
String s = DigestUtils.md5Hex(new FileInputStream(mergeFile));
String s1 = DigestUtils.md5Hex(new FileInputStream(sourceFile));
if (s.equals(s1)) {
System.out.println("合并成功");
}
}

//将分块文件上传到minio
@Test
public void uploadChunk() throws IOException, ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
for (int i = 0; i <= 254; i++) {
minioClient.uploadObject(
UploadObjectArgs.builder()
.bucket("testbucket")
.filename("G:/chunk/" + i)//指定本地文件路径
.object("chunk/" + i) //桶下路径
.build()
);
System.out.println("上传分块" + i + "成功");
}
}

//调用minio 接口合并分块
@Test
public void testMergeFile() throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
//指定分块信息
List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(255).map(i -> {
return ComposeSource.builder().bucket("testbucket").object("chunk/" + i).build();
}).collect(Collectors.toList());

//合并信息 minio默认合并大小为5M
ComposeObjectArgs testbucket = ComposeObjectArgs.builder().bucket("testbucket")
.object("merge01.mp4")
.sources(sources)
.build();
//合并文件
minioClient.composeObject(testbucket);
}

}

按照业务要求 前端分块 检查分块是否上传 上传分块 合并分块 检验完整性 清除分块 service 实现断点续传

保证文件只被上传一次利用 加密流的md5作为唯一值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
package com.jhj.media.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.j256.simplemagic.ContentInfo;
import com.j256.simplemagic.ContentInfoUtil;
import com.jhj.base.exception.UltimateException;
import com.jhj.base.model.PageParams;
import com.jhj.base.model.PageResult;
import com.jhj.base.model.RestResponse;
import com.jhj.media.mapper.MediaFilesMapper;
import com.jhj.media.model.dto.QueryMediaParamsDto;
import com.jhj.media.model.dto.UploadFileParamsDto;
import com.jhj.media.model.dto.UploadFileResultDto;
import com.jhj.media.model.po.MediaFiles;
import com.jhj.media.service.MediaFileService;
import io.minio.*;
import io.minio.messages.DeleteError;
import io.minio.messages.DeleteObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.compress.utils.IOUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.*;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* @author jhj
* @version 1.0
* @description TODO
* @date 2022/9/10 8:58
*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {
@Autowired
MediaFilesMapper mediaFilesMapper;

@Autowired
MinioClient minioClient;

@Autowired
@Lazy
MediaFileService currentProxy;

//存储普通文件
@Value("${minio.bucket.files}")
private String bucket_mediafiles;

//存储视频
@Value("${minio.bucket.videofiles}")
private String bucket_video;


@Override
public PageResult<MediaFiles> queryMediaFiels(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto) {

//构建查询条件对象
LambdaQueryWrapper<MediaFiles> queryWrapper = new LambdaQueryWrapper<>();

//分页对象
Page<MediaFiles> page = new Page<>(pageParams.getPageNo(), pageParams.getPageSize());
// 查询数据内容获得结果
Page<MediaFiles> pageResult = mediaFilesMapper.selectPage(page, queryWrapper);
// 获取数据列表
List<MediaFiles> list = pageResult.getRecords();
// 获取数据总数
long total = pageResult.getTotal();
// 构建结果集
PageResult<MediaFiles> mediaListResult = new PageResult<>(list, total, pageParams.getPageNo(), pageParams.getPageSize());
return mediaListResult;

}

@Override
//为什么不在这里加事务 是因为防止minio 网络连接时间长而导致数据库连接超时 占用数据库事务资源
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, String localFilePath) {
//文件名
String filename = uploadFileParamsDto.getFilename();
//拿到扩展名
String extension = filename.substring(filename.lastIndexOf("."));
//拿到mimeType
String mimeType = getMimeType(extension);
//文件的md5
String fileMd5 = getFileMd5(new File(localFilePath));
//目录
String defaultFolderPath = getDefaultFolderPath();
String objectName = defaultFolderPath + fileMd5 + extension;
//将文件上传Minio
boolean result = addMediaFilesToMinio(localFilePath, mimeType, bucket_mediafiles, objectName);
if(!result){
UltimateException.cast("上传文件失败");
}

//将文件信息保存到数据库
MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_mediafiles, objectName);
if(mediaFiles==null){
UltimateException.cast("文件上传后,保存信息失败");
}

//返回对象
UploadFileResultDto uploadFileResultDto = new UploadFileResultDto();
BeanUtils.copyProperties(mediaFiles,uploadFileResultDto);
return uploadFileResultDto;
}

//根据扩展名获取mimeType
private String getMimeType(String extension) {
if (extension == null) {
extension = "";
}
ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);
String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE; //通用mimeType
if (extensionMatch != null) {
mimeType = extensionMatch.getMimeType();
}
return mimeType;
}

/**
* 将文件上传到minio
* @param localFilePath 本地路径
* @param mimeType 媒体类型
* @param bucket 桶
* @param objectName 文件名
* @return
*/
private boolean addMediaFilesToMinio(String localFilePath, String mimeType, String bucket, String objectName) {
try {
minioClient.uploadObject(
UploadObjectArgs.builder()
.bucket(bucket)
.filename(localFilePath)
.object(objectName)
.contentType(mimeType)
.bucket(bucket).build()
);
log.debug("上传文件到minio成功,bucket:{},objectName:{}",bucket,objectName);
return true;
} catch (Exception e) {
e.printStackTrace();
log.error("上传文件出错,bucket:{},objectName:{},错误信息:{}",bucket,objectName,e.getMessage());
}
return false;
}

private String getDefaultFolderPath(){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String folder = sdf.format(new Date()).replace("-", "/") + "/";
return folder;
}

private String getFileMd5(File file){
try(FileInputStream fileInputStream=new FileInputStream(file)){
String fileMd5 = DigestUtils.md5Hex(fileInputStream);
return fileMd5;
}catch (Exception e){
e.printStackTrace();
return null;
}
}

/**
* 将文件信息保存到数据库
* @param companyId 机构id
* @param fileMd5 md5
* @param uploadFileParamsDto 文件信息
* @param bucket 桶信息
* @param objectName 路径
* @return
*/
@Transactional
//事务只能用于public 方法 原理是Aop jdk cglib 两种方式 访问不了private
//注意看是不是被代理对象调用 如果不是 则事务不会生效
//同serive 调用同类的事务方法 事务是无法控制的 因为用的是this 并不是代理对象
//注入进去的都是代理对象 this就是对象本身 解决办法 把service 在本类中进行注入
public MediaFiles addMediaFilesToDb(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto,String bucket,String objectName){
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
if (mediaFiles == null){
mediaFiles=new MediaFiles();
BeanUtils.copyProperties(uploadFileParamsDto,mediaFiles);
//文件id
mediaFiles.setId(fileMd5);
mediaFiles.setFileId(fileMd5);
//机构id
mediaFiles.setCompanyId(companyId);
//桶
mediaFiles.setBucket(bucket);
//file_path
mediaFiles.setFilePath(objectName);
//url
mediaFiles.setUrl("/"+bucket+"/"+objectName);
//上传时间
mediaFiles.setCreateDate(LocalDateTime.now());
//状态
mediaFiles.setStatus("1");
//审核状态
mediaFiles.setAuditStatus("002003");
//插入数据库
int insert = mediaFilesMapper.insert(mediaFiles);
int i=1/0;
if(insert<0){
log.error("向数据库保存文件失败,bucket:{},objectName:{}",bucket,objectName);
return null;
}
return mediaFiles;
}
return mediaFiles;
}

@Override
public RestResponse<Boolean> checkFile(String fileMd5) {
//先查询数据库
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
//如果数据库存在再查询 minio
if(mediaFiles!=null){
String bucket = mediaFiles.getBucket();
String filePath = mediaFiles.getFilePath();
GetObjectArgs getObjectArgs = GetObjectArgs.builder()
.bucket(bucket)
.object(filePath)
.build();

try {
FilterInputStream inputStream = minioClient.getObject(getObjectArgs);
if(inputStream!=null){
//文件已经存在了
return RestResponse.success(true);
}
} catch (Exception e) {
e.printStackTrace();
}
}
return RestResponse.success(false);
}

@Override
public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) {
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5)+chunkIndex;
//md5前两位为两个目录,chunk存储分块文件
GetObjectArgs getObjectArgs = GetObjectArgs.builder()
.bucket(bucket_video)
.object(chunkFileFolderPath)
.build();
try {
FilterInputStream inputStream = minioClient.getObject(getObjectArgs);
if(inputStream!=null){
//文件已经存在了
return RestResponse.success(true);
}
} catch (Exception e) {
e.printStackTrace();
}
return RestResponse.success(false);
}

@Override
public RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {
String mimeType = getMimeType(null);
boolean b = addMediaFilesToMinio(localChunkFilePath, mimeType, bucket_video, getChunkFileFolderPath(fileMd5) + chunk);
if(!b){
return RestResponse.validfail("上传分块文件失败",false);
}
return RestResponse.success(true);
}

@Override
public RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {
// 找到分块文件调用minio的sdk进行文件合并
// 分块文件所在目录
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
// 找到所有的分块文件
List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> {
return ComposeSource.builder().bucket(bucket_video).object(chunkFileFolderPath + i).build();
}).collect(Collectors.toList());

String filename = uploadFileParamsDto.getFilename();
String extension = filename.substring(filename.lastIndexOf("."));
//合并后的文件名
String filePathByMd5 = getFilePathByMd5(fileMd5, extension);
// 合并信息 minio默认合并大小为5M
ComposeObjectArgs testbucket = ComposeObjectArgs.builder().bucket(bucket_video)
.object(filePathByMd5) //合并后的文件
.sources(sources)
.build();
//合并文件
try {
minioClient.composeObject(testbucket);
} catch (Exception e) {
e.printStackTrace();
log.error("合并文件出错,bucket:{},onjectName:{},错误信息:{}",bucket_video,filePathByMd5,e.getMessage());
return RestResponse.validfail("合并文件异常",false);
}
//校验合并后的和源文件是否一致,视频上传才成功
File file = downloadFileFromMinIo(bucket_video, filePathByMd5);
try(FileInputStream fileInputStream = new FileInputStream(file)){
String mergeFile_md5 = DigestUtils.md5Hex(fileInputStream);
//比较原始md5和合并后的
if(!fileMd5.equals(mergeFile_md5)){
log.error("校验合并文件md5值不一致,原始文件:{},合并文件:{}",fileMd5,mergeFile_md5);
return RestResponse.validfail("文件校验失败",false);
}
//文件大小
uploadFileParamsDto.setFileSize(file.length());
}catch (Exception e){
return RestResponse.validfail("文件校验失败",false);
}
//将文件信息入库
MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_video, filePathByMd5);
if(mediaFiles==null){
return RestResponse.validfail("文件入库失败",false);
}
//清理分块文件
clearChunkFiles(chunkFileFolderPath,chunkTotal);

return RestResponse.success(true);
}

//得到分块目录
private String getChunkFileFolderPath(String fileMd5){
return fileMd5.substring(0,1)+"/"+fileMd5.substring(1,2)+"/"+fileMd5+"/"+"chunk"+"/";
}

//得到合并后的文件名
private String getFilePathByMd5(String fileMd5,String fileExt){
return fileMd5.substring(0,1)+"/"+fileMd5.substring(1,2)+"/"+fileMd5+"/"+fileMd5+fileExt;
}

//下载minio 文件
public File downloadFileFromMinIo(String bucket,String objectName){
File minioFile=null;
FileOutputStream outputStream=null;
try {
InputStream inputStream = minioClient.getObject(GetObjectArgs.builder().bucket(bucket).object(objectName).build());
minioFile=File.createTempFile("minio",".merge");
outputStream = new FileOutputStream(minioFile);
IOUtils.copy(inputStream,outputStream);
return minioFile;
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return minioFile;
}

//清除分块文件
private void clearChunkFiles(String chunkFileFolderPath, int chunkTotal){
Iterable<DeleteObject> objects=Stream.iterate(0,i->++i).limit(chunkTotal).map(i->{
return new DeleteObject(chunkFileFolderPath+i);
}).collect(Collectors.toList());
RemoveObjectsArgs removeObjectArgs = RemoveObjectsArgs.builder().bucket(bucket_video).objects(objects).build();
Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectArgs);
//要想真正的删除
results.forEach(f->{
try {
DeleteError deleteError = f.get();
} catch (Exception e){
e.printStackTrace();
}
});
}
}

作者声明

1
如有问题,欢迎指正!