回顾上文

  作为单体程序,依赖的第三方服务虽不多,但是2C的程序还是有不少内容可讲; 作为一个常规互联网系统,无外乎就是接受请求、处理请求,输出响应。

由于业务渐渐增长,数据处理的过程会越来越复杂和冗长,能够快速解耦这些业务并清晰连贯的处理的数据   变得越来越重要。  .Net 提供了TPL  Dataflow组件帮助我们更容易实现 基于数据流和 流水线操作的代码。

下图是单体程序中 数据处理的用例图。

程序中用到的TPL Dataflow 组件,Dataflow是微软前几年给出的数据处理库, 是由不同的处理块组成,可将这些块组装成一个处理管道,"块"对应处理管道中的"阶段", 可类比AspNetCore 中Middleware 和pipeline.。

  • TPL Dataflow库为消息传递和并行化CPU密集型和I / O密集型应用程序提供了编程基础,这些应用程序具有高吞吐量和低延迟。它还可以让您明确控制数据的缓冲方式并在系统中移动。

  • 为了更好地理解数据流编程模型,请考虑从磁盘异步加载图像并创建这些图像的应用程序。
    • 传统的编程模型通常使用回调和同步对象(如锁)来协调任务和访问共享数据, 从宏观看传统模型: 任务是一步步紧接着完成的

    • 通过使用数据流编程模型,您可以创建在从磁盘读取图像时处理图像的数据流对象。在数据流模型下,您可以声明数据在可用时的处理方式以及数据之间的依赖关系。由于运行时管理数据之间的依赖关系,因此通常可以避免同步访问共享数据的要求。此外,由于运行时调度基于数据的异步到达而工作,因此数据流可以通过有效地管理底层线程来提高响应性和吞吐量。    也就是说: 你定义的是任务内容和任务之间的依赖,不关注数据什么时候流到这个任务 。

  • 需要注意的是:TPL Dataflow 非分布式数据流,消息在进程内传递,   使用nuget引用 System.Threading.Tasks.Dataflow 包。

TPL Dataflow 核心概念

1.  Buffer & Block

TPL Dataflow 内置的Block覆盖了常见的应用场景,当然如果内置块不能满足你的要求,你也可以自定“块”。

Block可以划分为下面3类:

  • Buffering Only    【Buffer不是缓存Cache的概念, 而是一个缓冲区的概念】

  • Execution

  • Grouping

使用以上块混搭处理管道, 大多数的块都会执行一个操作,有些时候需要将消息分发到不同Block,这时可使用特殊类型的缓冲块给管道“”分叉”。

2. Execution Block

  可执行的块有两个核心组件:
  • 输入、输出消息的缓冲区(一般称为Input,Output队列)

  • 在消息上执行动作的委托

  消息在输入和输出时能够被缓冲:当Func委托的运行速度比输入的消息速度慢时,后续消息将在到达时进行缓冲;当下一个块的输入缓冲区中没有容量时,将在输出时缓冲。

每个块我们可以配置:

  • 缓冲区的总容量, 默认无上限

  • 执行操作委托的并发度, 默认情况下块按照顺序处理消息,一次一个。

我们将块链接在一起形成一个处理管道,生产者将消息推向管道。

TPL Dataflow有一个基于pull的机制(使用Receive和TryReceive方法),但我们将在管道中使用块连接和推送机制。

  • TransformBlock(Execution category)-- 由输入输出缓冲区和一个Func<TInput, TOutput>委托组成,消费的每个消息,都会输出另外一个,你可以使用这个Block去执行输入消息的转换,或者转发输出的消息到另外一个Block。

  • TransformManyBlock (Execution category) -- 由输入输出缓冲区和一个Func<TInput, IEnumerable<TOutput>>委托组成, 它为输入的每个消息输出一个 IEnumerable<TOutput>

  • BroadcastBlock (Buffering category)-- 由只容纳1个消息的缓冲区和Func<T, T>委托组成。缓冲区被每个新传入的消息所覆盖,委托仅仅为了让你控制怎样克隆这个消息,不做消息转换。

            该块可以链接到多个块(管道的分叉),虽然它一次只缓冲一条消息,但它一定会在该消息被覆盖之前将该消息转发到链接块(链接块还有缓冲区)。

  • ActionBlock (Execution category)-- 由缓冲区和Action<T>委托组成,他们一般是管道的结尾,他们不再给其他块转发消息,他们只会处理输入的消息。

  • BatchBlock (Grouping category)-- 告诉它你想要的每个批处理的大小,它将累积消息,直到它达到那个大小,然后将它作为一组消息转发到下一个块。

  还有一下其他的Block类型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,我们暂时不会深入。

3. Buffers、Back-Pressure、Load-shedding

  当输入缓冲区达到上限容量,为其供货的上游块的输出缓冲区将开始填充,当输出缓冲区已满时,该块必须暂停处理,直到缓冲区有空间,这意味着一个阶段的瓶颈可能导致所有前面的块的缓冲区填满。

  但是不是所有的块变满时,都会暂停,BroadcastBlock 有允许1个消息的缓冲区,每个消息都会被覆盖, 因此如果这个广播块不能将消息转发到下游,则在下个消息到达的时候消息将丢失,这在某种意义上是一种限流(比较生硬).

编程实践

  将按照上图实现TPL Dataflow

①  定义Dataflow  pipeline
        public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory)
        {
            _httpClient = httpClientFactory.CreateClient("bce-request");
            _redisDB0 = redisCache[];
            _redisDB = redisCache;
            _logger = loggerFactory.CreateLogger(nameof(EqidPairHandler));
            var option = new DataflowLinkOptions { PropagateCompletion = true };

            publisher = _redisDB.RedisConnection.GetSubscriber();
            _eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel>
              (
                   // redis piublih 没有做在TransformBlock fun里面, 因为publih失败可能影响后续的block传递
                   eqidPair => EqidResolverAsync(eqidPair),
                   new ExecutionDataflowBlockOptions
                   {
                       MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism")
                   }
              );
            // https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline
            _logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory);
            _logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) );

            _broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容纳一个消息的缓存区和拷贝函数组成
            _broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);
            _broadcastBlock.LinkTo(_logPublishBlock, option);
            _eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);
        }
public class LogBatchBlock<T> : ILogDestination<T> where T : IModelBase
    {
        private readonly string _dirPath;
        private readonly Timer _triggerBatchTimer;
        private readonly Timer _openFileTimer;
        private DateTime? _nextCheckpoint;
        private TextWriter _currentWriter;
        private readonly LogHead _logHead;
        private readonly object _syncRoot = new object();
        private readonly ILogger _logger;
        private readonly BatchBlock<T> _packer;
        private readonly ActionBlock<T[]> batchWriterBlock;
        private readonly TimeSpan _logFileIntervalTimeSpan;

        /// <summary>
        /// Generate  request log file.
        /// </summary>
        public LogBatchBlock(LogConfig logConfig, ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.CreateLogger<LogBatchBlock<T>>();

            _dirPath = logConfig.DirPath;
            if (!Directory.Exists(_dirPath))
            {
                Directory.CreateDirectory(_dirPath);
            }
            _logHead = logConfig.LogHead;

            _packer = new BatchBlock<T>(logConfig.BatchSize);
            batchWriterBlock = new ActionBlock<T[]>(models => WriteToFile(models));
            _packer.LinkTo(batchWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });

            _triggerBatchTimer = new Timer(state =>
            {
                _packer.TriggerBatch();
            }, null, TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period));

            _logFileIntervalTimeSpan = TimeSpan.Parse(logConfig.LogFileInterval);
            _openFileTimer = new Timer(state =>
            {
                AlignCurrentFileTo(DateTime.Now);
            }, null, TimeSpan.Zero, _logFileIntervalTimeSpan);
        }

        public ITargetBlock<T> InputBlock => _packer;

        private void AlignCurrentFileTo(DateTime dt)
        {
            if (!_nextCheckpoint.HasValue)
            {
                OpenFile(dt);
            }
            if (dt >= _nextCheckpoint.Value)
            {
                CloseFile();
                OpenFile(dt);
            }
        }

        private void OpenFile(DateTime now, string fileSuffix = null)
        {
            string filePath = null;
            try
            {
                var currentHour = now.Date.AddHours(now.Hour);
                _nextCheckpoint = currentHour.Add(_logFileIntervalTimeSpan);
                int hourConfiguration = _logFileIntervalTimeSpan.Hours;
                int minuteConfiguration = _logFileIntervalTimeSpan.Minutes;
                filePath = $"{_dirPath}/u_ex{now.ToString("yyMMddHH")}{fileSuffix}.log";

                var appendHead = !File.Exists(filePath);
                if (filePath != null)
                {
                    var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write);
                    var sw = new StreamWriter(stream, Encoding.Default);
                    if (appendHead)
                    {
                        sw.Write(GenerateHead());
                    }
                    _currentWriter = sw;
                    _logger.LogDebug($"{currentHour} TextWriter has been created.");
                }

            }
            catch (UnauthorizedAccessException ex)
            {
                _logger.LogWarning("I/O error or specific type of scecurity error,{0}", ex);
                throw;
            }
            catch (Exception e)
            {
                if (fileSuffix == null)
                {
                    _logger.LogWarning($"OpenFile failed:{e.StackTrace.ToString()}:{e.Message}.", e.StackTrace);
                    OpenFile(now, $"-{Guid.NewGuid()}");
                }
                else
                {
                    _logger.LogError($"OpenFile failed after retry: {filePath}", e);
                    throw;
                }
            }
        }

        private void CloseFile()
        {
            if (_currentWriter != null)
            {
                _currentWriter.Flush();
                _currentWriter.Dispose();
                _currentWriter = null;
                _logger.LogDebug($"{DateTime.Now} TextWriter has been disposed.");
            }
            _nextCheckpoint = null;
        }

        private string GenerateHead()
        {
            StringBuilder head = new StringBuilder();
            head.AppendLine("#Software: " + _logHead.Software)
                .AppendLine("#Version: " + _logHead.Version)
                .AppendLine($"#Date: {DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")}")
                .AppendLine("#Fields: " + _logHead.Fields);
            return head.ToString();
        }

        private void WriteToFile(T[] models)
        {
            try
            {
                lock (_syncRoot)
                {
                    var flag = false;
                    foreach (var model in models)
                    {
                        if (model == null)
                            continue;
                        flag = true;
                        AlignCurrentFileTo(model.ServerLocalTime);
                        _currentWriter.WriteLine(model.ToString());
                    }
                    if (flag)
                        _currentWriter.Flush();
                }
            }
            catch (Exception ex)
            {
                _logger.LogError("WriteToFile Error : {0}", ex.Message);
            }
        }

        public bool AcceptLogModel(T model)
        {
            return _packer.Post(model);
        }

        public string GetDirPath()
        {
            return _dirPath;
        }

        public async Task CompleteAsync()
        {
            _triggerBatchTimer.Dispose();
            _openFileTimer.Dispose();
            _packer.TriggerBatch();
            _packer.Complete();
            await InputBlock.Completion;
            lock (_syncRoot)
            {
                CloseFile();
            }
        }
    }

仿IIS日志存储代码

② 异常处理

  上述程序在部署时就遇到相关的坑位,在测试环境_eqid2ModelTransformBlock 内Func委托稳定执行,程序并未出现异样;

  部署到生产之后, 该Pipeline会运行一段时间就停止工作,一直很困惑, 后来通过监测_eqid2ModelTransformBlock.Completion 属性,提前进入“完成态”的原因是 程序在执行某次Func委托时报错,Block进入Fault状态

TransfomrBlock.Completion 一个Task对象,当TPL Dataflow不再处理消息并且能保证不再处理消息的时候,就被定义为已完成, Task对象的TaskStatus枚举值将标记此Block进入完成态的真实情况

- TaskStatus.RanToCompletion       根据Block定义的任务成功完成

- TaskStatus.Fault                            因为未处理的异常 导致"过早的完成"

- TaskStatus.Cancled                       因为取消操作 导致 "过早的完成"

  我们需要小心处理异常, 一般情况下我们使用try、catch包含所有的执行代码以确保所有的异常都被处理。

  可将TPL Dataflow 做为进程内消息队列,本文只是一个入门参考,更多复杂用法还是看官网, 你需要记住的是, 这是一个.Net 进程内数据流组件, 能让你专注于流程。

作者:JulianHuang

感谢您的认真阅读,如有问题请大胆斧正;觉得有用,请下方或加关注。

本文欢迎转载,但请保留此段声明,且在文章页面明显位置注明本文的作者及原文链接。

TPL DataFlow .Net 数据流组件,了解一下的更多相关文章

  1. SSIS自定义数据流组件开发(血路)

    由于特殊的原因(怎么特殊不解释),需要开发自定义数据流组件处理. 查了很多资料,用了不同的版本,发现各种各样的问题没有找到最终的解决方案. 遇到的问题如下: 用VS2015编译出来的插件,在SSDTB ...

  2. TPL DataFlow初探(一)

    属性TPL Dataflow是微软面向高并发应用而推出的一个类库.借助于异步消息传递与管道,它可以提供比线程池更好的控制,也比手工线程方式具备更好的性能.我们常常可以消息传递,生产-消费模式或Acto ...

  3. 一个使用C#的TPL Dataflow Library的例子:分析文本文件中词频

    博客搬到了fresky.github.io - Dawei XU,请各位看官挪步.最新的一篇是:一个使用C#的TPL Dataflow Library的例子:分析文本文件中词频.

  4. FluentDataflow - Fluent Style TPL Dataflow

    我的新英文博客文章: FluentDataflow - Fluent Style TPL Dataflow 介绍了本人最新发布的一个开源类库:FluentDataflow--Fluent风格的TPL ...

  5. TPL DataFlow初探(二)

    上一篇简单的介绍了TDF提供的一些Block,通过对这些Block配置和组合,可以满足很多的数据处理的场景.这一篇将继续介绍与这些Block配置的相关类,和挖掘一些高级功能. 在一些Block的构造函 ...

  6. .Net Core中利用TPL(任务并行库)构建Pipeline处理Dataflow

    在学习的过程中,看一些一线的技术文档很吃力,而且考虑到国内那些技术牛人英语都不差的,要向他们看齐,所以每天下班都在疯狂地背单词,博客有些日子没有更新了,见谅见谅 什么是TPL? Task Parall ...

  7. Akka(17): Stream:数据流基础组件-Source,Flow,Sink简介

    在大数据程序流行的今天,许多程序都面临着共同的难题:程序输入数据趋于无限大,抵达时间又不确定.一般的解决方法是采用回调函数(callback-function)来实现的,但这样的解决方案很容易造成“回 ...

  8. Python高级编程之生成器(Generator)与coroutine(三):coroutine与pipeline(管道)和Dataflow(数据流_

    原创作品,转载请注明出处:点我 在前两篇文章中,我们介绍了什么是Generator和coroutine,在这一篇文章中,我们会介绍coroutine在模拟pipeline(管道 )和控制Dataflo ...

  9. Pipeline处理Dataflow

    Pipeline处理Dataflow https://www.cnblogs.com/CoderAyu/p/9757389.html .Net Core中利用TPL(任务并行库)构建Pipeline处 ...

  10. HowTo:使用数据流读写消息

      本文主要演示使用TPL 数据流库从数据流块(dataflow block)读写消息. 提供了同步方法和异步方法. 主要使用BufferBlock,其既能作为message source,有能作为m ...

随机推荐

  1. SQL servcer 时间日期函数、数据类型转换

    1.时间日期函数 2.数据类型转换 3.习题 建立两个表,一个部门表,一个人员表.部门:部门的编号,部门的名称,部门的职责.人员:人员的编号,姓名,年龄,性别,cid所属部门

  2. Android判断用户是平板还是手机的方法

    public boolean isTabletDevice() {        TelephonyManager telephony = (TelephonyManager) mContext.ge ...

  3. Sharepoint 移动客户端 Rshare的特点

    1.随时随地快速访问SharePoint,和同事高效合作,实时浏览日历信息,完整日程安排.查看联系人信息.浏览公告,文档和图片等. 添加图片到相册,通过Email和他人分享. 2.新建.上传:新建日历 ...

  4. Asp.net MVC razor语法参考

    Razor语法的快捷参考http://haacked.com/archive/2011/01/06/razor-syntax-quick-reference.aspx/ 只是copy下来便于查阅! I ...

  5. TCP/IP详解学习笔记 这位仁兄写得太好了

      TCP/IP详解学习笔记(1)-基本概念 为什么会有TCP/IP协议 在世界上各地,各种各样的电脑运行着各自不同的操作系统为大家服务,这些电脑在表达同一种信息的时候所使用的方法是千差万别.就好像圣 ...

  6. Linux 下开启ssh服务(转)

    二.SSH SSH 为 Secure Shell 的缩写,由 IETF 的网络工作小组(Network Working Group)所制定:SSH 为建立在应用层和传输层基础上的安全协议.SSH 是目 ...

  7. logback读取src/test/resource下的配置文件

    import java.io.File; import java.net.URISyntaxException; import java.util.Map; import java.util.Prop ...

  8. kali中的webshell工具--webacoo

    webacoo webshell其实就是放置在服务器上的一段代码 kali中生成webshell的工具 WeBaCoo(Web Backdoor Cookie) 特点及使用方法 类终端的shell 编 ...

  9. linux svn权限

    svnserve -d -r /opt/svn                      //启动 创建仓库 svnadmin create /u02/svn/davesvn              ...

  10. JVM参数以及用法

    工作以后,发觉真的几乎没有像大学那样空闲的时间,坐下来看看书写写博客了.最近的一篇博客距离现在已经近一个多月了,最近也在复习Java的东西,准备校招,看了看JVM的东西,就当作记笔记. (一)JVM参 ...