在現(xiàn)代應(yīng)用程序中,及時(shí)更新不同數(shù)據(jù)庫之間的數(shù)據(jù)至關(guān)重要。本文將介紹如何在 SQL Server 中使用 C# 實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)同步。我們將使用 SQLDependency
類來監(jiān)聽數(shù)據(jù)庫表的變化,并將這些變化實(shí)時(shí)地同步到另一張表中。
前提條件
在開始之前,請確保已經(jīng)設(shè)置好兩個(gè) SQL Server 數(shù)據(jù)庫:
配置 SQL Server
首先,需要啟用 SQL Server 的查詢通知服務(wù),以便支持 SQLDependency
。請使用以下命令啟用數(shù)據(jù)庫服務(wù)代理:
查看
SELECT name, is_broker_enabled
FROM sys.databases;
ALTER DATABASE SourceDB SET ENABLE_BROKER;
編寫 C# 程序
下面的 C# 程序?qū)⑹褂?nbsp;SQLDependency
來監(jiān)聽 SourceDB
中的 SourceTable
表的變化。我們將在數(shù)據(jù)插入時(shí)同步到 TargetDB
中的 TargetTable
。
程序代碼
using System;
using System.Data;
using System.Data.SqlClient;
using System.Configuration;
class Program
{
private static bool _continueRunning = true;
static void Main()
{
Console.WriteLine("數(shù)據(jù)同步程序已啟動(dòng)。按 'Q' 鍵退出。");
// 設(shè)置連接字符串
string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
string targetConnectionString = ConfigurationManager.ConnectionStrings["TargetDB"].ConnectionString;
// 啟用 SQLDependency
SqlDependency.Start(sourceConnectionString);
try
{
while (_continueRunning)
{
try
{
using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
{
sourceConnection.Open();
StartListening(sourceConnection);
// 保持連接打開狀態(tài)
while (_continueRunning)
{
if (Console.KeyAvailable)
{
var key = Console.ReadKey(true).Key;
if (key == ConsoleKey.Q)
{
_continueRunning = false;
break;
}
}
Thread.Sleep(100);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"發(fā)生錯(cuò)誤: {ex.Message}");
Console.WriteLine("5秒后重試...");
Thread.Sleep(5000);
}
}
}
finally
{
SqlDependency.Stop(sourceConnectionString);
Console.WriteLine("數(shù)據(jù)同步程序已停止。");
}
}
private static void StartListening(SqlConnection connection)
{
using (SqlCommand command = new SqlCommand("SELECT ID, Name, Value, Created_Time FROM dbo.t1", connection))
{
SqlDependency dependency = new SqlDependency(command);
dependency.OnChange += new OnChangeEventHandler(OnDataChange);
using (SqlDataReader reader = command.ExecuteReader())
{
// 初次加載數(shù)據(jù)處理
}
}
}
private static void OnDataChange(object sender, SqlNotificationEventArgs e)
{
if (e.Info == SqlNotificationInfo.Insert)
{
Console.WriteLine("數(shù)據(jù)已插入。事件類型: " + e.Info.ToString());
SyncData();
}
// 重新啟用監(jiān)聽
string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
{
sourceConnection.Open();
StartListening(sourceConnection);
}
}
private static void SyncData()
{
string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
string targetConnectionString = ConfigurationManager.ConnectionStrings["TargetDB"].ConnectionString;
using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
using (SqlConnection targetConnection = new SqlConnection(targetConnectionString))
{
sourceConnection.Open();
targetConnection.Open();
// 獲取最新插入的數(shù)據(jù)
SqlCommand fetchDataCommand = new SqlCommand("SELECT TOP 1 * FROM t1 ORDER BY Created_Time DESC", sourceConnection);
using (SqlDataReader dataReader = fetchDataCommand.ExecuteReader())
{
if (dataReader.Read())
{
Guid id = (Guid)dataReader["ID"];
string name = (string)dataReader["Name"];
decimal value = (decimal)dataReader["Value"];
DateTime created_time = (DateTime)dataReader["created_time"];
// 將數(shù)據(jù)插入到 TargetTable
SqlCommand insertCommand = new SqlCommand("INSERT INTO t1 (ID, Name, Value,Created_Time) VALUES (@ID, @Name, @Value,@Created_Time)", targetConnection);
insertCommand.Parameters.AddWithValue("@ID", id);
insertCommand.Parameters.AddWithValue("@Name", name);
insertCommand.Parameters.AddWithValue("@Value", value);
insertCommand.Parameters.AddWithValue("@Created_Time", created_time);
insertCommand.ExecuteNonQuery();
}
}
}
}
}
增加更新后同步
private static void SyncUpdatedData()
{
string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
string targetConnectionString = ConfigurationManager.ConnectionStrings["TargetDB"].ConnectionString;
using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
using (SqlConnection targetConnection = new SqlConnection(targetConnectionString))
{
sourceConnection.Open();
targetConnection.Open();
// 獲取最近更新的數(shù)據(jù)
// 注意:這里假設(shè)你有一個(gè) Last_Updated_Time 字段來跟蹤更新時(shí)間
SqlCommand fetchDataCommand = new SqlCommand("SELECT TOP 1 * FROM t1 ORDER BY Last_Updated_Time DESC", sourceConnection);
using (SqlDataReader dataReader = fetchDataCommand.ExecuteReader())
{
if (dataReader.Read())
{
Guid id = (Guid)dataReader["ID"];
string name = (string)dataReader["Name"];
decimal value = (decimal)dataReader["Value"];
DateTime last_updated_time = (DateTime)dataReader["Last_Updated_Time"];
// 更新目標(biāo)表中的數(shù)據(jù)
SqlCommand updateCommand = new SqlCommand(
"UPDATE t1 SET Name = @Name, Value = @Value, Last_Updated_Time = @Last_Updated_Time WHERE ID = @ID",
targetConnection);
updateCommand.Parameters.AddWithValue("@ID", id);
updateCommand.Parameters.AddWithValue("@Name", name);
updateCommand.Parameters.AddWithValue("@Value", value);
updateCommand.Parameters.AddWithValue("@Last_Updated_Time", last_updated_time);
int rowsAffected = updateCommand.ExecuteNonQuery();
if (rowsAffected > 0)
{
Console.WriteLine($"已同步更新的數(shù)據(jù): ID={id}, Name={name}, Value={value}, Created_Time={last_updated_time}");
}
else
{
Console.WriteLine($"未找到要更新的記錄: ID={id}");
}
}
}
}
}
配置文件 (App.config
)
確保在你的項(xiàng)目中包含一個(gè)配置文件來管理數(shù)據(jù)庫連接字符串。
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<connectionStrings>
<add name="SourceDB" connectionString="Data Source=your_source_server;Initial Catalog=SourceDB;Integrated Security=True" />
<add name="TargetDB" connectionString="Data Source=your_target_server;Initial Catalog=TargetDB;Integrated Security=True" />
</connectionStrings>
</configuration>
關(guān)鍵點(diǎn)說明
SQLDependency: 通過 SQLDependency
監(jiān)聽數(shù)據(jù)表變化,允許我們對(duì) SourceTable
進(jìn)行實(shí)時(shí)監(jiān)聽。當(dāng)數(shù)據(jù)更改時(shí)自動(dòng)觸發(fā) OnChange
事件。
重新開啟監(jiān)聽: 數(shù)據(jù)變化后,必須重新啟動(dòng)監(jiān)聽,以確保程序在后續(xù)的變化中繼續(xù)有效。
注意事項(xiàng)
確保在 SQL Server 上啟用查詢通知和服務(wù)代理。
SQLDependency
適用于簡單查詢,不能包括復(fù)雜查詢、聯(lián)接或聚合。
如果項(xiàng)目對(duì)性能和實(shí)時(shí)性要求較高,建議結(jié)合其他工具或技術(shù)方案,如 Change Tracking
或 Change Data Capture
等。
通過以上步驟,你可以實(shí)現(xiàn)對(duì) SQL 數(shù)據(jù)庫變化的實(shí)時(shí)監(jiān)聽和數(shù)據(jù)同步,從而保持?jǐn)?shù)據(jù)庫之間的數(shù)據(jù)一致性和實(shí)時(shí)性。
該文章在 2024/10/30 15:06:16 編輯過