首页 > 其他分享 >【Azure 存储服务】使用 AppendBlobClient 对象实现对Blob进行追加内容操作

【Azure 存储服务】使用 AppendBlobClient 对象实现对Blob进行追加内容操作

时间:2023-05-13 22:31:59浏览次数:43  
标签:core java reactor publisher AppendBlobClient Blob Azure data blob

问题描述

在Azure Blob的官方示例中,都是对文件进行上传到Blob操作,没有实现对已创建的Blob进行追加的操作。如果想要实现对一个文件的多次追加操作,每一次写入的时候,只传入新的内容?

 

问题解答

Azure Storage Blob 有三种类型: Block Blob, Append Blob 和 Page Blob。其中,只有Append Blob类型支持追加(Append)操作。并且Blob类型在创建时就已经确定,无法后期修改。

【Azure 存储服务】使用 AppendBlobClient 对象实现对Blob进行追加内容操作_Storage

 

在查看Java Storage SDK后,发现可以使用AppendBlobClient来实现。

/**
     * Creates a new {@link AppendBlobClient} associated with this blob.
     *
     * @return A {@link AppendBlobClient} associated with this blob.
     */
    public AppendBlobClient getAppendBlobClient() {
        return new SpecializedBlobClientBuilder()
            .blobClient(this)
            .buildAppendBlobClient();
    }

在 AppendBlobClient 类,有 appendBlock 和 appendBlockWithResponse 等多种方法来实现追加。方法定义源码如下:

【Azure 存储服务】使用 AppendBlobClient 对象实现对Blob进行追加内容操作_Storage_02

【Azure 存储服务】使用 AppendBlobClient 对象实现对Blob进行追加内容操作_java_03

/**
     * Commits a new block of data to the end of the existing append blob.
     * <p>
     * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
     * {@code Flux} must produce the same data each time it is subscribed to.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.specialized.AppendBlobClient.appendBlock#InputStream-long}
     *
     * @param data The data to write to the blob. The data must be markable. This is in order to support retries. If
     * the data is not markable, consider using {@link #getBlobOutputStream()} and writing to the returned OutputStream.
     * Alternatively, consider wrapping your data source in a {@link java.io.BufferedInputStream} to add mark support.
     * @param length The exact length of the data. It is important that this value match precisely the length of the
     * data emitted by the {@code Flux}.
     * @return The information of the append blob operation.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public AppendBlobItem appendBlock(InputStream data, long length) {
        return appendBlockWithResponse(data, length, null, null, null, Context.NONE).getValue();
    }

    /**
     * Commits a new block of data to the end of the existing append blob.
     * <p>
     * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
     * {@code Flux} must produce the same data each time it is subscribed to.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.specialized.AppendBlobClient.appendBlockWithResponse#InputStream-long-byte-AppendBlobRequestConditions-Duration-Context}
     *
     * @param data The data to write to the blob. The data must be markable. This is in order to support retries. If
     * the data is not markable, consider using {@link #getBlobOutputStream()} and writing to the returned OutputStream.
     * Alternatively, consider wrapping your data source in a {@link java.io.BufferedInputStream} to add mark support.
     * @param length The exact length of the data. It is important that this value match precisely the length of the
     * data emitted by the {@code Flux}.
     * @param contentMd5 An MD5 hash of the block content. This hash is used to verify the integrity of the block during
     * transport. When this header is specified, the storage service compares the hash of the content that has arrived
     * with this header value. Note that this MD5 hash is not stored with the blob. If the two hashes do not match, the
     * operation will fail.
     * @param appendBlobRequestConditions {@link AppendBlobRequestConditions}
     * @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised.
     * @param context Additional context that is passed through the Http pipeline during the service call.
     * @return A {@link Response} whose {@link Response#getValue() value} contains the append blob operation.
     * @throws UnexpectedLengthException when the length of data does not match the input {@code length}.
     * @throws NullPointerException if the input data is null.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Response<AppendBlobItem> appendBlockWithResponse(InputStream data, long length, byte[] contentMd5,
        AppendBlobRequestConditions appendBlobRequestConditions, Duration timeout, Context context) {
        Objects.requireNonNull(data, "'data' cannot be null.");
        Flux<ByteBuffer> fbb = Utility.convertStreamToByteBuffer(data, length, MAX_APPEND_BLOCK_BYTES, true);
        Mono<Response<AppendBlobItem>> response = appendBlobAsyncClient.appendBlockWithResponse(
            fbb.subscribeOn(Schedulers.elastic()), length, contentMd5, appendBlobRequestConditions, context);
        return StorageImplUtils.blockWithOptionalTimeout(response, timeout);
    }

View Code

代码实现

第一步: 在Java项目 pom.xml 中引入Azure Storage Blob依赖

<dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-storage-blob</artifactId>
      <version>12.13.0</version>
    </dependency>

第二步: 引入必要的 Storage 类

import java.io.ByteArrayInputStream; 
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; 
import java.time.LocalTime; 
import com.azure.core.http.rest.Response; 
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.AppendBlobItem;
import com.azure.storage.blob.models.AppendBlobRequestConditions; 
import com.azure.storage.blob.specialized.AppendBlobClient;

第三步:创建 AppendBlobClient 对象,使用 BlobServiceClient 及连接字符串(Connection String)

String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=*****;AccountKey=*******;EndpointSuffix=core.chinacloudapi.cn";

                String containerName = "appendblob";
                String fileName = "test.txt";
                // Create a BlobServiceClient object which will be used to create a container
                System.out.println("\nCreate a BlobServiceClient Object to Connect Storage Account");
                BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
                                .connectionString(storageConnectionString)
                                .buildClient();

                BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(containerName);
                if (!containerClient.exists())
                        containerClient.create();

                // Get a reference to a blob
                AppendBlobClient appendBlobClient = containerClient.getBlobClient(fileName).getAppendBlobClient();

 

第四步:调用 appendBlockWithResponse 方法追加内容,并根据返回状态码判断是否追加成功

boolean overwrite = true; // Default value
                if (!appendBlobClient.exists())
                        System.out.printf("Created AppendBlob at %s%n",
                                        appendBlobClient.create(overwrite).getLastModified());


                String data = "Test to append new content into exists blob! by blogs lu bian liang zhan deng @"
                                + LocalTime.now().toString() + "\n";
                InputStream inputStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
                byte[] md5 = MessageDigest.getInstance("MD5").digest(data.getBytes(StandardCharsets.UTF_8));
                AppendBlobRequestConditions requestConditions = new AppendBlobRequestConditions();

                // Context context = new Context("key", "value");
                long length = data.getBytes().length;

                Response<AppendBlobItem> rsp = appendBlobClient.appendBlockWithResponse(inputStream, length, md5,
                                requestConditions, null, null);

                if (rsp.getStatusCode() == 201) {
                        System.out.println("append content successful........");
                }

运行结果展示

【Azure 存储服务】使用 AppendBlobClient 对象实现对Blob进行追加内容操作_java_04

 

但如果操作的Blob类型不是Append Blob,就会遇见错误 Status code 409 ---- The blob type is invalid for this operation 错误

Exception in thread "main" com.azure.storage.blob.models.BlobStorageException: Status code 409, "
<?xml version="1.0" encoding="utf-8"?><Error>><Code>InvalidBlobType</Code>
<Message>The blob type is invalid for this operation.
RequestId:501ee0b9-301e-0003-4f7b-829ca6000000
Time:2023-05-09T13:37:17.7509942Z</Message></Error>"
        at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:67)
        at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:484)
        at com.azure.core.http.rest.RestProxy.instantiateUnexpectedException(RestProxy.java:343)
        at com.azure.core.http.rest.RestProxy.lambda$ensureExpectedStatus$5(RestProxy.java:382)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
        at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:337)
        at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:354)
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2397)
        at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onSubscribe(MonoCacheTime.java:293)
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:192)
        at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:143)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
        at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:184)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
        at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
        at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:416)
        at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:470)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:1589)
        Suppressed: java.lang.Exception: #block terminated with an error
                at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
                at reactor.core.publisher.Mono.block(Mono.java:1703)
                at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:128)
                at com.azure.storage.blob.specialized.AppendBlobClient.appendBlockWithResponse(AppendBlobClient.java:259)
                at test.App.AppendBlobContent(App.java:68)
                at test.App.main(App.java:31)

 

参考资料

appendBlockWithResponse : https://learn.microsoft.com/en-us/java/api/com.azure.storage.blob.specialized.appendblobclient?view=azure-java-stable#com-azure-storage-blob-specialized-appendblobclient-appendblockwithresponse(java-io-inputstream-long-byte()-com-azure-storage-blob-models-appendblobrequestconditions-java-time-duration-com-azure-core-util-context)

 Blob(对象)存储简介  : https://docs.azure.cn/zh-cn/storage/blobs/storage-blobs-introduction

当在复杂的环境中面临问题,格物之道需:浊而静之徐清,安以动之徐生。 云中,恰是如此!



标签:core,java,reactor,publisher,AppendBlobClient,Blob,Azure,data,blob
From: https://blog.51cto.com/u_13773780/6273950

相关文章

  • Azure创建虚拟机过程解决报错
    Azure创建虚拟机过程解决报错背景​ 当我使用Azure学生教育福利开通虚拟机时(具体步骤建议参考:Azure(az100)和az200建免费虚拟机避抗教程-尘遇(chenyu.me)),在最后的下载个人私钥环节,遇到了以下报错:无法下载SSH私钥。未注册订阅,无法使用命名空间“Microsoft.Compute"。有关如何注......
  • 【Azure 媒体服务】Media Service的编码示例 -- 创建缩略图子画面的.NET代码调试问题
    问题描述在中国区Azure上,使用MediaService服务,想要使用.NET的代码来对上传视频创建缩略图(Thumbnail)。通过官网文档(https://docs.azure.cn/zh-cn/media-services/latest/samples/samples-encoding-reference#create-a-thumbnail-sprite)下载.NET示例,配置appsettings.json......
  • blob转string,同步调用
    问题背景通过接口下载文件的时候,后端设置的responseHeadercontent-disposition:attachment;filename=文件名.xlsxcontent-type:application/vnd.ms-excel;charset=utf-8前端接口请求的时候,设置responseType:'blob',后端接口直接返回的是文件流。然后当下载文件异常的情况......
  • 【Azure 存储服务】使用 AppendBlobClient 对象实现对Blob进行追加内容操作
    问题描述在AzureBlob的官方示例中,都是对文件进行上传到Blob操作,没有实现对已创建的Blob进行追加的操作。如果想要实现对一个文件的多次追加操作,每一次写入的时候,只传入新的内容? 问题解答AzureStorageBlob有三种类型:BlockBlob,AppendBlob和PageBlob。其中,只有Appen......
  • 【Azure 存储服务】Java Storage SDK 调用 uploadWithResponse 代码示例(询问ChatGTP
    问题描述查看JavaStorageSDK,想找一个 uploadWithResponse 的示例代码,但是通过全网搜索,结果没有任何有帮助的代码。使用最近ChatGPT来寻求答案,得到非常有格式的内容:问:javaazurestorageaccounttouseuploadWithResponse答:TousetheuploadWithResponsemethodw......
  • Blob/DataURL/canvas/image的相互转换
    /*-----------------------------------------------------------------------*///canvas转dataURL:canvas对象、转换格式、图像品质functioncanvasToDataURL(canvas,format,quality){returncanvas.toDataURL(format||'image/jpeg',quality||1.0);}//DataURL转can......
  • 【Issues】axios如何获取responseType为blob的请求的错误信息
    问题背景axios请求下载文件时会设置responseType:'blob’来处理,此时如果响应数据错误,则无法下载文件且同时没有把相关的错误信息提示处理。因为返回的是Blob对象,无法获取到普通对象中的错误信息。例如:{"code":450002,"data":null,"msg":"下载出错"}这就需要在下载文件时做......
  • 【Azure 应用服务】Azure JS Function 异步方法中执行SQL查询后,Callback函数中日志无
    问题描述开发AzureJSFunction(NodeJS),使用mssql组件操作数据库。当SQL语句执行完成后,在Callback函数中执行日志输出 context.log("..."),遇见如下错误:Warning:Unexpectedcallto'log'onthecontextobjectafterfunctionexecutionhascompleted.Pleasecheck......
  • Azure DevOps(三)Azure Pipeline 自动化将程序包上传到 Azure Bolb Storage
    一,引言结合前几篇文章,我们了解到AzurePipeline完美的解决了持续集成,自动编译。同时也兼顾了Sonarqube作为代码扫描工具。接下来另外一个问题出现了,AzureDevOps由于有人员限制,项目上不能给非开发人员或者外包成员开权限,这个时候就需要将编译好的程序包上传到公共网盘或......
  • 下载文件(Excel)功能,后端返回blob字节流,前端怎么处理?
    在做大屏数据项目有个报表下载的功能,根据用户选择的时间下载对应时间的报表,后端返回的是文件流,前端需要怎么去处理呢?实现的功能效果: 后端返回的数据:需要我们处理的乱码:前端代码:1exportExcel(){2axios({3methods:"xxxx",4url:"xxx/xxxx/xx......