
大家好,我是一安~
介绍
最近接到一个支持双机房备灾的需求,每个机房都承载业务,故双机房mongodb
需要双写。AAA
需要对用户表进行更新,且用户表中为两个机房的全量数据。此工具为双机房数据支持双向同步功能,即机房A/B
的数据更新及时同步到机房B/A
。
架构图:
思路
在MongoDB
中,local.oplog.rs
是一个特殊的集合,用于记录数据库中的所有操作。它的作用和意义如下:
记录数据库操作: local.oplog.rs
集合存储了对数据库执行的所有更新操作的记录,包括插入、更新和删除等操作。实现数据同步:在复制集或副本集中,从节点可以通过读取 local.oplog.rs
集合来获取主节点的写操作记录,并在自己节点上重放这些操作,实现主从的数据同步。这使得从节点可以随时通过读取oplog
来与主节点的数据状态达到一致,无论从节点何时加入复制集。支持故障恢复:如果主节点出现故障,从节点可以通过读取 local.oplog.rs
集合中的操作记录,将数据恢复到与主节点一致的状态。这对于MongoDB
复制集的工作至关重要,它提供了数据库中的所有历史写操作的记录,使得从节点可以随时与主节点的数据状态保持一致。追踪数据库历史操作:通过查询 local.oplog.rs
集合,可以追踪数据库的历史操作,这对于数据恢复、监控和调试等场景非常有用。
local.oplog.rs
集合样例:
op
: 表示操作的类型:"i" 表示插入,"u" 表示更新,"u" 表示更新ns
: 包含了操作所涉及的命名空间,通常由数据库名称和集合名称组成o
: 包含了执行操作的文档,例如插入操作中的文档数据o2
: 包含更新操作的查询条件,以确定要更新的文档ts
: 表示操作的时间戳,它通常是一个由秒和递增的序号组成的值,用于确定操作的时间顺序t
: 事务IDv
: 表示MongoDB
复制协议的版本号prevOpTime
: 之前的操作时间
所以,这里也是利用local.oplog.rs
实现双机房数据同步。
实现
依赖引入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
配置说明:
mongodb:
config:
#源Mongo地址,将从源mongo向destMongo中拷贝oplog更新操作。备注:格式mongodb://用户名:密码@ip:port,ip:port/ 末尾不要加其他参数
sourceMongoUri: mongodb://admin:admin@192.168.56.100:27017,192.168.56.101:27017/
#源Mongo中需要拷贝数据的database名称
sourceDatabase: local
#源Mongo中需要拷贝数据的collection名称
sourceCollection: oplog.rs
#从源mongodb最多读取多少条数据,用在limit操作中
sourceLimitSize: 10000
#是否通过字段来作为去除循环依赖开启对循环依赖的双层过滤,如果开启双层过滤,可以用来判断去除循环依赖的字段名,此字段必须为时间或者时间戳
deduplicateFilterd: false
deduplicateCol: logintime
#判断去重字段距离当下时间超过maxTime秒,则忽略相关oplog日志,单位秒。用来去除循环更新操作
maxTime: 180
#目的Mongo地址,将从sourceMongo向目的mongo中拷贝oplog更新操作
destMongoUri: mongodb://admin:admin@192.168.56.102:27017,192.168.56.103:27017/
#目的Mongo中需要更新数据的database名称,如果不需要指定同步特定数据库,则不需要填写,将对mongodb实例下的所有数据局进行数据同步。备注:目标mongodb实例中必须提前创建好同名数据库
destDatabase: test
#目的Mongo中需要更新数据的collection名称,如果不需要指定同步特定数据表,则不需要填写。备注:如果需要同步特定数据表,需要同时填写好destDatabase和destCollection
destCollection: onlineuser,accessuser
#目的mongo中验证的用户名
destAuthSource: admin
#断点记录文件,记录当前mongodb处理到oplog的位置
#record文件中内容格式: 秒级时间戳,个数 实例:1667883664,1
#启动时可以设置想要开始同步的时间戳,个数设置为1即可,如果启动时不设置,默认使用五分钟前的秒级时间戳,从五分钟前开始同步oplog操作
recordFile: D:/testExample/mongodbdemo/src/main/resources/record.txt
#判断当前是否有进程在执行同步方案
runningFile: D:/testExample/mongodbdemo/src/main/resources/.running
#更新操作以批量形式同步到目的mongo实例,此项为批量的大小
batchSize: 1000
#本地缓存保存最大数据量
cacheCapacity: 3000
#本地缓存超时时间,单位秒
timeout: 180
加载配置:
@Data
@Configuration
@ConfigurationProperties("mongodb.config")
public class MongoConfig {
private String sourceMongoUri;
private String sourceDatabase;
private String sourceCollection;
private Integer sourceLimitSize;
private Boolean deduplicateFilterd;
private String deduplicateCol;
private Integer maxTime;
private String destMongoUri;
private String destDatabase;
private String destCollection;
private String destAuthSource;
private String recordFile;
private String runningFile;
private Integer batchSize;
private Integer cacheCapacity;
private Integer timeout;
}
公共类
public class CommonConstants {
public static final String NS_FILTER_FORMT = "{ns : \"%s\", ts : { $gt : Timestamp(%s, %s)}}";
public static final String TIMESTAMP_FILTER_FORMT = "{ts : { $gt : Timestamp(%s, %s)}}";
public static final String GENERAL_NS_FILTER_FORMT = "{$or: [ %s ]}";
public static final String COLUMN_NS = "ns";
public static final String COLUMN_TS = "ts";
public static final String COLUMN_OP = "op";
public static final String COLUMN_O = "o";
public static final String COLUMN_O2 = "o2";
public static final String COLUMN_O_SET = "$set";
public static final String COLUMN_O_V = "$v";
public static final String SEPERATOR_RECORD = ",";
public static final String CHAR_DOT = ".";
public static final String FIRST_LINE = "1";
public static final String OPERATION_INSERT = "i";
public static final String OPERATION_UPDATE = "u";
public static final String OPERATION_DELETE = "d";
public static final String URI_SEPERATOR = "/";
public static final String AUTH_SOURCE = "?authSource=";
public static final String DEFAULT_ID_COLUMN = "_id";
public static final String CACHE_SUFFIX_OPERATION_INSERT = "_insert";
public static final String CACHE_SUFFIX_OPERATION_UPDATE = "_update";
}
缓存帮助类:
@Service
public class CacheService {
private LoadingCache<String, Map<String, Long>> idOperationCache = null;
private LoadingCache<Document, Document> updateCache = null;
private LoadingCache<String, Map<String, Boolean>> deleteCache = null;
@Value("${mongodb.config.cacheCapacity}")
private Integer cacheCapacity;
@Value(("${mongodb.config.timeout}"))
private Integer timeout;
@PostConstruct
public void init() {
idOperationCache = CacheBuilder.newBuilder().initialCapacity(cacheCapacity).maximumSize(cacheCapacity)
.expireAfterWrite(timeout, TimeUnit.SECONDS)
.build(new CacheLoader<String, Map<String, Long>>() {
@Override
public Map<String, Long> load(String s) throws Exception {
return null;
}
});
updateCache = CacheBuilder.newBuilder().initialCapacity(cacheCapacity).maximumSize(cacheCapacity)
.expireAfterWrite(timeout, TimeUnit.SECONDS)
.build(new CacheLoader<Document, Document>() {
@Override
public Document load(Document s) throws Exception {
return null;
}
});
deleteCache = CacheBuilder.newBuilder().initialCapacity(cacheCapacity).maximumSize(cacheCapacity)
.expireAfterWrite(timeout, TimeUnit.SECONDS)
.build(new CacheLoader<String, Map<String, Boolean>>() {
@Override
public Map<String, Boolean> load(String s) throws Exception {
return null;
}
});
}
public void setIDOpertionCache(String key, Map<String, Long> value) {
idOperationCache.put(key, value);
}
public Long getIDOpertionValue(String key, String id) {
try {
Map<String, Long> data = idOperationCache.get(key);
return data == null ? null : data.get(id);
} catch (Exception e) {
return null;
}
}
public boolean hasUpdated(Document query, Document update) {
try {
Document doc = updateCache.get(query);
if(doc != null && doc.equals(update)) {
return true;
}
return false;
} catch (Exception e) {
return false;
}
}
public void setUpdateCache(Document query, Document update) {
updateCache.put(query, update);
}
public void setDeleteCache(String key, Map<String, Boolean> value) {
deleteCache.put(key, value);
}
public Boolean getDeleteValue(String key, String id) {
try {
Map<String, Boolean> map = deleteCache.get(key);
return map.get(id) == null ? null : map.get(id);
} catch (Exception e) {
return null;
}
}
}
核心逻辑:
private void replicateOplogOperation(MongoCollection sourceCollection) throws Exception {
// 1. 读取记录文件
File recordFile = new File(mongoConfig.getRecordFile());
if(!recordFile.exists()) {
recordFile.createNewFile();
}
BufferedReader bufferedReader = new BufferedReader(new FileReader(recordFile));
String line = bufferedReader.readLine();
bufferedReader.close();
String[] vals = null;
if(StringUtils.isNotEmpty(line)) {
vals = line.split(CommonConstants.SEPERATOR_RECORD);
} else {
vals = buildInitialRecord().split(CommonConstants.SEPERATOR_RECORD);
}
String filter = null;
List<String> desColls = null;
BsonTimestamp preTimestamp = new BsonTimestamp(Integer.valueOf(vals[0]), Integer.valueOf(vals[1]));
//2. 根据配置过滤需要同步的集合
if(StringUtils.isNotEmpty(mongoConfig.getDestCollection())) {
String[] collections = mongoConfig.getDestCollection().split(CommonConstants.SEPERATOR_RECORD);
desColls = Arrays.asList(collections);
if(collections.length > 1) {
StringBuilder sb = new StringBuilder();
for(String coll : collections) {
String item = String.format(CommonConstants.NS_FILTER_FORMT, getNsName(mongoConfig.getDestDatabase(), coll), vals[0], vals[1]);
sb.append(item);
sb.append(CommonConstants.SEPERATOR_RECORD);
}
sb.deleteCharAt(sb.length() - 1);
filter = String.format(CommonConstants.GENERAL_NS_FILTER_FORMT, sb.toString());
} else {
filter = String.format(CommonConstants.NS_FILTER_FORMT, getNsName(mongoConfig.getDestDatabase(), mongoConfig.getDestCollection()), vals[0], vals[1]);
}
} else {
filter = String.format(CommonConstants.TIMESTAMP_FILTER_FORMT, vals[0], vals[1]);
}
//3. 执行过滤条件
FindIterable iter = sourceCollection.find(BasicDBObject.parse(filter)).limit(mongoConfig.getSourceLimitSize());
MongoCursor<Document> cursor = iter.cursor();
Map<String, List<WriteModel<Document>>> deduplicateMap = new HashMap<>();
Map<String, List<WriteModel<Document>>> deleteModels = new HashMap<>();
Document doc = null;
int count = 0;
while(cursor.hasNext()) {
doc = cursor.next();
bsonTimestamp = (BsonTimestamp) doc.get(CommonConstants.COLUMN_TS);
String ns = (String) doc.get(CommonConstants.COLUMN_NS);
if(StringUtils.isNotEmpty(mongoConfig.getDestDatabase()) && !ns.contains(mongoConfig.getDestDatabase())) {
continue;
}
String db = ns.substring(0, ns.indexOf(CommonConstants.CHAR_DOT));
String col = ns.substring(ns.indexOf(CommonConstants.CHAR_DOT) + 1);
if(StringUtils.isNotEmpty(mongoConfig.getDestDatabase()) && !mongoConfig.getDestDatabase().equals(db) ||
StringUtils.isNotEmpty(mongoConfig.getDestDatabase()) && StringUtils.isNotEmpty(mongoConfig.getDestCollection()) && !desColls.contains(col) ||
StringUtils.isEmpty(mongoConfig.getDestDatabase()) && StringUtils.isNotEmpty(mongoConfig.getDestCollection())) {
continue;
}
String op = (String) doc.get(CommonConstants.COLUMN_OP);
switch (op) {
case CommonConstants.OPERATION_INSERT:
Document insertDoc = (Document) doc.get(CommonConstants.COLUMN_O);
UpdateOneModel insertOneModel = new UpdateOneModel(new Document(CommonConstants.DEFAULT_ID_COLUMN, insertDoc.get(CommonConstants.DEFAULT_ID_COLUMN)),
new Document(CommonConstants.COLUMN_O_SET, insertDoc), new UpdateOptions().upsert(true));
Long cacheTime = cacheService.getIDOpertionValue(col, insertDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString() + CommonConstants.CACHE_SUFFIX_OPERATION_INSERT);
if((Boolean.TRUE.equals(mongoConfig.getDeduplicateFilterd()) && insertDoc.get(mongoConfig.getDeduplicateCol()) == null) || cacheTime != null) {
continue;
}
if(Boolean.TRUE.equals(mongoConfig.getDeduplicateFilterd())) {
long time = Long.valueOf(insertDoc.get(mongoConfig.getDeduplicateCol()).toString());
//为解决双向复制导致的循环复制问题,超过配置时间范围的oplog将被过滤掉
if(cacheTime == null && System.currentTimeMillis()/1000 - time <= mongoConfig.getMaxTime().intValue()) {
cacheService.setIDOpertionCache(col, buildCacheData(insertDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString() + CommonConstants.CACHE_SUFFIX_OPERATION_INSERT, time));
cacheTime = time;
}
if(System.currentTimeMillis()/1000 - time <= mongoConfig.getMaxTime().intValue() && cacheTime != null && time >= cacheTime.longValue()) {
List<WriteModel<Document>> models = deduplicateMap.get(col);
if(models == null) {
deduplicateMap.put(col, new ArrayList<>());
models = deduplicateMap.get(col);
}
models.add(insertOneModel);
} else {
count++;
}
} else {
List<WriteModel<Document>> models = deduplicateMap.get(col);
if(models == null) {
deduplicateMap.put(col, new ArrayList<>());
models = deduplicateMap.get(col);
}
models.add(insertOneModel);
cacheService.setIDOpertionCache(col, buildCacheData(insertDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString() + CommonConstants.CACHE_SUFFIX_OPERATION_INSERT, System.currentTimeMillis()));
}
break;
case CommonConstants.OPERATION_UPDATE:
Document filterDoc = (Document) doc.get(CommonConstants.COLUMN_O2);
Document updateDoc = (Document) doc.get(CommonConstants.COLUMN_O);
if(Boolean.TRUE.equals(mongoConfig.getDeduplicateFilterd()) && ((Document) updateDoc.get(CommonConstants.COLUMN_O_SET)).get(mongoConfig.getDeduplicateCol()) == null) {
continue;
}
updateDoc.remove(CommonConstants.COLUMN_O_V);
if(cacheService.hasUpdated(filterDoc, updateDoc)) {
continue;
}
cacheService.setUpdateCache(filterDoc, updateDoc);
UpdateOneModel updateOneModel = new UpdateOneModel(filterDoc, updateDoc);
if(Boolean.TRUE.equals(mongoConfig.getDeduplicateFilterd())) {
long updateTime = Long.valueOf(((Document) updateDoc.get(CommonConstants.COLUMN_O_SET)).get(mongoConfig.getDeduplicateCol()).toString());
Long ucacheTime = cacheService.getIDOpertionValue(col, filterDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString() + CommonConstants.CACHE_SUFFIX_OPERATION_UPDATE);
//为解决双向复制导致的循环复制问题,超过配置时间范围的oplog将被过滤掉
if(ucacheTime == null && System.currentTimeMillis()/1000 - updateTime <= mongoConfig.getMaxTime().intValue()) {
cacheService.setIDOpertionCache(col, buildCacheData(filterDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString() + CommonConstants.CACHE_SUFFIX_OPERATION_UPDATE, updateTime));
ucacheTime = updateTime;
}
if(System.currentTimeMillis()/1000 - updateTime <= mongoConfig.getMaxTime().intValue() && ucacheTime != null && updateTime >= ucacheTime.longValue()) {
List<WriteModel<Document>> models = deduplicateMap.get(col);
if(models == null) {
deduplicateMap.put(col, new ArrayList<>());
models = deduplicateMap.get(col);
}
models.add(updateOneModel);
} else {
count++;
}
} else {
List<WriteModel<Document>> models = deduplicateMap.get(col);
if(models == null) {
deduplicateMap.put(col, new ArrayList<>());
models = deduplicateMap.get(col);
}
models.add(updateOneModel);
}
break;
case CommonConstants.OPERATION_DELETE:
Document deleteDoc = (Document) doc.get(CommonConstants.COLUMN_O);
Boolean b = cacheService.getDeleteValue(col, deleteDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString());
if(b == null) {
cacheService.setDeleteCache(col, buildDeletedCacheData(deleteDoc.get(CommonConstants.DEFAULT_ID_COLUMN).toString(), Boolean.TRUE));
DeleteOneModel deleteOneModel = new DeleteOneModel(deleteDoc);
List<WriteModel<Document>> models = deleteModels.get(col);
if(models == null) {
deleteModels.put(col, new ArrayList<>());
models = deleteModels.get(col);
}
models.add(deleteOneModel);
} else {
count++;
}
break;
}
}
if(count > 0) {
log.info("The count that filter by timeout is {}", count);
}
updateMongodb(deduplicateMap, deleteModels, false);
updateMongodb(deduplicateMap, deleteModels, true);
if(bsonTimestamp != null && (hasValidData(deduplicateMap, deleteModels) || !preTimestamp.equals(bsonTimestamp))) {
updateRecordFile(buildRecord(bsonTimestamp));
}
}
private void updateMongodb(Map<String, List<WriteModel<Document>>> deduplicateMap, Map<String, List<WriteModel<Document>>> deleteModels, boolean onlyDelete) {
Set<Map.Entry<String, List<WriteModel<Document>>>> entries = onlyDelete ? deleteModels.entrySet() : deduplicateMap.entrySet();
Iterator<Map.Entry<String, List<WriteModel<Document>>>> iterator = entries.iterator();
while (iterator.hasNext()) {
Map.Entry<String, List<WriteModel<Document>>> entry = iterator.next();
String collectionName = entry.getKey();
List<WriteModel<Document>> values = entry.getValue();
if(!onlyDelete && deleteModels.get(collectionName) != null) {
values.addAll(deleteModels.get(collectionName));
}
if(!CollectionUtils.isEmpty(values)) {
log.info("表{}包含有效更新语句如下: {}", collectionName, values);
}
int page = (values.size()%mongoConfig.getBatchSize() == 0) ? (values.size()/mongoConfig.getBatchSize()) : (values.size()/mongoConfig.getBatchSize() + 1);
MongoCollection<Document> mongoCollection = mongopTemplateMap.get(mongoConfig.getDestDatabase()).getCollection(collectionName);
for(int i=0; i<page; i++) {
BulkWriteResult bulkWriteResult = mongoCollection.bulkWrite(values.subList(i*mongoConfig.getBatchSize(), Math.min((i+1)*mongoConfig.getBatchSize(), values.size())));
log.info("Mongodb批量更新结果为: {}", bulkWriteResult);
}
if(!onlyDelete) {
deleteModels.remove(collectionName);
}
}
}
执行任务:
public void processOplogOperation() throws Exception {
File file = new File(mongoConfig.getRunningFile());
if(file.exists()) {
return;
}
//获取目的地址 MongoTemplate
if(mongopTemplateMap == null) {
mongopTemplateMap = mongopTemplateMap();
}
file.createNewFile();
MongoClient sourceMongoClient = getSourceMongoClient();
MongoDatabase sourceDatabase = sourceMongoClient.getDatabase(mongoConfig.getSourceDatabase());
MongoCollection sourceCollection = sourceDatabase.getCollection(mongoConfig.getSourceCollection());
replicateOplogOperation(sourceCollection);
}
如果这篇文章对你有所帮助,或者有所启发的话,帮忙 分享、收藏、点赞、在看,你的支持就是我坚持下去的最大动力!

分布式接口幂等性、分布式限流:Guava 、nginx和lua限流

文章转载自一安未来,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




