Kapacitor:通过加入计算两个流之间的差异

问题描述:

完全披露:我还发布了此问题的一个变体hereKapacitor:通过加入计算两个流之间的差异

我有一个嵌入式设备作为加热系统的一部分,每隔5秒通过一个mosquitto MQTT代理发布两个温度值,每个温度值对应一个MQTT主题。 “mydevice/sensor1”是预热温度,“mydevice/sensor2”是预热温度。这些值几乎同时发布,因此两条消息之间的延迟时间通常不会超过半秒 - 但它们不会同步,正好是

Telegraf订阅了同一家经纪商,并乐于将这些测量结果放入名为“telegraf.autogen”的InfluxDB数据库中。这些测量结果都出现在称为“mqtt_consumer”的单个测量中,其中有一个名为“值”的字段。在InfluxDB我可以通过与“主题”标签过滤主题的标记值区分:

SELECT mean("value") AS "mean_value" FROM "telegraf"."autogen"."mqtt_consumer" WHERE time > now() - 1m AND "topic"='mydevice/sensor1' GROUP BY time(5s) 

这一切似乎正常工作。

我想要做的是计算这两个主题值之间的差异,为了计算温差,并最终计算加热系统传递的能量(流量是恒定的和已知的)。我试图在Grafana中使用InfluxDB查询来做到这一点,但看起来相当困难(我失败了),所以我想我会尝试使用TICKscript将我的过程分解成几个小步骤。

我已经组建了一个TICKscript计算基于此示例中的差异:

https://docs.influxdata.com/kapacitor/v1.3/guides/join_backfill/#stream-method

然而,在我来说,我没有这两个独立的测量。相反,我使用topic标记作为过滤器,从单个“mqtt_consumer”度量创建两个单独的流。然后,我尝试以1s的宽容度加入(价值总是足够及时地发布)。我使用httpOut生成一个调试视图(除此之外:它每10秒钟只更新一次,即使流以5秒间隔运行 - 为什么会这样呢?我可以在新的db中看到值都是存在的)。

一旦我将它们加入,我会评估值的差异,并将其存储在一个名为“diff”的测量下的新数据库中。

这里是我的脚本至今:

var sensor1 = stream 
    |from() 
     .database('telegraf') 
     .retentionPolicy('autogen') 
     .measurement('mqtt_consumer') 
     .where(lambda: "topic" == 'mydevice/sensor1') 
     .groupBy(*) 
    |httpOut('sensor1') 

var sensor2 = stream 
    |from() 
     .database('telegraf') 
     .retentionPolicy('autogen') 
     .measurement('mqtt_consumer') 
     .where(lambda: "topic" == 'mydevice/sensor2') 
     .groupBy(*) 
    |httpOut('sensor2') 

sensor1 
    |join(sensor2) 
     .as('value1', 'value2') 
     .tolerance(1s) 
    |httpOut('join') 
    |eval(lambda: "sensor1.value1" - "sensor1.value2") 
     .as('diff') 
    |httpOut('diff') 
    |influxDBOut() 
     .create() 
     .database('mydb') 
     .retentionPolicy('myrp') 
     .measurement('diff') 

不幸的是我的剧本未能通过join节点传递任何物品。在kapacitor show我可以看到,httpOut节点都将项目传递给join节点,但它没有传递任何信息。 kapacitor日志也没有显示任何明显的。一个HTTP GET的httpOut('join')回报:

{"series":null} 

我有两个问题:

  1. 是这种方法,使用Kapacitor与TICKscript基于单一测量两个值之间的差来计算能量,有效?还是有更好/更简单的方法来做到这一点?
  2. 为什么不是join节点产生任何输出?我能做些什么来进一步调试?

尝试添加|意味着节点,计算领域的平均值,在这两个传感器:

var sensor1 = stream 
    |from() 
     .database('telegraf') 
     .retentionPolicy('autogen') 
     .measurement('mqtt_consumer') 
     .where(lambda: "topic" == 'mydevice/sensor1') 
     .groupBy(*) 
    |mean('field1') 
    |httpOut('sensor1') 

后加入,你应该使用新分配的名称流,也不是原有的:

sensor1 
    |join(sensor2) 
     .as('value1', 'value2') 
     .tolerance(1s) 
    |httpOut('join') 
    |eval(lambda: "value1.field1" - "value2.field2") 
     .as('diff') 
    |httpOut('diff') 
    |influxDBOut() 
     .create() 
     .database('mydb') 
     .retentionPolicy('myrp') 
     .measurement('diff') 

其中mean字段是根据我以前的评论计算的字段。试试看!

此外,为了进一步调试,请尝试添加您希望放置眼球的日志节点。

希望这会有所帮助!问候