离线数仓ETL流程处理设计与实现
一.项目描述
外部数据经过数据同步程序实时将binlog从上游数据源写到消息队列,经过配置化平台自动同步到下游数据库以后,后续还需要加工,也就是常规说的etl处理
这里主要的更新方式是python连接数据库,采用sql批处理,sql处理不了的写脚本函数处理。
二.离线更新的逻辑
这块做离线任务而不做实时更新,是因为业务表的字段并非来自于同步过来的一张表,可能是多个表的交叉更新,受限于上游表的更新频率。
比如Table A有2个字段(address,website),address来自B,website来自C,B是实时同步,C在上游是一天更新一次。这时候可以实时更新address,但website字段实时更新就没有意义。
三.设想程序需要实现的功能
我觉得这点很重要,首先想好如何可以更方便的工作,上线以后如何可以更好维护,从目标出发,将开发目标逐步化解,最后可能就是将几个核心代码写好就搞定了。
数仓的实际开发中,我们业务表可能是很多很多,但逻辑整体还都是类似的,我设想的跑任务的方式是:
3.1 支持跑单个表的任务(便于调试) 3.2 支持按指定天(默认跑昨天增量,有遗漏某天的数据可以一次补全) 3.3 统一程序入口,新建的表,添加了这个表的任务,不会影响旧表的运行逻辑
我期望代码可以按这样的命令执行:
a.更新dw.example表昨天新增的数据
--startdate --enddate参数不传即可
main.py -env $env -table example
b.更新dw.example表(2020-01-01到2021-08-11)时间段的数据
main.py -env $env -table example --startdate 2020-01-01 --enddate 2021-08-11
当跑不同的表任务的时候 将-table参数换一下即可,因为程序将执行粒度控制到了表级别,在调度平台我可以控制多个表更新时任务的依赖以及任务的并发情况。
四.如何实现
4.1 需要解决的问题
4.1.1 每个表都有不同的sql,main.py(程序入口)如何匹配到各自表的逻辑 4.2.2 新增了表的任务 main.py(程序入口)如何感知到
4.2 如何解决
这两个问题本质上都是查找匹配问题
当执行:
--table A命令的时候 程序需要找到A的处理逻辑
--table B命令的时候 程序需要找到B的处理逻辑
如果有一个全局Map存储了每个table的规则,是不是迎刃而解?想象一下:
GLOBAL_MAPPING = {
"A":["sqla","sqlb"],
"B":["sqla","sqlb"]
}
当传入A的时候找到A对应的待执行sql列表,循环去执行。
当传入B的时候找到B对应的待执行sql列表,循环去执行。
这时候第一个问题解决,第二个却难以满足,因为新增了表的逻辑需要频繁修改GLOBAL_MAPPING。
所以这个变量还需要实现自动扩容。
这里我们引入web开发里常用的url路由和handler的概念,web开发时当客户端访问一个url "/orders",这个路由会匹配对应的handler,每个handler内部是各自的业务处理逻辑。借鉴到数仓更新里,我们更新表任务其实也是每个表是一个路由,每个表也有自己的一个Handler方法
因此我们设计代码目录的时候需要有2个目录,handlers和sqls,同时约定,每个表的更新都必须成对出现在这两个目录下,也就是: handlers/A.py和sqls/A.py
如何自动检测新增了表或者删除了表的更新任务呢?
使用python动态引用包模块的方式,检测这两个目录有多少个py脚本即可,脚本名称也就是表名称
import os
import importlib
handlers_path = os.path.dirname(os.path.abspath(__file__))
def get_all_handler_cls():
table_handler_cls = {}
table_handler_py_files = [i.replace(".py","") for i in os.listdir(handlers_path) if i.endswith(".py") and i not in ["__init__.py"]]
for py_file in table_handler_py_files:
module = "dw_update.handlers.{py_file}".format(py_file=py_file)
table_handler_cls[py_file] = importlib.import_module(module)
return table_handler_cls
#TABLE_HANDLER_CLS 也就是全局handlers变量
TABLE_HANDLER_CLS = get_all_handler_cls()
匹配问题解决了,如何调用并执行各自的handler和sql? 也就是main.py函数入口的处理
class Scheduler(BaseTask):
def __init__(self,env,task_name,starttime,logger,log_name,**kwargs):
super(Scheduler,self).__init__(env,task_name,starttime,logger,log_name,**kwargs)
self.config = kwargs.pop('config')
self.roboter = Roboter.from_settings(self.config,logger)
#不能pop handler 需要根据这个key 取sql对象的class
self.table = self.config["table"]
try:
#获取该table绑定的Handler类
handler_cls = TABLE_HANDLER_CLS[self.table]
except:
raise HandlerNotExistsException("dw_update.handlers not exists %s",self.table)
##实例Handler对象
self.handler_instance = handler_cls.Handler(env,task_name,starttime,logger,log_name,config=self.config)
def start(self):
#具体执行每个表的start方法
self.handler_instance.start()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="dw_update 启动命令行")
parser.add_argument('-c', '--config_path', default='./config',help='json配置文件路径')
parser.add_argument('-e', '--env', default='dev',help='任务运行环境 test/dev/prd')
parser.add_argument('-is_used_env', '--is_used_env', default=True,help='是否使用环境变量')
parser.add_argument('-task_name', '--task_name', default='dw_update',help='dw_update')
parser.add_argument('-table', '--table', default='A',help='A')
parser.add_argument('-startdate', '--startdate', default=get_str_yestoday(format='%Y-%m-%d'),help='>=某天天')
parser.add_argument('-enddate', '--enddate', default=get_str_tomorrow(format='%Y-%m-%d'),help='>某天')
parser.add_argument('-is_used_main', '--is_used_main', default=True,help='日期参数是否使用main.py传递过去的参数')
starttime = get_now()
cmd_args = parser.parse_args()
cmd_kwargs = vars(cmd_args)
py_file = os.path.abspath(__file__)
full_cmd = 'python3 ' + py_file + ' '.join(['--' + k + ' '+str(cmd_kwargs[k]) for k in cmd_kwargs.keys()])
is_used_env = False if str(cmd_kwargs['is_used_env']).lower() == 'false' else True
is_used_main = False if str(cmd_kwargs['used_main']).lower() == 'false' else True
env = cmd_args.env
task_name = cmd_args.task_name + "_" + cmd_args.table
config_path = cmd_args.config_path
conf_obj = Config.from_settings(env,config_path)
#获取配置文件内容
config = conf_obj.get_config(is_used_env)
config.update(cmd_kwargs)
config['is_used_env'] = is_used_env
config['full_cmd'] = full_cmd
config["is_used_main"] = is_used_main
#当前项目路径 main.py所在的目录
project_path = os.path.dirname(os.path.abspath(__file__))
log_config = config.get("log",{})
#服务器全路径执行bin目录下sh的时候 ./log会报找不到错 加一层判断
if log_config:
if log_config["log_path"] == "./log":
config["log"]["log_path"] = join_path(project_path,"log")
#获取logger对象
logger,log_name = LoggerRouter.get_logger_from_task_name(task_name,**config.get('log'))
logger.info('log_name:%s',log_name)
logger.info('config:\n%s',get_secure_raw_config(config))
st = Scheduler(env,task_name,starttime,logger,log_name,config=config)
st.run()
每个表的Handler如何写? 根据表逻辑实现start方法即可
##案例 handlers/A.py
from . import TABLE_SQL_CLS,SqlNotExistsException
#TABLE_SQL_CLS 和动态扩容handlers原理一样 扫描sqls目录下py脚本
class Handler(BaseTask):
def __init__(self,env,task_name,starttime,logger,log_name,**kwargs):
super(Handler,self).__init__(env,task_name,starttime,logger,log_name,**kwargs)
self.config = kwargs.pop("config")
self.logger.info("kwargs:%s",kwargs)
update_table = self.config["table"]
if self.config.get("is_used_main",True):
self.startdate = self.config["startdate"]
self.enddate = self.config["enddate"]
else:
self.startdate = self.yestorday
self.enddate = self.tomorrow
self.sql_params = {
"startdate":self.startdate,
"enddate":self.enddate
}
try:
sql_cls = TABLE_SQL_CLS[update_table]
except:
raise SqlNotExistsException("dw_update.sqls not exists %s",update_table)
self.sql_obj = sql_cls.Sql()
self.reader = MysqlClient.from_settings(self.config,logger,name="dw")
@property
def yestorday(self):
return get_str_yestoday(format='%Y-%m-%d')
@property
def today(self):
return get_str_today(format='%Y-%m-%d')
@property
def tomorrow(self):
return get_str_tomorrow(format='%Y-%m-%d')
def start(self):
self.logger.info("TABLE_SQL_CLS:%s",self.sql_obj)
for item in self.sql_obj.sqls:
comment = item["comment"]
sql = item["sql"]
if "{startdate}" in sql:
sql = sql.format(**self.sql_params)
self.logger.info("execute %s sql:\n %s",comment,sql)
result = self.reader.commit(sql)
self.logger.info("execute sql result: %s",result)
每个表的sql脚本如何写?
#sqls/A.py
UPDATE_ADDRESS = '''\
update A as m
join basedb.A as q on m.src_id=q.id
set m.address = q.address
'''
class Sql(object):
sqls = [
{
"comment":"sql描述"
"sql":UPDATE_ADDRESS
}
]
4.3 代码目录
➜ dw_update git:(test) ✗ tree -I "log|docs|js|css|html|.pyc|__pycache__" -L 3
.
├── README.md
├── bin
│ ├── start_dev.sh 开发环境启动脚本
│ ├── start_prod.sh 生产环境启动脚本
│ └── start_test.sh 测试环境启动脚本
├── config
│ ├── dev.json dev配置
│ ├── prod.json 生产配置 环境变量
│ └── test.json 测试配置环境变量
├── main.py 入口
├── requirements.txt 依赖包
├── test
└── dw_update 主要逻辑
├── __init__.py
├── exceptions.py 异常类
├── handlers 每个表一个Handler
│ ├── __init__.py
│ └── example.py example表的Handler
└── sqls 每个表一个sql文件
├── __init__.py
└── example.py example表的Sql
6 directories, 15 files
五. 代码启动脚本
任务调度使用开源dolphinscheduler(ds)调度平台
其中:
test环境:
bin/start_test.sh table名
===================================
prod环境:
bin/start_prod.sh table名




