暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

TDengine+MQTT:实现物联网设备数据采集

老王两点中 2025-03-10
8
随着物联网(IoT)技术的迅速发展,越来越多的设备接入互联网并产生大量的时间序列数据。在物联网领域,设备数据的实时上报采集与存储是至关重要的环节。为了有效地处理这些数据,选择合适的数据库和通信协议变得至关重要。TDengine作为一款高性能的时序数据库,与轻量级的消息传输协议MQTT相结合,能够高效地实现物联网设备数据的上报与存储。

一、技术架构概述

1. TDengine简介

TDengine是一款专为物联网、工业互联网、金融、IT运维监控等场景设计的高性能、可伸缩的时序数据库(TSDB)。

它具有以下特点:

  • 高性能:支持每秒百万级的写入速度,查询速度也极为迅速。

  • 高可用:支持集群部署,保障数据的高可用性和可靠性。

  • 易扩展:可以轻松地横向扩展,满足大规模数据存储与处理需求。

  • 丰富的功能:支持数据压缩、数据降采样、数据插值等时序数据处理功能。

2. MQTT简介

MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的消息传输协议,非常适合资源受限和网络条件较差的环境。

它具有以下特点:

  • 轻量级:协议开销小,占用带宽少,适合物联网设备使用。

  • 低功耗:设备在发送和接收消息时消耗的电量低。

  • 高可靠性:提供多种服务质量(QoS)级别,确保消息的可靠传输。

二、实现方案

1. 系统架构

整个系统架构可以分为三个主要部分:物联网设备、MQTT代理服务器和TDengine数据库服务器。

  • 物联网设备:负责采集数据并通过MQTT协议将数据发送到MQTT代理服务器。

  • MQTT代理服务器:作为消息的中间代理,负责接收物联网设备发送的消息,并将其转发到TDengine数据库服务器。

  • TDengine数据库服务器:负责接收MQTT代理服务器转发的数据,并将其存储在数据库中,供后续的数据分析和处理使用。

2.2 数据上报流程

(1)设备数据采集:物联网设备通过传感器采集数据,如温度、湿度、压力等。

(2)数据格式化:将采集到的数据按照一定的格式进行封装,如JSON格式。

(3)MQTT连接建立:物联网设备通过MQTT客户端库与MQTT代理服务器建立连接。

(4)数据发送:物联网设备将格式化后的数据通过MQTT协议发送到MQTT代理服务器。

(5)数据转发:MQTT代理服务器接收到数据后,将其转发到TDengine数据库服务器。

(6)数据存储:TDengine数据库服务器接收到数据后,将其存储在相应的数据库表中。

三、Java示例代码

以下是使用Java实现物联网设备数据上报的示例代码,包括MQTT客户端的连接、数据发送和TDengine数据库的数据存储。

1. 添加依赖包

您可以使用Maven或Gradle来管理这些依赖项。以下是Maven的pom.xml文件中的依赖项片段:

    <dependencies>
        <!-- Paho MQTT Java Client -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        <!-- TDengine JDBC Driver -->
        <dependency>
            <groupId>com.taosdata.jdbc</groupId>
            <artifactId>taos-jdbc</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>


    复制

    2.MQTT客户端连接与数据发送

      import org.eclipse.paho.client.mqttv3.MqttClient;
      import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      import org.eclipse.paho.client.mqttv3.MqttMessage;
      import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
      public class MqttPublisher {
          // MQTT代理服务器地址
          private static final String SERVER_URI = "tcp://mqtt-broker-address:1883"; 
          // MQTT客户端ID
          private static final String CLIENT_ID = "mqtt-client-id"; 
         
          public void publishData(String topic, String data) {
              try {
                  // 创建MQTT客户端
                  MqttClient mqttClient = new MqttClient(SERVER_URI, CLIENT_ID, new MemoryPersistence());
                  
                  // 配置连接选项
                  MqttConnectOptions options = new MqttConnectOptions();
                  options.setCleanSession(true);
                  
                  // 连接MQTT代理服务器
                  mqttClient.connect(options);
                  
                  // 创建MQTT消息
                  MqttMessage message = new MqttMessage(data.getBytes());
                  message.setQos(1); // 设置QoS级别为1
                  
                  // 发布消息
                  mqttClient.publish(topic, message);
                  
                  // 断开连接
                  mqttClient.disconnect();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      复制

      3.TDengine数据库数据存储

        import com.taosdata.jdbc.TSDBDriver;
        import java.sql.Connection;
        import java.sql.DriverManager;
        import java.sql.PreparedStatement;
        import java.sql.SQLException;
        public class TDengineStorage {
            private static final String DB_URL = "jdbc:TAOS://tdengine-server-address:6030/iot_db"// TDengine数据库URL
            private static final String USER = "root"; // TDengine数据库用户名
            private static final String PASSWORD = "taosdata"; // TDengine数据库密码
            public void storeData(String deviceName, String data) {
                try {
                    // 加载TDengine JDBC驱动
                    Class.forName(TSDBDriver.class.getName());
                    
                    // 连接TDengine数据库
                    Connection conn = DriverManager.getConnection(DB_URL, USER, PASSWORD);
                    
                    // 创建SQL语句
                    String sql = "INSERT INTO " + deviceName + " (ts, data) VALUES (?, ?)";
                    PreparedStatement pstmt = conn.prepareStatement(sql);
                    
                    // 设置参数
                    pstmt.setTimestamp(1, new java.sql.Timestamp(System.currentTimeMillis()));
                    pstmt.setString(2, data);
                    
                    // 执行SQL语句
                    pstmt.executeUpdate();
                    
                    // 关闭连接
                    pstmt.close();
                    conn.close();
                } catch (ClassNotFoundException | SQLException e) {
                    e.printStackTrace();
                }
            }
        }
        复制

        通过TDengine和MQTT的结合,可以构建一个高效的物联网数据采集系统,可以实现物联网设备数据的高效上报与存储。TDengine提供了高性能的时序数据存储能力,而MQTT则为设备数据的传输提供了轻量级、可靠的通信方式。这种组合方案不仅能够满足物联网设备数据上报的需求,还能够为后续的数据分析和处理提供坚实的基础。

        文章转载自老王两点中,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论