Kapacitor:通过加入计算两个流之间的差异
完全披露:我还发布了此问题的一个变体here。Kapacitor:通过加入计算两个流之间的差异
我有一个嵌入式设备作为加热系统的一部分,每隔5秒通过一个mosquitto MQTT代理发布两个温度值,每个温度值对应一个MQTT主题。 “mydevice/sensor1”是预热温度,“mydevice/sensor2”是预热温度。这些值几乎同时发布,因此两条消息之间的延迟时间通常不会超过半秒 - 但它们不会同步,正好是。
Telegraf订阅了同一家经纪商,并乐于将这些测量结果放入名为“telegraf.autogen”的InfluxDB数据库中。这些测量结果都出现在称为“mqtt_consumer”的单个测量中,其中有一个名为“值”的字段。在InfluxDB我可以通过与“主题”标签过滤主题的标记值区分:
SELECT mean("value") AS "mean_value" FROM "telegraf"."autogen"."mqtt_consumer" WHERE time > now() - 1m AND "topic"='mydevice/sensor1' GROUP BY time(5s)
这一切似乎正常工作。
我想要做的是计算这两个主题值之间的差异,为了计算温差,并最终计算加热系统传递的能量(流量是恒定的和已知的)。我试图在Grafana中使用InfluxDB查询来做到这一点,但看起来相当困难(我失败了),所以我想我会尝试使用TICKscript将我的过程分解成几个小步骤。
我已经组建了一个TICKscript计算基于此示例中的差异:
https://docs.influxdata.com/kapacitor/v1.3/guides/join_backfill/#stream-method
然而,在我来说,我没有这两个独立的测量。相反,我使用topic标记作为过滤器,从单个“mqtt_consumer”度量创建两个单独的流。然后,我尝试以1s的宽容度加入(价值总是足够及时地发布)。我使用httpOut
生成一个调试视图(除此之外:它每10秒钟只更新一次,即使流以5秒间隔运行 - 为什么会这样呢?我可以在新的db中看到值都是存在的)。
一旦我将它们加入,我会评估值的差异,并将其存储在一个名为“diff”的测量下的新数据库中。
这里是我的脚本至今:
var sensor1 = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('mqtt_consumer')
.where(lambda: "topic" == 'mydevice/sensor1')
.groupBy(*)
|httpOut('sensor1')
var sensor2 = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('mqtt_consumer')
.where(lambda: "topic" == 'mydevice/sensor2')
.groupBy(*)
|httpOut('sensor2')
sensor1
|join(sensor2)
.as('value1', 'value2')
.tolerance(1s)
|httpOut('join')
|eval(lambda: "sensor1.value1" - "sensor1.value2")
.as('diff')
|httpOut('diff')
|influxDBOut()
.create()
.database('mydb')
.retentionPolicy('myrp')
.measurement('diff')
不幸的是我的剧本未能通过join
节点传递任何物品。在kapacitor show
我可以看到,httpOut
节点都将项目传递给join
节点,但它没有传递任何信息。 kapacitor日志也没有显示任何明显的。一个HTTP GET的httpOut('join')
回报:
{"series":null}
我有两个问题:
- 是这种方法,使用Kapacitor与TICKscript基于单一测量两个值之间的差来计算能量,有效?还是有更好/更简单的方法来做到这一点?
- 为什么不是
join
节点产生任何输出?我能做些什么来进一步调试?
尝试添加|意味着节点,计算领域的平均值,在这两个传感器:
var sensor1 = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('mqtt_consumer')
.where(lambda: "topic" == 'mydevice/sensor1')
.groupBy(*)
|mean('field1')
|httpOut('sensor1')
后加入,你应该使用新分配的名称流,也不是原有的:
sensor1
|join(sensor2)
.as('value1', 'value2')
.tolerance(1s)
|httpOut('join')
|eval(lambda: "value1.field1" - "value2.field2")
.as('diff')
|httpOut('diff')
|influxDBOut()
.create()
.database('mydb')
.retentionPolicy('myrp')
.measurement('diff')
其中mean字段是根据我以前的评论计算的字段。试试看!
此外,为了进一步调试,请尝试添加您希望放置眼球的日志节点。
希望这会有所帮助!问候