Minio 入门系列【16】Minio 分片上传文件 putObject 接口流程源码分析


本文由 简悦 SimpRead 转码, 原文地址 blog.csdn.net

前言

为了更好的理解和优化 Minio 文件上传,本篇文档对 MInio 中上传文件 putObject 接口源码分析以下。

基于 Java 客户端 API

Controller 层上传文件接口:

@PostMapping("/upload")
    @ResponseBody
    public Object upload(MultipartFile file, String bucketName) throws IOException {
        return minioTemplate.putObject(file.getInputStream(), bucketName, file.getOriginalFilename());
    }

MinioTemplate 接口:

minioClient.putObject(
                    PutObjectArgs.builder().bucket(bucketName).object(uuidFileName).stream(
                            inputStream, inputStream.available(), -1)
                            .build());

源码分析

1. 进入 Controller 层接口

首先我在页面上上传了一个 9M 左右的文件:

文件上传,经过 Tomcat 服务器进行处理,然后到达我们的 Controller 层上传文件接口,我们使用的是 MultipartFile 对象来接受文件,可以看到当前 MultipartFile 对象存放了文件相关信息,而此时实际的文件是由 Tomcat 存放在硬盘临时目录的。

MultipartFile 实际的对象是 StandardMultipartHttpServletRequest 的实例,他包含了 ApplicationPart 对象,ApplicationPart 包含了图片中的文件信息。

接收到对象后,调用的就是 MinioTemplate,这里传入了各种参数:

minioTemplate.putObject(file.getInputStream(), bucketName, file.getOriginalFilename());

最重要的是传入了一个 InputStream,调用的是 MultipartFile 对象的 getInputStream() 方法。

getInputStream 获取输入流,是调用 ApplicationPart 中的 DiskFileItem 对象的 getInputStream() 方法。这个方法会将临时文件,直接转为 FileInputStream 并返回。

public InputStream getInputStream() throws IOException {
        if (!this.isInMemory()) {
            // 直接将临时文件转为输入流
            return new FileInputStream(this.dfos.getFile());
        } else {
            if (this.cachedContent == null) {
                this.cachedContent = this.dfos.getData();
            }

            return new ByteArrayInputStream(this.cachedContent);
        }
    }

2. 构建参数对象 PutObjectArgs(参数校验,分片)

InputStream 获取到了以后,接着就是调用 MinioTemplate 中的 putObject 方法了。

minioClient.putObject(
                    PutObjectArgs.builder().bucket(bucketName).object(uuidFileName).stream(
                            inputStream, inputStream.available(), -1)
                            .build());

putObject 方法实际调用的就是 MinioClient 的 putObject,调用之前会创建 PutObjectArgs 参数对象,使用的是建造者模式。

PutObjectArgs 首先会对存储桶名称进行校验,所以创建存储桶名称时,要格外注意。

protected void validateBucketName(String name) {
            // 非空校验
            this.validateNotNull(name, "bucket name");
            // 1. 校验长度,3-63之间
            if (name.length() >= 3 && name.length() <= 63) {
                String msg;
                // 2. 不能包含“..”
                if (name.contains("..")) {
                    msg = "bucket name cannot contain successive periods. For more information refer http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html";
                    throw new IllegalArgumentException(name + " : " + msg);
                    // 3. 正则校验
                } else if (!name.matches("^[a-z0-9][a-z0-9\\.\\-]+[a-z0-9]$")) {
                    msg = "bucket name does not follow Amazon S3 standards. For more information refer http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html";
                    throw new IllegalArgumentException(name + " : " + msg);
                }
            } else {
                throw new IllegalArgumentException(name + " : bucket name must be at least 3 and no more than 63 characters long");
            }
        }

然后对对象名称进行校验:

protected void validateObjectName(String name) {
            // 1. 非空和Null校验
            this.validateNotEmptyString(name, "object name");
            String[] var2 = name.split("/"); // 按照反斜杠分割为字符串数组
            int var3 = var2.length;
            //  2. 循环字符串数组,校验每个斜杠分割的字段不能是“.”或者“..”
            for(int var4 = 0; var4 < var3; ++var4) {
                String token = var2[var4];
                if (token.equals(".") || token.equals("..")) {
                    throw new IllegalArgumentException("object name with '.' or '..' path segment is not supported");
                }
            }
        }

最后对 InputStream 进行构建。

public PutObjectArgs.Builder stream(InputStream stream, long objectSize, long partSize) {            
            // 1. 非空
            this.validateNotNull(stream, "stream");
            // 2. 获取分片数,5M分割
            long[] partinfo = this.getPartInfo(objectSize, partSize);
            long pSize = partinfo[0]; // 分片大小 5M=5242880字节
            int pCount = (int)partinfo[1]; // 分片数,这里上传的9m文件,所以有两片
            // 3. 将FileInputStream=》BufferedInputStream
            BufferedInputStream bis = stream instanceof BufferedInputStream ? (BufferedInputStream)stream : new BufferedInputStream(stream);
            // 4. 将这些参数添加到PutObjectArgs对象中
            return this.setStream(bis, objectSize, pSize, pCount);
        }

在构建 InputStream 时,会进行分片操作,我们可以了解到上传文件大小的一些限制:

  • 分片大小不能小于 5MB,大于 5GB
  • 对象大小不能超过 5TiB
  • partSize 传入 - 1,默认按照 5MB 进行分割
  • 分片数量不能超过 10000

分片规则如下:

// 参数为 文件大小objectSize、分片大小partSize,分片数我们传入的是-1,表示使用默认配置
        protected long[] getPartInfo(long objectSize, long partSize) {
            // 1. 校验大小,如果设置的分片大小 小于5M或者大于5GB,报错不支持
            //  对象大小超过5TiB,报错不支持
            this.validateSizes(objectSize, partSize);
            if (objectSize < 0L) {
                return new long[]{partSize, -1L};
            } else {
                // 2. 没有设置分片数据大小,怎按照默认的5M进行分割
                if (partSize <= 0L) {
                    double dPartSize = Math.ceil((double)objectSize / 10000.0D);
                    dPartSize = Math.ceil(dPartSize / 5242880.0D) * 5242880.0D;
                    partSize = (long)dPartSize;
                }
                if (partSize > objectSize) {
                    partSize = objectSize;
                }

                long partCount = partSize > 0L ? (long)Math.ceil((double)objectSize / (double)partSize) : 1L;
                // 3. 分片数量不能超过10000
                if (partCount > 10000L) {
                    throw new IllegalArgumentException("object size " + objectSize + " and part size " + partSize + " make more than " + 10000 + "parts for upload");
                } else {
                    // 4. 返回一个数组,第一个值为分片数据大小,第二个为分片数量
                    return new long[]{partSize, partCount};
                }
            }
        }

最终构建的 PutObjectArgs 对象如下:

该对象包含了文件流、对象名、分片信息等重要数据。

3. 进入 MinioClient(上传分片、合并)

接着进入到 MinioClient 的 putObject 方法:

public ObjectWriteResponse putObject(PutObjectArgs args) throws ErrorResponseException, InsufficientDataException, InternalException, InvalidKeyException, InvalidResponseException, IOException, NoSuchAlgorithmException, ServerException, XmlParserException {
        // 1. 检查参数是否为Null
        this.checkArgs(args);
        // 2. 校验是否开启了SSE加密,如果开启了SSE,而不是Https请求则报错
        args.validateSse(this.baseUrl);
        // 3. 执行上传文件
        return this.putObject(args, args.stream(), args.objectSize(), args.partSize(), args.partCount(), args.contentType());
    }

接着调用重载的 putObject 方法,会进行分块创建 =》分块上传 =》合并文件流程操作。

protected ObjectWriteResponse putObject(PutObjectBaseArgs args, Object data, long objectSize, long partSize, int partCount, String contentType) throws ErrorResponseException, InsufficientDataException, InternalException, InvalidKeyException, InvalidResponseException, IOException, NoSuchAlgorithmException, ServerException, XmlParserException {
        // 1.设置消息头
        Multimap<String, String> headers = this.newMultimap(args.extraHeaders());
        headers.putAll(args.genHeaders());
        // 2. 设置 Content-Type
        if (!headers.containsKey("Content-Type")) {
            headers.put("Content-Type", contentType);
        }
        String uploadId = null;
        Part[] parts = null;
        // 3. 创建块读取对象
        PartReader partReader = this.newPartReader(data, objectSize, partSize, partCount);
        if (partReader == null) {
            throw new IllegalArgumentException("data must be RandomAccessFile or InputStream");
        } else {
            try {
                while(true) {
                    // 4. 分块操作,并返回块对象
                    PartSource partSource = partReader.getPart(!this.baseUrl.isHttps());
                    if (partSource == null) {
                        // 没有分片时,表示分片全部上传成功,执行合并文件操作。
                        return this.completeMultipartUpload(args.bucket(), args.region(), args.object(), uploadId, parts, (Multimap)null, (Multimap)null);
                    }
                    // 5. 如果对象只有一块,也就是5MB之内,执行上传
                    if (partReader.partCount() == 1) {
                        return this.putObject(args.bucket(), args.region(), args.object(), partSource, headers, args.extraQueryParams());
                    }
                    
                    if (uploadId == null) {
                        // 6. 执行分块上传请求,返回uploadId
                        CreateMultipartUploadResponse response = this.createMultipartUpload(args.bucket(), args.region(), args.object(), headers, args.extraQueryParams());
                        uploadId = response.result().uploadId();
                        parts = new Part[10000];
                    }

                    Map<String, String> ssecHeaders = null;
                    if (args.sse() != null && args.sse() instanceof ServerSideEncryptionCustomerKey) {
                        ssecHeaders = args.sse().headers();
                    }
                    // 7. 根据创建的请求,正式执行上传分片的操作
                    int partNumber = partSource.partNumber();
                    UploadPartResponse response = this.uploadPart(args.bucket(), args.region(), args.object(), partSource, partNumber, uploadId, ssecHeaders != null ? Multimaps.forMap(ssecHeaders) : null, (Multimap)null);
                    String etag = response.etag();
                    parts[partNumber - 1] = new Part(partNumber, etag);
                }
            } catch (RuntimeException var18) {
                if (uploadId != null) {
                    this.abortMultipartUpload(args.bucket(), args.region(), args.object(), uploadId, (Multimap)null, (Multimap)null);
                }

                throw var18;
            } catch (Exception var19) {
                if (uploadId != null) {
                    this.abortMultipartUpload(args.bucket(), args.region(), args.object(), uploadId, (Multimap)null, (Multimap)null);
                }

                throw var19;
            }
        }
    }

4. 创建分片

putObject 方法首先会创建 PartReader 块读取对象:

private PartReader newPartReader(Object data, long objectSize, long partSize, int partCount) {        
        // 1. 如果是RandomAccessFile(RandomAccessFile允许自由定义文件记录指针,
        // RandomAccessFile可以不从开始的地方开始输出,因此RandomAccessFile可以向已存在的文件后追加内容。)
        // 创建RandomAccessFile类型的PartReader 
        if (data instanceof RandomAccessFile) {
            return new PartReader((RandomAccessFile)data, objectSize, partSize, partCount);
        } else {
            // 2. 创建不同输入流的PartReader 对象
            return data instanceof InputStream ? new PartReader((InputStream)data, objectSize, partSize, partCount) : null;
        }
    }

PartReader 包含了文件数据流及分片信息。

接着进入一个死循环,PartReader 会获取 PartSource 块对象:

public PartSource getPart(boolean computeSha256) throws NoSuchAlgorithmException, IOException {
        if (this.partNumber == this.partCount) {
            return null;
        } else {
            // 1. 获取分片,从第一个开始获取
            ++this.partNumber;
            MessageDigest md5 = MessageDigest.getInstance("MD5"); // MD5 加密对象
            MessageDigest sha256 = computeSha256 ? MessageDigest.getInstance("SHA-256") : null; // SHA-256加密对象
            long partSize = this.partSize; // 分片大小 5MB
            if (this.partNumber == this.partCount) { // 判断当前分片是不是最后一个分片
                partSize = this.objectSize - this.totalDataRead;
            }
            // 2. 使用算法读取分块数据
            long bytesRead = this.read(partSize, md5, sha256);
            this.totalDataRead += bytesRead;
            if (this.objectSize < 0L && this.eof) {
                this.partCount = this.partNumber;
            }
            // 3. 加密
            String md5Hash = Base64.getEncoder().encodeToString(md5.digest());
            String sha256Hash = null;
            if (computeSha256) {
                sha256Hash = BaseEncoding.base16().encode(sha256.digest()).toLowerCase(Locale.US);
            }
            // 4. 返回PartSource对象
            return this.file != null ? new PartSource(this.partNumber, this.file, bytesRead, md5Hash, sha256Hash) : new PartSource(this.partNumber, this.buffers, bytesRead, md5Hash, sha256Hash);
        }
    }

每个 PartSource 对象,就对应一个块对象,其中包含了块数据和加密返回的签名。

5. 创建分片请求(获取 uploadId)

createMultipartUpload 方法会创建分块请求,根据对象名和存储桶名去 Minio 获取上传当前对象的 uploadId。

protected CreateMultipartUploadResponse createMultipartUpload(String bucketName, String region, String objectName, Multimap<String, String> headers, Multimap<String, String> extraQueryParams) throws NoSuchAlgorithmException, InsufficientDataException, IOException, InvalidKeyException, ServerException, XmlParserException, ErrorResponseException, InternalException, InvalidResponseException {
        // 构建请求参数
        Multimap<String, String> queryParams = this.newMultimap(extraQueryParams);
        queryParams.put("uploads", "");
        Multimap<String, String> headersCopy = this.newMultimap(headers);
        if (!headersCopy.containsKey("Content-Type")) {
            headersCopy.put("Content-Type", "application/octet-stream");
        }
        // 执行HTTP请求
        Response response = this.execute(Method.POST, bucketName, objectName, this.getRegion(bucketName, region), this.httpHeaders(headersCopy), queryParams, (Object)null, 0);
        Throwable var9 = null;

        CreateMultipartUploadResponse var11;
        try {
            // 解析返回结果
            InitiateMultipartUploadResult result = (InitiateMultipartUploadResult)Xml.unmarshal(InitiateMultipartUploadResult.class, response.body().charStream());
            var11 = new CreateMultipartUploadResponse(response.headers(), bucketName, region, objectName, result);
        }

uploadId 在循环中使用的都是同一个,说明分片上传的时候都会使用同一个 uploadId,最后合并同一个 uploadId 的文件。

6. 上传分片

获取到了 uploadId 以后,就会执行上传操作,调用 uploadPart 方法,uploadPart 最终也是调用 execute,可以看到该方法,是调用的 OkHttpClient 去执行的。

protected Response execute(Method method, String bucketName, String objectName, String region, Headers headers, Multimap<String, String> queryParamMap, Object body, int length) throws XmlParserException {
        //......
        // 构建URL :http://127.0.0.1:9000/pearl-buckent/files/2021-10-26/d9e9d6fc-73fc-4323-b317-b8b26b6b6fe0_apache-maven-3.6.2-bin.zip?uploadId=70174335-85ec-47c6-acaf-afa12c8add48&partNumber=2
        HttpUrl url = this.buildUrl(method, bucketName, objectName, region, queryParamMap);
        // 省略构建其他对象
        // 调用 httpClient执行上传文件
        Response response = httpClient.newCall(request).execute();
        // 处理响应,异常处理等。
        ResponseBody responseBody;
        if (response.isSuccessful()) {
           
                     // 省略大量代码.....
            }
        }
    }

合并文件

分片的数据都上传后,进入到 completeMultipartUpload 方法,在这个方法执行之前,在 Minio 控制台是看不到上传对象的。

这个方法传入了文件对象名,uploadID 等,

该方法最终也是执行的 execute,使用 httpclient 去调用的 Minio 服务器合并分片,最后完成了分片上传操作。之后 Tomcat 回调,完成清理临时文件等操作,最后返回信息给前端,也对应了整个 Servlet 请求响应的整个流程。

简单流程图


yg9538 2022年9月4日 09:11 1976 收藏文档