第八章:企业级解决方案
8.1 企业级功能概述
SOD框架提供了多种企业级解决方案,满足复杂业务场景的需求:
| 解决方案 | 应用场景 |
|---|---|
| 内存数据库 | 高频读取、缓存热数据 |
| 事务日志复制 | 主从同步、数据备份 |
| 数据同步 | 分布式系统数据一致性 |
| 分布式事务 | 跨库事务 |
| 分布式ID | 分布式系统唯一标识 |
| 命令管道 | 批量操作优化 |
| 热缓存 | 高频数据缓存 |
8.2 内存数据库
8.2.1 概述
SOD框架的内存数据库是一个轻量级的内存数据存储解决方案,适用于:
- 高频读取场景:减少数据库访问
- 临时数据存储:会话数据、计算中间结果
- 缓存层:作为数据库和应用之间的缓存
8.2.2 基本使用
using PWMIS.MemoryDatabase;
// 创建内存数据库实例
MemoryDatabase memDb = new MemoryDatabase();
// 创建实体对象
UserEntity user = new UserEntity
{
ID = 1,
Name = "张三",
Email = "zhangsan@example.com"
};
// 添加数据
memDb.Add(user);
// 查询数据
var query = memDb.GetEntities<UserEntity>();
var result = query.Where(u => u.ID == 1).FirstOrDefault();
// 更新数据
result.Name = "张三(已更新)";
memDb.Update(result);
// 删除数据
memDb.Remove(user);
8.2.3 批量操作
// 批量添加
List<UserEntity> users = new List<UserEntity>
{
new UserEntity { ID = 1, Name = "用户1" },
new UserEntity { ID = 2, Name = "用户2" },
new UserEntity { ID = 3, Name = "用户3" }
};
memDb.AddRange(users);
// 查询
var activeUsers = memDb.GetEntities<UserEntity>()
.Where(u => u.Status == 1)
.ToList();
// 批量删除
memDb.RemoveRange(users);
8.2.4 持久化与加载
// 将内存数据持久化到文件
memDb.SaveToFile("memdb.dat");
// 从文件加载
MemoryDatabase memDb2 = new MemoryDatabase();
memDb2.LoadFromFile("memdb.dat");
// 从数据库加载数据到内存
AdoHelper db = MyDB.GetDBHelper();
var oql = OQL.From(new UserEntity()).Select().END;
var users = EntityQuery<UserEntity>.QueryList(oql, db);
memDb.AddRange(users);
8.2.5 与数据库同步
public class CachedUserService
{
private readonly MemoryDatabase _memDb;
private readonly AdoHelper _db;
private DateTime _lastSyncTime;
public CachedUserService()
{
_memDb = new MemoryDatabase();
_db = MyDB.GetDBHelper();
SyncFromDatabase();
}
// 从数据库同步到内存
public void SyncFromDatabase()
{
var oql = OQL.From(new UserEntity())
.Select()
.Where(cmp => cmp.Property(new UserEntity().Status) == 1)
.END;
var users = EntityQuery<UserEntity>.QueryList(oql, _db);
_memDb.Clear<UserEntity>();
_memDb.AddRange(users);
_lastSyncTime = DateTime.Now;
}
// 从内存读取
public UserEntity GetUser(int id)
{
return _memDb.GetEntities<UserEntity>()
.FirstOrDefault(u => u.ID == id);
}
// 查询列表
public List<UserEntity> GetUsers(Func<UserEntity, bool> predicate)
{
return _memDb.GetEntities<UserEntity>()
.Where(predicate)
.ToList();
}
// 写入时同时更新内存和数据库
public void AddUser(UserEntity user)
{
EntityQuery<UserEntity>.Instance.Insert(user, _db);
_memDb.Add(user);
}
public void UpdateUser(UserEntity user)
{
EntityQuery<UserEntity>.Instance.Update(user, _db);
_memDb.Update(user);
}
}
8.3 事务日志数据复制
8.3.1 概述
事务日志数据复制是一种数据同步机制,通过记录数据变更日志,实现:
- 主从数据同步:主库写入,从库复制
- 数据审计:记录所有数据变更历史
- 增量备份:只备份变更的数据
8.3.2 工作原理
主数据库 → 变更操作 → 记录事务日志 → 日志队列
↓
从数据库 ← 回放日志 ← 读取事务日志 ← 日志消费者
8.3.3 事务日志实体
public class TransactionLog : EntityBase
{
public TransactionLog()
{
TableName = "TbTransactionLog";
IdentityName = "LogId";
PrimaryKeys.Add("LogId");
}
public long LogId { get; set; }
public string TableName { get; set; }
public string PrimaryKeyValue { get; set; }
public string OperationType { get; set; } // INSERT/UPDATE/DELETE
public string ChangedData { get; set; } // JSON格式的变更数据
public DateTime CreateTime { get; set; }
public bool IsReplicated { get; set; } // 是否已复制
}
8.3.4 记录事务日志
public class TransactionLogService
{
private readonly AdoHelper _db;
public TransactionLogService(AdoHelper db)
{
_db = db;
}
// 记录日志
public void LogChange<T>(T entity, string operationType) where T : EntityBase
{
var log = new TransactionLog
{
TableName = entity.TableName,
PrimaryKeyValue = GetPrimaryKeyValue(entity),
OperationType = operationType,
ChangedData = SerializeEntity(entity),
CreateTime = DateTime.Now,
IsReplicated = false
};
EntityQuery<TransactionLog>.Instance.Insert(log, _db);
}
private string GetPrimaryKeyValue<T>(T entity) where T : EntityBase
{
var values = new List<string>();
foreach (var pk in entity.PrimaryKeys)
{
values.Add(entity[pk]?.ToString());
}
return string.Join(",", values);
}
private string SerializeEntity<T>(T entity) where T : EntityBase
{
var dict = new Dictionary<string, object>();
foreach (var propName in entity.PropertyNames)
{
dict[propName] = entity[propName];
}
return JsonConvert.SerializeObject(dict);
}
}
8.3.5 复制日志
public class LogReplicationService
{
private readonly AdoHelper _masterDb;
private readonly AdoHelper _slaveDb;
public LogReplicationService(AdoHelper masterDb, AdoHelper slaveDb)
{
_masterDb = masterDb;
_slaveDb = slaveDb;
}
// 复制待处理的日志
public void Replicate()
{
var log = new TransactionLog();
var oql = OQL.From(log)
.Select()
.Where(cmp => cmp.Property(log.IsReplicated) == false)
.OrderBy(log.LogId)
.END;
oql.TopCount = 100; // 每次处理100条
var logs = EntityQuery<TransactionLog>.QueryList(oql, _masterDb);
foreach (var logItem in logs)
{
try
{
ApplyLog(logItem);
MarkAsReplicated(logItem);
}
catch (Exception ex)
{
// 记录错误,继续处理下一条
LogError(logItem, ex);
}
}
}
private void ApplyLog(TransactionLog log)
{
var data = JsonConvert.DeserializeObject<Dictionary<string, object>>(log.ChangedData);
switch (log.OperationType)
{
case "INSERT":
ApplyInsert(log.TableName, data);
break;
case "UPDATE":
ApplyUpdate(log.TableName, log.PrimaryKeyValue, data);
break;
case "DELETE":
ApplyDelete(log.TableName, log.PrimaryKeyValue);
break;
}
}
private void MarkAsReplicated(TransactionLog log)
{
log.IsReplicated = true;
EntityQuery<TransactionLog>.Instance.Update(log, _masterDb);
}
}
8.4 分布式事务
8.4.1 概述
当业务操作涉及多个数据库时,需要使用分布式事务来保证数据一致性。SOD框架支持以下分布式事务模式:
- 两阶段提交(2PC):强一致性
- 最终一致性:基于消息队列
- 补偿事务(TCC):Try-Confirm-Cancel
8.4.2 简单的分布式事务实现
public class DistributedTransactionManager
{
private readonly List<TransactionScope> _scopes = new List<TransactionScope>();
private readonly List<AdoHelper> _databases = new List<AdoHelper>();
public void Enlist(AdoHelper db)
{
_databases.Add(db);
db.OpenSession();
db.BeginTransaction();
}
public void Commit()
{
try
{
// 提交所有数据库
foreach (var db in _databases)
{
db.Commit();
}
}
catch
{
Rollback();
throw;
}
finally
{
CloseAll();
}
}
public void Rollback()
{
foreach (var db in _databases)
{
try
{
db.Rollback();
}
catch { }
}
CloseAll();
}
private void CloseAll()
{
foreach (var db in _databases)
{
try
{
db.CloseSession();
}
catch { }
}
_databases.Clear();
}
}
8.4.3 使用示例
public class OrderService
{
private readonly AdoHelper _orderDb;
private readonly AdoHelper _inventoryDb;
public OrderService()
{
_orderDb = MyDB.GetDBHelperByConnectionName("OrderDB");
_inventoryDb = MyDB.GetDBHelperByConnectionName("InventoryDB");
}
public void CreateOrder(OrderEntity order, List<OrderItemEntity> items)
{
var txManager = new DistributedTransactionManager();
try
{
txManager.Enlist(_orderDb);
txManager.Enlist(_inventoryDb);
// 1. 创建订单
EntityQuery<OrderEntity>.Instance.Insert(order, _orderDb);
foreach (var item in items)
{
item.OrderId = order.ID;
EntityQuery<OrderItemEntity>.Instance.Insert(item, _orderDb);
// 2. 扣减库存
DeductInventory(item.ProductId, item.Quantity);
}
txManager.Commit();
}
catch
{
txManager.Rollback();
throw;
}
}
private void DeductInventory(int productId, int quantity)
{
var sql = "UPDATE Inventory SET Quantity = Quantity - @Qty WHERE ProductId = @ProductId AND Quantity >= @Qty";
var paras = new[]
{
_inventoryDb.GetParameter("@Qty", quantity),
_inventoryDb.GetParameter("@ProductId", productId)
};
int affected = _inventoryDb.ExecuteNonQuery(sql, CommandType.Text, paras);
if (affected == 0)
{
throw new Exception("库存不足!");
}
}
}
8.5 分布式ID
8.5.1 概述
在分布式系统中,需要生成全局唯一的ID。SOD框架提供了分布式ID生成方案。
8.5.2 CommonUtil.NewSequenceGUID
using PWMIS.Common;
// 生成序列GUID(基于时间戳,有序)
long id = CommonUtil.NewSequenceGUID();
Console.WriteLine($"生成的ID: {id}");
// 特点:
// 1. 全局唯一
// 2. 基于时间戳,大致有序
// 3. 适合作为主键
8.5.3 雪花算法实现
public class SnowflakeIdGenerator
{
private static readonly DateTime Epoch = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private const int WorkerIdBits = 5;
private const int DatacenterIdBits = 5;
private const int SequenceBits = 12;
private readonly int _workerId;
private readonly int _datacenterId;
private long _sequence = 0L;
private long _lastTimestamp = -1L;
private readonly object _lock = new object();
public SnowflakeIdGenerator(int workerId, int datacenterId)
{
_workerId = workerId;
_datacenterId = datacenterId;
}
public long NextId()
{
lock (_lock)
{
var timestamp = GetTimestamp();
if (timestamp == _lastTimestamp)
{
_sequence = (_sequence + 1) & ((1 << SequenceBits) - 1);
if (_sequence == 0)
{
timestamp = WaitNextMillis(_lastTimestamp);
}
}
else
{
_sequence = 0L;
}
_lastTimestamp = timestamp;
return ((timestamp - GetEpochMillis()) << (WorkerIdBits + DatacenterIdBits + SequenceBits))
| ((long)_datacenterId << (WorkerIdBits + SequenceBits))
| ((long)_workerId << SequenceBits)
| _sequence;
}
}
private long GetTimestamp() => DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
private long GetEpochMillis() => new DateTimeOffset(Epoch).ToUnixTimeMilliseconds();
private long WaitNextMillis(long lastTimestamp)
{
var timestamp = GetTimestamp();
while (timestamp <= lastTimestamp)
{
timestamp = GetTimestamp();
}
return timestamp;
}
}
// 使用
var generator = new SnowflakeIdGenerator(workerId: 1, datacenterId: 1);
long id = generator.NextId();
8.6 命令管道
8.6.1 概述
命令管道用于批量执行数据库操作,减少数据库往返次数,提高性能。
8.6.2 使用示例
using PWMIS.DataProvider.Data;
public void BatchInsertUsers(List<UserEntity> users)
{
AdoHelper db = MyDB.GetDBHelper();
// 创建命令管道
CommandPipeline pipeline = new CommandPipeline(db);
foreach (var user in users)
{
// 添加命令到管道
pipeline.AddCommand(() =>
{
EntityQuery<UserEntity>.Instance.Insert(user, db);
});
}
// 执行所有命令
pipeline.Execute();
}
8.6.3 事务管道
public void BatchUpdateWithTransaction(List<UserEntity> users)
{
AdoHelper db = MyDB.GetDBHelper();
db.OpenSession();
db.BeginTransaction();
try
{
CommandPipeline pipeline = new CommandPipeline(db);
foreach (var user in users)
{
pipeline.AddCommand(() =>
{
EntityQuery<UserEntity>.Instance.Update(user, db);
});
}
pipeline.Execute();
db.Commit();
}
catch
{
db.Rollback();
throw;
}
finally
{
db.CloseSession();
}
}
8.7 热缓存
8.7.1 概述
热缓存(Hot Use Cache)用于缓存最常用的数据,减少数据库访问。
8.7.2 实现示例
public class HotCache<TKey, TEntity> where TEntity : EntityBase
{
private readonly Dictionary<TKey, CacheItem<TEntity>> _cache;
private readonly int _maxSize;
private readonly TimeSpan _expiration;
private readonly object _lock = new object();
public HotCache(int maxSize = 1000, int expirationMinutes = 30)
{
_cache = new Dictionary<TKey, CacheItem<TEntity>>();
_maxSize = maxSize;
_expiration = TimeSpan.FromMinutes(expirationMinutes);
}
public TEntity Get(TKey key)
{
lock (_lock)
{
if (_cache.TryGetValue(key, out var item))
{
if (!item.IsExpired)
{
item.HitCount++;
item.LastAccessTime = DateTime.Now;
return item.Value;
}
else
{
_cache.Remove(key);
}
}
return null;
}
}
public void Set(TKey key, TEntity value)
{
lock (_lock)
{
// 超出容量时移除最少使用的项
if (_cache.Count >= _maxSize)
{
var leastUsed = _cache.OrderBy(x => x.Value.HitCount)
.ThenBy(x => x.Value.LastAccessTime)
.First();
_cache.Remove(leastUsed.Key);
}
_cache[key] = new CacheItem<TEntity>
{
Value = value,
CreateTime = DateTime.Now,
LastAccessTime = DateTime.Now,
HitCount = 0,
Expiration = _expiration
};
}
}
public void Remove(TKey key)
{
lock (_lock)
{
_cache.Remove(key);
}
}
public void Clear()
{
lock (_lock)
{
_cache.Clear();
}
}
}
public class CacheItem<T>
{
public T Value { get; set; }
public DateTime CreateTime { get; set; }
public DateTime LastAccessTime { get; set; }
public int HitCount { get; set; }
public TimeSpan Expiration { get; set; }
public bool IsExpired => DateTime.Now - CreateTime > Expiration;
}
8.7.3 使用示例
public class UserCacheService
{
private readonly HotCache<int, UserEntity> _cache;
private readonly AdoHelper _db;
public UserCacheService()
{
_cache = new HotCache<int, UserEntity>(maxSize: 1000, expirationMinutes: 30);
_db = MyDB.GetDBHelper();
}
public UserEntity GetUser(int id)
{
// 先从缓存获取
var user = _cache.Get(id);
if (user != null)
{
return user;
}
// 缓存未命中,从数据库查询
var entity = new UserEntity { ID = id };
var oql = OQL.From(entity).Select().Where(entity.ID).END;
user = EntityQuery<UserEntity>.QueryObject(oql, _db);
// 放入缓存
if (user != null)
{
_cache.Set(id, user);
}
return user;
}
public void UpdateUser(UserEntity user)
{
EntityQuery<UserEntity>.Instance.Update(user, _db);
// 更新缓存
_cache.Set(user.ID, user);
}
public void DeleteUser(int id)
{
var user = new UserEntity { ID = id };
EntityQuery<UserEntity>.Instance.Delete(user, _db);
// 移除缓存
_cache.Remove(id);
}
}
8.8 分表分库支持
8.8.1 分表策略
public class ShardingStrategy
{
// 按ID取模分表
public static string GetTableByModulo(string baseTableName, long id, int tableCount)
{
int index = (int)(id % tableCount);
return $"{baseTableName}_{index}";
}
// 按时间分表
public static string GetTableByMonth(string baseTableName, DateTime date)
{
return $"{baseTableName}_{date:yyyyMM}";
}
// 按范围分表
public static string GetTableByRange(string baseTableName, long id)
{
if (id < 1000000)
return $"{baseTableName}_0";
else if (id < 2000000)
return $"{baseTableName}_1";
else
return $"{baseTableName}_2";
}
}
8.8.2 在实体类中使用
public class OrderEntity : EntityBase
{
private long _orderId;
public OrderEntity() : this(0) { }
public OrderEntity(long orderId)
{
_orderId = orderId;
TableName = ShardingStrategy.GetTableByModulo("TbOrder", orderId, 10);
IdentityName = "OrderId";
PrimaryKeys.Add("OrderId");
}
public long OrderId
{
get { return getProperty<long>(nameof(OrderId)); }
set
{
setProperty(nameof(OrderId), value);
// 更新表名
TableName = ShardingStrategy.GetTableByModulo("TbOrder", value, 10);
}
}
}
// 使用
long orderId = 12345;
var order = new OrderEntity(orderId); // 自动选择表 TbOrder_5
var oql = OQL.From(order).Select().Where(order.OrderId).END;
var result = EntityQuery<OrderEntity>.QueryObject(oql);
8.9 本章小结
本章介绍了SOD框架的企业级解决方案:
- 内存数据库:高频数据的内存存储和查询
- 事务日志复制:基于日志的数据复制机制
- 分布式事务:跨库事务的处理方式
- 分布式ID:全局唯一标识生成方案
- 命令管道:批量操作优化
- 热缓存:高频数据的缓存策略
- 分表分库:数据水平拆分支持
这些企业级特性使SOD框架能够应对复杂的业务场景和高并发需求。
下一章预告:第九章将介绍SOD框架的高级特性与扩展,包括多数据库支持、自定义数据提供程序等。