暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

mysql 单表7000万数据如何多进程查询更新

codefan 2021-01-19
219

mysql 单表7000万数据如何多进程查询更新

一张7000多万的表,省份字段脏数据较多且没有加索引,执行 where reg_province is null sql时候必然全表扫描 导致查询异常缓慢

实现思想

  • 根据主键切分,命中主键,where 条件里使用 主键>=a and 主键<b and ...
  • 支持跑全量跑增量
  • 支持多进程起多个任务操作

代码分析

  • Scheduler类
    • 1.1 主要读取配置文件
    • 1.2 根据命令行参数 判断任务类别
    • 1.3 获取最小最大id,根据每次查询的batch去划分列表,作为master给不同的进程分配不同的任务执行区间 start-end 比如0进程读取0-10000,1进程读取10000-20000...
    • 1.4 创建子进程 将父进程的配置传递到子进程,堵塞执行主进程
    • 1.5 主进程获取每个子进程的任务结果,失败进程自动抛出异常 查看日志
  • ChildProcess 类
    • 2.1 根据父进程传过来的任务列表 循环执行具体的逻辑 调接口获取返回的数据 入库
    • 2.2 将任务结果返回给父进程

具体实现:


import os
import argparse
import copy
import time
import requests
from multiprocessing import Process,Queue
from zeronetools.config import Config
from zeronetools.log import LoggerRouter
from zeronetools.util import (
            get_str_yestoday,
            get_str_tomorrow,
            get_secure_raw_config,
            get_now,
            get_table_min_max_sql,
            split_list,
            get_process_datas
)
from zeronetools.backends import RedisClient,MysqlClient
from zeronetools.roboter import Roboter
from zeronetools.task import BaseTask
from zeronetools.decorators import log_timed
from zeronetools.transfer import REDIS_NOID_HASH,REDIS_ENTITY_NAME_HASH,REDIS_FROMAPP_HASH



# and (reg_province is null or reg_province like '%A%'  or reg_province like '%z%') 
BASE_QUERY = """\
select id,entity_id,entity_name,reg_address,reg_province
from base_entity_basic_info 
where id>={start} and id<{end} and {condition} and reg_address is not null
"""


FIELDS  = ["entity_id","entity_name","reg_address","reg_province","derive_reg_province","province_code","province_name","city_name","city_code","area_name","area_code"]

BASE_INSERT_ZO = """\
insert into zo_base_entity_area({str_fields})values({fmt_fields})
on duplicate key update {update_duplicate}
"""

 
INSERT_ZO = BASE_INSERT_ZO.format(
    str_fields=",".join(FIELDS),
    fmt_fields = ','.join(['%s' for i in FIELDS]),
    update_duplicate = ','.join(['{k}=values({k})'.format(k=i) for i in FIELDS])
)
INSERT_TEMP_REG_PROVINCE  = """\
insert into temp_reg_province(reg_province,reg_province_num)values(%s,%s)
"""



class ChildProcessFailedException(Exception):
    pass


class ChildProcess(object):
    def __init__(self,task_name,config,logger,log_name,start_ends,process_id,queue):
        self.starttime = get_now()
        self.task_name = task_name
        self.config = config
        self.logger = logger
        self.log_name = log_name
        self.start_ends = start_ends
        self.env = self.config['env']
        self.reader = MysqlClient.from_settings(self.config,self.logger)
        self.roboter = Roboter.from_settings(self.config,self.logger)
        self.process_id = process_id
        self.queue = queue
        self.task_status = 0
        self._condition = self.config.pop('_condition')
        self.session = requests.Session()
        self.api = self.config["area_api"]
        self.error_codes = {}
        self.headers = {
            'Content-Type''text/plain;charset=utf-8',
            'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9'
        }

    def close(self):
        self.reader.close()
    
    
    def get_code(self,reg_address):
        """
        {
       'province': [{'code': '530000', 'name': '云南省'}], 
        'city': [{'code': '530100', 'name': '昆明市', 'provinceCode': '530000'}],
        'area': [{'code': '530103', 'name': '盘龙区', 'cityCode': '530100', 'provinceCode': '530000'}],
        'detail': ['沣源路与北京路交叉口东南侧俊发名城N-15地块A幢1-2号商铺']}
        """

        def tran_json(data):
            #"province_code","province_name","city_name","city_code"
            if len(data.get("area",[]))>0:
                _result = data["area"][0]
                item = {
                    'derive_reg_province':_result['code'],
                    "province_code":data["province"][0]['code'],
                    "province_name":data["province"][0]['name'],
                    "city_code":data["city"][0]['code'],
                    "city_name":data["city"][0]['name'],
                    "area_name":_result['name'],
                    "area_code":_result['code'],
                }
            elif len(data.get("city",[]))>0:
                _result = data["city"][0]
                item = {
                    
                    'derive_reg_province':_result['code'],
                    "province_code":data["province"][0]['code'],
                    "province_name":data["province"][0]['name'],
                    "city_code":data["city"][0]['code'],
                    "city_name":data["city"][0]['name'],
                }
            elif len(data.get("province",[]))>0:
                _result = data["province"][0]
                item = {
                    'derive_reg_province':_result['code'],
                    "province_code":data["province"][0]['code'],
                    "province_name":data["province"][0]['name'],
                }
            else:
                item = None
            return item

        url = self.api.format(reg_address)
        try:
            data = self.session.get(url,headers=self.headers).json()
            if not data:
                return None
            item = tran_json(data)
            return item
        except Exception as e:
            return None

        
        

    
    def render_sql(self,base_sql,start,end,condition=''):
        return base_sql.format(start=start,end=end,condition=condition)
    

    def execute_sql(self,base_sql,start,end,log_msg,condition=''):
        sql = self.render_sql(base_sql,start,end,condition)
        self.logger.info('%s:\n\n%s\n',log_msg,sql)
        datas = self.reader.query(sql)['data']
        l = []
        batch = 100
        while datas:
            data = datas.pop()
            reg_address = data['reg_address']
            reg_province = data['reg_province']
            #如果reg_province 是数字 跳过去
            if reg_province and reg_province.isdigit():
                continue
            if reg_province in self.error_codes:
                self.error_codes[reg_province] += 1
            else:
                self.error_codes[reg_province] = 1

            
            json_code = self.get_code(reg_address)
            if json_code is None:
                continue
            self.logger.info("reg_address is %s,reg_province is %s,api-result is:%s",reg_address,reg_province,json_code)
            data.update(json_code)
            del data['id']
            values = [data.get(k,Nonefor k in FIELDS]
            l.append(values)
            if len(l) == batch:
                self.reader.commits(INSERT_ZO,l)
                l.clear()
        if l:
            self.reader.commits(INSERT_ZO,l)
            l.clear()


       
        
    
    def process_data(self):
        total_executes = len(self.start_ends)
        bacth_commit = 1000
        batch_lst = []
        while self.start_ends:
            start,end = self.start_ends.pop()
            #更新
            self.execute_sql(BASE_QUERY,start,end,log_msg="entity_basic_area",condition=self._condition)

        if self.error_codes:
            values = [[k,v]for k,v in self.error_codes.items()]
            self.reader.commits(INSERT_TEMP_REG_PROVINCE,values)
            

                
    @log_timed
    def run(self):
        try:
            self.logger.info('子进程任务%s 开始处理 %d个id',self.task_name,len(self.entids))
            self.process_data()
            self.close()
            self.task_status = 1
        except Exception as e:
            self.logger.error("task_name %s error %s",self.task_name,str(e))
            self.roboter.send_msg("task_name %s error %s" % (self.task_name,str(e)))
        self.queue.put([self.process_id,self.task_status])



class Scheduler(BaseTask):
    def __init__(self,env,task_name,starttime,logger,log_name,**kwargs):
        super(Scheduler,self).__init__(env,task_name,starttime,logger,log_name,**kwargs)
        self.config = kwargs.pop('config')
        self.reader = MysqlClient.from_settings(self.config,logger)
        self.logger.info('reader-config:%s',get_secure_raw_config(self.reader.config))
        self.roboter = Roboter.from_settings(self.config,logger)
        self.startdate = self.config.pop('startdate')
        self.enddate = self.config.pop('enddate')
        self.condition = self.config.pop('condition')
        if self.condition != '':
            self._condition = '{condition}>="{startdate}" and {condition}<="{enddate}"'.format(
                condition=self.condition,
                startdate=self.startdate,
                enddate=self.enddate
            )
        else:
            self._condition = ' 1=1 '
        self.config['_condition'] = self._condition
        self.process_num = self.config.pop('process_num',10)
        if isinstance(self.process_num,str):
            self.process_num = int(self.process_num)
        if self.process_num == 0:
            self.process_num = 10
        

    def start(self):
        #全量时候 condition = ''
        pk_column = 'id'
        db = 'xx'
        table = 'xx2'
        min_sql = get_table_min_max_sql(table,pk_column,db,sql_type="min",condition=self._condition)
        max_sql = get_table_min_max_sql(table,pk_column,db,sql_type="max",condition=self._condition)
        self.logger.info("min and max_sql is:\n\n %s \n\n %s \n",min_sql,max_sql)
        starts = self.reader.query(min_sql)["data"]
        if not starts:
            return
        start = starts[0][pk_column] -1
        end = self.reader.query(max_sql)["data"][0][pk_column]
        self.logger.info("start:%s end %s",start,end)
        batch = 30000
        split_datas = split_list(start,end,batch)
        total_executes = len(split_datas)
        self.logger.info("need execute %d sql",total_executes)
        total,process_data = get_process_datas(split_datas,self.process_num)
        processs = []
        queue = Queue(self.process_num)
        for i in range(self.process_num):
            process_task_name = self.task_name + '_' + str(i)
            process_config = copy.deepcopy(self.config)
            process_logger,process_log_name = LoggerRouter.get_logger_from_task_name(process_task_name,**process_config.get('log'))
            process_start_ends = process_data[i]
            p_obj = ChildProcess(process_task_name,process_config,process_logger,process_log_name,process_start_ends,i,queue)
            p = Process(target=p_obj.run,args=())
            processs.append(p)
        for p in processs:
            p.start()
        for p in processs:
            p.join()
        self.logger.info("子进程处理完毕 主进程进行处理")
        pids = []
        while not queue.empty():
            p_id,status = queue.get(block=True)
            if status == 1:
                pids.append(p_id)
        failed_p_objs = [i for i in range(self.process_num) if i not in pids]
        self.logger.info("成功进程ids:%s",pids)
        self.logger.info("失败进程ids:%s",failed_p_objs)
        self.reader.close()
        if failed_p_objs:
            raise ChildProcessFailedException(",".join(map(str,failed_p_objs)))
            
        



if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="zo_area")
    parser.add_argument('-c''--config_path', default='./config',help='json配置文件路径')
    parser.add_argument('-e''--env', default='handle_history_name',help='任务运行环境 test/dev/prd')
    parser.add_argument('-is_used_env''--is_used_env', default=False,help='是否使用环境变量')
    parser.add_argument('-task_name''--task_name', default='update_zo_area',help='省份字段code更新')
    parser.add_argument('-p''--process_num', default=10,help='处理进程数')
    parser.add_argument('-condition''--condition', default='z_update_time',help='日期条件key,=空的时候 默认跑全量')
    parser.add_argument('-startdate''--startdate', default=get_str_yestoday(format='%Y-%m-%d'),help='起始日期')
    parser.add_argument('-enddate''--enddate', default=get_str_tomorrow(format='%Y-%m-%d'),help='起始日期')
    starttime = get_now()
    cmd_args = parser.parse_args()
    cmd_kwargs = vars(cmd_args)
    py_file = os.path.abspath(__file__)
    cmd = 'python3 ' + py_file + ' '.join(['--' + k + ' '+str(cmd_kwargs[k]) for k in cmd_kwargs.keys()])
    is_used_env = False if str(cmd_kwargs['is_used_env']).lower() == 'false' else True
    env = cmd_args.env
    task_name = cmd_args.task_name
    config_path = cmd_args.config_path
    conf_obj = Config.from_settings(env,config_path)
    #获取配置文件内容
    config = conf_obj.get_config(is_used_env)
    config.update(cmd_kwargs)
    config['is_used_env'] = is_used_env
    config['cmd'] = cmd
    #获取logger对象
    logger,log_name = LoggerRouter.get_logger_from_task_name(task_name,**config.get('log'))
    logger.info('log_name:%s',log_name)
    logger.info('config:\n%s',get_secure_raw_config(config))
    scheduler = Scheduler(env,task_name,starttime,logger,log_name,config=config)
    scheduler.run()


复制


文章转载自codefan,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论