暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用 Tensorflow 在 PostgreSQL 中进行回归分析:第三部分 - 数据分析

飞象数据 2022-08-20
578

这个迷你系列博客的第一部分中,我们介绍了如何设置PostgreSQL,以便我们可以使用TensorFlowhttps://www.tensorflow.org/在数据库服务器中使用pl/python3过程语言对数据进行回归分析。https://www.enterprisedb.com/blog/regression-analysis-postgresql-tensorflow-part-1-getting-started

第二部分,我们了解了如何在训练神经网络之前理解和预处理数据。

https://www.enterprisedb.com/blog/regression-analysis-postgresql-tensorflow-part-2-data-pre-processing

在第三部分,我们将使用我们准备和优化的数据来训练一个网络,然后使用它来分析数据——所有这些都来自于一个PostgreSQL数据库!

拆分数据

当我们完成博客系列的最后一部分时,我们有一组数据,我们从中删除了具有异常值的行和与结果相关性较差的列。现在需要将该数据集拆分为训练、验证和测试数据集,每个数据集的结果(或输出特征)都将与输入特征分开。这是通过计算每个所需的行数,将输入和输出分割成单独的 Pandas 数据帧,然后相应地将它们逐行分割来实现的:

    # Figure out how many rows to use for training, validation and test
    test_rows = int((actual_rows/100) * test_pct)
    validation_rows = int(((actual_rows)/100) * validation_pct)
    training_rows = actual_rows - test_rows - validation_rows

    # Split the data into input and output
    input = data[columns[:-1]]
    output = data[columns[-1:]]

    # Split the input and output into training, validation and test sets
    training_input = input[:training_rows]
    training_output = output[:training_rows]
    validation_input = input[training_rows:training_rows+validation_rows]
    validation_output = output[training_rows:training_rows+validation_rows]
    test_input = input[training_rows+validation_rows:]
    test_output = output[training_rows+validation_rows:]

    # Make a note of the largest result value
    max_z = max(output[output.columns[0]])
    复制

    从代码中可以看出,三组数据中每一组的大小由 test_pct
    validation_pct
    变量决定,这两个变量决定了每个初始数据集使用的百分比。这里没有硬性规定——事实上,在我的测试代码中,这些值都被传递到 pl/python3 函数中,因此它们可以很容易地被覆盖。我给他们两个默认值 10%。您几乎肯定希望训练数据集比其他数据集大得多。

    创建模型

    模型或神经网络是一组具有可训练参数的过滤器层 - 您可能认为是神经元。这个模型有多层的事实意味着我们正在执行深度学习。

      # Define the model
      model = tf.keras.Sequential()

      model.add(tf.keras.layers.Dense(units=units,
      input_shape=(len(columns) - 1,),
      activation = 'relu'))

      for units in structure:
      model.add(tf.keras.layers.Dense(units=units, activation = 'relu'))

      model.add(tf.keras.layers.Dense(units=1, activation='linear'))

      # Compile it
      model.compile(loss=tf.keras.losses.MeanSquaredError(),
      optimizer='adam')

      summary = []
      model.summary(print_fn=lambda x: summary.append(x))
      plpy.notice('Model architecture:\n{}'.format('\n'.join(summary)))
      复制

      首先,我们创建一个空模型,并添加一个密集层,其中包含与我们在数据中具有输入特征的相同数量的过滤器。

      为了测试不同的模型配置,我的 pl/python3 函数将一维整数数组作为参数。对于数组中的每个元素,将添加指定数量的过滤器的额外密集层。

      要添加的最后一层是包含单个过滤器的密集层;对应于我们试图预测的单个标量结果值。当然,您可能想要预测多个值,在这种情况下,结果数据集和模型的最后一层会更宽。重要的是输入层的过滤器数量与输入特征的数量相匹配,输出层的过滤器数量与输出特征的数量相匹配。

      您会注意到每一层都指定了一个激活函数。在将值传递到下一层之前,将激活函数应用于过滤器的输出。ReLU(整流线性单元)用于允许对非线性函数进行建模,输出层上的简单线性函数用于生成最终输出。

      定义模型后,我们将对其进行编译以进行训练。我们指定用于衡量损失的指标(模型执行的好坏)和学习优化器。我更喜欢损失函数的均方误差(我经常将其转换为均方根损失来调试输出),并且 Adam 优化器运行良好,帮助网络在训练期间有效地为每个滤波器选择可调值。

      最后,我输出了一个模型摘要,用于调试目的(注意输入层不是 TensorFlow 显示的):

        NOTICE:  Model architecture:
        Model: "sequential"
        _________________________________________________________________
        Layer (type) Output Shape Param #
        =================================================================
        dense (Dense) (None, 13) 182
        _________________________________________________________________
        dense_1 (Dense) (None, 13) 182
        _________________________________________________________________
        dense_2 (Dense) (None, 1) 14
        =================================================================
        Total params: 378
        Trainable params: 378
        Non-trainable params: 0
        复制

        训练模型

        模型是在多个迭代或时代中训练的。在每个时期,调整滤波器的可调参数,目的是根据定义模型时选择的损失函数的输出实现最小损失。优化算法控制这些参数的调整方式,以使模型尽可能快地磨合到最佳配置。

        训练过度拟合时需要注意的一个问题。当网络被训练太多次,并且有效地学习了数据而不是连接输入和输出的算法时,就会发生这种情况。当输入是它以前见过的输入时,这将使模型非常擅长产生正确的输出,但对于以前看不见的输入却很差。

        您会记得我们将数据分成三组。我们将使用训练数据来训练模型,并在训练过程中使用验证数据对其进行验证。

          # Save a checkpoint each time our loss metric improves.
          checkpoint = ModelCheckpoint('{}/{}.h5'.format(output_path, output_name),
          monitor='loss',
          save_best_only=True,
          mode='min')

          # Use early stopping
          early_stopping = EarlyStopping(patience=50)

          # Display output
          logger = LambdaCallback(
          on_epoch_end=lambda epoch,
          logs: plpy.notice(
          'epoch: {}, training RMSE: {} ({}%), validation RMSE: {} ({}%)'.\
          format(
          epoch,
          sqrt(logs['loss']),
          round(100 / max_z * sqrt(logs['loss']), 5),
          sqrt(logs['val_loss']),
          round(100 / max_z * sqrt(logs['val_loss']), 5))))

          # Train it!
          history = model.fit(training_input,
          training_output,
          validation_data=(validation_input,
          validation_output),
          epochs=epochs,
          batch_size=50,
          callbacks=[logger, checkpoint, early_stopping])
          复制

          首先,我们设置了三个回调函数:

          • 检查点:这将在每个 epoch 之后保存模型,这将产生比之前所有 epoch 更好的结果。

          • early_stopping:这将在 50 个 epoch(耐力参数)后没有明显进展时停止训练,以防止过度拟合。

          • logger:通常 TensorFlow 会在每个 epoch 上向 stdout 输出一条状态消息。因为我们在 PostgreSQL 的 pl/python3 下运行,所以我们需要输出我们自己的作为 NOTICE。我们将输出 epoch 数、训练的平方根和验证损失(我们正在测量均方误差,因此平方根会将其转换回我们可以与实际输出进行比较的值),以及两者的百分比到原始数据中的最大输出值(max_z)。

          一旦设置了回调,我们就可以在模型上执行 model.fit() 方法,将训练和验证数据、时期数、要处理的记录的批量大小和回调列表传递给它。输出将如下所示:

            NOTICE:  epoch: 0, training RMSE: 34.744217425799796 (95.9785%), validation RMSE: 66.97870901217416 (185.02406%)
            NOTICE: epoch: 1, training RMSE: 20.016233341165083 (55.29346%), validation RMSE: 86.36728608372935 (238.58366%)
            NOTICE: epoch: 2, training RMSE: 20.81612147077206 (57.5031%), validation RMSE: 74.53097811795274 (205.88668%)
            NOTICE: epoch: 3, training RMSE: 15.706952561354747 (43.38937%), validation RMSE: 50.19628950897417 (138.66378%)
            ...
            NOTICE: epoch: 88, training RMSE: 4.051587516569147 (11.19223%), validation RMSE: 14.049098431858425 (38.80966%)
            NOTICE: epoch: 89, training RMSE: 4.043822798154733 (11.17078%), validation RMSE: 14.405585350211444 (39.79443%)
            NOTICE: epoch: 90, training RMSE: 4.039318438155652 (11.15834%), validation RMSE: 13.547520957602528 (37.42409%)
            NOTICE: epoch: 91, training RMSE: 4.0253103101067085 (11.11964%), validation RMSE: 14.17337045065995 (39.15296%)
            复制

            即使最后一个 epoch 的错误率不是最好的,checkpoint 回调也会为我们保存最有效的模型。

            我们可以很容易地绘制错误率与 epoch 的关系,以查看网络在训练时改进的速度:

            执行此操作的代码(使用 matplotlib)非常简单:

              # Graph the results
              training_loss = history.history['loss']
              validation_loss = history.history['val_loss']

              epochs_range = range(len(history.history['loss']))

              plt.figure(figsize=(12, 8))
              plt.grid(True)
              plt.plot(epochs_range,
              [x ** 0.5 for x in training_loss],
              label='Training')
              plt.plot(epochs_range,
              [x ** 0.5 for x in validation_loss],
              label='Validation')
              plt.xlabel('Epoch')
              plt.ylabel('Root Mean Squared Error')
              plt.legend(loc='upper right')
              plt.title('Training and Validation Root Mean Squared Error')
              plt.savefig('{}/{}_rmse.png'.format(output_path, output_name))
              plpy.notice('Created: {}/{}_rmse.png\n'.format(output_path,
              output_name))
              复制

              我们首先从model.fit()方法返回的历史对象中获取每个 epoch 的训练和验证损失值。我们找出沿 X 轴绘制的时期范围,设置创建一个基本图,然后添加两个系列(平方根,因此我们可以将数字与实际数据进行比较)。最后,我们设置图例和标签等并保存图像。

              此时,我们已经保存了一个模型,我们可以根据需要加载它以对新数据或假设数据进行分析。如果我们获得想要用来改进它的新数据,我们还可以在该模型的基础上重新训练。

              您会记得,我们之前除了训练和验证数据集之外,还创建了一组测试数据。在这个阶段,我们可以使用 model.evaluate() 方法,将测试输入和输出特征传递给它,以便进一步测试模型。它将在测试模式和回波损耗和指标值下运行预测,以指示模型的执行情况。

              执行分析

              实际上,使用我们创建的模型来执行分析非常简单,可以很容易地封装到通用 pl/python3 函数中:

                CREATE OR REPLACE FUNCTION public.tf_predict(
                input_values double precision[],
                model_path text)
                RETURNS double precision[]
                LANGUAGE 'plpython3u'
                COST 100
                VOLATILE PARALLEL UNSAFE
                AS $BODY$
                import tensorflow as tf

                # Reset everything
                tf.keras.backend.clear_session()
                tf.random.set_seed(42)

                # Load the model
                model = tf.keras.models.load_model(model_path)

                # Are we dealing with a single prediction, or a list of them?
                if not any(isinstance(sub, list) for sub in input_values):
                data = [input_values]
                else:
                data = input_values

                # Make the prediction(s)
                result = model.predict([data])[0]
                result = [ item for elem in result for item in elem]

                return result
                $BODY$;
                复制

                首先,我们定义函数的 SQL 接口,传入一组输入特征,以及保存模型的路径。

                在函数体内,我们导入 TensorFlow 库,然后确保一切都已重置。然后从给定的路径加载模型。

                我们可以使用该函数一次进行一个或多个预测,方法是传入输入特征的一维数组或包含多组特征的二维数组。TensorFlow 需要一个二维数组(实际上是一个列表或类似对象,例如 Numpy 数组或 TensorFlow 张量),因此如果我们只传入一个一维数组,我们需要将其更改为包含单个列表的列表特点。否则,我们保持原样,因为 PostgreSQL 已经为我们将二维数组转换为一列列表。

                然后使用model.predict()方法分析数据并进行预测,并返回一个 Numpy 的结果数组,我们将其转换为列表并返回给 PostgreSQL。该功能可以在下面看到:

                这个例子使用了我从 Boston Housing 数据集中随机选择的两行(因为我们知道它们的结果值应该是什么)。我删除了与medv列最松散相关的三列(chasdisb),然后训练了一个模型,除了输入和输出层外,还有四层 64 个过滤器,超过 5000 个 epoch(早期停止发生在 epoch 108)。

                结论

                在这个博客迷你系列中,我们探讨了如何设置 PostgreSQL 以便我们可以在我们的数据库中使用 TensorFlow,如何检查和优化数据以训练模型,以及如何执行训练并使用模型进行预测.

                很容易看出,这里展示的技术提供了大量的可能性;将 TensorFlow 与 pl/python3 一起使用几乎只是一个示例,尽管它是一个非常强大且有趣的示例,但当然类似的代码可以与其他机器学习库(例如PyTorch https://pytorch.org/ )一起使用,或者实际上,任何其他可以使用的库用 PostgreSQL 中的数据做有趣的事情。

                文章转载自飞象数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论