创建读取excel中各个sheet的函数:
# -*- coding: utf-8 -*-
# read hecang.xslx and seperate sheets
import pandas as pd
from pandas import ExcelWriter
from pandas import ExcelFile
import csv
import os
from datetime import datetime,timedelta
import sys
reload(sys)
sys.setdefaultencoding( "utf-8" )
tt = datetime.now().timetuple()
month = ''
day = ''
if tt.tm_mon < 10:
month = '0'+ str(tt.tm_mon)
else:
month = str(tt.tm_mon)
if tt.tm_mday < 10:
day = '0' + str(tt.tm_mday)
else:
day = str(tt.tm_mday)
today = str(tt.tm_year)+'_'+month+'_'+day
print("today is:"+today)
#1. read xslx with pandas
# file_directory = '/root/airflow/files/sale_data/'
def split_sale_data(file_directory):
file_name = 'sales_data_' + today + '.xlsx'
# df = pd.read_excel('/Users/wencheng/Documents/VP/2019/销售报表实验/sales_data.xlsx', sheetname='合同明细')
xls = pd.ExcelFile(file_directory + file_name)
contract_detail_df = pd.read_excel(xls, '合同明细')
sale_lead_df = pd.read_excel(xls, '销售线索')
order_used_detail_df = pd.read_excel(xls, '消耗明细')
finance_money_returned_df = pd.read_excel(xls, '财务回款')
crm_order_with_no_dup_customer_df = pd.read_excel(xls, 'CRM订单2(去重客户)')
crm_order_detail_df = pd.read_excel(xls, 'CRM订单明细1')
# products_df = pd.read_excel(xls, '产品名单')
employee_infos_df = pd.read_excel(xls, '名单')
# kpi_month_df = pd.read_excel(xls, '名单')
# middle_data_df = pd.read_excel(xls, '数据中层')
contract_detail_file = file_directory + 'contract_detail.csv'
contract_detail_df.to_csv(contract_detail_file, sep="$", encoding="utf-8")
#
sale_lead_file = file_directory + 'sale_lead.csv'
sale_lead_df.to_csv(sale_lead_file, sep="$", encoding="utf-8")
#
order_used_detail_file = file_directory + 'order_used_detail.csv'
order_used_detail_df.to_csv(order_used_detail_file, sep="$", encoding="utf-8")
#
finance_money_returned_file = file_directory + 'finance_money_returned.csv'
finance_money_returned_df.to_csv(finance_money_returned_file, sep="$", encoding="utf-8")
#
crm_order_with_no_dup_customer_file = file_directory + 'crm_order_with_no_dup_customer.csv'
crm_order_with_no_dup_customer_df.to_csv(crm_order_with_no_dup_customer_file, sep="^", encoding="utf-8")
#
crm_order_detail_file = file_directory + 'crm_order_detail.csv'
crm_order_detail_df.to_csv(crm_order_detail_file, sep="^", encoding="utf-8")
#
# products_file = '/Users/wencheng/Documents/VP/2019/销售报表任务/products.csv'
# products_df.to_csv(products_file, sep="\t", encoding="utf-8")
employee_info_file = file_directory + 'employee_infos.csv'
employee_infos_df.to_csv(employee_info_file, sep="$", encoding="utf-8")
# kpi_month_file = file_directory + 'employee_infos.csv'
# kpi_month_df.to_csv(kpi_month_file, sep="$", encoding="utf-8")
# middle_data_file = file_directory + 'middle_data.csv'
# middle_data_df.to_csv(middle_data_file, sep=",", encoding="utf-8")
复制
通过pgcsv导入到greenplum:
# -*- coding: utf-8 -*-
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.trigger_rule import TriggerRule
from sale_raw_xlsx_split import split_sale_data
import sys
reload(sys)
sys.setdefaultencoding('utf8')
tt = datetime.now().timetuple()
tomorrow = datetime.now()+timedelta(days=1)
tt_tomorrow = tomorrow.timetuple()
print(tt_tomorrow)
today = str(tt.tm_year)+'-'+str(tt.tm_mon)+'-'+str(tt.tm_mday)
cur_date = str(tt_tomorrow.tm_year)+'-'+str(tt_tomorrow.tm_mon)+'-'+str(tt_tomorrow.tm_mday)
print("current date:"+cur_date)
print("today is:"+today)
default_args = {
'owner': 'niuzaiwenli',
'depends_on_past': False,
'start_date': datetime(2019, 4, 15, 9, 50),
'email': ['email1', 'email2'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 0,
# 'retry_delay': timedelta(seconds=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('vp_sale_data_prepare', default_args=default_args, schedule_interval="0 1 * * * ")
CRMSyncTaskSuccess = EmailOperator(
dag=dag,
trigger_rule=TriggerRule.ALL_SUCCESS,
task_id="SalesReportDataPrepared",
to=["email1","email2"],
subject="SalesReportDataPrepared successfully",
html_content='<h3>SalesReportDataPrepared successfully </h3>')
CRMSyncTaskStart = EmailOperator(
dag=dag,
trigger_rule=TriggerRule.ALL_SUCCESS,
task_id="SalesReportDataStartToPrepare",
to=["email1","email2"],
subject="SalesReportDataStartToPrepare at " + str(datetime.now()),
html_content='<h3>SalesReportDataStartToPrepare at ' + str(datetime.now()) +' </h3>')
work_dir = '/home/vphoto/' # 上一步生成文件目录
split_sale_data = PythonOperator(
task_id='split_sale_data',
provide_context=False,
python_callable=split_sale_data,
op_kwargs={'file_directory': '/home/vphoto/'},
dag=dag)
write_contract_detail_2_db = BashOperator(
task_id='pgcsv_contract_detail',
bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_contract '+work_dir+'contract_detail.csv',
dag=dag)
write_lead_2_db = BashOperator(
task_id='pgcsv_lead',
bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_lead '+work_dir+'sale_lead.csv',
dag=dag)
write_order_used_2_db = BashOperator(
task_id='pgcsv_order_used_detail',
bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_order_used '+work_dir+'order_used_detail.csv',
dag=dag)
write_money_returned_2_db = BashOperator(
task_id='pgcsv_money_return',
bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_finance_money_returned '+work_dir+'finance_money_returned.csv',
dag=dag)
write_crm_order_with_no_dup_customer_2_db = BashOperator(
task_id='pgcsv_crm_order_with_no_dup_customer',
bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'^\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_order_with_no_dup_customer '+work_dir+'crm_order_with_no_dup_customer.csv',
dag=dag)
write_crm_order_detail_2_db = BashOperator(
task_id='pgcsv_crm_order_detail',
bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'^\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_order_detail '+work_dir+'crm_order_detail.csv',
dag=dag)
write_employee_infos_2_db = BashOperator(
task_id='pgcsv_employee_infos',
bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_employees '+work_dir+'employee_infos.csv',
dag=dag)
# write_kpi_month_2_db = BashOperator(
# task_id='pgcsv_kpi_month',
# bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_kpi_month '+work_dir+'kpi_month.csv',
# dag=dag)
remove_linux_carriage_return_cus = BashOperator(
task_id='remove_linux_carriage_return_cus',
bash_command='sed -i \'s/\\r//g\' '+work_dir+'crm_order_with_no_dup_customer.csv',
dag=dag)
remove_linux_carriage_return_order = BashOperator(
task_id='remove_linux_carriage_return_order',
bash_command='sed -i \'s/\\r//g\' '+work_dir+'crm_order_detail.csv',
dag=dag)
split_sale_data.set_upstream(CRMSyncTaskStart)
write_contract_detail_2_db.set_upstream(split_sale_data)
write_lead_2_db.set_upstream(split_sale_data)
write_order_used_2_db.set_upstream(split_sale_data)
write_money_returned_2_db.set_upstream(split_sale_data)
remove_linux_carriage_return_cus.set_upstream(split_sale_data)
write_crm_order_with_no_dup_customer_2_db.set_upstream(remove_linux_carriage_return_cus)
remove_linux_carriage_return_order.set_upstream(split_sale_data)
write_crm_order_detail_2_db.set_upstream(remove_linux_carriage_return_order)
write_employee_infos_2_db.set_upstream(split_sale_data)
# write_kpi_month_2_db.set_upstream(split_sale_data)
CRMSyncTaskSuccess.set_upstream(write_contract_detail_2_db)
CRMSyncTaskSuccess.set_upstream(write_lead_2_db)
CRMSyncTaskSuccess.set_upstream(write_order_used_2_db)
CRMSyncTaskSuccess.set_upstream(write_money_returned_2_db)
CRMSyncTaskSuccess.set_upstream(write_crm_order_with_no_dup_customer_2_db)
CRMSyncTaskSuccess.set_upstream(write_crm_order_detail_2_db)
CRMSyncTaskSuccess.set_upstream(write_employee_infos_2_db)
# CRMSyncTaskSuccess.set_upstream(write_kpi_month_2_db)
复制
最终结果表计算和统计:
# -*- coding: utf-8 -*-
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.trigger_rule import TriggerRule
from sale_raw_xlsx_split import split_sale_data
import sys
reload(sys)
sys.setdefaultencoding('utf8')
tt = datetime.now().timetuple()
tomorrow = datetime.now()+timedelta(days=1)
tt_tomorrow = tomorrow.timetuple()
print(tt_tomorrow)
today = str(tt.tm_year)+'-'+str(tt.tm_mon)+'-'+str(tt.tm_mday)
cur_date = str(tt_tomorrow.tm_year)+'-'+str(tt_tomorrow.tm_mon)+'-'+str(tt_tomorrow.tm_mday)
print("current date:"+cur_date)
print("today is:"+today)
default_args = {
'owner': 'niuzaiwenli',
'depends_on_past': False,
'start_date': datetime(2019, 4, 15, 9, 50),
'email': ['email1', 'email2'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 0,
# 'retry_delay': timedelta(seconds=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('vp_build_sale_middle_data', default_args=default_args, schedule_interval="0 2 * * * ")
MiddleDataTaskSuccess = EmailOperator(
dag=dag,
trigger_rule=TriggerRule.ALL_SUCCESS,
task_id="sale_middle_data_success",
to=["email1","email2"],
subject="sale_middle_data_success successfully",
html_content='<h3>sale_middle_data job successfully </h3>')
MiddleDataTaskStart = EmailOperator(
dag=dag,
trigger_rule=TriggerRule.ALL_SUCCESS,
task_id="sale_middle_data_start",
to=["email1","email2"],
subject="sale_middle_data_start at " + str(datetime.now()),
html_content='<h3>sale_middle_data_start at ' + str(datetime.now()) +' </h3>')
#数据中层-订单
sale_middle_data_order_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_order"
sale_middle_data_order_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,sod.\"[订单]生效日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",SUM (CAST (sod.\"[订单]总金额\" AS FLOAT)) 订单金额,SUM (CAST (sod.\"[订单]机位总数(摄影师人数)\" AS NUMERIC)) 机位数,COUNT (sod.COLUMN) 订单数 into tmp.t_tmp_sale_middle_data_order FROM tmp.t_tmp_sale_order_detail sod INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=sod.\"[订单]生效日期\" LEFT JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=sod.\"[订单]订单所有人\" GROUP BY 1,2,3,4,5,6,7,8 ORDER BY 1,2,3,4,5,6,7"
sale_middle_data_order_clean = PostgresOperator(
task_id='sale_middle_data_order_tmp_clean',
sql= sale_middle_data_order_clean_sql,
postgres_conn_id='GP-DW',
dag=dag)
sale_middle_data_order = PostgresOperator(
task_id='sale_middle_data_order_tmp',
sql= sale_middle_data_order_sql,
postgres_conn_id='GP-DW',
dag=dag)
#数据中层-新签
sale_middle_data_new_sign_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_new_sign"
sale_middle_data_new_sign_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,cus.\"[订单]生效日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",SUM (CASE WHEN cus.\"有无消耗历史\"='无' THEN CAST (cus.\"[订单]总金额\" AS FLOAT) ELSE 0 END) 新签订单金额,SUM (CASE WHEN cus.\"有无消耗历史\"='无' THEN 1 ELSE 0 END) 新签客户数,SUM (CASE WHEN cus.\"有无消耗历史\"='无' THEN 1 ELSE 0 END) 新签订单数 into tmp.t_tmp_sale_middle_data_new_sign FROM tmp.t_tmp_sale_order_with_no_dup_customer cus INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=cus.\"[订单]生效日期\" LEFT JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=cus.\"[订单]订单所有人\" GROUP BY 1,2,3,4,5,6,7,8 ORDER BY 1,2,3,4,5,6,7"
sale_middle_data_new_sign_clean = PostgresOperator(
task_id='sale_middle_data_new_sign_tmp_clean',
sql= sale_middle_data_new_sign_clean_sql,
postgres_conn_id='GP-DW',
dag=dag)
sale_middle_data_new_sign = PostgresOperator(
task_id='sale_middle_data_new_sign_tmp',
sql= sale_middle_data_new_sign_sql,
postgres_conn_id='GP-DW',
dag=dag)
#数据中层-产品订单金额,订单数
sale_middle_data_product_order_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_product_order"
sale_middle_data_product_order_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,sod.\"[订单]生效日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",CASE WHEN sod.\"[产品]拍摄类型\"='云摄影' THEN '云摄影订单额' ELSE '云视频订单额' END \"产品\",SUM (CAST (sod.\"[订单]总金额\" AS FLOAT)) 订单金额,COUNT (sod.COLUMN) 订单数 INTO tmp.t_tmp_sale_middle_data_product_order FROM tmp.t_tmp_sale_order_detail sod INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=sod.\"[订单]生效日期\" LEFT JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=sod.\"[订单]订单所有人\" GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"
sale_middle_data_product_order_clean = PostgresOperator(
task_id='sale_middle_data_product_order_tmp_clean',
sql= sale_middle_data_product_order_clean_sql,
postgres_conn_id='GP-DW',
dag=dag)
sale_middle_data_product_order = PostgresOperator(
task_id='sale_middle_data_product_order_tmp',
sql= sale_middle_data_product_order_sql,
postgres_conn_id='GP-DW',
dag=dag)
#数据中层-产品客户数
sale_middle_data_product_client_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_product_client"
sale_middle_data_product_client_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,cus.\"[订单]生效日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",CASE WHEN cus.\"[产品]拍摄类型\"='云摄影' THEN '云摄影订单额' ELSE '云视频订单额' END \"产品\",COUNT (1) 客户数 INTO tmp.t_tmp_sale_middle_data_product_client FROM tmp.t_tmp_sale_order_with_no_dup_customer cus INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=cus.\"[订单]生效日期\" LEFT JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=cus.\"[订单]订单所有人\" GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"
sale_middle_data_product_client_clean = PostgresOperator(
task_id='sale_middle_data_product_client_tmp_clean',
sql= sale_middle_data_product_client_clean_sql,
postgres_conn_id='GP-DW',
dag=dag)
sale_middle_data_product_client = PostgresOperator(
task_id='sale_middle_data_product_client_tmp',
sql= sale_middle_data_product_client_sql,
postgres_conn_id='GP-DW',
dag=dag)
#数据中层-合同 客户数 个数 金额
sale_middle_data_contract_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_contract"
sale_middle_data_contract_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,con.\"创建日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",COUNT (DISTINCT con.\"客户名称\") 合同客户数,COUNT (con.\"客户名称\") 合同个数,SUM (CAST (con.\"总金额\" AS FLOAT)) 合同金额 INTO tmp.t_tmp_sale_middle_data_contract FROM tmp.t_tmp_sale_contract con INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=con.\"创建日期\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=con.\"合同所有人\" GROUP BY 1,2,3,4,5,6,7,8 ORDER BY 1,2,3,4,5,6,7,8"
sale_middle_data_contract_clean = PostgresOperator(
task_id='sale_middle_data_contract_tmp_clean',
sql= sale_middle_data_contract_clean_sql,
postgres_conn_id='GP-DW',
dag=dag)
sale_middle_data_contract = PostgresOperator(
task_id='sale_middle_data_contract_tmp',
sql = sale_middle_data_contract_sql,
postgres_conn_id='GP-DW',
dag=dag)
#数据中层-回款 金额 单次 回款月结 年框
sale_middle_data_returned_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_returned"
sale_middle_data_returned_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,mr.\"日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",SUM (CAST (mr.\"提成计算金额\" AS FLOAT)) 总来款金额,SUM (CASE WHEN mr.\"回款属性\"='单次/年会' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款单次,SUM (CASE WHEN mr.\"回款属性\"='月结' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款月结,SUM (CASE WHEN mr.\"回款属性\"='新年框' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款新年框 INTO tmp.t_tmp_sale_middle_data_returned FROM tmp.t_tmp_sale_finance_money_returned mr INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=mr.\"日期\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=mr.\"销售\" and to_char(mr.\"日期\"::date,'yyyy-mm')=to_char(emp.\"司龄计算日期\"::date,'yyyy-mm') where emp.\"职位\"='销售' GROUP BY 1,2,3,4,5,6,7,8 ORDER BY 1,2,3,4,5,6,7,8"
sale_middle_data_returned_clean = PostgresOperator(
task_id='sale_middle_data_returned_tmp_clean',
sql= sale_middle_data_returned_clean_sql,
postgres_conn_id='GP-DW',
dag=dag)
sale_middle_data_returned = PostgresOperator(
task_id='sale_middle_data_returned_tmp',
sql = sale_middle_data_returned_sql,
postgres_conn_id='GP-DW',
dag=dag)
#数据中层-回款新老客户
sale_middle_data_returned_newclient_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_returned_newclient"
sale_middle_data_returned_newclient_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,mr.\"日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",mr.\"新老\",SUM (CAST (mr.\"提成计算金额\" AS FLOAT)) 总来款金额,SUM (CASE WHEN mr.\"回款属性\"='单次/年会' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款单次,SUM (CASE WHEN mr.\"回款属性\"='月结' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款月结,SUM (CASE WHEN mr.\"回款属性\"='新年框' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款新年框 INTO tmp.t_tmp_sale_middle_data_returned_newclient FROM tmp.t_tmp_sale_finance_money_returned mr INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=mr.\"日期\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=mr.\"销售\" GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"
sale_middle_data_returned_newclient_clean = PostgresOperator(
task_id='sale_middle_data_returned_newclient_tmp_clean',
sql= sale_middle_data_returned_newclient_clean_sql,
postgres_conn_id='GP-DW',
dag=dag)
sale_middle_data_returned_newclient = PostgresOperator(
task_id='sale_middle_data_returned_newclient_tmp',
sql = sale_middle_data_returned_newclient_sql,
postgres_conn_id='GP-DW',
dag=dag)
#数据中层-消耗 无条件 按产品类型
sale_middle_data_used_no_cond_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_used_no_cond"
sale_middle_data_used_no_cond_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,ou.\"结束拍摄日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",ou.\"套餐类型判断\",SUM (CAST (ou.\"订单总金额拆分\" AS FLOAT)) 消耗金额 INTO tmp.t_tmp_sale_middle_data_used_no_cond FROM tmp.t_tmp_sale_order_used ou INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=ou.\"结束拍摄日期\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=ou.\"销售经理\" GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"
sale_middle_data_used_no_cond_clean = PostgresOperator(
task_id='sale_middle_data_used_no_cond_tmp_clean',
sql= sale_middle_data_used_no_cond_clean_sql,
postgres_conn_id='GP-DW',
dag=dag)
sale_middle_data_used_no_cond = PostgresOperator(
task_id='sale_middle_data_used_no_cond_tmp',
sql = sale_middle_data_used_no_cond_sql,
postgres_conn_id='GP-DW',
dag=dag)
#数据中层-消耗 有条件 ou."订单号重复行数" = '1'
sale_middle_data_used_cond_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_used_cond"
sale_middle_data_used_cond_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,ou.\"结束拍摄日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",ou.\"套餐类型判断\",COUNT (ou.\"订单号\") 订单数,SUM (CAST (COALESCE (ou.\"摄影师数(订单表)\",'0') AS INTEGER)) 消耗机位数,COUNT (DISTINCT ou.\"公司全称(公司表中的公司名称)\") 客户数,COUNT (ou.\"拍摄主题\") 消耗活动数 INTO tmp.t_tmp_sale_middle_data_used_cond FROM tmp.t_tmp_sale_order_used ou INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=ou.\"结束拍摄日期\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=ou.\"销售经理\" WHERE ou.\"订单号重复行数\"='1' GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"
sale_middle_data_used_cond_clean = PostgresOperator(
task_id='sale_middle_data_used_cond_tmp_clean',
sql= sale_middle_data_used_cond_clean_sql,
postgres_conn_id='GP-DW',
dag=dag)
sale_middle_data_used_cond = PostgresOperator(
task_id='sale_middle_data_used_cond_tmp',
sql = sale_middle_data_used_cond_sql,
postgres_conn_id='GP-DW',
dag=dag)
#数据中层-线索
sale_middle_data_lead_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_lead"
sale_middle_data_lead_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,sl.\"分配时间\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",sl.\"线索评级\",COUNT (sl.\"数据唯一编号\") 线索数 INTO tmp.t_tmp_sale_middle_data_lead FROM tmp.t_tmp_sale_lead sl INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=sl.\"分配时间\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=sl.\"销售所有人\" GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"
sale_middle_data_lead_clean = PostgresOperator(
task_id='sale_middle_data_lead_tmp_clean',
sql= sale_middle_data_lead_clean_sql,
postgres_conn_id='GP-DW',
dag=dag)
sale_middle_data_lead = PostgresOperator(
task_id='sale_middle_data_lead_tmp',
sql = sale_middle_data_lead_sql,
postgres_conn_id='GP-DW',
dag=dag)
#订单
sale_middle_data_order_clean.set_upstream(MiddleDataTaskStart)
sale_middle_data_order.set_upstream(sale_middle_data_order_clean)
#新签
sale_middle_data_new_sign_clean.set_upstream(MiddleDataTaskStart)
sale_middle_data_new_sign.set_upstream(sale_middle_data_new_sign_clean)
#产品订单金额,订单数
sale_middle_data_product_order_clean.set_upstream(MiddleDataTaskStart)
sale_middle_data_product_order.set_upstream(sale_middle_data_product_order_clean)
#产品客户数
sale_middle_data_product_client_clean.set_upstream(MiddleDataTaskStart)
sale_middle_data_product_client.set_upstream(sale_middle_data_product_client_clean)
#合同 客户数 个数 金额
sale_middle_data_contract_clean.set_upstream(MiddleDataTaskStart)
sale_middle_data_contract.set_upstream(sale_middle_data_contract_clean)
#回款
sale_middle_data_returned_clean.set_upstream(MiddleDataTaskStart)
sale_middle_data_returned.set_upstream(sale_middle_data_returned_clean)
#消耗无条件
sale_middle_data_used_no_cond_clean.set_upstream(MiddleDataTaskStart)
sale_middle_data_used_no_cond.set_upstream(sale_middle_data_used_no_cond_clean)
#消耗有条件
sale_middle_data_used_cond_clean.set_upstream(MiddleDataTaskStart)
sale_middle_data_used_cond.set_upstream(sale_middle_data_used_cond_clean)
#线索
sale_middle_data_lead_clean.set_upstream(MiddleDataTaskStart)
sale_middle_data_lead.set_upstream(sale_middle_data_lead_clean)
#回款新客户
sale_middle_data_returned_newclient_clean.set_upstream(MiddleDataTaskStart)
sale_middle_data_returned_newclient.set_upstream(sale_middle_data_returned_newclient_clean)
#成功依赖
MiddleDataTaskSuccess.set_upstream(sale_middle_data_order)
MiddleDataTaskSuccess.set_upstream(sale_middle_data_new_sign)
MiddleDataTaskSuccess.set_upstream(sale_middle_data_product_order)
MiddleDataTaskSuccess.set_upstream(sale_middle_data_product_client)
MiddleDataTaskSuccess.set_upstream(sale_middle_data_contract)
MiddleDataTaskSuccess.set_upstream(sale_middle_data_returned)
MiddleDataTaskSuccess.set_upstream(sale_middle_data_used_no_cond)
MiddleDataTaskSuccess.set_upstream(sale_middle_data_used_cond)
MiddleDataTaskSuccess.set_upstream(sale_middle_data_lead)
MiddleDataTaskSuccess.set_upstream(sale_middle_data_returned_newclient)
复制
代码通过git提交,进行自动获取。
文章转载自牛仔纹理,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
OceanBase 接入 MCP 架构:贯通数据孤岛,释放 AI 创新潜能
OceanBase数据库
399次阅读
2025-03-28 15:32:52
AI关键场景得到全面支持!OceanBase入选Forrester报告三大领域代表厂商
OceanBase数据库
231次阅读
2025-04-19 22:27:54
Apache Doris 2025 Roadmap:构建 GenAI 时代实时高效统一的数据底座
SelectDB
178次阅读
2025-04-03 17:41:08
定档!2025 OceanBase开发者大会,5月17日广州见!
OceanBase数据库
129次阅读
2025-04-09 16:48:47
瓜分 10 万奖金!OceanBase 首届 AI 黑客松等你来战
OceanBase数据库
107次阅读
2025-04-10 18:19:58
OceanBase首届生态伙伴大会圆满收官,开启生态建设2.0新征程
OceanBase数据库
84次阅读
2025-03-28 15:59:27
阿里云 AI 搜索开放平台新功能发布:大模型联网能力上线
阿里云大数据AI技术
39次阅读
2025-04-16 09:59:20
走进蚂蚁:推动应用创新,共话 AI 新未来
OceanBase数据库
30次阅读
2025-04-04 18:03:40
OceanBase即将亮相中国电子信息年会,共探Data+AI融合新边界
OceanBase数据库
29次阅读
2025-04-11 21:14:31
深度解析 OceanBase 向量能力:原理剖析与实战应用,助力打造 AI 时代数据核心引擎
OceanBase数据库
25次阅读
2025-04-08 16:47:49