暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Airflow调度 excel写入Greenplum

牛仔纹理 2021-04-28
1055

创建读取excel中各个sheet的函数:

    # -*- coding: utf-8 -*-


    # read hecang.xslx and seperate sheets
    import pandas as pd
    from pandas import ExcelWriter
    from pandas import ExcelFile
    import csv
    import os
    from datetime import datetime,timedelta


    import sys
    reload(sys)
    sys.setdefaultencoding( "utf-8" )


    tt = datetime.now().timetuple()




    month = ''
    day = ''


    if tt.tm_mon < 10:
    month = '0'+ str(tt.tm_mon)
    else:
    month = str(tt.tm_mon)


    if tt.tm_mday < 10:
    day = '0' + str(tt.tm_mday)
    else:
    day = str(tt.tm_mday)


    today = str(tt.tm_year)+'_'+month+'_'+day
    print("today is:"+today)




    #1. read xslx with pandas


    # file_directory = '/root/airflow/files/sale_data/'






    def split_sale_data(file_directory):
    file_name = 'sales_data_' + today + '.xlsx'
        # df = pd.read_excel('/Users/wencheng/Documents/VP/2019/销售报表实验/sales_data.xlsx', sheetname='合同明细')
    xls = pd.ExcelFile(file_directory + file_name)
    contract_detail_df = pd.read_excel(xls, '合同明细')
    sale_lead_df = pd.read_excel(xls, '销售线索')
    order_used_detail_df = pd.read_excel(xls, '消耗明细')
    finance_money_returned_df = pd.read_excel(xls, '财务回款')
    crm_order_with_no_dup_customer_df = pd.read_excel(xls, 'CRM订单2(去重客户)')
    crm_order_detail_df = pd.read_excel(xls, 'CRM订单明细1')
    # products_df = pd.read_excel(xls, '产品名单')
    employee_infos_df = pd.read_excel(xls, '名单')
    # kpi_month_df = pd.read_excel(xls, '名单')
    # middle_data_df = pd.read_excel(xls, '数据中层')


    contract_detail_file = file_directory + 'contract_detail.csv'
    contract_detail_df.to_csv(contract_detail_file, sep="$", encoding="utf-8")
    #
    sale_lead_file = file_directory + 'sale_lead.csv'
    sale_lead_df.to_csv(sale_lead_file, sep="$", encoding="utf-8")
    #
    order_used_detail_file = file_directory + 'order_used_detail.csv'
    order_used_detail_df.to_csv(order_used_detail_file, sep="$", encoding="utf-8")
    #
    finance_money_returned_file = file_directory + 'finance_money_returned.csv'
    finance_money_returned_df.to_csv(finance_money_returned_file, sep="$", encoding="utf-8")
    #
    crm_order_with_no_dup_customer_file = file_directory + 'crm_order_with_no_dup_customer.csv'
    crm_order_with_no_dup_customer_df.to_csv(crm_order_with_no_dup_customer_file, sep="^", encoding="utf-8")
    #
    crm_order_detail_file = file_directory + 'crm_order_detail.csv'
    crm_order_detail_df.to_csv(crm_order_detail_file, sep="^", encoding="utf-8")
    #
        # products_file = '/Users/wencheng/Documents/VP/2019/销售报表任务/products.csv'
    # products_df.to_csv(products_file, sep="\t", encoding="utf-8")


    employee_info_file = file_directory + 'employee_infos.csv'
    employee_infos_df.to_csv(employee_info_file, sep="$", encoding="utf-8")


    # kpi_month_file = file_directory + 'employee_infos.csv'
    # kpi_month_df.to_csv(kpi_month_file, sep="$", encoding="utf-8")


    # middle_data_file = file_directory + 'middle_data.csv'
        # middle_data_df.to_csv(middle_data_file, sep=",", encoding="utf-8")
    复制

    通过pgcsv导入到greenplum:

      # -*- coding: utf-8 -*-


      from datetime import datetime,timedelta


      from airflow import DAG
      from airflow.operators.bash_operator import BashOperator
      from airflow.operators.python_operator import PythonOperator
      from airflow.operators.email_operator import EmailOperator
      from airflow.operators.postgres_operator import PostgresOperator
      from airflow.utils.trigger_rule import TriggerRule


      from sale_raw_xlsx_split import split_sale_data
      import sys
      reload(sys)
      sys.setdefaultencoding('utf8')


      tt = datetime.now().timetuple()
      tomorrow = datetime.now()+timedelta(days=1)
      tt_tomorrow = tomorrow.timetuple()
      print(tt_tomorrow)
      today = str(tt.tm_year)+'-'+str(tt.tm_mon)+'-'+str(tt.tm_mday)
      cur_date = str(tt_tomorrow.tm_year)+'-'+str(tt_tomorrow.tm_mon)+'-'+str(tt_tomorrow.tm_mday)


      print("current date:"+cur_date)
      print("today is:"+today)




      default_args = {
      'owner': 'niuzaiwenli',
      'depends_on_past': False,
      'start_date': datetime(2019, 4, 15, 9, 50),
      'email': ['email1', 'email2'],
      'email_on_failure': True,
      'email_on_retry': True,
      'retries': 0,
      # 'retry_delay': timedelta(seconds=5),
      # 'queue': 'bash_queue',
      # 'pool': 'backfill',
      # 'priority_weight': 10,
      # 'end_date': datetime(2016, 1, 1),
      }


      dag = DAG('vp_sale_data_prepare', default_args=default_args, schedule_interval="0 1 * * * ")


      CRMSyncTaskSuccess = EmailOperator(
      dag=dag,
      trigger_rule=TriggerRule.ALL_SUCCESS,
      task_id="SalesReportDataPrepared",
      to=["email1","email2"],
      subject="SalesReportDataPrepared successfully",
      html_content='<h3>SalesReportDataPrepared successfully </h3>')


      CRMSyncTaskStart = EmailOperator(
      dag=dag,
      trigger_rule=TriggerRule.ALL_SUCCESS,
      task_id="SalesReportDataStartToPrepare",
      to=["email1","email2"],
      subject="SalesReportDataStartToPrepare at " + str(datetime.now()),
      html_content='<h3>SalesReportDataStartToPrepare at ' + str(datetime.now()) +' </h3>')


      work_dir = '/home/vphoto/' # 上一步生成文件目录


      split_sale_data = PythonOperator(
      task_id='split_sale_data',
      provide_context=False,
      python_callable=split_sale_data,
      op_kwargs={'file_directory': '/home/vphoto/'},
      dag=dag)


      write_contract_detail_2_db = BashOperator(
      task_id='pgcsv_contract_detail',
      bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_contract '+work_dir+'contract_detail.csv',
      dag=dag)




      write_lead_2_db = BashOperator(
      task_id='pgcsv_lead',
      bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_lead '+work_dir+'sale_lead.csv',
      dag=dag)


      write_order_used_2_db = BashOperator(
      task_id='pgcsv_order_used_detail',
      bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_order_used '+work_dir+'order_used_detail.csv',
      dag=dag)


      write_money_returned_2_db = BashOperator(
      task_id='pgcsv_money_return',
      bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_finance_money_returned '+work_dir+'finance_money_returned.csv',
      dag=dag)


      write_crm_order_with_no_dup_customer_2_db = BashOperator(
      task_id='pgcsv_crm_order_with_no_dup_customer',
      bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'^\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_order_with_no_dup_customer '+work_dir+'crm_order_with_no_dup_customer.csv',
      dag=dag)


      write_crm_order_detail_2_db = BashOperator(
      task_id='pgcsv_crm_order_detail',
      bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'^\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_order_detail '+work_dir+'crm_order_detail.csv',
      dag=dag)


      write_employee_infos_2_db = BashOperator(
      task_id='pgcsv_employee_infos',
      bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_employees '+work_dir+'employee_infos.csv',
      dag=dag)


      # write_kpi_month_2_db = BashOperator(
      # task_id='pgcsv_kpi_month',
      # bash_command='/usr/local/bin/pgcsv3 --encoding \'utf-8\' --delimiter \'$\' --drop --db postgresql://vphotos:wiech2UB@10.64.104.137:5432/vphotos tmp t_tmp_sale_kpi_month '+work_dir+'kpi_month.csv',
      # dag=dag)


      remove_linux_carriage_return_cus = BashOperator(
      task_id='remove_linux_carriage_return_cus',
      bash_command='sed -i \'s/\\r//g\' '+work_dir+'crm_order_with_no_dup_customer.csv',
      dag=dag)


      remove_linux_carriage_return_order = BashOperator(
      task_id='remove_linux_carriage_return_order',
      bash_command='sed -i \'s/\\r//g\' '+work_dir+'crm_order_detail.csv',
      dag=dag)




      split_sale_data.set_upstream(CRMSyncTaskStart)
      write_contract_detail_2_db.set_upstream(split_sale_data)
      write_lead_2_db.set_upstream(split_sale_data)
      write_order_used_2_db.set_upstream(split_sale_data)
      write_money_returned_2_db.set_upstream(split_sale_data)


      remove_linux_carriage_return_cus.set_upstream(split_sale_data)
      write_crm_order_with_no_dup_customer_2_db.set_upstream(remove_linux_carriage_return_cus)


      remove_linux_carriage_return_order.set_upstream(split_sale_data)
      write_crm_order_detail_2_db.set_upstream(remove_linux_carriage_return_order)


      write_employee_infos_2_db.set_upstream(split_sale_data)
      # write_kpi_month_2_db.set_upstream(split_sale_data)


      CRMSyncTaskSuccess.set_upstream(write_contract_detail_2_db)
      CRMSyncTaskSuccess.set_upstream(write_lead_2_db)
      CRMSyncTaskSuccess.set_upstream(write_order_used_2_db)
      CRMSyncTaskSuccess.set_upstream(write_money_returned_2_db)
      CRMSyncTaskSuccess.set_upstream(write_crm_order_with_no_dup_customer_2_db)
      CRMSyncTaskSuccess.set_upstream(write_crm_order_detail_2_db)
      CRMSyncTaskSuccess.set_upstream(write_employee_infos_2_db)
      # CRMSyncTaskSuccess.set_upstream(write_kpi_month_2_db)
      复制

      最终结果表计算和统计:

        # -*- coding: utf-8 -*-


        from datetime import datetime,timedelta


        from airflow import DAG
        from airflow.operators.bash_operator import BashOperator
        from airflow.operators.python_operator import PythonOperator
        from airflow.operators.email_operator import EmailOperator
        from airflow.operators.postgres_operator import PostgresOperator
        from airflow.utils.trigger_rule import TriggerRule


        from sale_raw_xlsx_split import split_sale_data
        import sys
        reload(sys)
        sys.setdefaultencoding('utf8')


        tt = datetime.now().timetuple()
        tomorrow = datetime.now()+timedelta(days=1)
        tt_tomorrow = tomorrow.timetuple()
        print(tt_tomorrow)
        today = str(tt.tm_year)+'-'+str(tt.tm_mon)+'-'+str(tt.tm_mday)
        cur_date = str(tt_tomorrow.tm_year)+'-'+str(tt_tomorrow.tm_mon)+'-'+str(tt_tomorrow.tm_mday)


        print("current date:"+cur_date)
        print("today is:"+today)




        default_args = {
        'owner': 'niuzaiwenli',
        'depends_on_past': False,
        'start_date': datetime(2019, 4, 15, 9, 50),
            'email': ['email1''email2'],
        'email_on_failure': True,
        'email_on_retry': True,
        'retries': 0,
        # 'retry_delay': timedelta(seconds=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        }


        dag = DAG('vp_build_sale_middle_data', default_args=default_args, schedule_interval="0 2 * * * ")


        MiddleDataTaskSuccess = EmailOperator(
        dag=dag,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        task_id="sale_middle_data_success",
            to=["email1","email2"],
        subject="sale_middle_data_success successfully",
        html_content='<h3>sale_middle_data job successfully </h3>')


        MiddleDataTaskStart = EmailOperator(
        dag=dag,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        task_id="sale_middle_data_start",
        to=["email1","email2"],
        subject="sale_middle_data_start at " + str(datetime.now()),
        html_content='<h3>sale_middle_data_start at ' + str(datetime.now()) +' </h3>')


        #数据中层-订单
        sale_middle_data_order_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_order"
        sale_middle_data_order_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,sod.\"[订单]生效日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",SUM (CAST (sod.\"[订单]总金额\" AS FLOAT)) 订单金额,SUM (CAST (sod.\"[订单]机位总数(摄影师人数)\" AS NUMERIC)) 机位数,COUNT (sod.COLUMN) 订单数 into tmp.t_tmp_sale_middle_data_order FROM tmp.t_tmp_sale_order_detail sod INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=sod.\"[订单]生效日期\" LEFT JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=sod.\"[订单]订单所有人\" GROUP BY 1,2,3,4,5,6,7,8 ORDER BY 1,2,3,4,5,6,7"


        sale_middle_data_order_clean = PostgresOperator(
        task_id='sale_middle_data_order_tmp_clean',
        sql= sale_middle_data_order_clean_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        sale_middle_data_order = PostgresOperator(
        task_id='sale_middle_data_order_tmp',
        sql= sale_middle_data_order_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        #数据中层-新签
        sale_middle_data_new_sign_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_new_sign"
        sale_middle_data_new_sign_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,cus.\"[订单]生效日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",SUM (CASE WHEN cus.\"有无消耗历史\"='无' THEN CAST (cus.\"[订单]总金额\" AS FLOAT) ELSE 0 END) 新签订单金额,SUM (CASE WHEN cus.\"有无消耗历史\"='无' THEN 1 ELSE 0 END) 新签客户数,SUM (CASE WHEN cus.\"有无消耗历史\"='无' THEN 1 ELSE 0 END) 新签订单数 into tmp.t_tmp_sale_middle_data_new_sign FROM tmp.t_tmp_sale_order_with_no_dup_customer cus INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=cus.\"[订单]生效日期\" LEFT JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=cus.\"[订单]订单所有人\" GROUP BY 1,2,3,4,5,6,7,8 ORDER BY 1,2,3,4,5,6,7"


        sale_middle_data_new_sign_clean = PostgresOperator(
        task_id='sale_middle_data_new_sign_tmp_clean',
        sql= sale_middle_data_new_sign_clean_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        sale_middle_data_new_sign = PostgresOperator(
        task_id='sale_middle_data_new_sign_tmp',
        sql= sale_middle_data_new_sign_sql,
        postgres_conn_id='GP-DW',
        dag=dag)




        #数据中层-产品订单金额,订单数
        sale_middle_data_product_order_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_product_order"
        sale_middle_data_product_order_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,sod.\"[订单]生效日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",CASE WHEN sod.\"[产品]拍摄类型\"='云摄影' THEN '云摄影订单额' ELSE '云视频订单额' END \"产品\",SUM (CAST (sod.\"[订单]总金额\" AS FLOAT)) 订单金额,COUNT (sod.COLUMN) 订单数 INTO tmp.t_tmp_sale_middle_data_product_order FROM tmp.t_tmp_sale_order_detail sod INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=sod.\"[订单]生效日期\" LEFT JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=sod.\"[订单]订单所有人\" GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"


        sale_middle_data_product_order_clean = PostgresOperator(
        task_id='sale_middle_data_product_order_tmp_clean',
        sql= sale_middle_data_product_order_clean_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        sale_middle_data_product_order = PostgresOperator(
        task_id='sale_middle_data_product_order_tmp',
        sql= sale_middle_data_product_order_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        #数据中层-产品客户数
        sale_middle_data_product_client_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_product_client"
        sale_middle_data_product_client_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,cus.\"[订单]生效日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",CASE WHEN cus.\"[产品]拍摄类型\"='云摄影' THEN '云摄影订单额' ELSE '云视频订单额' END \"产品\",COUNT (1) 客户数 INTO tmp.t_tmp_sale_middle_data_product_client FROM tmp.t_tmp_sale_order_with_no_dup_customer cus INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=cus.\"[订单]生效日期\" LEFT JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=cus.\"[订单]订单所有人\" GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"


        sale_middle_data_product_client_clean = PostgresOperator(
        task_id='sale_middle_data_product_client_tmp_clean',
        sql= sale_middle_data_product_client_clean_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        sale_middle_data_product_client = PostgresOperator(
        task_id='sale_middle_data_product_client_tmp',
        sql= sale_middle_data_product_client_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        #数据中层-合同 客户数 个数 金额
        sale_middle_data_contract_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_contract"
        sale_middle_data_contract_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,con.\"创建日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",COUNT (DISTINCT con.\"客户名称\") 合同客户数,COUNT (con.\"客户名称\") 合同个数,SUM (CAST (con.\"总金额\" AS FLOAT)) 合同金额 INTO tmp.t_tmp_sale_middle_data_contract FROM tmp.t_tmp_sale_contract con INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=con.\"创建日期\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=con.\"合同所有人\" GROUP BY 1,2,3,4,5,6,7,8 ORDER BY 1,2,3,4,5,6,7,8"


        sale_middle_data_contract_clean = PostgresOperator(
        task_id='sale_middle_data_contract_tmp_clean',
        sql= sale_middle_data_contract_clean_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        sale_middle_data_contract = PostgresOperator(
        task_id='sale_middle_data_contract_tmp',
        sql = sale_middle_data_contract_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        #数据中层-回款 金额 单次 回款月结 年框
        sale_middle_data_returned_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_returned"
        sale_middle_data_returned_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,mr.\"日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",SUM (CAST (mr.\"提成计算金额\" AS FLOAT)) 总来款金额,SUM (CASE WHEN mr.\"回款属性\"='单次/年会' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款单次,SUM (CASE WHEN mr.\"回款属性\"='月结' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款月结,SUM (CASE WHEN mr.\"回款属性\"='新年框' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款新年框 INTO tmp.t_tmp_sale_middle_data_returned FROM tmp.t_tmp_sale_finance_money_returned mr INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=mr.\"日期\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=mr.\"销售\" and to_char(mr.\"日期\"::date,'yyyy-mm')=to_char(emp.\"司龄计算日期\"::date,'yyyy-mm') where emp.\"职位\"='销售' GROUP BY 1,2,3,4,5,6,7,8 ORDER BY 1,2,3,4,5,6,7,8"


        sale_middle_data_returned_clean = PostgresOperator(
        task_id='sale_middle_data_returned_tmp_clean',
        sql= sale_middle_data_returned_clean_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        sale_middle_data_returned = PostgresOperator(
        task_id='sale_middle_data_returned_tmp',
        sql = sale_middle_data_returned_sql,
        postgres_conn_id='GP-DW',
        dag=dag)




        #数据中层-回款新老客户
        sale_middle_data_returned_newclient_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_returned_newclient"
        sale_middle_data_returned_newclient_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,mr.\"日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",mr.\"新老\",SUM (CAST (mr.\"提成计算金额\" AS FLOAT)) 总来款金额,SUM (CASE WHEN mr.\"回款属性\"='单次/年会' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款单次,SUM (CASE WHEN mr.\"回款属性\"='月结' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款月结,SUM (CASE WHEN mr.\"回款属性\"='新年框' THEN CAST (mr.\"提成计算金额\" AS FLOAT) ELSE 0 END) 回款新年框 INTO tmp.t_tmp_sale_middle_data_returned_newclient FROM tmp.t_tmp_sale_finance_money_returned mr INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=mr.\"日期\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=mr.\"销售\" GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"


        sale_middle_data_returned_newclient_clean = PostgresOperator(
        task_id='sale_middle_data_returned_newclient_tmp_clean',
        sql= sale_middle_data_returned_newclient_clean_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        sale_middle_data_returned_newclient = PostgresOperator(
        task_id='sale_middle_data_returned_newclient_tmp',
        sql = sale_middle_data_returned_newclient_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        #数据中层-消耗 无条件 按产品类型
        sale_middle_data_used_no_cond_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_used_no_cond"
        sale_middle_data_used_no_cond_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,ou.\"结束拍摄日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",ou.\"套餐类型判断\",SUM (CAST (ou.\"订单总金额拆分\" AS FLOAT)) 消耗金额 INTO tmp.t_tmp_sale_middle_data_used_no_cond FROM tmp.t_tmp_sale_order_used ou INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=ou.\"结束拍摄日期\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=ou.\"销售经理\" GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"


        sale_middle_data_used_no_cond_clean = PostgresOperator(
        task_id='sale_middle_data_used_no_cond_tmp_clean',
        sql= sale_middle_data_used_no_cond_clean_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        sale_middle_data_used_no_cond = PostgresOperator(
        task_id='sale_middle_data_used_no_cond_tmp',
        sql = sale_middle_data_used_no_cond_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        #数据中层-消耗 有条件 ou."订单号重复行数" = '1'
        sale_middle_data_used_cond_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_used_cond"
        sale_middle_data_used_cond_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,ou.\"结束拍摄日期\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",ou.\"套餐类型判断\",COUNT (ou.\"订单号\") 订单数,SUM (CAST (COALESCE (ou.\"摄影师数(订单表)\",'0') AS INTEGER)) 消耗机位数,COUNT (DISTINCT ou.\"公司全称(公司表中的公司名称)\") 客户数,COUNT (ou.\"拍摄主题\") 消耗活动数 INTO tmp.t_tmp_sale_middle_data_used_cond FROM tmp.t_tmp_sale_order_used ou INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=ou.\"结束拍摄日期\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=ou.\"销售经理\" WHERE ou.\"订单号重复行数\"='1' GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"


        sale_middle_data_used_cond_clean = PostgresOperator(
        task_id='sale_middle_data_used_cond_tmp_clean',
        sql= sale_middle_data_used_cond_clean_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        sale_middle_data_used_cond = PostgresOperator(
        task_id='sale_middle_data_used_cond_tmp',
        sql = sale_middle_data_used_cond_sql,
        postgres_conn_id='GP-DW',
        dag=dag)




        #数据中层-线索
        sale_middle_data_lead_clean_sql = "drop table if EXISTS tmp.t_tmp_sale_middle_data_lead"
        sale_middle_data_lead_sql = "SELECT dd.YEAR,dd.quarter,dd.MONTH,dd.week_of_year,sl.\"分配时间\",emp.\"大区\",emp.\"团队\",emp.\"姓名\",sl.\"线索评级\",COUNT (sl.\"数据唯一编号\") 线索数 INTO tmp.t_tmp_sale_middle_data_lead FROM tmp.t_tmp_sale_lead sl INNER JOIN dw.t_ob_dim_dates dd ON to_char(dd.DATE,'yyyy-mm-dd')=sl.\"分配时间\" INNER JOIN tmp.t_tmp_sale_employees emp ON emp.\"姓名\"=sl.\"销售所有人\" GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY 1,2,3,4,5,6,7,8"


        sale_middle_data_lead_clean = PostgresOperator(
        task_id='sale_middle_data_lead_tmp_clean',
        sql= sale_middle_data_lead_clean_sql,
        postgres_conn_id='GP-DW',
        dag=dag)


        sale_middle_data_lead = PostgresOperator(
        task_id='sale_middle_data_lead_tmp',
        sql = sale_middle_data_lead_sql,
        postgres_conn_id='GP-DW',
        dag=dag)




        #订单
        sale_middle_data_order_clean.set_upstream(MiddleDataTaskStart)
        sale_middle_data_order.set_upstream(sale_middle_data_order_clean)


        #新签
        sale_middle_data_new_sign_clean.set_upstream(MiddleDataTaskStart)
        sale_middle_data_new_sign.set_upstream(sale_middle_data_new_sign_clean)


        #产品订单金额,订单数
        sale_middle_data_product_order_clean.set_upstream(MiddleDataTaskStart)
        sale_middle_data_product_order.set_upstream(sale_middle_data_product_order_clean)


        #产品客户数
        sale_middle_data_product_client_clean.set_upstream(MiddleDataTaskStart)
        sale_middle_data_product_client.set_upstream(sale_middle_data_product_client_clean)


        #合同 客户数 个数 金额
        sale_middle_data_contract_clean.set_upstream(MiddleDataTaskStart)
        sale_middle_data_contract.set_upstream(sale_middle_data_contract_clean)


        #回款
        sale_middle_data_returned_clean.set_upstream(MiddleDataTaskStart)
        sale_middle_data_returned.set_upstream(sale_middle_data_returned_clean)


        #消耗无条件
        sale_middle_data_used_no_cond_clean.set_upstream(MiddleDataTaskStart)
        sale_middle_data_used_no_cond.set_upstream(sale_middle_data_used_no_cond_clean)


        #消耗有条件
        sale_middle_data_used_cond_clean.set_upstream(MiddleDataTaskStart)
        sale_middle_data_used_cond.set_upstream(sale_middle_data_used_cond_clean)


        #线索
        sale_middle_data_lead_clean.set_upstream(MiddleDataTaskStart)
        sale_middle_data_lead.set_upstream(sale_middle_data_lead_clean)


        #回款新客户
        sale_middle_data_returned_newclient_clean.set_upstream(MiddleDataTaskStart)
        sale_middle_data_returned_newclient.set_upstream(sale_middle_data_returned_newclient_clean)






        #成功依赖
        MiddleDataTaskSuccess.set_upstream(sale_middle_data_order)
        MiddleDataTaskSuccess.set_upstream(sale_middle_data_new_sign)
        MiddleDataTaskSuccess.set_upstream(sale_middle_data_product_order)
        MiddleDataTaskSuccess.set_upstream(sale_middle_data_product_client)
        MiddleDataTaskSuccess.set_upstream(sale_middle_data_contract)
        MiddleDataTaskSuccess.set_upstream(sale_middle_data_returned)
        MiddleDataTaskSuccess.set_upstream(sale_middle_data_used_no_cond)
        MiddleDataTaskSuccess.set_upstream(sale_middle_data_used_cond)
        MiddleDataTaskSuccess.set_upstream(sale_middle_data_lead)
        MiddleDataTaskSuccess.set_upstream(sale_middle_data_returned_newclient)
        复制

        代码通过git提交,进行自动获取。

        文章转载自牛仔纹理,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论