暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

No.8 - 时序数据库随笔 - InfluxDB 多条时序数据联合分析

孙金城 2021-04-06
2244

 如何进行多条时序数据的分析处理?本文用多种方式进行示例说明。


01


问题

 

正文本篇我们要解决 No6,No7提到的网友问题,如下:

简单说就是如何处理两条时间线的数值计算?上面例子是一个 “+” 加法。


02


数据准备


我们首先利用InfluxDB解决上述问题,首先进行数据准备,建立一个测试的bucket,建立之前先检查一下现有的bucket。

启动InfluxDB实例,如下:

启动之后,我们查看一下现有的bucket,如下:

    influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx bucket list         
    ID Name Retention Shard group duration Organization ID
    98e86f05543f5866 _monitoring 168h0m0s 24h0m0s 56b35f89025991c8
    b9b9609ae3e08b97        _tasks          72h0m0s         24h0m0s                 56b35f89025991c8
    复制

    创建名为iot的bucket,如下命令:

      bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx setup \
      --username iot \
      --password 2021iotdb \
      --org org \
      --bucket 2021iotdb \
      --retention 1h \
      --token iot_test_token \
      --host http://localhost:8086 \
      --force
      复制

      执行成功之后会显示如下:

          influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx setup \
        --username iot \
        --password 2021iotdb \
        --org org \
        --bucket 2021iotdb \
        --retention 1h \
        --token iot_test_token \
        --host http://localhost:8086 \
        --force
        Config default has been stored in Users/jincheng/.influxdbv2/configs.
        User Organization Bucket
        iot org 2021iotdb
        复制

        我们用命令查看一下:

            influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx bucket list
          ID Name Retention Shard group duration Organization ID
          c05283f56bf9cead 2021iotdb 1h0m0s 1h0m0s 0b1ad4c0cd4db9ca
          e70f5bb2fdaa5dd2 _monitoring 168h0m0s 24h0m0s 0b1ad4c0cd4db9ca
          56241b01789c1a1b  _tasks    72h0m0s    24h0m0s      0b1ad4c0cd4db9ca
          复制

          插入两条时间线数据,如下:

            ➜  influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx write --bucket 2021iotdb --precision s "m1 vm=3333 $(date +%s)"
            ➜ influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx write --bucket 2021iotdb --precision s "m2 vn=4444 $(date +%s)"
            复制

            我们插入两条时间线数据,m1的vm=3333,m2的vn=4444,我们的需求是vm + vn。 


            03


            JOIN查询


            我们看一下JOIN的功能定义:


            The join() function merges two or more input streams, whose values are equal on a set of common columns, into a single output stream. Flux allows you to join on any columns common between two data streams and opens the door for operations such as cross-measurement joins and math across measurements.


            语法:join(tables: {key1: table1, key2: table2}, on: ["_time", "_field"], method: "inner")


            这个和我们标准数据库的JOIN语义基本一致们先查看一下用于测试的数据,我们既可以用influxCLI,如下:

            我们发现数据已经插入成功。也可以用fluxCLI,InlfuxDB社区更推进用flux,我们打开一个flux repl。细节可以查阅 前面一篇No6。我用IDE打开如下:

              > from(bucket:"2021iotdb") |> range(start:-1h)
              Result: _result
              Error: unauthorized access
              复制

              如图,我们在IDE里面执行查询时候,提示我们需要token,那么influx query为啥不需要呢,IDE没有默认去读取配置文件,我们可以配置环境变量也可以直接添加token,查询语句如下:

                > from(bucket:"2021iotdb", org:"org", token:"iot_test_token") |> range(start:-1h)
                Result: _result
                Table: keys: [_start, _stop, _field, _measurement]
                _start:time _stop:time _field:string _measurement:string _time:time _value:float
                ------------------------------ ------------------------------ ---------------------- ---------------------- ------------------------------ ----------------------------
                2021-04-06T05:36:50.079542000Z 2021-04-06T06:36:50.079542000Z vm m1 2021-04-06T06:23:16.000000000Z 3333
                Table: keys: [_start, _stop, _field, _measurement]
                _start:time _stop:time _field:string _measurement:string _time:time _value:float
                ------------------------------ ------------------------------ ---------------------- ---------------------- ------------------------------ ----------------------------
                2021-04-06T05:36:50.079542000Z  2021-04-06T06:36:50.079542000Z                      vn                      m2  2021-04-06T06:23:34.000000000Z                          4444  
                复制

                好的,一切都还算顺利,我们看看如果计算 vm + vn呢?如果我们把 m1和m2两个时间序列看成是两个流(表),那么我们要进行两个表的操作,第一想到的应该是两个表进行JOIN将两个表的数据合并成一个宽表,然后在进行列求值,如下:

                  tab1 = from(bucket:"2021iotdb", org:"org", token:"iot_test_token") 
                  |> range(start:-1h)
                  |> filter(fn:(r) => r._measurement == "m1")


                  tab2 = from(bucket:"2021iotdb", org:"org", token:"iot_test_token")
                  |> range(start:-1h)
                  |> filter(fn:(r) => r._measurement == "m2")
                  复制

                  得到两个表之后我们在进行JOIN操作,查询语句如下:

                    join(tables: {m1:tab1, m2:tab2},
                    on: ["_time"]
                    ) |> map(fn:(r) => ({_time: r._time,
                    _value: r._value_m1 + r._value_m2
                    }))
                    复制

                    上面的on表示JOIN的条件,但是我们发现,tab1和tab2中时间字段并不相同,如下:

                    所以我们需要再快速的插入两条数据,使得时间字段相同,我们才能拿到结果,插入之后数据如下:

                    这样我们再进行查询:

                      join(tables: {m1:tab1, m2:tab2},
                      on: ["_time"]
                      ) |> map(fn:(r) => ({_time: r._time,
                      _value: r._value_m1 + r._value_m2
                      }))
                      复制

                      如上我们完成了查询需求。哈哈,那是不是在InfluxDB里面进行这类查询都是用JOIN的方式吗?是否有更简单的方式?看下面部分:)



                      03


                      PIOVT查询

                      我们看一下PIVOT的功能定义:


                      The pivot() function collects values stored vertically (column-wise) in a table and aligns them horizontally (row-wise) into logical sets.


                      语法:pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")


                      其实在标准数据库里面也有PIVOT,在InfluxDB里面pivot可以将行转换为列,进而将两个时序数据值变成一个Table中的两个列,这个内置也可以为用户进行内部优化处理。我们看看如何操作:

                        > from(bucket:"2021iotdb", org:"org", token:"iot_test_token") 
                        |> range(start:-1h)
                        |> pivot(rowKey:["_time"], columnKey: ["_measurement","_field"],valueColumn: "_value")
                        复制

                        如上语句执行结果如下:

                        我们发现m1的vm和m2的vn都变成一个表的某一列了,这样pivot就完美的将两个时序数据合并成宽表的列了。我们再加上具体的过滤条件,如下:

                        接下来我们再进行计算,如下:

                          from(bucket:"2021iotdb", org:"org", token:"iot_test_token") 
                          |> range(start:-1h)
                          |> filter(fn:(r) => r._measurement == "m1" or r._measurement == "m2")
                          |> pivot(rowKey:["_time"], columnKey: ["_measurement","_field"],valueColumn: "_value")
                          |> map(fn:(r) => ({_time: r._time, _value:r.m1_vm + r.m2_vn}))
                          复制

                          OK, 大家是不是赶紧PIVOT非常方便?:)


                          04


                          问题


                          最后,留个问题给大家,大家知道标准数据库里面PIVOT和UNPIVOT的使用场景吗?或者Flink&Spark如何支持PIVOT?或者知道Apache IoTDB里面如何处理多条时序数据分析梳理吗?我们下一篇见:)

                          文章转载自孙金城,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                          评论