Dapr實現.Net Grpc服務之間的發布和訂閱,并采用WebApi類似的事件訂閱方式

大家好,我是失業在家,正在找工作的博主Jerry,找工作之余,總結和整理以前的項目經驗 , 動手寫了個洋蔥架構(整潔架構)示例解決方案 OnionArch 。其目的是為了更好的實現基于DDD(領域驅動分析)和命令查詢職責分離(CQRS)的洋蔥架構 。
OnionArch 是用來實現單個微服務的 。它提供了Grpc接口和Dapr Side Car進行交互,通過Dapr來實現微服務之間的接口調用、事件發布訂閱等微服務特性 。但是,Dapr官方文檔上只有Go語言的Grpc的微服務調用示例 , 沒有事件發布和訂閱示例,更沒有基于Grpc通訊用.Net實現的事件訂閱和發布示例 。
一、實現目標為了方便大家寫代碼 , 本文旨在介紹如何通過Dapr實現.Net Grpc服務之間的發布和訂閱,并采用與WebApi類似的事件訂閱方式 。
【Dapr實現.Net Grpc服務之間的發布和訂閱,并采用WebApi類似的事件訂閱方式】如果是Dapr Side Car通過Web Api和微服務引用交互,在WebApi中實現事件訂閱非常簡單 , 只要在Action 上增加“[Topic("pubsub", "TestTopic")]” Attribute即可,可如果Dapr是通過Grpc和Grpc服務交互就不能這樣寫了 。
為了保持WebApi和Grpc事件訂閱代碼的一致性,本文就是要在Grpc通訊的情況下實現如下寫法來訂閱并處理事件 。
[Topic("pubsub", "TestTopic")]public override Task<HelloReply> TestTopicEvent(TestTopicEventRequest request, ServerCallContext context){string message = "TestTopicEvent" + request.EventData.Name;Console.WriteLine(message);return Task.FromResult(new HelloReply{Message = message});}二、實現方案Dapr實現.Net Grpc服務之間的發布和訂閱 , 根據官方文檔,需要重寫AppCallback.AppCallbackBase Grpc類的ListTopicSubscriptions方法和OnTopicEvent方法,ListTopicSubscriptions是給Dapr調用獲取該微服務已訂閱的事件,OnTopicEvent給Dapr調用以觸發事件到達處理邏輯 。但是這樣就需要在AppCallback.AppCallbackBase實現類中硬編碼已訂閱的事件和事件處理邏輯 。顯然不符合我們的實現目標 。
參考Dapr SDK中關于WebApi 訂閱查詢接口“http://localhost:<appPort>/dapr/subscribe”的實現代碼 , 可以在AppCallback.AppCallbackBase實現類的ListTopicSubscriptions方法中,采用相同的方式,在Grpc方法中查詢Topic Attribute的方式來搜索已訂閱的事件 。這樣就不用在ListTopicSubscriptions中硬編碼已訂閱的事件了 。
為了避免在OnTopicEvent方法中應編碼事件處理邏輯,就需要在接收到事件觸發后動態調用Grpc方法 。理論上 , 只要有proto文件就可以動態調用Grpc方法,而proto文件本來就在項目中 。但是,我沒找到.Net動態調用Grpc方法的相關資料,不知道大家有沒有?
我這里采用了另一種方式 , 根據我上一篇關于.Net 7.0 RC gRPC JSON 轉碼為 Swagger/OpenAPI文檔 。Grpc方法可以增加一個轉碼為Json的WebApi調用 。這樣就可以在OnTopicEvent方法中接收到事件觸發后,通過HttpClient post到對應的WebApi地址,曲線實現動態調用Grpc方法 。是不是有點脫褲子放屁的感覺?
三、代碼實現我的解決方案如下,GrpcServiceA發布事件,GrpcServiceB接收事件并處理 。

Dapr實現.Net Grpc服務之間的發布和訂閱,并采用WebApi類似的事件訂閱方式

文章插圖
實現事件發布GrpcServiceA發布事件比較簡單,和WebApi的方式是一樣一樣的 。
public async override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context){//await _daprClient.SaveStateAsync("statestore", "testKey", request.Name);EventData eventData = https://www.huyubaike.com/biancheng/new EventData() { Id = 6, Name = request.Name, Description ="Looking for a job" };await _daprClient.PublishEventAsync<EventData>("pubsub", "TestTopic", eventData);return new HelloReply{Message = "Hello" + request.Name};}_daprClient怎么來的?我參考Dapr .Net SDK的代碼 , 給IGrpcServerBuilder 增加了擴展方法:
Dapr實現.Net Grpc服務之間的發布和訂閱,并采用WebApi類似的事件訂閱方式

文章插圖
Dapr實現.Net Grpc服務之間的發布和訂閱,并采用WebApi類似的事件訂閱方式

文章插圖
public static IGrpcServerBuilder AddDapr(this IGrpcServerBuilder builder, Action<DaprClientBuilder> configureClient = null){if (builder is null){throw new ArgumentNullException(nameof(builder));}// This pattern prevents registering services multiple times in the case AddDapr is called// by non-user-code.if (builder.Services.Any(s => s.ImplementationType == typeof(DaprMvcMarkerService))){return builder;}builder.Services.AddDaprClient(configureClient);builder.Services.AddSingleton<DaprMvcMarkerService>();return builder;}private class DaprMvcMarkerService{}

推薦閱讀