首页 > 其他分享 >.NET分布式Orleans - 7 - Streaming

.NET分布式Orleans - 7 - Streaming

时间:2024-03-29 11:44:06浏览次数:24  
标签:Orleans Task Streaming var NET public rid

概念

在Orleans中,Streaming是一组API和功能集,它提供了一种构建、发布和消费数据流的方式。

这些流可以是任何类型的数据,从简单的消息到复杂的事件或数据记录。Streaming API允许你定义、发布和消费这些流,而无需关心底层的传输机制或数据存储。

每个流都有一个唯一的标识符,称为StreamId,用于区分不同的流。流可以是持久的,也可以是临时的,具体取决于所使用的流提供者(Stream Provider)。流提供者负责处理流的存储、传输和故障恢复。

作用

Streaming在Orleans中起到了至关重要的作用,主要体现在以下几个方面:

  1. 解耦:Streaming允许将数据的产生者和消费者解耦。生产者可以发布数据到流中,而消费者可以独立地订阅这些流并处理数据。这种解耦使得系统更加灵活和可扩展。

  2. 实时性:通过Streaming,你可以实时地处理和响应数据流。这对于需要实时分析、监控或响应的场景非常有用。

  3. 故障恢复:Orleans的Streaming机制具有强大的故障恢复能力。即使在出现网络分区或节点故障的情况下,流提供者也能够确保数据的可靠性和一致性。

应用场景

  1. 实时日志分析:你可以将应用程序的日志消息发布到流中,并使用专门的消费者来分析这些日志。这允许你实时地监控和响应应用程序的行为。

  2. 事件驱动架构:在事件驱动架构中,你可以使用Streaming来发布事件,并由多个消费者来处理这些事件。这有助于构建松耦合、可扩展和响应式的系统。

  3. 分布式协作:Streaming也可以用于实现分布式系统中的协作和通信。例如,多个节点可以发布状态更新到流中,其他节点可以订阅这些流以获取最新的状态信息。

示例

安装nuget包

<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.0.0" />

配置Streaming

siloHostBuilder.AddMemoryStreams("StreamProvider").AddMemoryGrainStorage("PubSubStore");

定义一个Grain生成事件

public interface ISender : IGrainWithStringKey
{
    Task Send(Guid rid);
}

public class SenderGrain : Grain, ISender
{
    public Task Send(Guid rid)
    {
        var streamProvider = this.GetStreamProvider("StreamProvider");
        var streamId = StreamId.Create("RANDOMDATA", rid);
        var stream = streamProvider.GetStream<int>(streamId);
        RegisterTimer(_ =>
        {
            return stream.OnNextAsync(Random.Shared.Next());
        }, null, TimeSpan.FromMilliseconds(1_000), TimeSpan.FromMilliseconds(1_000));
        return Task.CompletedTask;
    }
}

再定义一个Grain订阅事件

public interface IRandomReceiver : IGrainWithGuidKey
{
    Task Receive();
}

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
{
    public override async Task OnActivateAsync(CancellationToken cancellationToken)
    {
        var streamProvider = this.GetStreamProvider("StreamProvider");
        var rid = this.GetPrimaryKey();
        var streamId = StreamId.Create("RANDOMDATA", rid);
        var stream = streamProvider.GetStream<int>(streamId);

        await stream.SubscribeAsync<int>(
            async (data, token) =>
            {
                Console.WriteLine(data);
                await Task.CompletedTask;
            });
        base.OnActivateAsync(cancellationToken);
    }
    public async Task Receive()
    {
            
    }
}

然后即可测试

var rid = Guid.NewGuid();
var sender1 = client.GetGrain<ISender>("sender1");
await sender1.Send(rid);
var reciver1 = client.GetGrain<IRandomReceiver>(new Guid());
await reciver1.Receive();

流提供程序

提供程序可以通过在nuget种搜索Orleans.Streaming,也可以通过PersistentStreamProvider 与 IQueueAdapter 重写来自定义Provider

 

标签:Orleans,Task,Streaming,var,NET,public,rid
From: https://www.cnblogs.com/chenyishi/p/18103492

相关文章

  • 基于 Kubernetes 的容器化物联网平台
    鱼弦:公众号【红尘灯塔】,CSDN内容合伙人、CSDN新星导师、全栈领域优质创作者、51CTO(Top红人+专家博主)、github开源爱好者(go-zero源码二次开发、游戏后端架构https://github.com/Peakchen)基于Kubernetes的容器化物联网平台1.简介基于Kubernetes的容器化物联网平......
  • Vue+.Net6部署日记
    一.准备工作vue编译后以dist文件夹在iis新建一个网站,.Net6发布后同样的建站但是要注意把应用程序池设置为无托管模式二.配置反向代理IIS给前端方向代理需要以下两个组件:1.ARR2.Url重写;这两个组件都可以在www.iis.net搜到,在搜索栏输入关键词然后一个个找就好,先确保下......
  • 探秘Kubernetes:在本地环境中玩转容器技术
    在云计算时代,Kubernetes已成为云原生技术的真正基石。它是应用程序容器的编排动力源,可跨多个集群自动部署、扩展和运行容器。Kubernetes不仅仅是一个流行词,它还是一种模式转变,是现代软件可扩展性和敏捷性的基础。 虽然Kubernetes经常与云原生联系在一起,但它对本地基础设施......
  • DotNetty客户端获取未编码的16进制数据
    publicoverridevoidChannelRead(IChannelHandlerContextcontext,objectmessage){varbuffer=messageasIByteBuffer;Console.WriteLine($"收到消息{buffer}");if(buffer!=null){//这里可以处理接收到的数据byte[]b......
  • 解决Debian服务器使用NetworkManager出现的DNS自动清除问题
    解决Debian服务器使用NetworkManager出现的DNS自动清除问题使用vim编辑/etc/NetworkManager/NetworkManager.conf中的内容:sudovim/etc/NetworkManager/NetworkManager.conf在该文件的[main]下方加上下面这一行:dns=none然后重启NetworkManager服务:sudosystemctlrestart......
  • 自定义的基于System.Net.Http.HttpClient的WebClient,可以作为微信支付宝的发起请求时
    个人编写的,自己用于自己的微信api的请求的实现当中,源码公开,大家可以查看反编译源码。以下是使用方法:第一步搜索和安装zmjtool第二步发起请求1/**引入命名空间*/2usingZmjTool;34/**发起Get请求*/5using(varcl=newZmjTool.WebClient())6{7cl.......
  • 论文笔记 SimpleNet A Simple Network for Image Anomaly Detection and Localization
    背景对于工业场景上的异常检测和定位任务,由于零件的异常情况具有多样性和随机性,所以很难用有监督的方式来解决;目前用的最多的是用无监督的方式,在训练过程中只使用正常样本进行训练,目前无监督解决异常检测任务的三个趋势是基于重建的方法,基于合成的方法以及基于嵌入的......
  • .NET C#导出解决方案的NuGet依赖关系
    前言公司项目需要写DS设计文档,文档需要标识出来你的解决方案文件下的所有项目都使用了NuGet哪些第三方依赖,我们都知道sln下面的所有.csproj文件中的节点下会标识出对应的依赖,但一个一个对比又太麻烦(主要是懒),有时候一个sln能有10几个project项目,能不能写脚本一键导出这些依赖关......
  • 低版本的.netFramework项目调用高版本的.net framework的dll的解决方法
    如果你的项目引用了一个针对较高版本的.NETFramework的程序集,而你的项目当前的目标框架版本较低,你可以采取以下措施来解决这个问题:更改目标框架版本:在VisualStudio中,打开你的项目。在“解决方案资源管理器”中,右键单击项目并选择“属性”。在“应用程序”选项卡中,选......
  • 列举和删除.NET的版本
    删除前dotnet--list-sdksdotnet--list-runtimesdotnet--info从工具的发布页面下载.NET卸载工具仅删除标记为预览版的.NETSDK(最高预览版除外)。dotnet-core-uninstallremove--all-previews-but-latest--sdk因为没有符合条件的,所有并未产生删除。......