专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

NServiceBus+RabbitMQ开发分布式应用

前言

NServiceBus提供了8种传输管道组件,分别是Learning、MSMQ、Azure Service Bus、Azure Service Bus (Legacy)、Azure Storage Queues、SQL Server、RabbitMQ、Amazon SQS。前两篇我们主要用的是Learnning,这篇使用RabbitMQ,也可以直接用于生产。

安装RabbitMQ组件

RabbitMQ不在NServiceBus下,是一个单独的组件,需要单独安装。

53_1.png

设置RabbitMQ连接串

var transport = config.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");
transport.UseDirectRoutingTopology();

消息生产者(ClientUI)

 class Program
    {
        static ILog log = LogManager.GetLogger<Program>();
        static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
        }

        static async Task MainAsync()
        {
            Console.Title = "Sample.ClientUI";
            var config = new EndpointConfiguration("Sample.ClientUI");
            config.UseSerialization<NewtonsoftSerializer>();
            config.UsePersistence<InMemoryPersistence>();
            config.EnableInstallers();

            var transport = config.UseTransport<RabbitMQTransport>();
            transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");
            transport.UseDirectRoutingTopology();

            var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);

            await RunAsync(endpointInstance).ConfigureAwait(false);

            await endpointInstance.Stop().ConfigureAwait(false);
        }

        static async Task RunAsync(IEndpointInstance endpointInstance)
        {
            log.Info("Press 'P' to send an PlaceOrder Command");
            while (true)
            {
                var key = Console.ReadKey();
                Console.WriteLine();

                switch (key.Key)
                {
                    case ConsoleKey.P:
                        {
                            var command = new PlaceOrder
                            {
                                OrderId = Guid.NewGuid().ToString()
                            };

                            log.Info($"Sending PlaceOrder with OrderId:{command.OrderId}");
                            await endpointInstance.Send("Sample.Server",command).ConfigureAwait(false);
                            break;
                        }

                    case ConsoleKey.Q:
                        return;
                    default:
                        log.Info("Please try again");
                        break;
                }
            }
        }
    }

先启动ClientUI,启动后先发送几条数据,发送数据观察程序控制台数据是否发送,然后在RabbitMQ控制台里观察Queue的情况,没创建Queue时,这里会自动创建Queue,队列名称和端点名称相同。这里发送消息数据,数据会在Sample.Server队列里。

53_2.png

53_3.png

53_4.png这里能看到在队列Sample.Server里有三条未消费的数据,因为我还没有启动Sample.Server控制台程序。

消息消费者(Server)

public class Program
{
    static ILog log = LogManager.GetLogger<Program>();
    static void Main(string[] args)
    {
        MainAsync().GetAwaiter().GetResult();
    }

    static async Task MainAsync()
    {
        Console.Title = "Sample.Server";
        var config = new EndpointConfiguration("Sample.Server");
        config.UseSerialization<NewtonsoftSerializer>();
        config.UsePersistence<InMemoryPersistence>();
        config.EnableInstallers();

        var transport = config.UseTransport<RabbitMQTransport>();
        transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");
        transport.UseDirectRoutingTopology();

        var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);

        log.Info("Press any key to quit");
        Console.ReadKey();

        await endpointInstance.Stop().ConfigureAwait(false);
    }
}

public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
{
    static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
    public Task Handle(PlaceOrder message, IMessageHandlerContext context)
    {
        log.Info($"Received PlaceOrder with OrderId:{message.OrderId}");
        return Task.CompletedTask;
    }
}

启动Sample.Server控制台后,就会消费Queue里的数据,消费完成后观察程序控制台的提示和RabbitMQ控制台的提示。

53_5.png

53_6.png53_7.png这里能看到刚才堆积的三条数据已经被消费了。

#

总结

写完这个demo我就在想在程序里通过使用NServiceBus的RabbitMQ组件和直接在程序里使用RabbitMQ直接调用的区别,到底有没有必要通过NServiceBus调用,还需要进一步考量。

文章永久链接:https://tech.souyunku.com/25901

未经允许不得转载:搜云库技术团队 » NServiceBus+RabbitMQ开发分布式应用

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们