Orleans 知多少 | 3. Hello Orleans

Orleans 3.0 Released

1. 引言

是的,Orleans v3.0.0 已經(jīng)發(fā)布了,并已經(jīng)完全支持 .NET Core 3.0。
所以,Orleans 系列是時(shí)候繼續(xù)了,抱歉,讓大家久等了。
萬(wàn)丈高樓平地起,這一節(jié)我們就先來(lái)了解下Orleans的基本使用。

2. 模板項(xiàng)目講解

Orleans 核心概念

在上一篇文章中,我們了解到Orleans 作為.NET 分布式框架,其主要包括三個(gè)部分:Client、Grains、Silo Host(Server)。因此,為了方便講解,創(chuàng)建如下的項(xiàng)目結(jié)構(gòu)進(jìn)行演示:


Hello.Orleans 項(xiàng)目結(jié)構(gòu)

這里有幾點(diǎn)需要說(shuō)明:

  1. Orleans.Grains: 類庫(kù)項(xiàng)目,用于定義Grain的接口以及實(shí)現(xiàn),需要引用Microsoft.Orleans.CodeGenerator.MSBuildMicrosoft.Orleans.Core.Abstractions NuGet包。
  2. Orleans.Server:控制臺(tái)項(xiàng)目,為 Silo 宿主提供宿主環(huán)境,需要引用Microsoft.Orleans.ServerMicrosoft.Extensions.Hosting NuGet包,以及Orleans.Grains 項(xiàng)目。
  3. Orleans.Client:控制臺(tái)項(xiàng)目,用于演示如何借助Orleans Client建立與Orleans Server的連接,需要引用Microsoft.Orleans.ClientMicrosoft.Extensions.Hosting NuGet包,同時(shí)添加Orleans.Grains項(xiàng)目引用。

3. 第一個(gè)Grain

Grain作為Orleans的第一公民,以及Virtual Actor的實(shí)際代言人,想吃透Orleans,那Grain就是第一道坎。
先看一個(gè)簡(jiǎn)單的Demo,我們來(lái)模擬統(tǒng)計(jì)網(wǎng)站的實(shí)時(shí)在線用戶。
Orlean s.Grains添加ISessionControl接口,主要用戶登錄狀態(tài)的管理。

public interface ISessionControlGrain : IGrainWithStringKey
{
    Task Login(string userId);
    Task Logout(string userId);
    Task<int> GetActiveUserCount();
}

可以看見Grain的定義很簡(jiǎn)單,只需要指定繼承自IGrain的接口就好。這里面繼承自IGrainWithStringKey,說(shuō)明該Grain 的Identity Key(身份標(biāo)識(shí))為string類型。同時(shí)需要注意的是
Grain 的方法申明,返回值必須是: Task、Task<T>、ValueTask<T>。
緊接著定義SessionControlGrain來(lái)實(shí)現(xiàn)ISessionControlGrain接口。

public class SessionControlGrain : Grain, ISessionControlGrain
{
    private List<string> LoginUsers { get; set; } = new List<string>();

    public Task Login(string userId)
    {
        //獲取當(dāng)前Grain的身份標(biāo)識(shí)(因?yàn)镮SessionControlGrain身份標(biāo)識(shí)為string類型,GetPrimaryKeyString()); 
        var appName = this.GetPrimaryKeyString();

        LoginUsers.Add(userId);

        Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
        return Task.CompletedTask;
    }

    public Task Logout(string userId)
    {
        //獲取當(dāng)前Grain的身份標(biāo)識(shí)
        var appName = this.GetPrimaryKey();
        LoginUsers.Remove(userId);

        Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
        return Task.CompletedTask;
    }

    public Task<int> GetActiveUserCount()
    {
        return Task.FromResult(LoginUsers.Count);
    }
}

實(shí)現(xiàn)也很簡(jiǎn)單,Grain的實(shí)現(xiàn)要繼承自Grain基類。代碼中我們定義了一個(gè)List<string>集合用于保存登錄用戶。

4. 第一個(gè)Silo Host(Server)

定義一個(gè)Silo用于暴露Grain提供的服務(wù),在Orleans.Server.Program中添加以下代碼用于啟動(dòng)Silo Host。

static Task Main(string[] args)
{
    Console.Title = typeof(Program).Namespace;

    // define the cluster configuration
    return Host.CreateDefaultBuilder()
        .UseOrleans((builder) =>
            {
                builder.UseLocalhostClustering()
                    .AddMemoryGrainStorageAsDefault()
                    .Configure<ClusterOptions>(options =>
                    {
                        options.ClusterId = "Hello.Orleans";
                        options.ServiceId = "Hello.Orleans";
                    })
                    .Configure<EndpointOptions>(options => options.AdvertisedIPAddress = IPAddress.Loopback)
                    .ConfigureApplicationParts(parts =>
                        parts.AddApplicationPart(typeof(ISessionControlGrain).Assembly).WithReferences());
            }
        )
        .ConfigureServices(services =>
        {
            services.Configure<ConsoleLifetimeOptions>(options =>
            {
                options.SuppressStatusMessages = true;
            });
        })
        .ConfigureLogging(builder => { builder.AddConsole(); })
        .RunConsoleAsync();
}
  1. Host.CreateDefaultBuilder():創(chuàng)建泛型主機(jī)提供宿主環(huán)境。
  2. UseOrleans:用來(lái)配置Oleans。
  3. UseLocalhostClustering() :用于在開發(fā)環(huán)境下指定連接到本地集群。
  4. Configure<ClusterOptions>:用于指定連接到那個(gè)集群。
  5. Configure<EndpointOptions>:用于配置silo與silo、silo與client之間的通信端點(diǎn)。開發(fā)環(huán)境下可僅指定回環(huán)地址作為集群間通信的IP地址。
  6. ConfigureApplicationParts():用于指定暴露哪些Grain服務(wù)。

以上就是開發(fā)環(huán)境下,Orleans Server的基本配置。對(duì)于詳細(xì)的配置也可以先參考Orleans Server Configuration。后續(xù)也會(huì)有專門的一篇文章來(lái)詳解。

5. 第一個(gè)Client

客戶端的定義也很簡(jiǎn)單,主要是創(chuàng)建IClusterClient對(duì)象建立于Orleans Server的連接。因?yàn)?code>IClusterClient最好能在程序啟動(dòng)之時(shí)就建立連接,所以可以通過(guò)繼承IHostedService來(lái)實(shí)現(xiàn)。
Orleans.Client中定義ClusterClientHostedService繼承自IHostedService。

public class ClusterClientHostedService : IHostedService
{
    public IClusterClient Client { get; }

    private readonly ILogger<ClusterClientHostedService> _logger;

    public ClusterClientHostedService(ILogger<ClusterClientHostedService> logger, ILoggerProvider loggerProvider)
    {
        _logger = logger;
        Client = new ClientBuilder()
            .UseLocalhostClustering()
            .Configure<ClusterOptions>(options =>
            {
                options.ClusterId = "Hello.Orleans";
                options.ServiceId = "Hello.Orleans";
            })
            .ConfigureLogging(builder => builder.AddProvider(loggerProvider))
            .Build();
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        var attempt = 0;
        var maxAttempts = 100;
        var delay = TimeSpan.FromSeconds(1);
        return Client.Connect(async error =>
        {
            if (cancellationToken.IsCancellationRequested)
            {
                return false;
            }

            if (++attempt < maxAttempts)
            {
                _logger.LogWarning(error,
                    "Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.",
                    attempt, maxAttempts);

                try
                {
                    await Task.Delay(delay, cancellationToken);
                }
                catch (OperationCanceledException)
                {
                    return false;
                }

                return true;
            }
            else
            {
                _logger.LogError(error,
                    "Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.",
                    attempt, maxAttempts);

                return false;
            }
        });
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        try
        {
            await Client.Close();
        }
        catch (OrleansException error)
        {
            _logger.LogWarning(error, "Error while gracefully disconnecting from Orleans cluster. Will ignore and continue to shutdown.");
        }
    }
}

代碼講解:

  1. 構(gòu)造函數(shù)中通過(guò)借助ClientBuilder() 來(lái)初始化IClusterClient。其中UseLocalhostClustering()用于連接到開發(fā)環(huán)境中的localhost 集群。并通過(guò)Configure<ClusterOptions>指定連接到哪個(gè)集群。(需要注意的是,這里的ClusterId必須與Orleans.Server中配置的保持一致。
Client = new ClientBuilder()
    .UseLocalhostClustering()
    .Configure<ClusterOptions>(options =>
    {
        options.ClusterId = "Hello.Orleans";
        options.ServiceId = "Hello.Orleans";
    })
    .ConfigureLogging(builder => builder.AddProvider(loggerProvider))
    .Build();
  1. StartAsync方法中通過(guò)調(diào)用Client.Connect建立與Orleans Server的連接。同時(shí)定義了一個(gè)重試機(jī)制。

緊接著我們需要將ClusterClientHostedService添加到Ioc容器,添加以下代碼到Orleans.Client.Program中:

static Task Main(string[] args)
{
    Console.Title = typeof(Program).Namespace;

    return Host.CreateDefaultBuilder()
        .ConfigureServices(services =>
        {
            services.AddSingleton<ClusterClientHostedService>();
            services.AddSingleton<IHostedService>(_ => _.GetService<ClusterClientHostedService>());
            services.AddSingleton(_ => _.GetService<ClusterClientHostedService>().Client);

            services.AddHostedService<HelloOrleansClientHostedService>();
            services.Configure<ConsoleLifetimeOptions>(options =>
            {
                options.SuppressStatusMessages = true;
            });
        })
        .ConfigureLogging(builder =>
        {
            builder.AddConsole();
        })
        .RunConsoleAsync();
}

對(duì)于ClusterClientHostedService,并沒(méi)有選擇直接通過(guò)services.AddHostedService<T>的方式注入,是因?yàn)槲覀冃枰⑷朐摲?wù)中提供的IClusterClient(單例),以供其他類去消費(fèi)。

緊接著,定義一個(gè)HelloOrleansClientHostedService用來(lái)消費(fèi)定義的ISessionControlGrain。

public class HelloOrleansClientHostedService : IHostedService
{
    private readonly IClusterClient _client;
    private readonly ILogger<HelloOrleansClientHostedService> _logger;

    public HelloOrleansClientHostedService(IClusterClient client, ILogger<HelloOrleansClientHostedService> logger)
    {
        _client = client;
        _logger = logger;
    }
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        // 模擬控制臺(tái)終端用戶登錄
       await MockLogin("Hello.Orleans.Console");
       // 模擬網(wǎng)頁(yè)終端用戶登錄
       await MockLogin("Hello.Orleans.Web");
    }

    /// <summary>
    /// 模擬指定應(yīng)用的登錄
    /// </summary>
    /// <param name="appName"></param>
    /// <returns></returns>
    public async Task MockLogin(string appName)
    {
        //假設(shè)我們需要支持不同端登錄用戶,則只需要將項(xiàng)目名稱作為身份標(biāo)識(shí)。
        //即可獲取一個(gè)代表用來(lái)維護(hù)當(dāng)前項(xiàng)目登錄狀態(tài)的的單例Grain。
        var sessionControl = _client.GetGrain<ISessionControlGrain>(appName);
        ParallelLoopResult result = Parallel.For(0, 10000, (index) =>
        {
            var userId = $"User-{index}";
            sessionControl.Login(userId);
        });

        if (result.IsCompleted)
        {
            //ParallelLoopResult.IsCompleted 只是返回所有循環(huán)創(chuàng)建完畢,并不保證循環(huán)的內(nèi)部任務(wù)創(chuàng)建并執(zhí)行完畢
            //所以,此處手動(dòng)延遲5秒后再去讀取活動(dòng)用戶數(shù)。
            await Task.Delay(TimeSpan.FromSeconds(5));
            var activeUserCount = await sessionControl.GetActiveUserCount();

            _logger.LogInformation($"The Active Users Count of {appName} is {activeUserCount}");
        }
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Closed!");

        return Task.CompletedTask; ;
    }
}

代碼講解:
這里定義了一個(gè)MockLogin用于模擬不同終端10000個(gè)用戶的并發(fā)登錄。

  1. 通過(guò)構(gòu)造函數(shù)注入需要的IClusterClient。
  2. 通過(guò)指定Grain接口以及身份標(biāo)識(shí),就可以通過(guò)Client 獲取對(duì)應(yīng)的Grain,進(jìn)而消費(fèi)Grain中暴露的方法。var sessionControl = _client.GetGrain<ISessionControlGrain>(appName); 這里需要注意的是,指定的身份標(biāo)識(shí)為終端應(yīng)用的名稱,那么在整個(gè)應(yīng)用生命周期內(nèi),將有且僅有一個(gè)代表這個(gè)終端應(yīng)用的Grain。
  3. 使用Parallel.For 模擬并發(fā)
  4. ParallelLoopResult.IsCompleted 只是返回所有循環(huán)任務(wù)創(chuàng)建完畢,并不代表循環(huán)的內(nèi)部任務(wù)執(zhí)行完畢。

6. 啟動(dòng)第一個(gè) Orleans 應(yīng)用

先啟動(dòng)Orleans.Server

Orleans Server Stared

再啟動(dòng)Orleans.Client
Orleans Client

Orleans Server log

從上面的運(yùn)行結(jié)果來(lái)看,模擬兩個(gè)終端10000個(gè)用戶的并發(fā)登錄,最終輸出的活動(dòng)用戶數(shù)量均為10000個(gè)。
回顧整個(gè)實(shí)現(xiàn),并沒(méi)有用到諸如鎖、并發(fā)集合等避免并發(fā)導(dǎo)致的線程安全問(wèn)題,但卻輸出正確的期望結(jié)果,這就正好說(shuō)明了Orleans強(qiáng)大的并發(fā)控制特性。

public class SessionControlGrain : Grain, ISessionControlGrain
{
    // 未使用并發(fā)集合
    private List<string> LoginUsers { get; set; } = new List<string>();

    public Task Login(string userId)
    {
        //獲取當(dāng)前Grain的身份標(biāo)識(shí)(因?yàn)镮SessionControlGrain身份標(biāo)識(shí)為string類型,GetPrimaryKeyString());
        var appName = this.GetPrimaryKeyString();
        
        LoginUsers.Add(userId);//未加鎖

        Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
        return Task.CompletedTask;
    }
    ....
}

7. 小結(jié)

通過(guò)簡(jiǎn)單的演示,想必你對(duì)Orleans的編程實(shí)現(xiàn)有了基本的認(rèn)知,并體會(huì)到其并發(fā)控制的強(qiáng)大之處。
這只是簡(jiǎn)單的入門演練,Orleans很多強(qiáng)大的特性,后續(xù)再結(jié)合具體場(chǎng)景進(jìn)行詳細(xì)闡述。
源碼已上傳至GitHub:Hello.Orleans

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容