多线程并发如何高效实现生产者/消费者?

前言

无需引入第三方消息队列组件,我们如何利用内置C#语法高效实现生产者/消费者对数据进行处理呢?在.NET Core共享框架(Share Framework)引入了通道(Channel),也就是说无需额外通过NuGet包安装,若为.NET Framework则需通过NuGet安装,前提是版本必须是4.6+(包含4.6),查询网上资料少的可怜,估计也有部分童鞋都没听说这玩意,所以接下来将通过几篇文章详细介绍其使用和底层具体实现原理

生产者/消费者概念

生产者/消费者这一概念,相信我们大家都不陌生,在日常生活无处不在、随处可见,其本质可用一句话概括:具有多个连续步骤的工作流程。比如美团外卖、再比如工厂里面的流水作业线、又比如线下实体快餐店等等。整个过程如同一条链,在这个链中每个步骤必须被完全隔离执行,生产者产生“东西”,然后对其交由下一步骤进行处理,最终到达消费者。

 

上述叙述为一切抽象,我们回到软件领域,在软件中每一块都在对应的线程中执行,以确保数据能得到正确处理,当然,这也就包括跨线程共享数据可能引起的并发问题。未出现该库之前,我们可利用内置BlockingCollection实现生产者/消费者机制,但依然无法解决我们所面临的两个问题:其一:阻塞问题,其二:无任何基于Task的异步APi执行异步操作。通过引入System.Threading.Channel库则可以完美解决生产者/消费者问题,毫无疑问,线程安全是前提,性能测试有保证,异步提高吞吐量,配置选项够灵活。目前来看,利用通道可能将是实现生产者/消费者的最终手段

通道(Channel)概念

名为通道还是比较形象,如同管道一样,说到底就是线程安全的队列,既然是队列,那么势必涉及边界问题,通道类型分为有界通道和无界通道。

有界通道(Bounded Channel):对传入数据具有指定容量,这也就意味着,若生产者产生的数据一旦达到容量空间,将不得不等待消费者执行完为生产者推送数据腾出额外可用空间

无界通道:(Unbounded Channel):对传入数据无上限,这也就意味着生产者可以持续不断发布数据,以此希望消费者能跟上生产者的节奏

到这里我们完全可得出一结论:因通道提供有界和无界选项,所以内置不可能利用并发队列来实现,一定是通过链表数据结构实现队列机制。那么问题来了,全部指定为无界通道岂不万事大吉,这个问题想想就有问题,虽说无界通道为毫无上限,但计算机的系统内存不是,无论是有界通道抑或是无界通道都会通过缓存区来存储数据。所以选择正确的通道类型,取决于业务上下文。那么问题又来了,若创建有界通道,一旦达到容量限制,通道应该如何处理呢?别担心,这个事情则交由我们根据实际业务情况来处理,边界通道容量满模式(BoundedChannelFullMode)枚举

💡 Wait: 等待可用空间以完成写操作

💡 DropNewest: 直接删除并忽略通道中的最新数据,以便为待写入数据腾出空间

💡 DropOldest: 直接删除并忽略通道中的最旧数据,以便为待写入数据腾出空间

💡 DropWrite: 直接删除要写入的数据

我们通过如下简单3个步骤实现生产者/消费者

创建通道类型

//创建通道类型
public static class Channel
{
    //有界通道(指定容量)
    public static Channel<T> CreateBounded<T>(int capacity);

    //有界通道(指定容量、配置通道满模式选项、配置读(是否单个读取)、写(是否单个写入)、是否允许延续同步操作)
    public static Channel<T> CreateBounded<T>(BoundedChannelOptions options);

    //无界通道
    public static Channel<T> CreateUnbounded<T>();

    //无界通道(配置读(是否单个读取)、写(是否单个写入)、是否允许延续同步操作)
    public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
}

创建生产者

//向通道写入数据(生产者)
public abstract class ChannelWriter<T>
{  
    protected ChannelWriter();  

    //标识写入通道完成,不再有数据写入
    public void Complete(Exception error = null);  

    //尝试向通道写入数据,若被写入则返回true,否则为false
    public abstract bool TryWrite(T item);

    //异步返回通道是否有可写入空间
    public abstract ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken = default);

    //异步写入数据到通道
    public virtual ValueTask WriteAsync(T item, CancellationToken cancellationToken = default);
}

创建消费者

//从通道读取数据(消费者)
public abstract class ChannelReader<T>
{
    protected ChannelReader();

    public virtual Task Completion { get; }

    //异步读取通道所有数据
    public virtual IAsyncEnumerable<T> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default);

    //异步读取通道每一项数据
    public virtual ValueTask<T> ReadAsync(CancellationToken cancellationToken = default);

    //尝试向通道读取数据
    public abstract bool TryRead(out T item);

    //异步返回通道是否有可读取数据
    public abstract ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default);
}

有界通道(Channel)示例

一切已就绪,接下来我们通过示例重点演示有界通道,然后无界通道只不过是通道类型不同,额外增加选项配置而已。首先我们创建消息数据类

public class Message
{
    public Message(string data)
    {
      Data = data;
    }

 public string Data { get; }
}

然后为方便观察生产者和消费者数据打印情况,在控制台中通过不同字体颜色来进行区分,简单来个日志类

public static class Logger
{
    private static readonly object obj = new object();
    public static void Log(string text, ConsoleColor color = ConsoleColor.White)
    {
      lock (obj)
      {
        Console.ForegroundColor = color;
        Console.WriteLine($"[{DateTime.Now:yyyy-MM-dd hh:mm:ss.ff}] - {text}");
      }
    }
}

接下来定义生产者发布数据

public class Producer
{
    private readonly ChannelWriter<Message> _writer;
    private readonly int _msgId;

    public Producer(ChannelWriter<Message> writer, int msgId)
    {
      _writer = writer;
      _msgId = msgId;
    }

    public async Task PublishAsync(Message message, CancellationToken cancellationToken = default)
    {
      await _writer.WriteAsync(message, cancellationToken);

      Logger.Log($"生产者 {_msgId} > 发布消息 【{message.Data}】", ConsoleColor.Yellow);
    }
}

消费者接收数据,为模拟演示,延迟50毫秒作为消息处理时间

public class Consumer
{
    private readonly ChannelReader<Message> _reader;
    private readonly int _msgId;

    public Consumer(ChannelReader<Message> reader, int msgId)
    {
      _reader = reader;
      _msgId = msgId;
    }

    public async Task BeginConsumeAsync(CancellationToken cancellationToken = default)
    {
      Logger.Log($"消费者 {_msgId} > 等待处理消息", ConsoleColor.Green);

      try
      {
        await foreach (var message in _reader.ReadAllAsync(cancellationToken))
        {
          Logger.Log($"消费者 ({_msgId})> 接收消息: 【{message.Data}】", ConsoleColor.Green);

          await Task.Delay(50, cancellationToken);
        }
      }
      catch (Exception ex)
      {
        Logger.Log($"消费者 {_msgId} > 被强迫停止:{ex}", ConsoleColor.Green);
      }

      Logger.Log($"消费者 {_msgId} > 完成处理消息", ConsoleColor.Green);
    }
}

然后定义启动初始化生产者和消费者任务数量

//启动指定数量的消费者
private static Task[] StartConsumers(Channel<Message> channel, int consumersCount, CancellationToken cancellationToken)
{
    var consumerTasks = Enumerable.Range(1, consumersCount)
      .Select(i => new Consumer(channel.Reader, i).BeginConsumeAsync(cancellationToken))
      .ToArray();

    return consumerTasks;
}

//启动指定数量的生产者
private static async Task ProduceAsync(Channel<Message> channel,
 int messagesCount,
 int producersCount,
 CancellationTokenSource tokenSource)
{
    var producers = Enumerable.Range(1, producersCount)
      .Select(i => new Producer(channel.Writer, i))
      .ToArray();

    int index = 0;

    var tasks = Enumerable.Range(1, messagesCount)
      .Select(i =>
      {
        index = ++index % producersCount;
        var producer = producers[index];
        var msg = new Message($"{i}");
        return producer.PublishAsync(msg, tokenSource.Token);
      }).ToArray();

    await Task.WhenAll(tasks);

    Logger.Log("生产者发布消息完成,结束写入");
    channel.Writer.Complete();

    Logger.Log("等待消费者处理");
    await channel.Reader.Completion;

    Logger.Log("消费者正在处理");
}

最后一步则是创建通道类型(有界通道),启动生产者和消费者线程任务并运行

private static async Task Run(int maxMessagesToBuffer, int messagesToSend, int producersCount, int consumersCount)
{
    Logger.Log("*** 开始执行 ***");
    Logger.Log($"生产者数量 #: {producersCount}, 容量大小: {maxMessagesToBuffer}, 消息数量: {messagesToSend}, 消费者数量 #: {consumersCount}");

    var channel = Channel.CreateBounded<Message>(maxMessagesToBuffer);

    var tokenSource = new CancellationTokenSource();
    var cancellationToken = tokenSource.Token;

    var tasks = new List<Task>(StartConsumers(channel, consumersCount, cancellationToken))
    {
      ProduceAsync(channel, messagesToSend, producersCount, tokenSource)
    };

    await Task.WhenAll(tasks);

    Logger.Log("*** 执行完成 ***");
}

接下来我们在主方法中调用上述Run方法,指定有界通道容量为100,消费数量为10,生产者和消费者数量各为1,如下:

static async Task Main(string[] args)
{
    await Run(100, 10, 1, 1);

    Console.ReadLine();
}

 

根据业务上下文我们可指定有界通道满模式以及其他对应参数

var channel = Channel.CreateBounded<Message>(new BoundedChannelOptions(maxMessagesToBuffer)
{
    FullMode = BoundedChannelFullMode.Wait,
    SingleReader = true,
    SingleWriter = true,
    AllowSynchronousContinuations = false
});

关于无界通道没啥太多要讲解的地方,配置选项如下:

var channel = Channel.CreateUnbounded<Message>(new UnboundedChannelOptions()
{
    SingleReader = true,
    SingleWriter = true,
    AllowSynchronousContinuations = false
});

总结

相比阻塞模型,通道提供异步支持以及灵活配置,更适合在实际业务场景中使用。关于通道大概就讲解这么多,后续我们将分析通道实现原理,更详细介绍请参看外链:https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/

原文链接:www.cnblogs.com
点击下面的标签,发现更多相似文章
HTTP c# 数据结构 框架 组件

上一篇:李宏毅强化学习完整笔记!开源项目《LeeDeepRL-Notes》发布
下一篇:Netty源码解析 -- 对象池Recycler实现原理

相关推荐

  • (六)多线程之 GIL全局解释器锁

    一、引言 定义: ''' 定义: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multi...

    4 个月前
  • (八)多线程之queue

    一、线程queue 1,定义: queue队列 :使用 import queue,用法与进程 Queue一样。 queue is especially useful in threaded prog...

    3 个月前
  • 高并发,性能压力测试,你知道怎么做吗?

    一、 Jmeter简介 Apache JMeter是一款纯java编写负载功能测试和性能测试开源工具软件。相比Loadrunner而言,JMeter小巧轻便且免费,逐渐成为了主流的性能测试工具,是每个...

    3 个月前
  • 高并发量服务器架构

    高并发量服务器架构   转自: https://blog.csdn.net/daogla/article/details/72877153               ...

    6 个月前
  • 高并发计算QPS和测试

    日网站带宽 = PV/统计时间(换算到秒)*平均页面大小(单位KB)* 8 QPS是每秒HTTP请求数量如一个网页会同时请求多个页面元素,并发是同时访问问一个接口请求 (总PV数*80%)/(6小时秒...

    5 个月前
  • 高并发环境下如何优化Tomcat性能

    Tomcat运行模式 Tomcat的运行模式有3种。 1.bio模式 默认的模式,性能非常低下,没有经过任何优化处理和支持。 2.nio模式 利用java的异步io护理技术,noblocking IO...

    6 个月前
  • 高并发分布式架构的演进之路

    1.概述     本文以淘宝作为例子,介绍从一百个到千万级并发情况下服务端的架构的演进过程,同时列举出每个演进阶段会遇到的相关技术,让大家对架构的演进有一个整体的认知,文章最后汇总了一些架构设计...

    3 个月前
  • 高并发IO底层理解

    IO读写基础 应用层在进行read,write系统调用时,不是物理级别的读写,而是缓存的复制,进程缓冲区同内核缓冲区的缓存复制,底层数据交换是有由操作系统内核完成,控制内核缓冲与硬件(物理设备)之间数...

    4 个月前
  • 面试官问我:什么是高并发下的请求合并?

    这是why哥的第 76 篇原创文章从一道面试题说起前段时间一个在深圳的,两年经验的小伙伴出去面试了一圈,收割了几个大厂 offer 的同时,还总结了一下面试的过程中遇到的面试题,面试题有很多,文末的时...

    2 天前
  • 针对前端开发可重用组件并发布到NPM

    翻译:疯狂的技术宅原文:https://www.smashingmagazine.... 本文首发微信公众号:jingchengyideng欢迎关注,每天都给你推送新鲜的前端技术文章 摘要:本文...

    2 年前