暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用 C# 和 SQL Server 实现数据库的实时数据同步

技术老小子 2024-10-30
2

在现代应用程序中,及时更新不同数据库之间的数据至关重要。本文将介绍如何在 SQL Server 中使用 C# 实现数据的实时同步。我们将使用 SQLDependency
类来监听数据库表的变化,并将这些变化实时地同步到另一张表中。

前提条件

在开始之前,请确保已经设置好两个 SQL Server 数据库:

  • SourceDB
    : 包含你需要监听的表。

  • TargetDB
    : 目标数据库,用于同步数据。



配置 SQL Server

首先,需要启用 SQL Server 的查询通知服务,以便支持 SQLDependency
。请使用以下命令启用数据库服务代理:

查看

复制
    SELECT name, is_broker_enabled  
    FROM sys.databases;


    ALTER DATABASE SourceDB SET ENABLE_BROKER;
    复制
    复制

    编写 C# 程序

    下面的 C# 程序将使用 SQLDependency
    来监听 SourceDB
    中的 SourceTable
    表的变化。我们将在数据插入时同步到 TargetDB
    中的 TargetTable

    程序代码

      using System;
      using System.Data;
      using System.Data.SqlClient;
      using System.Configuration;


      class Program
      {
      private static bool _continueRunning = true;


      static void Main()
      {
      Console.WriteLine("数据同步程序已启动。按 'Q' 键退出。");


      // 设置连接字符串
      string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
      string targetConnectionString = ConfigurationManager.ConnectionStrings["TargetDB"].ConnectionString;


      // 启用 SQLDependency
      SqlDependency.Start(sourceConnectionString);


      try
      {
      while (_continueRunning)
      {
      try
      {
      using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
      {
      sourceConnection.Open();
      StartListening(sourceConnection);


      // 保持连接打开状态
      while (_continueRunning)
      {
      if (Console.KeyAvailable)
      {
      var key = Console.ReadKey(true).Key;
      if (key == ConsoleKey.Q)
      {
      _continueRunning = false;
      break;
      }
      }
      Thread.Sleep(100);
      }
      }
      }
      catch (Exception ex)
      {
      Console.WriteLine($"发生错误: {ex.Message}");
      Console.WriteLine("5秒后重试...");
      Thread.Sleep(5000);
      }
      }
      }
      finally
      {
      SqlDependency.Stop(sourceConnectionString);
      Console.WriteLine("数据同步程序已停止。");
      }
      }


      private static void StartListening(SqlConnection connection)
      {
      using (SqlCommand command = new SqlCommand("SELECT ID, Name, Value, Created_Time FROM dbo.t1", connection))
      {
      SqlDependency dependency = new SqlDependency(command);
      dependency.OnChange += new OnChangeEventHandler(OnDataChange);


      using (SqlDataReader reader = command.ExecuteReader())
      {
      // 初次加载数据处理
      }
      }
      }


      private static void OnDataChange(object sender, SqlNotificationEventArgs e)
      {
      if (e.Info == SqlNotificationInfo.Insert)
      {
      Console.WriteLine("数据已插入。事件类型: " + e.Info.ToString());
      SyncData();
      }


      // 重新启用监听
      string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
      using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
      {
      sourceConnection.Open();
      StartListening(sourceConnection);
      }
      }


      private static void SyncData()
      {
      string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
      string targetConnectionString = ConfigurationManager.ConnectionStrings["TargetDB"].ConnectionString;


      using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
      using (SqlConnection targetConnection = new SqlConnection(targetConnectionString))
      {
      sourceConnection.Open();
      targetConnection.Open();


      // 获取最新插入的数据
      SqlCommand fetchDataCommand = new SqlCommand("SELECT TOP 1 * FROM t1 ORDER BY Created_Time DESC", sourceConnection);
      using (SqlDataReader dataReader = fetchDataCommand.ExecuteReader())
      {
      if (dataReader.Read())
      {
      Guid id = (Guid)dataReader["ID"];
      string name = (string)dataReader["Name"];
      decimal value = (decimal)dataReader["Value"];
      DateTime created_time = (DateTime)dataReader["created_time"];


      // 将数据插入到 TargetTable
      SqlCommand insertCommand = new SqlCommand("INSERT INTO t1 (ID, Name, Value,Created_Time) VALUES (@ID, @Name, @Value,@Created_Time)", targetConnection);
      insertCommand.Parameters.AddWithValue("@ID", id);
      insertCommand.Parameters.AddWithValue("@Name", name);
      insertCommand.Parameters.AddWithValue("@Value", value);
      insertCommand.Parameters.AddWithValue("@Created_Time", created_time);


      insertCommand.ExecuteNonQuery();
      }
      }
      }
      }
      }
      复制

      增加更新后同步

        private static void SyncUpdatedData()
        {
        string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
        string targetConnectionString = ConfigurationManager.ConnectionStrings["TargetDB"].ConnectionString;


        using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
        using (SqlConnection targetConnection = new SqlConnection(targetConnectionString))
        {
        sourceConnection.Open();
        targetConnection.Open();


        // 获取最近更新的数据
        // 注意:这里假设你有一个 Last_Updated_Time 字段来跟踪更新时间
        SqlCommand fetchDataCommand = new SqlCommand("SELECT TOP 1 * FROM t1 ORDER BY Last_Updated_Time DESC", sourceConnection);
        using (SqlDataReader dataReader = fetchDataCommand.ExecuteReader())
        {
        if (dataReader.Read())
        {
        Guid id = (Guid)dataReader["ID"];
        string name = (string)dataReader["Name"];
        decimal value = (decimal)dataReader["Value"];
        DateTime last_updated_time = (DateTime)dataReader["Last_Updated_Time"];


        // 更新目标表中的数据
        SqlCommand updateCommand = new SqlCommand(
        "UPDATE t1 SET Name = @Name, Value = @Value, Last_Updated_Time = @Last_Updated_Time WHERE ID = @ID",
        targetConnection);
        updateCommand.Parameters.AddWithValue("@ID", id);
        updateCommand.Parameters.AddWithValue("@Name", name);
        updateCommand.Parameters.AddWithValue("@Value", value);
        updateCommand.Parameters.AddWithValue("@Last_Updated_Time", last_updated_time);


        int rowsAffected = updateCommand.ExecuteNonQuery();
        if (rowsAffected > 0)
        {
        Console.WriteLine($"已同步更新的数据: ID={id}, Name={name}, Value={value}, Created_Time={last_updated_time}");
        }
        else
        {
        Console.WriteLine($"未找到要更新的记录: ID={id}");
        }
        }
        }
        }
        }
        复制


        复制

        配置文件 (App.config
        )

        确保在你的项目中包含一个配置文件来管理数据库连接字符串。

          <?xml version="1.0" encoding="utf-8" ?>
          <configuration>
          <connectionStrings>
          <add name="SourceDB" connectionString="Data Source=your_source_server;Initial Catalog=SourceDB;Integrated Security=True" >
          <add name="TargetDB" connectionString="Data Source=your_target_server;Initial Catalog=TargetDB;Integrated Security=True" >
          </connectionStrings>
          </configuration>
          复制

          关键点说明

          • SQLDependency: 通过 SQLDependency
            监听数据表变化,允许我们对 SourceTable
            进行实时监听。当数据更改时自动触发 OnChange
            事件。

          • 重新开启监听: 数据变化后,必须重新启动监听,以确保程序在后续的变化中继续有效。


          注意事项

          • 确保在 SQL Server 上启用查询通知和服务代理。

          • SQLDependency
            适用于简单查询,不能包括复杂查询、联接或聚合。

          • 如果项目对性能和实时性要求较高,建议结合其他工具或技术方案,如 Change Tracking
            Change Data Capture
            等。


          通过以上步骤,你可以实现对 SQL 数据库变化的实时监听和数据同步,从而保持数据库之间的数据一致性和实时性。


          如果你正在从事上位机、自动化、机器视觉、物联网(IOT)项目或数字化转型方面的工作,欢迎加入我的微信圈子!在这里,我们不仅可以轻松畅聊最新技术动态和行业趋势,还能够在技术问题上互相帮助和支持。我会尽量利用我的知识和经验来帮助你解决问题,当然也期待从大家的专业见解中学习和成长。无论你是新手还是老鸟,期待与志同道合的朋友交流心得,一起进步!



          文章转载自技术老小子,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论