znlgis 博客

GIS开发与技术分享

第八章:企业级解决方案

8.1 企业级功能概述

SOD框架提供了多种企业级解决方案,满足复杂业务场景的需求:

解决方案 应用场景
内存数据库 高频读取、缓存热数据
事务日志复制 主从同步、数据备份
数据同步 分布式系统数据一致性
分布式事务 跨库事务
分布式ID 分布式系统唯一标识
命令管道 批量操作优化
热缓存 高频数据缓存

8.2 内存数据库

8.2.1 概述

SOD框架的内存数据库是一个轻量级的内存数据存储解决方案,适用于:

  1. 高频读取场景:减少数据库访问
  2. 临时数据存储:会话数据、计算中间结果
  3. 缓存层:作为数据库和应用之间的缓存

8.2.2 基本使用

using PWMIS.MemoryDatabase;

// 创建内存数据库实例
MemoryDatabase memDb = new MemoryDatabase();

// 创建实体对象
UserEntity user = new UserEntity
{
    ID = 1,
    Name = "张三",
    Email = "zhangsan@example.com"
};

// 添加数据
memDb.Add(user);

// 查询数据
var query = memDb.GetEntities<UserEntity>();
var result = query.Where(u => u.ID == 1).FirstOrDefault();

// 更新数据
result.Name = "张三(已更新)";
memDb.Update(result);

// 删除数据
memDb.Remove(user);

8.2.3 批量操作

// 批量添加
List<UserEntity> users = new List<UserEntity>
{
    new UserEntity { ID = 1, Name = "用户1" },
    new UserEntity { ID = 2, Name = "用户2" },
    new UserEntity { ID = 3, Name = "用户3" }
};
memDb.AddRange(users);

// 查询
var activeUsers = memDb.GetEntities<UserEntity>()
    .Where(u => u.Status == 1)
    .ToList();

// 批量删除
memDb.RemoveRange(users);

8.2.4 持久化与加载

// 将内存数据持久化到文件
memDb.SaveToFile("memdb.dat");

// 从文件加载
MemoryDatabase memDb2 = new MemoryDatabase();
memDb2.LoadFromFile("memdb.dat");

// 从数据库加载数据到内存
AdoHelper db = MyDB.GetDBHelper();
var oql = OQL.From(new UserEntity()).Select().END;
var users = EntityQuery<UserEntity>.QueryList(oql, db);
memDb.AddRange(users);

8.2.5 与数据库同步

public class CachedUserService
{
    private readonly MemoryDatabase _memDb;
    private readonly AdoHelper _db;
    private DateTime _lastSyncTime;
    
    public CachedUserService()
    {
        _memDb = new MemoryDatabase();
        _db = MyDB.GetDBHelper();
        SyncFromDatabase();
    }
    
    // 从数据库同步到内存
    public void SyncFromDatabase()
    {
        var oql = OQL.From(new UserEntity())
            .Select()
            .Where(cmp => cmp.Property(new UserEntity().Status) == 1)
            .END;
        var users = EntityQuery<UserEntity>.QueryList(oql, _db);
        
        _memDb.Clear<UserEntity>();
        _memDb.AddRange(users);
        _lastSyncTime = DateTime.Now;
    }
    
    // 从内存读取
    public UserEntity GetUser(int id)
    {
        return _memDb.GetEntities<UserEntity>()
            .FirstOrDefault(u => u.ID == id);
    }
    
    // 查询列表
    public List<UserEntity> GetUsers(Func<UserEntity, bool> predicate)
    {
        return _memDb.GetEntities<UserEntity>()
            .Where(predicate)
            .ToList();
    }
    
    // 写入时同时更新内存和数据库
    public void AddUser(UserEntity user)
    {
        EntityQuery<UserEntity>.Instance.Insert(user, _db);
        _memDb.Add(user);
    }
    
    public void UpdateUser(UserEntity user)
    {
        EntityQuery<UserEntity>.Instance.Update(user, _db);
        _memDb.Update(user);
    }
}

8.3 事务日志数据复制

8.3.1 概述

事务日志数据复制是一种数据同步机制,通过记录数据变更日志,实现:

  1. 主从数据同步:主库写入,从库复制
  2. 数据审计:记录所有数据变更历史
  3. 增量备份:只备份变更的数据

8.3.2 工作原理

主数据库 → 变更操作 → 记录事务日志 → 日志队列
                                        ↓
从数据库 ← 回放日志 ← 读取事务日志 ← 日志消费者

8.3.3 事务日志实体

public class TransactionLog : EntityBase
{
    public TransactionLog()
    {
        TableName = "TbTransactionLog";
        IdentityName = "LogId";
        PrimaryKeys.Add("LogId");
    }
    
    public long LogId { get; set; }
    public string TableName { get; set; }
    public string PrimaryKeyValue { get; set; }
    public string OperationType { get; set; }  // INSERT/UPDATE/DELETE
    public string ChangedData { get; set; }    // JSON格式的变更数据
    public DateTime CreateTime { get; set; }
    public bool IsReplicated { get; set; }     // 是否已复制
}

8.3.4 记录事务日志

public class TransactionLogService
{
    private readonly AdoHelper _db;
    
    public TransactionLogService(AdoHelper db)
    {
        _db = db;
    }
    
    // 记录日志
    public void LogChange<T>(T entity, string operationType) where T : EntityBase
    {
        var log = new TransactionLog
        {
            TableName = entity.TableName,
            PrimaryKeyValue = GetPrimaryKeyValue(entity),
            OperationType = operationType,
            ChangedData = SerializeEntity(entity),
            CreateTime = DateTime.Now,
            IsReplicated = false
        };
        
        EntityQuery<TransactionLog>.Instance.Insert(log, _db);
    }
    
    private string GetPrimaryKeyValue<T>(T entity) where T : EntityBase
    {
        var values = new List<string>();
        foreach (var pk in entity.PrimaryKeys)
        {
            values.Add(entity[pk]?.ToString());
        }
        return string.Join(",", values);
    }
    
    private string SerializeEntity<T>(T entity) where T : EntityBase
    {
        var dict = new Dictionary<string, object>();
        foreach (var propName in entity.PropertyNames)
        {
            dict[propName] = entity[propName];
        }
        return JsonConvert.SerializeObject(dict);
    }
}

8.3.5 复制日志

public class LogReplicationService
{
    private readonly AdoHelper _masterDb;
    private readonly AdoHelper _slaveDb;
    
    public LogReplicationService(AdoHelper masterDb, AdoHelper slaveDb)
    {
        _masterDb = masterDb;
        _slaveDb = slaveDb;
    }
    
    // 复制待处理的日志
    public void Replicate()
    {
        var log = new TransactionLog();
        var oql = OQL.From(log)
            .Select()
            .Where(cmp => cmp.Property(log.IsReplicated) == false)
            .OrderBy(log.LogId)
            .END;
        oql.TopCount = 100;  // 每次处理100条
        
        var logs = EntityQuery<TransactionLog>.QueryList(oql, _masterDb);
        
        foreach (var logItem in logs)
        {
            try
            {
                ApplyLog(logItem);
                MarkAsReplicated(logItem);
            }
            catch (Exception ex)
            {
                // 记录错误,继续处理下一条
                LogError(logItem, ex);
            }
        }
    }
    
    private void ApplyLog(TransactionLog log)
    {
        var data = JsonConvert.DeserializeObject<Dictionary<string, object>>(log.ChangedData);
        
        switch (log.OperationType)
        {
            case "INSERT":
                ApplyInsert(log.TableName, data);
                break;
            case "UPDATE":
                ApplyUpdate(log.TableName, log.PrimaryKeyValue, data);
                break;
            case "DELETE":
                ApplyDelete(log.TableName, log.PrimaryKeyValue);
                break;
        }
    }
    
    private void MarkAsReplicated(TransactionLog log)
    {
        log.IsReplicated = true;
        EntityQuery<TransactionLog>.Instance.Update(log, _masterDb);
    }
}

8.4 分布式事务

8.4.1 概述

当业务操作涉及多个数据库时,需要使用分布式事务来保证数据一致性。SOD框架支持以下分布式事务模式:

  1. 两阶段提交(2PC):强一致性
  2. 最终一致性:基于消息队列
  3. 补偿事务(TCC):Try-Confirm-Cancel

8.4.2 简单的分布式事务实现

public class DistributedTransactionManager
{
    private readonly List<TransactionScope> _scopes = new List<TransactionScope>();
    private readonly List<AdoHelper> _databases = new List<AdoHelper>();
    
    public void Enlist(AdoHelper db)
    {
        _databases.Add(db);
        db.OpenSession();
        db.BeginTransaction();
    }
    
    public void Commit()
    {
        try
        {
            // 提交所有数据库
            foreach (var db in _databases)
            {
                db.Commit();
            }
        }
        catch
        {
            Rollback();
            throw;
        }
        finally
        {
            CloseAll();
        }
    }
    
    public void Rollback()
    {
        foreach (var db in _databases)
        {
            try
            {
                db.Rollback();
            }
            catch { }
        }
        CloseAll();
    }
    
    private void CloseAll()
    {
        foreach (var db in _databases)
        {
            try
            {
                db.CloseSession();
            }
            catch { }
        }
        _databases.Clear();
    }
}

8.4.3 使用示例

public class OrderService
{
    private readonly AdoHelper _orderDb;
    private readonly AdoHelper _inventoryDb;
    
    public OrderService()
    {
        _orderDb = MyDB.GetDBHelperByConnectionName("OrderDB");
        _inventoryDb = MyDB.GetDBHelperByConnectionName("InventoryDB");
    }
    
    public void CreateOrder(OrderEntity order, List<OrderItemEntity> items)
    {
        var txManager = new DistributedTransactionManager();
        
        try
        {
            txManager.Enlist(_orderDb);
            txManager.Enlist(_inventoryDb);
            
            // 1. 创建订单
            EntityQuery<OrderEntity>.Instance.Insert(order, _orderDb);
            
            foreach (var item in items)
            {
                item.OrderId = order.ID;
                EntityQuery<OrderItemEntity>.Instance.Insert(item, _orderDb);
                
                // 2. 扣减库存
                DeductInventory(item.ProductId, item.Quantity);
            }
            
            txManager.Commit();
        }
        catch
        {
            txManager.Rollback();
            throw;
        }
    }
    
    private void DeductInventory(int productId, int quantity)
    {
        var sql = "UPDATE Inventory SET Quantity = Quantity - @Qty WHERE ProductId = @ProductId AND Quantity >= @Qty";
        var paras = new[]
        {
            _inventoryDb.GetParameter("@Qty", quantity),
            _inventoryDb.GetParameter("@ProductId", productId)
        };
        
        int affected = _inventoryDb.ExecuteNonQuery(sql, CommandType.Text, paras);
        if (affected == 0)
        {
            throw new Exception("库存不足!");
        }
    }
}

8.5 分布式ID

8.5.1 概述

在分布式系统中,需要生成全局唯一的ID。SOD框架提供了分布式ID生成方案。

8.5.2 CommonUtil.NewSequenceGUID

using PWMIS.Common;

// 生成序列GUID(基于时间戳,有序)
long id = CommonUtil.NewSequenceGUID();
Console.WriteLine($"生成的ID: {id}");

// 特点:
// 1. 全局唯一
// 2. 基于时间戳,大致有序
// 3. 适合作为主键

8.5.3 雪花算法实现

public class SnowflakeIdGenerator
{
    private static readonly DateTime Epoch = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc);
    private const int WorkerIdBits = 5;
    private const int DatacenterIdBits = 5;
    private const int SequenceBits = 12;
    
    private readonly int _workerId;
    private readonly int _datacenterId;
    private long _sequence = 0L;
    private long _lastTimestamp = -1L;
    private readonly object _lock = new object();
    
    public SnowflakeIdGenerator(int workerId, int datacenterId)
    {
        _workerId = workerId;
        _datacenterId = datacenterId;
    }
    
    public long NextId()
    {
        lock (_lock)
        {
            var timestamp = GetTimestamp();
            
            if (timestamp == _lastTimestamp)
            {
                _sequence = (_sequence + 1) & ((1 << SequenceBits) - 1);
                if (_sequence == 0)
                {
                    timestamp = WaitNextMillis(_lastTimestamp);
                }
            }
            else
            {
                _sequence = 0L;
            }
            
            _lastTimestamp = timestamp;
            
            return ((timestamp - GetEpochMillis()) << (WorkerIdBits + DatacenterIdBits + SequenceBits))
                   | ((long)_datacenterId << (WorkerIdBits + SequenceBits))
                   | ((long)_workerId << SequenceBits)
                   | _sequence;
        }
    }
    
    private long GetTimestamp() => DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
    private long GetEpochMillis() => new DateTimeOffset(Epoch).ToUnixTimeMilliseconds();
    
    private long WaitNextMillis(long lastTimestamp)
    {
        var timestamp = GetTimestamp();
        while (timestamp <= lastTimestamp)
        {
            timestamp = GetTimestamp();
        }
        return timestamp;
    }
}

// 使用
var generator = new SnowflakeIdGenerator(workerId: 1, datacenterId: 1);
long id = generator.NextId();

8.6 命令管道

8.6.1 概述

命令管道用于批量执行数据库操作,减少数据库往返次数,提高性能。

8.6.2 使用示例

using PWMIS.DataProvider.Data;

public void BatchInsertUsers(List<UserEntity> users)
{
    AdoHelper db = MyDB.GetDBHelper();
    
    // 创建命令管道
    CommandPipeline pipeline = new CommandPipeline(db);
    
    foreach (var user in users)
    {
        // 添加命令到管道
        pipeline.AddCommand(() =>
        {
            EntityQuery<UserEntity>.Instance.Insert(user, db);
        });
    }
    
    // 执行所有命令
    pipeline.Execute();
}

8.6.3 事务管道

public void BatchUpdateWithTransaction(List<UserEntity> users)
{
    AdoHelper db = MyDB.GetDBHelper();
    
    db.OpenSession();
    db.BeginTransaction();
    
    try
    {
        CommandPipeline pipeline = new CommandPipeline(db);
        
        foreach (var user in users)
        {
            pipeline.AddCommand(() =>
            {
                EntityQuery<UserEntity>.Instance.Update(user, db);
            });
        }
        
        pipeline.Execute();
        db.Commit();
    }
    catch
    {
        db.Rollback();
        throw;
    }
    finally
    {
        db.CloseSession();
    }
}

8.7 热缓存

8.7.1 概述

热缓存(Hot Use Cache)用于缓存最常用的数据,减少数据库访问。

8.7.2 实现示例

public class HotCache<TKey, TEntity> where TEntity : EntityBase
{
    private readonly Dictionary<TKey, CacheItem<TEntity>> _cache;
    private readonly int _maxSize;
    private readonly TimeSpan _expiration;
    private readonly object _lock = new object();
    
    public HotCache(int maxSize = 1000, int expirationMinutes = 30)
    {
        _cache = new Dictionary<TKey, CacheItem<TEntity>>();
        _maxSize = maxSize;
        _expiration = TimeSpan.FromMinutes(expirationMinutes);
    }
    
    public TEntity Get(TKey key)
    {
        lock (_lock)
        {
            if (_cache.TryGetValue(key, out var item))
            {
                if (!item.IsExpired)
                {
                    item.HitCount++;
                    item.LastAccessTime = DateTime.Now;
                    return item.Value;
                }
                else
                {
                    _cache.Remove(key);
                }
            }
            return null;
        }
    }
    
    public void Set(TKey key, TEntity value)
    {
        lock (_lock)
        {
            // 超出容量时移除最少使用的项
            if (_cache.Count >= _maxSize)
            {
                var leastUsed = _cache.OrderBy(x => x.Value.HitCount)
                    .ThenBy(x => x.Value.LastAccessTime)
                    .First();
                _cache.Remove(leastUsed.Key);
            }
            
            _cache[key] = new CacheItem<TEntity>
            {
                Value = value,
                CreateTime = DateTime.Now,
                LastAccessTime = DateTime.Now,
                HitCount = 0,
                Expiration = _expiration
            };
        }
    }
    
    public void Remove(TKey key)
    {
        lock (_lock)
        {
            _cache.Remove(key);
        }
    }
    
    public void Clear()
    {
        lock (_lock)
        {
            _cache.Clear();
        }
    }
}

public class CacheItem<T>
{
    public T Value { get; set; }
    public DateTime CreateTime { get; set; }
    public DateTime LastAccessTime { get; set; }
    public int HitCount { get; set; }
    public TimeSpan Expiration { get; set; }
    
    public bool IsExpired => DateTime.Now - CreateTime > Expiration;
}

8.7.3 使用示例

public class UserCacheService
{
    private readonly HotCache<int, UserEntity> _cache;
    private readonly AdoHelper _db;
    
    public UserCacheService()
    {
        _cache = new HotCache<int, UserEntity>(maxSize: 1000, expirationMinutes: 30);
        _db = MyDB.GetDBHelper();
    }
    
    public UserEntity GetUser(int id)
    {
        // 先从缓存获取
        var user = _cache.Get(id);
        if (user != null)
        {
            return user;
        }
        
        // 缓存未命中,从数据库查询
        var entity = new UserEntity { ID = id };
        var oql = OQL.From(entity).Select().Where(entity.ID).END;
        user = EntityQuery<UserEntity>.QueryObject(oql, _db);
        
        // 放入缓存
        if (user != null)
        {
            _cache.Set(id, user);
        }
        
        return user;
    }
    
    public void UpdateUser(UserEntity user)
    {
        EntityQuery<UserEntity>.Instance.Update(user, _db);
        
        // 更新缓存
        _cache.Set(user.ID, user);
    }
    
    public void DeleteUser(int id)
    {
        var user = new UserEntity { ID = id };
        EntityQuery<UserEntity>.Instance.Delete(user, _db);
        
        // 移除缓存
        _cache.Remove(id);
    }
}

8.8 分表分库支持

8.8.1 分表策略

public class ShardingStrategy
{
    // 按ID取模分表
    public static string GetTableByModulo(string baseTableName, long id, int tableCount)
    {
        int index = (int)(id % tableCount);
        return $"{baseTableName}_{index}";
    }
    
    // 按时间分表
    public static string GetTableByMonth(string baseTableName, DateTime date)
    {
        return $"{baseTableName}_{date:yyyyMM}";
    }
    
    // 按范围分表
    public static string GetTableByRange(string baseTableName, long id)
    {
        if (id < 1000000)
            return $"{baseTableName}_0";
        else if (id < 2000000)
            return $"{baseTableName}_1";
        else
            return $"{baseTableName}_2";
    }
}

8.8.2 在实体类中使用

public class OrderEntity : EntityBase
{
    private long _orderId;
    
    public OrderEntity() : this(0) { }
    
    public OrderEntity(long orderId)
    {
        _orderId = orderId;
        TableName = ShardingStrategy.GetTableByModulo("TbOrder", orderId, 10);
        IdentityName = "OrderId";
        PrimaryKeys.Add("OrderId");
    }
    
    public long OrderId
    {
        get { return getProperty<long>(nameof(OrderId)); }
        set
        {
            setProperty(nameof(OrderId), value);
            // 更新表名
            TableName = ShardingStrategy.GetTableByModulo("TbOrder", value, 10);
        }
    }
}

// 使用
long orderId = 12345;
var order = new OrderEntity(orderId);  // 自动选择表 TbOrder_5
var oql = OQL.From(order).Select().Where(order.OrderId).END;
var result = EntityQuery<OrderEntity>.QueryObject(oql);

8.9 本章小结

本章介绍了SOD框架的企业级解决方案:

  1. 内存数据库:高频数据的内存存储和查询
  2. 事务日志复制:基于日志的数据复制机制
  3. 分布式事务:跨库事务的处理方式
  4. 分布式ID:全局唯一标识生成方案
  5. 命令管道:批量操作优化
  6. 热缓存:高频数据的缓存策略
  7. 分表分库:数据水平拆分支持

这些企业级特性使SOD框架能够应对复杂的业务场景和高并发需求。


下一章预告:第九章将介绍SOD框架的高级特性与扩展,包括多数据库支持、自定义数据提供程序等。