上一篇我们介绍了OSS的简单文件上传,同时也了解了简单上传的适用场景与局限性,为了应对它所带来的局限性我们这一篇文章将带着大家介绍OSS的分片文件上传。
如果对OSS文件上传还不了解的,推荐先看我的上一篇博客:OSS文件上传(一):简单上传-CSDN博客
大文件解决方案:分片上传
分片上传(Multipart Upload)是 OSS 专为大文件设计的上传方式,核心思想是将大文件拆分为多个小分片(Part),分别上传后再合并为完整文件。
1. 分片上传的核心流程
分片上传(Multipart Upload)分为以下三个步骤:
初始化一个分片上传事件。
调用ossClient.initiateMultipartUpload方法返回OSS创建的全局唯一的uploadId。
上传分片。
调用ossClient.uploadPart方法上传分片数据。
说明
对于同一个uploadId,分片号(PartNumber)标识了该分片在整个文件内的相对位置。如果使用同一个分片号上传了新的数据,则OSS上该分片已有的数据将会被覆盖。
OSS将收到的分片数据的MD5值放在ETag头内返回给用户。
OSS计算上传数据的MD5值,并与SDK计算的MD5值比较,如果不一致则返回InvalidDigest错误码。
完成分片上传。
所有分片上传完成后,调用ossClient.completeMultipartUpload方法将所有分片合并成完整的文件。
2.分片上传实现(基础版)
(1)依赖配置(Maven)
首先在pom.xml
中添加 OSS SDK 相关依赖:
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>3.17.4</version>
</dependency>
如果使用的是Java 9及以上的版本,则需要添加以下JAXB相关依赖:
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>
<!-- no more than 2.3.3-->
<dependency>
<groupId>org.glassfish.jaxb</groupId>
<artifactId>jaxb-runtime</artifactId>
<version>2.3.3</version>
</dependency>
(2)OSS 配置
在application.yml
中配置 OSS 连接信息:
aliyun:
oss:
endpoint: oss-cn-beijing.aliyuncs.com # 地域Endpoint
access-key-id: your-access-key-id
access-key-secret: your-access-key-secret
bucket-name: your-bucket-name # 存储桶名称
创建配置类初始化 OSS 客户端:
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClient;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "aliyun")
public class AliYunConfig {
private String accessKey; // 阿里云AccessKeyId
private String accessKeySecret; // 阿里云AccessKeySecret
private String ossBucket; // 存储桶名称
private String ossEndpoint; // OSS地域节点
@Bean
public OSS oSSClient() {
return new OSSClient(ossEndpoint, accessKey, accessKeySecret);
}
}
创建统一结果返回类:
package com.netflow.utils;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* 统一结果返回类
* @param <T> 返回数据的类型
*/
public class Msg<T> implements Serializable {
private static final long serialVersionUID = 1L;
// 状态码
private int code;
// 消息
private String message;
// 返回数据
private T data;
// 时间戳
private long timestamp;
// 附加数据(可选)
private Map<String, Object> extra;
// 构造方法
private Msg() {
this.timestamp = System.currentTimeMillis();
}
private Msg(int code, String message) {
this();
this.code = code;
this.message = message;
}
private Msg(int code, String message, T data) {
this(code, message);
this.data = data;
}
// 成功静态工厂方法
public static <T> Msg<T> success() {
return new Msg<>(200, "操作成功");
}
public static <T> Msg<T> success(String message) {
return new Msg<>(200, message);
}
public static <T> Msg<T> success(T data) {
return new Msg<>(200, "操作成功", data);
}
public static <T> Msg<T> success(String message, T data) {
return new Msg<>(200, message, data);
}
// 失败静态工厂方法
public static <T> Msg<T> fail() {
return new Msg<>(500, "操作失败");
}
public static <T> Msg<T> fail(String message) {
return new Msg<>(500, message);
}
public static <T> Msg<T> fail(int code, String message) {
return new Msg<>(code, message);
}
// 链式调用方法
public Msg<T> code(int code) {
this.code = code;
return this;
}
public Msg<T> message(String message) {
this.message = message;
return this;
}
public Msg<T> data(T data) {
this.data = data;
return this;
}
public Msg<T> extra(String key, Object value) {
if (this.extra == null) {
this.extra = new HashMap<>();
}
this.extra.put(key, value);
return this;
}
// Getter方法
public int getCode() {
return code;
}
public String getMessage() {
return message;
}
public T getData() {
return data;
}
public long getTimestamp() {
return timestamp;
}
public Map<String, Object> getExtra() {
return extra;
}
@Override
public String toString() {
return "Msg{" +
"code=" + code +
", message='" + message + '\'' +
", data=" + data +
", timestamp=" + timestamp +
", extra=" + extra +
'}';
}
}
(3)分片上传实现
/**
* 分片上传
* @param file
* @return
*/
@PostMapping("/uploads")
public Msg<String> uploadFiles(@RequestParam("file") MultipartFile file) {
String objectName = util(file);
try {
// 创建InitiateMultipartUploadRequest对象。
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(aliYunConfig.getOssBucket(), objectName);
// 创建ObjectMetadata并设置Content-Type。
ObjectMetadata metadata = new ObjectMetadata();
if (metadata.getContentType() == null) {
metadata.setContentType(Mimetypes.getInstance().getMimetype(file.getInputStream().toString(), objectName));
}
System.out.println("Content-Type: " + metadata.getContentType());
// 将metadata绑定到上传请求中。
request.setObjectMetadata(metadata);
// 初始化分片。
InitiateMultipartUploadResult upresult = ossClient.initiateMultipartUpload(request);
// 返回uploadId。
String uploadId = upresult.getUploadId();
// partETags是PartETag的集合。PartETag由分片的ETag和分片号组成。
List<PartETag> partETags = new ArrayList<PartETag>();
// 每个分片的大小,用于计算文件有多少个分片。单位为字节。
// 分片最小值为100 KB,最大值为5 GB。最后一个分片的大小允许小于100 KB。
// 设置分片大小为 1 MB。
final long partSize = 1 * 1024 * 1024L;
// 根据上传的数据大小计算分片数。以本地文件为例,说明如何通过File.length()获取上传数据的大小。
final File sampleFile = new File(file.getContentType());
long fileLength = file.getSize();
int partCount = (int) (fileLength / partSize);
if (fileLength % partSize != 0) {
partCount++;
}
// 遍历分片上传。
for (int i = 0; i < partCount; i++) {
long startPos = i * partSize;
long curPartSize = (i + 1 == partCount) ? (fileLength - startPos) : partSize;
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(aliYunConfig.getOssBucket());
uploadPartRequest.setKey(objectName);
uploadPartRequest.setUploadId(uploadId);
// 设置上传的分片流。
// 以本地文件为例说明如何创建FileInputStream,并通过InputStream.skip()方法跳过指定数据。
InputStream instream = file.getInputStream();
instream.skip(startPos);
uploadPartRequest.setInputStream(instream);
// 设置分片大小。
uploadPartRequest.setPartSize(curPartSize);
// 设置分片号。每一个上传的分片都有一个分片号,取值范围是1~10000,如果超出此范围,OSS将返回InvalidArgument错误码。
uploadPartRequest.setPartNumber(i + 1);
// 每个分片不需要按顺序上传,甚至可以在不同客户端上传,OSS会按照分片号排序组成完整的文件。
UploadPartResult uploadPartResult = ossClient.uploadPart(uploadPartRequest);
// 每次上传分片之后,OSS的返回结果包含PartETag。PartETag将被保存在partETags中。
partETags.add(uploadPartResult.getPartETag());
// 关闭流
instream.close();
}
// 创建CompleteMultipartUploadRequest对象。
// 在执行完成分片上传操作时,需要提供所有有效的partETags。OSS收到提交的partETags后,会逐一验证每个分片的有效性。当所有的数据分片验证通过后,OSS将把这些分片组合成一个完整的文件。
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(aliYunConfig.getOssBucket(), objectName, uploadId, partETags);
// 完成分片上传。
CompleteMultipartUploadResult completeMultipartUploadResult = ossClient.completeMultipartUpload(completeMultipartUploadRequest);
System.out.println("上传成功,ETag:" + completeMultipartUploadResult.getETag());
return Msg.success("分片上传成功");
} catch (OSSException oe) {
System.out.println("Caught an OSSException, which means your request made it to OSS, "
+ "but was rejected with an error response for some reason.");
System.out.println("Error Message:" + oe.getErrorMessage());
System.out.println("Error Code:" + oe.getErrorCode());
System.out.println("Request ID:" + oe.getRequestId());
System.out.println("Host ID:" + oe.getHostId());
} catch (ClientException | IOException ce) {
System.out.println("Caught a ClientException, which means the client encountered "
+ "a serious internal problem while trying to communicate with OSS, "
+ "such as not being able to access the network.");
System.out.println("Error Message:" + ce.getMessage());
} finally {
if (ossClient != null) {
ossClient.shutdown();
}
}
return Msg.fail("分片上传失败");
}
private String util(MultipartFile file){
//获取初始文件名
String fileName = file.getOriginalFilename();
//获取文件后缀
String suffixName = fileName.substring(fileName.lastIndexOf("."));
//UUID为前缀作为文件名
String newFileName = UUID.randomUUID().toString()+ "-"+ fileName + suffixName;
return newFileName;
}
3.元数据(Metadata)
在文件上传的上下文中,元数据(Metadata) 是关于文件本身的描述性信息,不包含文件的实际内容。它的作用类似于文件的 “标签” 或 “属性”,用于提供额外的上下文信息,帮助系统或用户更好地管理、检索和理解文件。
元数据的常见作用:
文件描述
- 存储文件名、大小、类型、创建时间等基本信息。
- 示例:
Content-Type: image/jpeg
、Content-Length: 123456
。
业务逻辑关联
- 记录与业务相关的属性,如用户 ID、部门、项目名称等。
- 示例:
user_id: 123
、project: marketing-campaign
。
权限控制
- 存储访问级别、加密信息或自定义权限规则。
- 示例:
access-level: confidential
、encryption: AES256
。
版本管理
- 记录文件版本、修订历史或 MD5 校验值。
- 示例:
version: 2.0
、md5: abc123...
。
在 initiateMultipartUpload
中的应用:
在文件上传 API 中,元数据通常作为参数传递,用于设置上传文件的属性。
总之:
元数据是文件管理的重要组成部分,它使系统能够:
- 自动化处理文件(如根据
Content-Type
选择解析器)。 - 实现高级搜索和过滤功能。
- 简化权限管理和审计。
- 优化缓存策略,提升性能。
4. 分片上传的优势
- 支持大文件:最大支持 48.8TB(单个分片最大 5GB,最多 10000 个分片);
- 并行上传:多线程同时上传多个分片,提高效率;
- 容错性:单个分片上传失败仅需重传该分片,无需整体重试。
5.分片上传的局限性:
虽然使用分片上传可以解决单个文件过大导致上传失败的问题,但同样它也存在一定的局限性:
1. 复杂度更高
- 实现成本:需要客户端和服务端协同管理分片,代码逻辑更复杂。
- 状态管理:需跟踪每个分片的上传状态(如已上传、失败、待重试)。
2. 额外资源消耗
- 存储开销:服务端需临时保存所有分片,直到合并完成。
- 元数据管理:需维护分片与文件的映射关系(如 Redis 或数据库)。
3. 最终一致性问题
- 原子性挑战:分片合并是异步操作,合并失败可能导致文件不完整。
- 垃圾回收:未完成的上传需定期清理分片,避免占用存储空间。
4. 网络依赖更强
- 重试成本:单个分片失败需单独重试,可能影响整体速度。
- 顺序性:部分实现要求按序上传或合并分片,增加等待时间。
5. 性能瓶颈
- 并发限制:客户端或服务端可能限制并发上传的分片数量。
- 合并延迟:大文件的分片合并可能耗时较长。
思考:
在文件分片上传的过程中,出现故障导致上传失败,在下次上传的过程中是从头开始重新进行一次分片上传,还是从出故障的地方继续进行上传?这涉及到断点续传的问题,感兴趣的朋友可以参考我的另一篇博客