暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

5分钟内使用go对2张8000万表进行对比

codefan 2021-07-08
122

5分钟内使用go对2张8000万表进行对比

之前为了保证服务稳定,程序是运行在了预生产环境,更新以后将增量的数据发布到生产环境。今天这个例子,用到的表属于一张实体基本信息表,字段多,数据量大,大约在8000多万,数据56G,索引11G,之前分析过上游每天增量大约在几十万,每天上传还是挺快的。昨天上游修复上周反馈的bug,推送了2000多万的数据,早晨7点多看了下增量发布任务,没有执行完,看了下这个表的发布流程,一张全量表一张增量表,表结构完全一样。

数据情况
增量发布流程

由于发布时间过久,避免上班后线上使用出问题,我在调度平台暂停了上传任务,注意此时其实已经有数据被删除,但是可能还没写入到正式表。所以需要赶紧将数据发布,重新用go写了上传脚本,多协程从增量表读取数据切分,按先删除后写入的操作完成导入。到公司后,心想还是做个数据检验,对比一下哪些数据在预生产有生产环境无。这个表唯一索引是entity_id,是业务主键,为什么不用主键自增id去做校验,涉及到业务曾用名处理问题,有可能会报索引冲突,线上的数据其实会比pre多,因为pre环境会因为曾用名表更的情况删除数据,这部分数据在上次上传到生产环境后不会被删除,因为其他表不会再使用到这部分id,所以同步pre删除的相当于不做处理。

如何对2张8000多万表做对比

  • 任务切分的时候做了特殊处理,entity_id因为业务问题,之前数据类型是int(11) 后数据源扩充到了上游全库,改成了bigint(20),因此这个字段数据是倾斜的,并非是自增连续的 处理方法如下
func GetStartEnds(start, end int) [][]int {
 startEnds := make([][]int, 0)
 ls := make([]int, 2)
 ls[0] = start
 ls[1] = 100012000000
 startEnds = append(startEnds, ls)
 other_ls := make([]int, 2)
 other_ls[0] = 1000100000000000
 other_ls[1] = end
 startEnds = append(startEnds, other_ls)
 return startEnds
}


复制
  • 初始化2个数据库连接池

func init(){
 StartTime = time.Now()
 DbName = "z_pe"
 TableName = "base_entity_basic_info"
 PK = "entity_id"
 mc := &backends.MysqlConfig{
  Host:HOST,
  User:UserName,
  Password:Password,
  Db:DATABASE,
  Port:PORT,
  Charset:CHARSET,
 }
 mc.ConnUri = mc.GetConnUri()
 mysql = backends.NewMysqlClient(mc)
 pmc := &backends.MysqlConfig{
  Host:PHOST,
  User:PUserName,
  Password:PPassword,
  Db:DATABASE,
  Port:PORT,
  Charset:CHARSET,
 }
 pmc.ConnUri = pmc.GetConnUri()
 pmysql = backends.NewMysqlClient(pmc)
 
}

复制
  • 分发任务 开启协程
 start := mysql.GetMinId(DbName,TableName,PK)
 end := mysql.GetMaxId(DbName,TableName,PK)
 startEnds := GetStartEnds(start,end)
 JobsChan := make(chan *Job,0)
 ResultChan := make(chan *Result,0)
 workerNum := 20
 Batch := 10000
 for i:=1;i<=workerNum;i++{
  wg.Add(1)
  go worker(i,JobsChan,ResultChan)
 }

 go func(){
  for _, ls := range startEnds {
   start, end := ls[0], ls[1]
   log.Println("start,end ", start, end)
   for start < end{
    _end := start + Batch
    if _end > end{
     _end = end
    }
    p := make([]int,2)
    p[0] = start
    p[1] = _end
    JobsChan <- &Job{Start:start,End:_end}
    start = _end
   }
  }
  close(JobsChan)
 }()

复制
  • worker函数 使用了wg(sync.WaitGroup) 当JobsChan关闭后wg.Done() 退出协程
worker函数
  • GetNotExists函数 找到pre环境有生产环境没有的entity_id
GetNotExists函数
  • 汇总结果 使用了单独的协程处理入库所以无需加锁
  • 脚本发布到服务器跑一下结果,检测出2921条不存在生产环境,用时4m52s,查了下入库时间是今天pre环境通过binlog进来的数据明天自动发布,校验通过


文章转载自codefan,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论