【原创】StreamInsight查询系列(十)——基本查询操作之联接

上篇博文介绍了StreamInsight基础查询操作中的决胜排序部分。这篇文章将主要介绍如何StreamInsight基本查询的最后一篇——联接。

测试数据准备

为了方便测试查询,我们首先准备一些静态的测试数据源。如下input1和input2是一个包含时间戳和[1,5]区间数字的复杂事件流:

// 创建一个包含[1,5]区间连续数字的事件流
var now = DateTime.Now;
var input1 = Enumerable.Range(1, 5).ToPointStream(Application, e =>
    PointEvent.CreateInsert(now.AddSeconds(e), new { Value = e }),
    AdvanceTimeSettings.StrictlyIncreasingStartTime);

var input2 = Enumerable.Range(1, 5).ToPointStream(Application, e =>
    PointEvent.CreateInsert(now.AddSeconds(e).AddMilliseconds(1), new { Value = e }),
    AdvanceTimeSettings.StrictlyIncreasingStartTime);

使用下面的语句分别输出input1和input2:

// 注意input1和input2的起始时间并不相同
(from i in input1.ToPointEnumerable()
 where i.EventKind == EventKind.Insert
 select new 
 { StartTime = i.StartTime.ToString("hh:mm:ss.fff"), Value = i.Payload.Value })
 .Dump("Input Stream 1");

(from i in input2.ToPointEnumerable()
 where i.EventKind == EventKind.Insert
 select new 
 { StartTime = i.StartTime.ToString("hh:mm:ss.fff"), Value = i.Payload.Value })
 .Dump("Input Stream 2");

在LINQPad中可以看到如下结果(由于数据与机器当前时间相关,读者在本机运行的结果可能与下图不一样):

【原创】StreamInsight查询系列(十)——基本查询操作之联接

联接

StreamInsight的联接(join)操作和LINQ to Object中的稍有不同。一般来说,联接两个StreamInsight事件流需要满足以下两个条件:

  • 时间重叠
  • 满足联接条件

问题1:如何联接两个事件流?

让我们看看联接input1和input2后的结果:

// 由Value相同连接input1和input2(尽管两者在时间上没有重叠)
var joinQuery1 = from i in input1
                 join j in input2
                 on i.Value equals j.Value
                 select i;

将joinQuery1结果导出:

(from p in joinQuery1.ToPointEnumerable()
 where p.EventKind == EventKind.Insert
 select new
 { StartTime = p.StartTime.ToString("hh:mm:ss.fff"), Value = p.Payload.Value })
 .Dump();

你会发现没有任何结果输出。这是因为这个联接操作并没有满足时间重叠的条件:input2的每一个事件都对应比input1慢上1毫秒。那么怎么做才能得到想要的结果呢?

本质上我们只要能够让input1的时间与input2的事件重叠就应当能够正确输出结果,因此可以使用AlterEventDuration延伸input1事件的生命周期,将joinQuery1修改如下就可以看到联接结果了:

var joinQuery1 = from i in input1.AlterEventDuration(e => TimeSpan.FromMilliseconds(5))
                 join j in input2
                 on i.Value equals j.Value
                 select i;

问题2:如何联接多个事件流?

问题1中展示了如何联接两个流,下面让我们看看联接3个事件流。

先创建第3个复杂事件流:

var moreData = new[] 
{
    new { Key = 1, Description = "One Tag Event" },
    new { Key = 2, Description = "Two Tag Event" }, 
    new { Key = 3, Description = "Three Tag Event" }, 
    new { Key = 4, Description = "Four Tag Event" }, 
    new { Key = 5, Description = "Five Tag Event" },    
};
var moreStream = moreData.ToIntervalStream(Application, ev =>
    IntervalEvent.CreateInsert(now, now.AddDays(1), ev),
    AdvanceTimeSettings.IncreasingStartTime);

将input1和input2的联接结果joinQuery1与moreStream再进行联接:

var joinQuery2 = from i1 in joinQuery1
                 join i2 in moreStream on i1.Value equals i2.Key
                 select new
                 {
                     Value = i1.Value,
                     Description = i2.Description
                 };

将joinQuery2结果导出如下(由于数据与机器当前时间相关,读者在本机运行的结果可能与下图不一样):

【原创】StreamInsight查询系列(十)——基本查询操作之联接

下一篇将开始介绍StreamInsight查询模式的第1篇——窗口对齐。

转载于:https://www.cnblogs.com/StreamInsight/archive/2011/08/26/StreamInsight-Query-Series-Part10-Basic-Queries-Joins.html