暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

利用MongoDB的复制机制实现双机房数据同步的策略与实践

一安未来 2023-11-07
57

大家好,我是一安~

介绍

最近接到一个支持双机房备灾的需求,每个机房都承载业务,故双机房mongodb
需要双写。AAA
需要对用户表进行更新,且用户表中为两个机房的全量数据。此工具为双机房数据支持双向同步功能,即机房A/B
的数据更新及时同步到机房B/A

架构图:

思路

MongoDB
中,local.oplog.rs
是一个特殊的集合,用于记录数据库中的所有操作。它的作用和意义如下:

  • 记录数据库操作:local.oplog.rs
    集合存储了对数据库执行的所有更新操作的记录,包括插入、更新和删除等操作。
  • 实现数据同步:在复制集或副本集中,从节点可以通过读取local.oplog.rs
    集合来获取主节点的写操作记录,并在自己节点上重放这些操作,实现主从的数据同步。这使得从节点可以随时通过读取oplog
    来与主节点的数据状态达到一致,无论从节点何时加入复制集。
  • 支持故障恢复:如果主节点出现故障,从节点可以通过读取local.oplog.rs
    集合中的操作记录,将数据恢复到与主节点一致的状态。这对于MongoDB
    复制集的工作至关重要,它提供了数据库中的所有历史写操作的记录,使得从节点可以随时与主节点的数据状态保持一致。
  • 追踪数据库历史操作:通过查询local.oplog.rs
    集合,可以追踪数据库的历史操作,这对于数据恢复、监控和调试等场景非常有用。

local.oplog.rs
集合样例:

  • op
    : 表示操作的类型:"i" 表示插入,"u" 表示更新,"u" 表示更新
  • ns
    : 包含了操作所涉及的命名空间,通常由数据库名称和集合名称组成
  • o
    : 包含了执行操作的文档,例如插入操作中的文档数据
  • o2
    : 包含更新操作的查询条件,以确定要更新的文档
  • ts
    : 表示操作的时间戳,它通常是一个由秒和递增的序号组成的值,用于确定操作的时间顺序
  • t
    : 事务ID
  • v
    : 表示 MongoDB
    复制协议的版本号
  • prevOpTime
    : 之前的操作时间

所以,这里也是利用local.oplog.rs
实现双机房数据同步。

实现

  1. 依赖引入:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.8.1</version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>19.0</version>
</dependency>

  1. 配置说明:
mongodb:
  config:
    #源Mongo地址,将从源mongo向destMongo中拷贝oplog更新操作。备注:格式mongodb://用户名:密码@ip:port,ip:port/  末尾不要加其他参数
    sourceMongoUri: mongodb://admin:admin@192.168.56.100:27017,192.168.56.101:27017/
    #源Mongo中需要拷贝数据的database名称
    sourceDatabase: local
    #源Mongo中需要拷贝数据的collection名称
    sourceCollection: oplog.rs
    #从源mongodb最多读取多少条数据,用在limit操作中
    sourceLimitSize: 10000
    #是否通过字段来作为去除循环依赖开启对循环依赖的双层过滤,如果开启双层过滤,可以用来判断去除循环依赖的字段名,此字段必须为时间或者时间戳
    deduplicateFilterd: false
    deduplicateCol: logintime
    #判断去重字段距离当下时间超过maxTime秒,则忽略相关oplog日志,单位秒。用来去除循环更新操作
    maxTime: 180
    #目的Mongo地址,将从sourceMongo向目的mongo中拷贝oplog更新操作
    destMongoUri: mongodb://admin:admin@192.168.56.102:27017,192.168.56.103:27017/
    #目的Mongo中需要更新数据的database名称,如果不需要指定同步特定数据库,则不需要填写,将对mongodb实例下的所有数据局进行数据同步。备注:目标mongodb实例中必须提前创建好同名数据库
    destDatabase: test
    #目的Mongo中需要更新数据的collection名称,如果不需要指定同步特定数据表,则不需要填写。备注:如果需要同步特定数据表,需要同时填写好destDatabase和destCollection
    destCollection: onlineuser,accessuser
    #目的mongo中验证的用户名
    destAuthSource: admin
    #断点记录文件,记录当前mongodb处理到oplog的位置
    #record文件中内容格式: 秒级时间戳,个数    实例:1667883664,1
    #启动时可以设置想要开始同步的时间戳,个数设置为1即可,如果启动时不设置,默认使用五分钟前的秒级时间戳,从五分钟前开始同步oplog操作
    recordFile: D:/testExample/mongodbdemo/src/main/resources/record.txt
    #判断当前是否有进程在执行同步方案
    runningFile: D:/testExample/mongodbdemo/src/main/resources/.running
    #更新操作以批量形式同步到目的mongo实例,此项为批量的大小
    batchSize: 1000
    #本地缓存保存最大数据量
    cacheCapacity: 3000
    #本地缓存超时时间,单位秒
    timeout: 180

  1. 加载配置:
@Data
@Configuration
@ConfigurationProperties("mongodb.config")
public class MongoConfig {

    private String sourceMongoUri;
    private String sourceDatabase;
    private String sourceCollection;
    private Integer sourceLimitSize;
    private Boolean deduplicateFilterd;
    private String deduplicateCol;
    private Integer maxTime;
    private String destMongoUri;
    private String destDatabase;
    private String destCollection;
    private String destAuthSource;
    private String recordFile;
    private String runningFile;
    private Integer batchSize;
    private Integer cacheCapacity;
    private Integer timeout;
}

  1. 公共类
public class CommonConstants {
    public static final String NS_FILTER_FORMT = "{ns : \"%s\", ts : { $gt : Timestamp(%s, %s)}}";
    public static final String TIMESTAMP_FILTER_FORMT = "{ts : { $gt : Timestamp(%s, %s)}}";
    public static final String GENERAL_NS_FILTER_FORMT = "{$or: [ %s ]}";
    public static final String COLUMN_NS = "ns";
    public static final String COLUMN_TS = "ts";
    public static final String COLUMN_OP = "op";
    public static final String COLUMN_O = "o";
    public static final String COLUMN_O2 = "o2";
    public static final String COLUMN_O_SET = "$set";
    public static final String COLUMN_O_V = "$v";
    public static final String SEPERATOR_RECORD = ",";
    public static final String CHAR_DOT = ".";
    public static final String FIRST_LINE = "1";
    public static final String OPERATION_INSERT = "i";
    public static final String OPERATION_UPDATE = "u";
    public static final String OPERATION_DELETE = "d";
    public static final String URI_SEPERATOR = "/";
    public static final String AUTH_SOURCE = "?authSource=";
    public static final String DEFAULT_ID_COLUMN = "_id";
    public static final String CACHE_SUFFIX_OPERATION_INSERT = "_insert";
    public static final String CACHE_SUFFIX_OPERATION_UPDATE = "_update";
}

  1. 缓存帮助类:
@Service
public class CacheService {

    private LoadingCache<String, Map<String, Long>> idOperationCache = null;
    private LoadingCache<Document, Document> updateCache = null;
    private LoadingCache<String, Map<String, Boolean>> deleteCache = null;

    @Value("${mongodb.config.cacheCapacity}")
    private Integer cacheCapacity;
    @Value(("${mongodb.config.timeout}"))
    private Integer timeout;


    @PostConstruct
    public void init() {
        idOperationCache = CacheBuilder.newBuilder().initialCapacity(cacheCapacity).maximumSize(cacheCapacity)
            .expireAfterWrite(timeout, TimeUnit.SECONDS)
            .build(new CacheLoader<String, Map<String, Long>>() {
                @Override
                public Map<String, Long> load(String s) throws Exception {
                    return null;
                }
            });
        updateCache = CacheBuilder.newBuilder().initialCapacity(cacheCapacity).maximumSize(cacheCapacity)
                .expireAfterWrite(timeout, TimeUnit.SECONDS)
                .build(new CacheLoader<Document, Document>() {
                    @Override
                    public Document load(Document s) throws Exception {
                        return null;
                    }
                });
        deleteCache = CacheBuilder.newBuilder().initialCapacity(cacheCapacity).maximumSize(cacheCapacity)
                .expireAfterWrite(timeout, TimeUnit.SECONDS)
                .build(new CacheLoader<String, Map<String, Boolean>>() {
                    @Override
                    public Map<String, Boolean> load(String s) throws Exception {
                        return null;
                    }
                });
    }

    public void setIDOpertionCache(String key, Map<String, Long> value) {
        idOperationCache.put(key, value);
    }

    public Long getIDOpertionValue(String key, String id) {
        try {
            Map<String, Long> data = idOperationCache.get(key);
            return data == null ? null : data.get(id);
        } catch (Exception e) {
            return null;
        }
    }

    public boolean hasUpdated(Document query, Document update) {
        try {
            Document doc = updateCache.get(query);
            if(doc != null && doc.equals(update)) {
                return true;
            }
            return false;
        } catch (Exception e) {
            return false;
        }
    }

    public void setUpdateCache(Document query, Document update) {
        updateCache.put(query, update);
    }

    public void setDeleteCache(String key, Map<String, Boolean> value) {
        deleteCache.put(key, value);
    }

    public Boolean getDeleteValue(String key, String id) {
        try {
            Map<String, Boolean> map = deleteCache.get(key);
            return map.get(id) == null ? null : map.get(id);
        } catch (Exception e) {
            return null;
        }
    }
}

  1. 核心逻辑:
private void replicateOplogOperation(MongoCollection sourceCollection) throws Exception {
        // 1. 读取记录文件
        File recordFile = new File(mongoConfig.getRecordFile());
        if(!recordFile.exists()) {
            recordFile.createNewFile();
        }
        BufferedReader bufferedReader = new BufferedReader(new FileReader(recordFile));
        String line = bufferedReader.readLine();
        bufferedReader.close();
        String[] vals = null;
        if(StringUtils.isNotEmpty(line)) {
            vals = line.split(CommonConstants.SEPERATOR_RECORD);
        } else {
            vals = buildInitialRecord().split(CommonConstants.SEPERATOR_RECORD);
        }

        String filter = null;
        List<String> desColls = null;
        BsonTimestamp preTimestamp = new BsonTimestamp(Integer.valueOf(vals[0]), Integer.valueOf(vals[1])); 
        
        //2. 根据配置过滤需要同步的集合
        if(StringUtils.isNotEmpty(mongoConfig.getDestCollection())) {
            String[] collections = mongoConfig.getDestCollection().split(CommonConstants.SEPERATOR_RECORD);
            desColls = Arrays.asList(collections);
            if(collections.length > 1) {
                StringBuilder sb = new StringBuilder();
                for(String coll : collections) {
                    String item = String.format(CommonConstants.NS_FILTER_FORMT, getNsName(mongoConfig.getDestDatabase(), coll), vals[0], vals[1]);
                    sb.append(item);
                    sb.append(CommonConstants.SEPERATOR_RECORD);
                }
                sb.deleteCharAt(sb.length() - 1);
                filter = String.format(CommonConstants.GENERAL_NS_FILTER_FORMT, sb.toString());
            } else {
                filter = String.format(CommonConstants.NS_FILTER_FORMT, getNsName(mongoConfig.getDestDatabase(), mongoConfig.getDestCollection()), vals[0], vals[1]);
            }
        } else {
            filter = String.format(CommonConstants.TIMESTAMP_FILTER_FORMT, vals[0], vals[1]);
        }

        //3. 执行过滤条件
        FindIterable iter = sourceCollection.find(BasicDBObject.parse(filter)).limit(mongoConfig.getSourceLimitSize());
        MongoCursor<Document> cursor = iter.cursor();
        Map<String, List<WriteModel<Document>>> deduplicateMap = new HashMap<>();
        Map<String, List<WriteModel<Document>>> deleteModels = new HashMap<>();
        Document doc = null;
        int count = 0;
        while(cursor.hasNext()) {
            doc = cursor.next();
            bsonTimestamp = (BsonTimestamp) doc.get(CommonConstants.COLUMN_TS);
            String ns = (String) doc.get(CommonConstants.COLUMN_NS);
            if(StringUtils.isNotEmpty(mongoConfig.getDestDatabase()) && !ns.contains(mongoConfig.getDestDatabase())) {
                continue;
            }
            String db = ns.substring(0, ns.indexOf(CommonConstants.CHAR_DOT));
            String col = ns.substring(ns.indexOf(CommonConstants.CHAR_DOT) + 1);
            if(StringUtils.isNotEmpty(mongoConfig.getDestDatabase()) && !mongoConfig.getDestDatabase().equals(db) ||
                    StringUtils.isNotEmpty(mongoConfig.getDestDatabase()) && StringUtils.isNotEmpty(mongoConfig.getDestCollection()) && !desColls.contains(col) ||
                    StringUtils.isEmpty(mongoConfig.getDestDatabase()) && StringUtils.isNotEmpty(mongoConfig.getDestCollection())) {
                continue;
            }
            String op = (String) doc.get(CommonConstants.COLUMN_OP);
            switch (op) {
                case CommonConstants.OPERATION_INSERT:
                    Document insertDoc = (Document) doc.get(CommonConstants.COLUMN_O);
                    UpdateOneModel insertOneModel = new UpdateOneModel(new Document(CommonConstants.DEFAULT_ID_COLUMN, insertDoc.get(CommonConstants.DEFAULT_ID_COLUMN)),
                            new Document(CommonConstants.COLUMN_O_SET, insertDoc), new UpdateOptions().upsert(true));
                    Long cacheTime = cacheService.getIDOpertionValue(col, insertDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString() + CommonConstants.CACHE_SUFFIX_OPERATION_INSERT);
                    if((Boolean.TRUE.equals(mongoConfig.getDeduplicateFilterd()) && insertDoc.get(mongoConfig.getDeduplicateCol()) == null) || cacheTime != null) {
                        continue;
                    }
                    if(Boolean.TRUE.equals(mongoConfig.getDeduplicateFilterd())) {
                        long time = Long.valueOf(insertDoc.get(mongoConfig.getDeduplicateCol()).toString());
                        //为解决双向复制导致的循环复制问题,超过配置时间范围的oplog将被过滤掉
                        if(cacheTime == null && System.currentTimeMillis()/1000 - time <= mongoConfig.getMaxTime().intValue()) {
                            cacheService.setIDOpertionCache(col, buildCacheData(insertDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString()  + CommonConstants.CACHE_SUFFIX_OPERATION_INSERT, time));
                            cacheTime = time;
                        }
                        if(System.currentTimeMillis()/1000 - time <= mongoConfig.getMaxTime().intValue() && cacheTime != null && time >= cacheTime.longValue()) {
                            List<WriteModel<Document>> models = deduplicateMap.get(col);
                            if(models == null) {
                                deduplicateMap.put(col, new ArrayList<>());
                                models = deduplicateMap.get(col);
                            }
                            models.add(insertOneModel);
                        } else {
                            count++;
                        }
                    } else {
                        List<WriteModel<Document>> models = deduplicateMap.get(col);
                        if(models == null) {
                            deduplicateMap.put(col, new ArrayList<>());
                            models = deduplicateMap.get(col);
                        }
                        models.add(insertOneModel);
                        cacheService.setIDOpertionCache(col, buildCacheData(insertDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString()  + CommonConstants.CACHE_SUFFIX_OPERATION_INSERT, System.currentTimeMillis()));
                    }

                    break;
                case CommonConstants.OPERATION_UPDATE:
                    Document filterDoc = (Document) doc.get(CommonConstants.COLUMN_O2);
                    Document updateDoc = (Document) doc.get(CommonConstants.COLUMN_O);
                    if(Boolean.TRUE.equals(mongoConfig.getDeduplicateFilterd()) && ((Document) updateDoc.get(CommonConstants.COLUMN_O_SET)).get(mongoConfig.getDeduplicateCol()) == null) {
                        continue;
                    }
                    updateDoc.remove(CommonConstants.COLUMN_O_V);
                    if(cacheService.hasUpdated(filterDoc, updateDoc)) {
                        continue;
                    }
                    cacheService.setUpdateCache(filterDoc, updateDoc);
                    UpdateOneModel updateOneModel = new UpdateOneModel(filterDoc, updateDoc);
                    if(Boolean.TRUE.equals(mongoConfig.getDeduplicateFilterd())) {
                        long updateTime = Long.valueOf(((Document) updateDoc.get(CommonConstants.COLUMN_O_SET)).get(mongoConfig.getDeduplicateCol()).toString());
                        Long ucacheTime = cacheService.getIDOpertionValue(col, filterDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString() + CommonConstants.CACHE_SUFFIX_OPERATION_UPDATE);
                        //为解决双向复制导致的循环复制问题,超过配置时间范围的oplog将被过滤掉
                        if(ucacheTime == null && System.currentTimeMillis()/1000 - updateTime <= mongoConfig.getMaxTime().intValue()) {
                            cacheService.setIDOpertionCache(col, buildCacheData(filterDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString() + CommonConstants.CACHE_SUFFIX_OPERATION_UPDATE, updateTime));
                            ucacheTime = updateTime;
                        }
                        if(System.currentTimeMillis()/1000 - updateTime <= mongoConfig.getMaxTime().intValue() && ucacheTime != null && updateTime >= ucacheTime.longValue()) {
                            List<WriteModel<Document>> models = deduplicateMap.get(col);
                            if(models == null) {
                                deduplicateMap.put(col, new ArrayList<>());
                                models = deduplicateMap.get(col);
                            }
                            models.add(updateOneModel);
                        } else {
                            count++;
                        }
                    } else {
                        List<WriteModel<Document>> models = deduplicateMap.get(col);
                        if(models == null) {
                            deduplicateMap.put(col, new ArrayList<>());
                            models = deduplicateMap.get(col);
                        }
                        models.add(updateOneModel);
                    }
                    break;
                case CommonConstants.OPERATION_DELETE:
                    Document deleteDoc = (Document) doc.get(CommonConstants.COLUMN_O);
                    Boolean b = cacheService.getDeleteValue(col, deleteDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString());
                    if(b == null) {
                        cacheService.setDeleteCache(col, buildDeletedCacheData(deleteDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString(), Boolean.TRUE));
                        DeleteOneModel deleteOneModel = new DeleteOneModel(deleteDoc);
                        List<WriteModel<Document>> models = deleteModels.get(col);
                        if(models == null) {
                            deleteModels.put(col, new ArrayList<>());
                            models = deleteModels.get(col);
                        }
                        models.add(deleteOneModel);
                    } else {
                        count++;
                    }
                    break;
            }
        }
        if(count > 0) {
            log.info("The count that filter by timeout is {}", count);
        }
        updateMongodb(deduplicateMap, deleteModels, false);
        updateMongodb(deduplicateMap, deleteModels, true);
        if(bsonTimestamp != null && (hasValidData(deduplicateMap, deleteModels) || !preTimestamp.equals(bsonTimestamp))) {
            updateRecordFile(buildRecord(bsonTimestamp));
        }
    }
    
    private void updateMongodb(Map<String, List<WriteModel<Document>>> deduplicateMap, Map<String, List<WriteModel<Document>>> deleteModels, boolean onlyDelete) {
        Set<Map.Entry<String, List<WriteModel<Document>>>> entries = onlyDelete ?  deleteModels.entrySet() : deduplicateMap.entrySet();
        Iterator<Map.Entry<String, List<WriteModel<Document>>>> iterator = entries.iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, List<WriteModel<Document>>> entry = iterator.next();
            String collectionName = entry.getKey();
            List<WriteModel<Document>> values = entry.getValue();
            if(!onlyDelete && deleteModels.get(collectionName) != null) {
                values.addAll(deleteModels.get(collectionName));
            }
            if(!CollectionUtils.isEmpty(values)) {
                log.info("表{}包含有效更新语句如下: {}", collectionName, values);
            }
            int page = (values.size()%mongoConfig.getBatchSize() == 0) ? (values.size()/mongoConfig.getBatchSize()) : (values.size()/mongoConfig.getBatchSize() + 1);
            MongoCollection<Document> mongoCollection = mongopTemplateMap.get(mongoConfig.getDestDatabase()).getCollection(collectionName);
            for(int i=0; i<page; i++) {
                BulkWriteResult bulkWriteResult = mongoCollection.bulkWrite(values.subList(i*mongoConfig.getBatchSize(), Math.min((i+1)*mongoConfig.getBatchSize(), values.size())));
                log.info("Mongodb批量更新结果为: {}", bulkWriteResult);
            }
            if(!onlyDelete) {
                deleteModels.remove(collectionName);
            }
        }
    }

  1. 执行任务:
    public void processOplogOperation() throws Exception {
        File file = new File(mongoConfig.getRunningFile());
        if(file.exists()) {
            return;
        }
        //获取目的地址 MongoTemplate
        if(mongopTemplateMap == null) {
            mongopTemplateMap = mongopTemplateMap();
        }

        file.createNewFile();

        MongoClient sourceMongoClient = getSourceMongoClient();
        MongoDatabase sourceDatabase = sourceMongoClient.getDatabase(mongoConfig.getSourceDatabase());
        MongoCollection sourceCollection = sourceDatabase.getCollection(mongoConfig.getSourceCollection());

        replicateOplogOperation(sourceCollection);
    }


如果这篇文章对你有所帮助,或者有所启发的话,帮忙 分享、收藏、点赞、在看,你的支持就是我坚持下去的最大动力!

敏感字段加密后还能进行模糊查询吗?该如何实现?


40亿个号码,限制1G内存,你如何实现存储?


分布式接口幂等性、分布式限流:Guava 、nginx和lua限流

文章转载自一安未来,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论