mysql 单表7000万数据如何多进程查询更新
一张7000多万的表,省份字段脏数据较多且没有加索引,执行 where reg_province is null sql时候必然全表扫描 导致查询异常缓慢
实现思想
根据主键切分,命中主键, where 条件里使用 主键>=a and 主键<b and ...
支持跑全量跑增量 支持多进程起多个任务操作
代码分析
Scheduler类 1.1 主要读取配置文件 1.2 根据命令行参数 判断任务类别 1.3 获取最小最大id,根据每次查询的batch去划分列表,作为master给不同的进程分配不同的任务执行区间 start-end 比如0进程读取0-10000,1进程读取10000-20000... 1.4 创建子进程 将父进程的配置传递到子进程,堵塞执行主进程 1.5 主进程获取每个子进程的任务结果,失败进程自动抛出异常 查看日志 ChildProcess 类 2.1 根据父进程传过来的任务列表 循环执行具体的逻辑 调接口获取返回的数据 入库 2.2 将任务结果返回给父进程
具体实现:
import os
import argparse
import copy
import time
import requests
from multiprocessing import Process,Queue
from zeronetools.config import Config
from zeronetools.log import LoggerRouter
from zeronetools.util import (
get_str_yestoday,
get_str_tomorrow,
get_secure_raw_config,
get_now,
get_table_min_max_sql,
split_list,
get_process_datas
)
from zeronetools.backends import RedisClient,MysqlClient
from zeronetools.roboter import Roboter
from zeronetools.task import BaseTask
from zeronetools.decorators import log_timed
from zeronetools.transfer import REDIS_NOID_HASH,REDIS_ENTITY_NAME_HASH,REDIS_FROMAPP_HASH
# and (reg_province is null or reg_province like '%A%' or reg_province like '%z%')
BASE_QUERY = """\
select id,entity_id,entity_name,reg_address,reg_province
from base_entity_basic_info
where id>={start} and id<{end} and {condition} and reg_address is not null
"""
FIELDS = ["entity_id","entity_name","reg_address","reg_province","derive_reg_province","province_code","province_name","city_name","city_code","area_name","area_code"]
BASE_INSERT_ZO = """\
insert into zo_base_entity_area({str_fields})values({fmt_fields})
on duplicate key update {update_duplicate}
"""
INSERT_ZO = BASE_INSERT_ZO.format(
str_fields=",".join(FIELDS),
fmt_fields = ','.join(['%s' for i in FIELDS]),
update_duplicate = ','.join(['{k}=values({k})'.format(k=i) for i in FIELDS])
)
INSERT_TEMP_REG_PROVINCE = """\
insert into temp_reg_province(reg_province,reg_province_num)values(%s,%s)
"""
class ChildProcessFailedException(Exception):
pass
class ChildProcess(object):
def __init__(self,task_name,config,logger,log_name,start_ends,process_id,queue):
self.starttime = get_now()
self.task_name = task_name
self.config = config
self.logger = logger
self.log_name = log_name
self.start_ends = start_ends
self.env = self.config['env']
self.reader = MysqlClient.from_settings(self.config,self.logger)
self.roboter = Roboter.from_settings(self.config,self.logger)
self.process_id = process_id
self.queue = queue
self.task_status = 0
self._condition = self.config.pop('_condition')
self.session = requests.Session()
self.api = self.config["area_api"]
self.error_codes = {}
self.headers = {
'Content-Type': 'text/plain;charset=utf-8',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9'
}
def close(self):
self.reader.close()
def get_code(self,reg_address):
"""
{
'province': [{'code': '530000', 'name': '云南省'}],
'city': [{'code': '530100', 'name': '昆明市', 'provinceCode': '530000'}],
'area': [{'code': '530103', 'name': '盘龙区', 'cityCode': '530100', 'provinceCode': '530000'}],
'detail': ['沣源路与北京路交叉口东南侧俊发名城N-15地块A幢1-2号商铺']}
"""
def tran_json(data):
#"province_code","province_name","city_name","city_code"
if len(data.get("area",[]))>0:
_result = data["area"][0]
item = {
'derive_reg_province':_result['code'],
"province_code":data["province"][0]['code'],
"province_name":data["province"][0]['name'],
"city_code":data["city"][0]['code'],
"city_name":data["city"][0]['name'],
"area_name":_result['name'],
"area_code":_result['code'],
}
elif len(data.get("city",[]))>0:
_result = data["city"][0]
item = {
'derive_reg_province':_result['code'],
"province_code":data["province"][0]['code'],
"province_name":data["province"][0]['name'],
"city_code":data["city"][0]['code'],
"city_name":data["city"][0]['name'],
}
elif len(data.get("province",[]))>0:
_result = data["province"][0]
item = {
'derive_reg_province':_result['code'],
"province_code":data["province"][0]['code'],
"province_name":data["province"][0]['name'],
}
else:
item = None
return item
url = self.api.format(reg_address)
try:
data = self.session.get(url,headers=self.headers).json()
if not data:
return None
item = tran_json(data)
return item
except Exception as e:
return None
def render_sql(self,base_sql,start,end,condition=''):
return base_sql.format(start=start,end=end,condition=condition)
def execute_sql(self,base_sql,start,end,log_msg,condition=''):
sql = self.render_sql(base_sql,start,end,condition)
self.logger.info('%s:\n\n%s\n',log_msg,sql)
datas = self.reader.query(sql)['data']
l = []
batch = 100
while datas:
data = datas.pop()
reg_address = data['reg_address']
reg_province = data['reg_province']
#如果reg_province 是数字 跳过去
if reg_province and reg_province.isdigit():
continue
if reg_province in self.error_codes:
self.error_codes[reg_province] += 1
else:
self.error_codes[reg_province] = 1
json_code = self.get_code(reg_address)
if json_code is None:
continue
self.logger.info("reg_address is %s,reg_province is %s,api-result is:%s",reg_address,reg_province,json_code)
data.update(json_code)
del data['id']
values = [data.get(k,None) for k in FIELDS]
l.append(values)
if len(l) == batch:
self.reader.commits(INSERT_ZO,l)
l.clear()
if l:
self.reader.commits(INSERT_ZO,l)
l.clear()
def process_data(self):
total_executes = len(self.start_ends)
bacth_commit = 1000
batch_lst = []
while self.start_ends:
start,end = self.start_ends.pop()
#更新
self.execute_sql(BASE_QUERY,start,end,log_msg="entity_basic_area",condition=self._condition)
if self.error_codes:
values = [[k,v]for k,v in self.error_codes.items()]
self.reader.commits(INSERT_TEMP_REG_PROVINCE,values)
@log_timed
def run(self):
try:
self.logger.info('子进程任务%s 开始处理 %d个id',self.task_name,len(self.entids))
self.process_data()
self.close()
self.task_status = 1
except Exception as e:
self.logger.error("task_name %s error %s",self.task_name,str(e))
self.roboter.send_msg("task_name %s error %s" % (self.task_name,str(e)))
self.queue.put([self.process_id,self.task_status])
class Scheduler(BaseTask):
def __init__(self,env,task_name,starttime,logger,log_name,**kwargs):
super(Scheduler,self).__init__(env,task_name,starttime,logger,log_name,**kwargs)
self.config = kwargs.pop('config')
self.reader = MysqlClient.from_settings(self.config,logger)
self.logger.info('reader-config:%s',get_secure_raw_config(self.reader.config))
self.roboter = Roboter.from_settings(self.config,logger)
self.startdate = self.config.pop('startdate')
self.enddate = self.config.pop('enddate')
self.condition = self.config.pop('condition')
if self.condition != '':
self._condition = '{condition}>="{startdate}" and {condition}<="{enddate}"'.format(
condition=self.condition,
startdate=self.startdate,
enddate=self.enddate
)
else:
self._condition = ' 1=1 '
self.config['_condition'] = self._condition
self.process_num = self.config.pop('process_num',10)
if isinstance(self.process_num,str):
self.process_num = int(self.process_num)
if self.process_num == 0:
self.process_num = 10
def start(self):
#全量时候 condition = ''
pk_column = 'id'
db = 'xx'
table = 'xx2'
min_sql = get_table_min_max_sql(table,pk_column,db,sql_type="min",condition=self._condition)
max_sql = get_table_min_max_sql(table,pk_column,db,sql_type="max",condition=self._condition)
self.logger.info("min and max_sql is:\n\n %s \n\n %s \n",min_sql,max_sql)
starts = self.reader.query(min_sql)["data"]
if not starts:
return
start = starts[0][pk_column] -1
end = self.reader.query(max_sql)["data"][0][pk_column]
self.logger.info("start:%s end %s",start,end)
batch = 30000
split_datas = split_list(start,end,batch)
total_executes = len(split_datas)
self.logger.info("need execute %d sql",total_executes)
total,process_data = get_process_datas(split_datas,self.process_num)
processs = []
queue = Queue(self.process_num)
for i in range(self.process_num):
process_task_name = self.task_name + '_' + str(i)
process_config = copy.deepcopy(self.config)
process_logger,process_log_name = LoggerRouter.get_logger_from_task_name(process_task_name,**process_config.get('log'))
process_start_ends = process_data[i]
p_obj = ChildProcess(process_task_name,process_config,process_logger,process_log_name,process_start_ends,i,queue)
p = Process(target=p_obj.run,args=())
processs.append(p)
for p in processs:
p.start()
for p in processs:
p.join()
self.logger.info("子进程处理完毕 主进程进行处理")
pids = []
while not queue.empty():
p_id,status = queue.get(block=True)
if status == 1:
pids.append(p_id)
failed_p_objs = [i for i in range(self.process_num) if i not in pids]
self.logger.info("成功进程ids:%s",pids)
self.logger.info("失败进程ids:%s",failed_p_objs)
self.reader.close()
if failed_p_objs:
raise ChildProcessFailedException(",".join(map(str,failed_p_objs)))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="zo_area")
parser.add_argument('-c', '--config_path', default='./config',help='json配置文件路径')
parser.add_argument('-e', '--env', default='handle_history_name',help='任务运行环境 test/dev/prd')
parser.add_argument('-is_used_env', '--is_used_env', default=False,help='是否使用环境变量')
parser.add_argument('-task_name', '--task_name', default='update_zo_area',help='省份字段code更新')
parser.add_argument('-p', '--process_num', default=10,help='处理进程数')
parser.add_argument('-condition', '--condition', default='z_update_time',help='日期条件key,=空的时候 默认跑全量')
parser.add_argument('-startdate', '--startdate', default=get_str_yestoday(format='%Y-%m-%d'),help='起始日期')
parser.add_argument('-enddate', '--enddate', default=get_str_tomorrow(format='%Y-%m-%d'),help='起始日期')
starttime = get_now()
cmd_args = parser.parse_args()
cmd_kwargs = vars(cmd_args)
py_file = os.path.abspath(__file__)
cmd = 'python3 ' + py_file + ' '.join(['--' + k + ' '+str(cmd_kwargs[k]) for k in cmd_kwargs.keys()])
is_used_env = False if str(cmd_kwargs['is_used_env']).lower() == 'false' else True
env = cmd_args.env
task_name = cmd_args.task_name
config_path = cmd_args.config_path
conf_obj = Config.from_settings(env,config_path)
#获取配置文件内容
config = conf_obj.get_config(is_used_env)
config.update(cmd_kwargs)
config['is_used_env'] = is_used_env
config['cmd'] = cmd
#获取logger对象
logger,log_name = LoggerRouter.get_logger_from_task_name(task_name,**config.get('log'))
logger.info('log_name:%s',log_name)
logger.info('config:\n%s',get_secure_raw_config(config))
scheduler = Scheduler(env,task_name,starttime,logger,log_name,config=config)
scheduler.run()复制
文章转载自codefan,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
墨天轮个人数说知识点合集
JiekeXu
428次阅读
2025-04-01 15:56:03
MySQL数据库当前和历史事务分析
听见风的声音
420次阅读
2025-04-01 08:47:17
MySQL 生产实践-Update 二级索引导致的性能问题排查
chengang
380次阅读
2025-03-28 16:28:31
MySQL 9.3 正式 GA,我却大失所望,新特性亮点与隐忧并存?
JiekeXu
344次阅读
2025-04-15 23:49:58
【活动】分享你的压箱底干货文档,三篇解锁进阶奖励!
墨天轮编辑部
344次阅读
2025-04-17 17:02:24
3月“墨力原创作者计划”获奖名单公布
墨天轮编辑部
326次阅读
2025-04-15 14:48:05
云和恩墨杨明翰:安全生产系列之MySQL高危操作
墨天轮编辑部
305次阅读
2025-03-27 16:45:26
openHalo问世,全球首款基于PostgreSQL兼容MySQL协议的国产开源数据库
严少安
289次阅读
2025-04-07 12:14:29
记录MySQL数据库的一些奇怪的迁移需求!
陈举超
191次阅读
2025-04-15 15:27:53
[MYSQL] 服务器出现大量的TIME_WAIT, 每天凌晨就清零了
大大刺猬
179次阅读
2025-04-01 16:20:44