data_cover数据转换例子

data_cover 是一个启动函数,里面的过程就是处理一个一个先前读取过的数据表。

处理的数据表包括日期表、店铺表、商品数据、销售数据、销售目标数据、预测占比数据、计算销售占比数据、计算销售目标占比数据等

核心就是一个表对应一个函数。函数里面进行空值处理、异常数据过滤、数据转换、数据计算、数据修正、数据保存、标签保存等操作。

需要注意的是,函数里面需要处理的固定文件名要对应上。

代码调试入口是data_cover






import logging
from typing import Tuple, Dict
import pandas as pd
import numpy as np
import os
import shutil
import datetime

import sys
sys.path.append('')
import yaml




# 空值统一转换成Unknown
def null_convert(x):
    """
    isinstance() 函数来判断一个对象是否是一个已知的类型
    strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列
    返回移除字符串头尾指定的字符生成的新字符串
    """
    if isinstance(x, float) and np.isnan(x):
        return 'Unknown'
    return 'Unknown' if x.strip()=='' else x.strip()
pass
def unknown_first(values):
    '''将Unknown放到第一位'''
    if 'Unknown' in values:
        values.remove('Unknown')
        values.insert(0, 'Unknown')
    return values
pass
def to_yml(data, file_path):
    dictionary = data
    if not isinstance(data, dict):
        index2name_dictionary = {}
        name2index_dictionary = {}
        for i, x in enumerate(data):
            index2name_dictionary[i] = x
            name2index_dictionary[x] = i
        dictionary = {
            'count': len(index2name_dictionary), 
            'index2name': index2name_dictionary, 
            'name2index': name2index_dictionary
            }
    with open(file_path, 'w', encoding='utf-8') as file:
        yaml.dump(dictionary, file)
    return dictionary
pass
class ShareArgs():
    args = {
        "labels_dir":"./yongjian/shop_group/month_w_amt/data/labels", # 标签目录
        "labels_output_dir":"./yongjian/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
        "common_datas_dir":"./yongjian/data", # 共用数据目录。ur_bi_dw的公共
        "only_predict": False, # 只识别,不训练
        "delete_model": True, # 先删除模型,仅在训练时使用
        "export_excel": False, # 导出excel
        "classes": 12, # 聚类数
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4, 
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args

    def set_args(args):
        ShareArgs.args = args

    def set_args_value(key, value):
        ShareArgs.args[key] = value

    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)

    def contain_key(key):
        return key in ShareArgs.args.keys()

    def update(args):
        ShareArgs.args.update(args)
pass
def convert_date_data(ur_bi_dw_dir, labels_dir):
    ''',date_key,date_name,date_name_en,day_of_year,
    day_of_month,day_of_week,year_id,year_name,
    quarter_id,quarter_name,quarter_name_en,quarter_of_year,
    quarter_of_year_name,quarter_of_year_name_en,month_id,
    month_name,month_name_full_en,month_name_short_en,
    month_of_year,month_of_year_name,week_id,week_id_name,
    week_seq,week_name,week_name_full_en,week_name_short_en,
    week_of_year,week_year_name,week_of_month,is_week_end,
    is_week_last,is_month_last,festival_key,festival,
    lunar_date,solar_term,season_year,season,big_season,
    goods_season_year,goods_season,big_goods_season,l_week_day,
    cr_week_day,y_week_day,cr_y_week_day,first_day_week,
    last_day_week,l_month_day,y_month_day,first_day_month,
    last_day_month,last_30_day,l_month_last_30_day,y_month_last_30_day,
    etl_date,holiday_status_code,holiday_status,year_id_pd'''
    dim_date = pd.read_csv(
        os.path.join(ur_bi_dw_dir,'ur_bi_dw.dim_date.csv'),
        dtype={'dim_date.festival_key': str, 'dim_date.festival': str,'dim_date.holiday_status_code': str,'dim_date.holiday_status': str}
        ) # 日期数据
    # 去掉前缀
    columns = list(dim_date.columns)
    columns = {c:c.replace('dim_date.','') for c in columns}
    dim_date = dim_date.rename(columns=columns)

    dim_date['season'] = dim_date['season'].map(null_convert) # 空值转换

def convert_shop_data(
    common_ur_bi_dir,
    labels_dir,
    logger:logging.Logger
    ) -> pd.DataFrame:
    '''店铺数据'''
    ''',dim_shop.shop_no,dim_shop.shop_name,dim_shop.shop_name_en,
    dim_shop.brand,dim_shop.currency_code,dim_shop.shop_type,
    dim_shop.shop_type_desc,dim_shop.shop_sort,dim_shop.shop_sort_desc,
    dim_shop.stock_id,dim_shop.stock_no,dim_shop.region_no,
    dim_shop.region_name,dim_shop.region_name_en,dim_shop.region_manager_code,
    dim_shop.region_manager_name,dim_shop.subregion_no,dim_shop.subregion_name,
    dim_shop.subregion_name_en,dim_shop.subregion_manager_code,
    dim_shop.subregion_manager_name,dim_shop.bussinesscircle_code,
    dim_shop.bussinesscircle_name,dim_shop.bussinesscircle_name_en,
    dim_shop.country_code,dim_shop.country_name,dim_shop.country_name_en,
    dim_shop.province,dim_shop.province_name,dim_shop.province_name_en,
    dim_shop.city_code,dim_shop.city_name,dim_shop.city_name_en,dim_shop.city_level_code,
    dim_shop.city_level_name,dim_shop.city_level_name_en,dim_shop.district,
    dim_shop.shop_manager_code,dim_shop.shop_manager_name,dim_shop.shop_manager_name_en,
    dim_shop.open_date,dim_shop.new_open_date,dim_shop.close_date,dim_shop.store_email,
    dim_shop.shop_tel,dim_shop.shop_manager_tel,dim_shop.shop_property,
    dim_shop.is_new_shop,dim_shop.is_kids,dim_shop.is_same_shop,dim_shop.is_promote,
    dim_shop.is_streetstore,dim_shop.is_outrant,dim_shop.contract_size,dim_shop.shop_size,
    dim_shop.sales_size,dim_shop.shop_dis_level,dim_shop.sales_level,dim_shop.image_level,
    dim_shop.shop_level,dim_shop.ppsregion_no,dim_shop.ppsregion_name,dim_shop.temperature_zone,
    dim_shop.store_status,dim_shop.lon,dim_shop.lat,dim_shop.shop_address,dim_shop.etl_date,
    dim_shop.cost_currency_code,dim_shop.store_centercode,dim_shop.store_centername,dim_shop.data_source,
    dim_shop.bussinesscircle_id,dim_shop.project_id,dim_shop.project_code,dim_shop.project_name'''
    shop_data = pd.read_csv(os.path.join(common_ur_bi_dir,'ur_bi_dw.dim_shop.csv'), index_col=0)
    
    # 去掉前缀
    columns = list(shop_data.columns)
    columns = {c:c.replace('dim_shop.','') for c in columns}
    shop_data = shop_data.rename(columns=columns)
    
    # 修正数据
    shop_data[shop_data['province_name']=='上海'] = '上海市'
    shop_data[shop_data['province_name']=='北京'] = '北京市'
    shop_data[shop_data['province_name']=='天津'] = '天津市'
    shop_data[shop_data['province_name']=='重庆'] = '重庆市'

    # 店铺数据, 空值转换
    null_convert_columns = [
        'shop_no', # 店铺编码
        'shop_name', # 店铺名称
        'brand', # 品牌
        'currency_code', # 币种
        'shop_type', # 店铺类型
        'shop_type_desc', # 店铺类型描述
        'shop_sort', # 店铺归类
        'shop_sort_desc', # 店铺归类描述
        'region_no', # 大区编码
        'region_name', # 大区名称
        'subregion_no', # 小区编码
        'subregion_name', # 小区名称
        'bussinesscircle_code', # 商圈编码
        'bussinesscircle_name', # 商圈名称
        'country_code', # 国家编码
        'country_name', # 国家名称
        'province', # 省份编码
        'province_name', # 省份名称
        'city_code', # 城市编码
        'city_name', # 城市名称
        'city_level_code', # 城市等级编码
        'city_level_name', # 城市等级名称
        'district', # 城市区域
        'open_date', # 开业日期
        'new_open_date', # 再次开业日期
        'shop_property', # 店铺属性
        'is_new_shop', # 是否新老店
        'is_kids', # 是否童装店
        'is_same_shop', # 是否同店
        'is_promote', # 是否特卖
        'is_streetstore', # 是否街铺
        'is_outrant', # 是否外租仓
        # 'contract_size', # 合同面积
        # 'shop_size', # 实用面积
        # 'sales_size', # 销售面积
        'shop_dis_level', # 配货等级
        'sales_level', # 销售等级
        'image_level', # 形象级别
        'shop_level', # 店铺级别
        'ppsregion_no', # 企划分区
        'ppsregion_name', # 企划分区名称
        'temperature_zone', # 气温带
        'store_status', # 店铺状态
        # 'lon', # 经度
        # 'lat', # 纬度
        'store_centercode', # 分部编码
        'store_centername', # 分部名称
    ]
    for c in null_convert_columns:
        shop_data[c] = shop_data[c].map(null_convert)
    shop_data['contract_size'] = shop_data['contract_size'].fillna(0) # 合同面积
    shop_data['shop_size'] = shop_data['shop_size'].fillna(0) # 实用面积
    shop_data['sales_size'] = shop_data['sales_size'].fillna(0) # 销售面积
    shop_data['lon'] = shop_data['lon'].fillna(0) # 经度
    shop_data['lat'] = shop_data['lat'].fillna(0) # 纬度
    shop_data = shop_data.sort_values(['country_code','province_name','city_name','district','bussinesscircle_code']) # 排序

    # 筛选店铺
    # 'shop_type' in ['UR_SHOP']
    # 'shop_sort' in ['UR_LOCAL_STORE']
    # 'city_name' in ['广州市']
    # 'is_new_shop' != '临时店'
    # shop_sort_filter = None
    # shop_sort_filter = ['E_COMMERCE_STORE','ACC_STORE']
    shop_sort_filter = ['E_COMMERCE_STORE']
    # if is_acc:
    #     shop_sort_filter = ['E_COMMERCE_STORE']
    # else:
    #     shop_sort_filter = ['E_COMMERCE_STORE','ACC_STORE']
    shop_data_filter = shop_data[
        # (shop_data['city_name']=='广州市')
        # (shop_data['region_name'].isin(['华南一区','华南二区']))
        (shop_data['is_new_shop']!='临时店')
        &(shop_data['country_code']=='CN')
        &(shop_data['brand'].isin(['URBAN REVIVO','ACC']))
        # &(shop_data['shop_type'].isin(['UR_SHOP','UR_ONLINE_SHOP']))
        # &(shop_data['shop_sort'].isin(['UR_LOCAL_STORE','OUTLET_STORE']))
        &(~shop_data['shop_sort'].isin(shop_sort_filter))
        &(~shop_data['shop_name'].str.contains('临时', na=False))
        &(~shop_data['shop_name'].str.contains('JGO', na=False))
        # &(~shop_data['shop_name'].str.contains('奥特莱斯', na=False))
        &(~shop_data['store_status'].isin(['项目暂停','关店']))
        &((shop_data['province_name']!='Unknown')|(shop_data['city_name']!='Unknown'))
    ]
    # 选中列
    columns_filter = [
        'shop_no', # 店铺编码
        'shop_name', # 店铺名称
        'brand', # 品牌
        'currency_code', # 币种
        'shop_type', # 店铺类型
        'shop_type_desc', # 店铺类型描述
        'shop_sort', # 店铺归类
        'shop_sort_desc', # 店铺归类描述
        'region_no', # 大区编码
        'region_name', # 大区名称
        'subregion_no', # 小区编码
        'subregion_name', # 小区名称
        'bussinesscircle_code', # 商圈编码
        'bussinesscircle_name', # 商圈名称
        'country_code', # 国家编码
        'country_name', # 国家名称
        'province', # 省份编码
        'province_name', # 省份名称
        'city_code', # 城市编码
        'city_name', # 城市名称
        'city_level_code', # 城市等级编码
        'city_level_name', # 城市等级名称
        'district', # 城市区域
        'open_date', # 开业日期
        'new_open_date', # 再次开业日期
        'shop_property', # 店铺属性
        'is_new_shop', # 是否新老店
        'is_kids', # 是否童装店
        'is_same_shop', # 是否同店
        'is_promote', # 是否特卖
        'is_streetstore', # 是否街铺
        'is_outrant', # 是否外租仓
        'contract_size', # 合同面积
        'shop_size', # 实用面积
        'sales_size', # 销售面积
        'shop_dis_level', # 配货等级
        'sales_level', # 销售等级
        'image_level', # 形象级别
        'shop_level', # 店铺级别
        'ppsregion_no', # 企划分区
        'ppsregion_name', # 企划分区名称
        'temperature_zone', # 气温带
        'store_status', # 店铺状态
        'lon', # 经度
        'lat', # 纬度
        'store_centercode', # 分部编码
        'store_centername', # 分部名称
    ]
    shop_data_filter = shop_data_filter[columns_filter]
    # 店铺代码与名称映射转换
    shop_code2name_dictionary = {}
    for i in range(len(shop_data_filter)):
        shop_no = str(shop_data_filter.iloc[i]['shop_no'])
        shop_name = shop_data_filter.iloc[i]['shop_name']
        shop_code2name_dictionary[shop_no] = shop_name
    shop_code2name_dictionary = to_yml(shop_code2name_dictionary, os.path.join(labels_dir,'shop_code2name.yml'))
    logger.info('已导出: {}'.format(os.path.join(labels_dir,'shop_code2name.yml')))
    return shop_data_filter
pass
def convert_shop_data(
    common_ur_bi_dir,
    labels_dir,
    logger:logging.Logger
    ) -> pd.DataFrame:
    '''店铺数据'''
    ''',dim_shop.shop_no,dim_shop.shop_name,dim_shop.shop_name_en,
    dim_shop.brand,dim_shop.currency_code,dim_shop.shop_type,
    dim_shop.shop_type_desc,dim_shop.shop_sort,dim_shop.shop_sort_desc,
    dim_shop.stock_id,dim_shop.stock_no,dim_shop.region_no,
    dim_shop.region_name,dim_shop.region_name_en,dim_shop.region_manager_code,
    dim_shop.region_manager_name,dim_shop.subregion_no,dim_shop.subregion_name,
    dim_shop.subregion_name_en,dim_shop.subregion_manager_code,
    dim_shop.subregion_manager_name,dim_shop.bussinesscircle_code,
    dim_shop.bussinesscircle_name,dim_shop.bussinesscircle_name_en,
    dim_shop.country_code,dim_shop.country_name,dim_shop.country_name_en,
    dim_shop.province,dim_shop.province_name,dim_shop.province_name_en,
    dim_shop.city_code,dim_shop.city_name,dim_shop.city_name_en,dim_shop.city_level_code,
    dim_shop.city_level_name,dim_shop.city_level_name_en,dim_shop.district,
    dim_shop.shop_manager_code,dim_shop.shop_manager_name,dim_shop.shop_manager_name_en,
    dim_shop.open_date,dim_shop.new_open_date,dim_shop.close_date,dim_shop.store_email,
    dim_shop.shop_tel,dim_shop.shop_manager_tel,dim_shop.shop_property,
    dim_shop.is_new_shop,dim_shop.is_kids,dim_shop.is_same_shop,dim_shop.is_promote,
    dim_shop.is_streetstore,dim_shop.is_outrant,dim_shop.contract_size,dim_shop.shop_size,
    dim_shop.sales_size,dim_shop.shop_dis_level,dim_shop.sales_level,dim_shop.image_level,
    dim_shop.shop_level,dim_shop.ppsregion_no,dim_shop.ppsregion_name,dim_shop.temperature_zone,
    dim_shop.store_status,dim_shop.lon,dim_shop.lat,dim_shop.shop_address,dim_shop.etl_date,
    dim_shop.cost_currency_code,dim_shop.store_centercode,dim_shop.store_centername,dim_shop.data_source,
    dim_shop.bussinesscircle_id,dim_shop.project_id,dim_shop.project_code,dim_shop.project_name'''
    shop_data = pd.read_csv(os.path.join(common_ur_bi_dir,'ur_bi_dw.dim_shop.csv'), index_col=0)
    
    # 去掉前缀
    columns = list(shop_data.columns)
    columns = {c:c.replace('dim_shop.','') for c in columns}
    shop_data = shop_data.rename(columns=columns)
    
    # 修正数据
    shop_data[shop_data['province_name']=='上海'] = '上海市'
    shop_data[shop_data['province_name']=='北京'] = '北京市'
    shop_data[shop_data['province_name']=='天津'] = '天津市'
    shop_data[shop_data['province_name']=='重庆'] = '重庆市'

    # 店铺数据, 空值转换
    null_convert_columns = [
        'shop_no', # 店铺编码
        'shop_name', # 店铺名称
        'brand', # 品牌
        'currency_code', # 币种
        'shop_type', # 店铺类型
        'shop_type_desc', # 店铺类型描述
        'shop_sort', # 店铺归类
        'shop_sort_desc', # 店铺归类描述
        'region_no', # 大区编码
        'region_name', # 大区名称
        'subregion_no', # 小区编码
        'subregion_name', # 小区名称
        'bussinesscircle_code', # 商圈编码
        'bussinesscircle_name', # 商圈名称
        'country_code', # 国家编码
        'country_name', # 国家名称
        'province', # 省份编码
        'province_name', # 省份名称
        'city_code', # 城市编码
        'city_name', # 城市名称
        'city_level_code', # 城市等级编码
        'city_level_name', # 城市等级名称
        'district', # 城市区域
        'open_date', # 开业日期
        'new_open_date', # 再次开业日期
        'shop_property', # 店铺属性
        'is_new_shop', # 是否新老店
        'is_kids', # 是否童装店
        'is_same_shop', # 是否同店
        'is_promote', # 是否特卖
        'is_streetstore', # 是否街铺
        'is_outrant', # 是否外租仓
        # 'contract_size', # 合同面积
        # 'shop_size', # 实用面积
        # 'sales_size', # 销售面积
        'shop_dis_level', # 配货等级
        'sales_level', # 销售等级
        'image_level', # 形象级别
        'shop_level', # 店铺级别
        'ppsregion_no', # 企划分区
        'ppsregion_name', # 企划分区名称
        'temperature_zone', # 气温带
        'store_status', # 店铺状态
        # 'lon', # 经度
        # 'lat', # 纬度
        'store_centercode', # 分部编码
        'store_centername', # 分部名称
    ]
    for c in null_convert_columns:
        shop_data[c] = shop_data[c].map(null_convert)
    shop_data['contract_size'] = shop_data['contract_size'].fillna(0) # 合同面积
    shop_data['shop_size'] = shop_data['shop_size'].fillna(0) # 实用面积
    shop_data['sales_size'] = shop_data['sales_size'].fillna(0) # 销售面积
    shop_data['lon'] = shop_data['lon'].fillna(0) # 经度
    shop_data['lat'] = shop_data['lat'].fillna(0) # 纬度
    shop_data = shop_data.sort_values(['country_code','province_name','city_name','district','bussinesscircle_code']) # 排序

    # 筛选店铺
    # 'shop_type' in ['UR_SHOP']
    # 'shop_sort' in ['UR_LOCAL_STORE']
    # 'city_name' in ['广州市']
    # 'is_new_shop' != '临时店'
    # shop_sort_filter = None
    # shop_sort_filter = ['E_COMMERCE_STORE','ACC_STORE']
    shop_sort_filter = ['E_COMMERCE_STORE']
    # if is_acc:
    #     shop_sort_filter = ['E_COMMERCE_STORE']
    # else:
    #     shop_sort_filter = ['E_COMMERCE_STORE','ACC_STORE']
    shop_data_filter = shop_data[
        # (shop_data['city_name']=='广州市')
        # (shop_data['region_name'].isin(['华南一区','华南二区']))
        (shop_data['is_new_shop']!='临时店')
        &(shop_data['country_code']=='CN')
        &(shop_data['brand'].isin(['URBAN REVIVO','ACC']))
        # &(shop_data['shop_type'].isin(['UR_SHOP','UR_ONLINE_SHOP']))
        # &(shop_data['shop_sort'].isin(['UR_LOCAL_STORE','OUTLET_STORE']))
        &(~shop_data['shop_sort'].isin(shop_sort_filter))
        &(~shop_data['shop_name'].str.contains('临时', na=False))
        &(~shop_data['shop_name'].str.contains('JGO', na=False))
        # &(~shop_data['shop_name'].str.contains('奥特莱斯', na=False))
        &(~shop_data['store_status'].isin(['项目暂停','关店']))
        &((shop_data['province_name']!='Unknown')|(shop_data['city_name']!='Unknown'))
    ]
    # 选中列
    columns_filter = [
        'shop_no', # 店铺编码
        'shop_name', # 店铺名称
        'brand', # 品牌
        'currency_code', # 币种
        'shop_type', # 店铺类型
        'shop_type_desc', # 店铺类型描述
        'shop_sort', # 店铺归类
        'shop_sort_desc', # 店铺归类描述
        'region_no', # 大区编码
        'region_name', # 大区名称
        'subregion_no', # 小区编码
        'subregion_name', # 小区名称
        'bussinesscircle_code', # 商圈编码
        'bussinesscircle_name', # 商圈名称
        'country_code', # 国家编码
        'country_name', # 国家名称
        'province', # 省份编码
        'province_name', # 省份名称
        'city_code', # 城市编码
        'city_name', # 城市名称
        'city_level_code', # 城市等级编码
        'city_level_name', # 城市等级名称
        'district', # 城市区域
        'open_date', # 开业日期
        'new_open_date', # 再次开业日期
        'shop_property', # 店铺属性
        'is_new_shop', # 是否新老店
        'is_kids', # 是否童装店
        'is_same_shop', # 是否同店
        'is_promote', # 是否特卖
        'is_streetstore', # 是否街铺
        'is_outrant', # 是否外租仓
        'contract_size', # 合同面积
        'shop_size', # 实用面积
        'sales_size', # 销售面积
        'shop_dis_level', # 配货等级
        'sales_level', # 销售等级
        'image_level', # 形象级别
        'shop_level', # 店铺级别
        'ppsregion_no', # 企划分区
        'ppsregion_name', # 企划分区名称
        'temperature_zone', # 气温带
        'store_status', # 店铺状态
        'lon', # 经度
        'lat', # 纬度
        'store_centercode', # 分部编码
        'store_centername', # 分部名称
    ]
    shop_data_filter = shop_data_filter[columns_filter]
    # 店铺代码与名称映射转换
    shop_code2name_dictionary = {}
    for i in range(len(shop_data_filter)):
        shop_no = str(shop_data_filter.iloc[i]['shop_no'])
        shop_name = shop_data_filter.iloc[i]['shop_name']
        shop_code2name_dictionary[shop_no] = shop_name
    shop_code2name_dictionary = to_yml(shop_code2name_dictionary, os.path.join(labels_dir,'shop_code2name.yml'))
    logger.info('已导出: {}'.format(os.path.join(labels_dir,'shop_code2name.yml')))
    return shop_data_filter
pass
def convert_goods_data(
    common_ur_bi_dir,
    labels_dir,
    logger:logging.Logger):
    '''SKU数据,汇总系列与品类'''
    # 商品数据
    dim_goods = pd.read_csv(
        os.path.join(common_ur_bi_dir,'ur_bi_dw.dim_goods.csv'),
        dtype={
            'dim_goods.year':str,
            'dim_goods.mat_code':str,
            'dim_goods.serial':str,
            'dim_goods.style':str,
            'dim_goods.category':str,
            'dim_goods.subcategory':str,
            'dim_goods.version_style':str,
            'dim_goods.big_season_name':str,
            'dim_goods.order_no':str,
            'dim_goods.order_type_name':str,
            'dim_goods.month_of_year':str,
        },
        index_col=0
    )
    # 去掉前缀
    columns = list(dim_goods.columns)
    columns = {c:c.replace('dim_goods.','') for c in columns}
    dim_goods = dim_goods.rename(columns=columns)
    # 空值转换
    dim_goods['serial'] = dim_goods['serial'].map(null_convert)
    dim_goods['serial_name'] = dim_goods['serial_name'].map(null_convert)
    # 系列
    serial_data = dim_goods[['serial','serial_name']].drop_duplicates()
    serial_data = serial_data.sort_values(['serial','serial_name'])
    # 系列代码与名称映射转换
    serial_code2name_dictionary = {}
    for i in range(len(serial_data)):
        serial = str(serial_data.iloc[i]['serial'])
        serial_name = serial_data.iloc[i]['serial_name']
        serial_code2name_dictionary[serial] = serial_name
    _ = to_yml(serial_code2name_dictionary, os.path.join(labels_dir,'serial_code2name.yml'))
    logger.info('已导出: {}'.format(os.path.join(labels_dir,'serial_code2name.yml')))

    return dim_goods, serial_code2name_dictionary
pass
class DataHelper(object):
    def __init__(
        self
        ):
        pass

    @staticmethod
    def load_split_data(
        data_dir,   # 处理的原数据放在这里
        cache_file_dir,  # 处理好的数据放在这里
        logger:logging.Logger,
        convert_fun,  # 转换的函数
        delete_last_cache=True,
        delete_cache_indexes=[-1],
        data_dtype=None,
        cache_dtype=None,
        filter_fun=None, # 返回True,则过滤掉当前数据
        **kwargs
        ) -> pd.DataFrame:
        '''分批处理文件'''
        delete_cache_list = []
        # 新建目录
        if not os.path.exists(cache_file_dir):
            os.makedirs(cache_file_dir)
        elif delete_last_cache:
            #删除最后一个文件
            file_list = os.listdir(cache_file_dir)
            if len(file_list)>0:
                file_list.sort()
                for i in delete_cache_indexes:
                    # 将需要删除的文件加入列表
                    delete_cache_list.append(os.path.join(cache_file_dir,file_list[i]))
                    # os.remove(os.path.join(cache_file_dir,file_list[i]))
                    logger.info('删除:%s', file_list[i])
        # 加载文件列表
        file_list = os.listdir(data_dir)
        all_data = None
        if len(file_list)>0:
            file_list.sort()
            for file in file_list:
                if filter_fun is not None and filter_fun(file, **kwargs):
                    continue
                file_path = os.path.join(data_dir,file)
                logger.info(file_path)
                cache_file_path = os.path.join(cache_file_dir,file)
                logger.info(cache_file_path)
                now_data = None
                if not os.path.exists(cache_file_path) or cache_file_path in delete_cache_list:
                    logger.info('读取数据')
                    now_data:pd.DataFrame = pd.read_csv(file_path, dtype=data_dtype) # 读取数据
                    now_data = convert_fun(now_data, **kwargs) # 数据转换
                    now_data.to_csv(cache_file_path, index=False)
                else:
                    logger.info('读取缓存')
                    now_data:pd.DataFrame = pd.read_csv(cache_file_path, dtype=cache_dtype)
                logger.info('now_data: %s', len(now_data))
                if all_data is None:
                    all_data = now_data
                else:
                    all_data = pd.concat([all_data, now_data], ignore_index=True)
        return all_data
pass
def convert_sales_data(
    labels_dir,
    common_ur_bi_dir,
    shop_data_filter:pd.DataFrame,
    dim_goods:pd.DataFrame,
    dim_date:pd.DataFrame,
    logger:logging.Logger):
    '''实际销售'''
    # sales_data = pd.read_csv(os.path.join(ur_bi_dw_dir,'month_of_year_sales_amt.csv'), dtype={'dates.month_of_year':str}, index_col=0) # 月销售数据
    
    data_dir = os.path.join(common_ur_bi_dir,'dwd_daily_sales_size_all')
    cache_file_dir = os.path.join(labels_dir,'dwd_daily_sales_size_all')
    if not os.path.exists(cache_file_dir):
        os.makedirs(cache_file_dir)
    def data_convert(
        data:pd.DataFrame,
        dim_goods:pd.DataFrame,
        dim_date:pd.DataFrame,
        **kwargs,
        ):
        # 关联时间数据
        data = data.merge(
            dim_date[[
                'date_key','month_of_year',
            ]],
            on=['date_key'],
            how='inner',
        )
        # 关联商品数据
        data = data.merge(
            dim_goods[[
                'sku_no',
                'serial',
            ]].rename(columns={
                'sku_no':'sku_no',
                'serial':'goods.serial',
            }),
            on=['sku_no'],
            how='inner',
        )
        # 系列Y转W
        data['goods.serial'] = data['goods.serial'].map(lambda x: 'W' if x=='Y' else x)
        # 按 店-周 汇总数据
        data = data.groupby([
            'month_of_year',
            'shop_no',
            'goods.serial',
        ]).agg({
            'sales_amt': 'sum',
        }).reset_index()
        return data
    sales_data = DataHelper.load_split_data(
        data_dir=data_dir,
        cache_file_dir=cache_file_dir,
        logger=logger,
        convert_fun=data_convert,
        delete_last_cache=True,
        data_dtype={
            'date_key':int,
            'shop_no':str,
            'sku_no':str,
            'tag_price':float,
            'sales_qty':float,
            'sales_tag_amt':float,
            'sales_amt':float,
            'sales_count':int,
        },
        cache_dtype={
            'month_of_year':str,
            'shop_no':str,
            'goods.serial':str,
            'sales_amt':float,
        },
        dim_date=dim_date,
        dim_goods=dim_goods,
    )
    # 销售数据值处理
    sales_data['goods.serial'] = sales_data['goods.serial'].map(null_convert) # 空值转换

    # 筛选销售数据
    sales_data = sales_data[
        (sales_data['goods.serial'].isin(['A','M','W']))
        &(sales_data['shop_no'].isin(shop_data_filter['shop_no']))
    ]

    # 按系列汇总金额
    sales_group = sales_data.groupby([
        'shop_no',
        'goods.serial',
        'month_of_year',
        ])['sales_amt'].sum()
    sales_data = sales_group.reset_index()


    # 去掉月负数的,不然会导致训练异常,不能包含0,不然占比全为0
    sales_data = sales_data[sales_data['sales_amt']>0]

    return sales_data
pass
def convert_sales_goal_data(
    ur_bi_dw_dir,
    shop_data_filter:pd.DataFrame,
    logger:logging.Logger):
    '''销售目标'''
    # 月销售目标数据
    sales_goal_data = pd.read_csv(
        os.path.join(ur_bi_dw_dir,'month_of_year_sales_goal_amt.csv'),
        index_col=0,
        dtype={
            'sales_goal.serial':str,
            'dates.month_of_year':str
        },
    )
    ''',sales_goal.shop_no,sales_goal.serial,dates.month_of_year,sales_goal_amt'''
    sales_goal_data = sales_goal_data.rename(columns={
        'sales_goal.serial':'goods.serial',
        'dates.month_of_year':'month_of_year',
    })
    # 销售数据值处理
    sales_goal_data['goods.serial'] = sales_goal_data['goods.serial'].map(null_convert) # 空值转换
    
    # 筛选销售数据
    sales_goal_data = sales_goal_data[
        (sales_goal_data['goods.serial'].isin(['A','M','W']))
        &(sales_goal_data['sales_goal.shop_no'].isin(shop_data_filter['shop_no']))
    ]
    # 按系列汇总金额
    sales_goal_group = sales_goal_data.groupby([
        'sales_goal.shop_no',
        'goods.serial',
        'month_of_year',
        ])['sales_goal_amt'].sum()
    sales_goal_data = sales_goal_group.reset_index()

    # 去掉月负数的,不然会导致训练异常,不能包含0,不然占比全为0
    sales_goal_data = sales_goal_data[sales_goal_data['sales_goal_amt']>0]

    return sales_goal_data
pass
def convert_predict_data(
    ims_dir,
    ):
    '''预测的占比数据'''
    predict_data = pd.read_csv(os.path.join(ims_dir,'ims_w_amt_pro.csv'), index_col=0)
    '''
    ,id,year,plan_season,month,serial,shop_no,forecast_proportion,create_time,create_by,modify_time,modify_by
    '''
    predict_data['month_of_year'] = predict_data['year'].map('{:04d}'.format) + predict_data['month'].map('{:02d}'.format)
    return predict_data
pass
def convert_sales_percentage_data(
    labels_dir,
    sales_data:pd.DataFrame,
    logger:logging.Logger):
    '''计算销售占比'''
    # 计算店铺每月系列总销售金额,计算占比
    sales_group_data = sales_data.groupby(['shop_no','month_of_year'])
    # 店铺每月总销售金额
    sales_data['sales_amt_sum'] = sales_group_data['sales_amt'].transform('sum')
    # 系列占比
    sales_data['sales_amt_percentage'] = sales_data['sales_amt'] / sales_data['sales_amt_sum']
    
    # 过滤掉异常数据
    sales_data = sales_data[
        (sales_data['sales_amt_percentage']>0)&
        (sales_data['sales_amt_percentage']<1)
    ]

    # 系列数据值处理
    serial_data = sales_data['goods.serial']
    serial_data = serial_data.drop_duplicates() # 去重
    serial_data = serial_data.sort_values() # 排序
    # 系列序号映射转换
    serial_dictionary = serial_data.values
    serial_dictionary = to_yml(serial_dictionary, os.path.join(labels_dir,'serial.yml'))
    # print('serial_dictionary:', serial_dictionary)

    return sales_data
pass
def convert_sales_goal_percentage_data(
    sales_goal_data:pd.DataFrame,
    logger:logging.Logger):
    '''计算销售占比'''
    # 计算店铺每月系列总销售金额,计算占比
    sales_goal_group_data = sales_goal_data.groupby(['sales_goal.shop_no','month_of_year'])
    # 店铺每月总销售金额
    sales_goal_data['sales_goal_amt_sum'] = sales_goal_group_data['sales_goal_amt'].transform('sum')
    # 系列占比
    sales_goal_data['sales_goal_amt_percentage'] = sales_goal_data['sales_goal_amt'] / sales_goal_data['sales_goal_amt_sum']
    
    # 过滤掉异常数据
    sales_goal_data = sales_goal_data[
        (sales_goal_data['sales_goal_amt_percentage']>0)&
        (sales_goal_data['sales_goal_amt_percentage']<1)
    ]
    # 占比行转列
    sales_goal_percentage_data = convert_column(
        df=sales_goal_data,
        columns=['sales_goal.shop_no','month_of_year','goods.serial'],
        unstack_column='sales_goal_amt_percentage',
        default_value=0.0
    )
    return sales_goal_data, sales_goal_percentage_data
pass
def convert_column(df:pd.DataFrame,columns:list,unstack_column:str,default_value=None):
    '''
    将行转列
    
    Args:
        columns: 固定列
        unstack_column: 拆分列
        default_value: 默认空值
    '''
    # 将行转置成列
    df_index_data = df.set_index(columns)[unstack_column]
    df_index_data = df_index_data.unstack()
    # df_index_data = df_index_data.rename_axis(columns=None)
    df_index_data = df_index_data.reset_index()
    # df_index_data = pd.pivot_table(df,index=columns[:-1],columns=columns[-1:],values=unstack_column,dropna=False)
    # # print('df_index_data.columns:', df_index_data.columns)
    # df_index_data = df_index_data.reset_index(drop=False)
    if default_value is not None:
        df_index_data = df_index_data.fillna(default_value)
    return df_index_data
pass

def shop_labels_convert(
    other_dir,
    labels_dir,
    logger:logging.Logger,
):

    customer_labels_data = pd.read_csv(os.path.join(other_dir, 'shop_customer_labels.csv'))
    # 处理字符串列
    customer_labels_data = customer_labels_data.astype({
        'shopping_center_type':str,
        'shopping_center_position':str,
        'luxury_shopping_center':str,
        'fashion_type':str,
        'price_sensitivity_women':str,
        'price_sensitivity_men':str,
        'price_sensitivity_acc':str})
    
    # 生成映射
    # 购物中心类型序号映射转换
    shopping_center_type_dictionary = {}
    shopping_center_type_index = customer_labels_data[(customer_labels_data['shopping_center_type']=='nan')|(customer_labels_data['shopping_center_type']=='0')].index
    customer_labels_data.loc[shopping_center_type_index,'shopping_center_type'] = 'Unknown'
    # print(customer_labels_data)
    shopping_center_type = customer_labels_data['shopping_center_type'].drop_duplicates() # 去重
    shopping_center_type = shopping_center_type.sort_values() # 排序
    shopping_center_type_dictionary = shopping_center_type.values
    shopping_center_type_dictionary = to_yml(shopping_center_type_dictionary, os.path.join(labels_dir,'shopping_center_type.yml'))
    # print(shopping_center_type_dictionary)
    # 特征转下标
    customer_labels_data['shopping_center_type_index'] = customer_labels_data['shopping_center_type'].map(lambda x: shopping_center_type_dictionary['name2index'][x])


    # 商场定位趋势序号映射转换
    shopping_center_position_dictionary = {}
    shopping_center_position_index = customer_labels_data[(customer_labels_data['shopping_center_position']=='nan')|(customer_labels_data['shopping_center_position']=='0')].index
    customer_labels_data.loc[shopping_center_position_index,'shopping_center_position'] = 'Unknown'
    shopping_center_position = customer_labels_data['shopping_center_position'].drop_duplicates() # 去重
    shopping_center_position = shopping_center_position.sort_values() # 排序
    shopping_center_position_dictionary = shopping_center_position.values
    shopping_center_position_dictionary = to_yml(shopping_center_position_dictionary, os.path.join(labels_dir,'shopping_center_position.yml'))
    # print(shopping_center_position_dictionary)
    # 特征转下标
    customer_labels_data['shopping_center_position_index'] = customer_labels_data['shopping_center_position'].map(lambda x: shopping_center_position_dictionary['name2index'][x])


    # 是否重奢序号映射转换
    luxury_shopping_center_dictionary = {}
    luxury_shopping_center_index = customer_labels_data[(customer_labels_data['luxury_shopping_center']=='nan')|(customer_labels_data['luxury_shopping_center']=='0')].index
    customer_labels_data.loc[luxury_shopping_center_index,'luxury_shopping_center'] = 'Unknown'
    luxury_shopping_center = customer_labels_data['luxury_shopping_center'].drop_duplicates() # 去重
    luxury_shopping_center = luxury_shopping_center.sort_values() # 排序
    luxury_shopping_center_dictionary = luxury_shopping_center.values
    luxury_shopping_center_dictionary = to_yml(luxury_shopping_center_dictionary, os.path.join(labels_dir,'luxury_shopping_center.yml'))
    # print(luxury_shopping_center_dictionary)
    # 特征转下标
    customer_labels_data['luxury_shopping_center_index'] = customer_labels_data['luxury_shopping_center'].map(lambda x: luxury_shopping_center_dictionary['name2index'][x])


    # 商场时尚度 序号映射转换
    fashion_type_dictionary = {}
    fashion_type_index = customer_labels_data[(customer_labels_data['fashion_type']=='nan')|(customer_labels_data['fashion_type']=='0')].index
    customer_labels_data.loc[fashion_type_index,'fashion_type'] = 'Unknown'
    fashion_type = customer_labels_data['fashion_type'].drop_duplicates() # 去重
    fashion_type = fashion_type.sort_values() # 排序
    fashion_type_dictionary = fashion_type.values
    fashion_type_dictionary = to_yml(fashion_type_dictionary, os.path.join(labels_dir,'fashion_type.yml'))
    # print(fashion_type_dictionary)
    # 特征转下标
    customer_labels_data['fashion_type_index'] = customer_labels_data['fashion_type'].map(lambda x: fashion_type_dictionary['name2index'][x])


    # 客群价格敏感度|WOMEN 序号映射转换
    price_sensitivity_women_dictionary = {}
    price_sensitivity_women_index = customer_labels_data[(customer_labels_data['price_sensitivity_women']=='nan')|(customer_labels_data['price_sensitivity_women']=='0')].index
    customer_labels_data.loc[price_sensitivity_women_index,'price_sensitivity_women'] = 'Unknown'
    price_sensitivity_women = customer_labels_data['price_sensitivity_women'].drop_duplicates() # 去重
    price_sensitivity_women = price_sensitivity_women.sort_values() # 排序
    price_sensitivity_women_dictionary = price_sensitivity_women.values
    price_sensitivity_women_dictionary = to_yml(price_sensitivity_women_dictionary, os.path.join(labels_dir,'price_sensitivity_women.yml'))
    # print(price_sensitivity_women_dictionary)
    # 特征转下标
    customer_labels_data['price_sensitivity_women_index'] = customer_labels_data['price_sensitivity_women'].map(lambda x: price_sensitivity_women_dictionary['name2index'][x])

    # 客群价格敏感度|MEN 序号映射转换
    price_sensitivity_men_dictionary = {}
    price_sensitivity_men_index = customer_labels_data[(customer_labels_data['price_sensitivity_men']=='nan')|(customer_labels_data['price_sensitivity_men']=='0')].index
    customer_labels_data.loc[price_sensitivity_men_index,'price_sensitivity_men'] = 'Unknown'
    price_sensitivity_men = customer_labels_data['price_sensitivity_men'].drop_duplicates() # 去重
    price_sensitivity_men = price_sensitivity_men.sort_values() # 排序
    price_sensitivity_men_dictionary = price_sensitivity_men.values
    price_sensitivity_men_dictionary = to_yml(price_sensitivity_men_dictionary, os.path.join(labels_dir,'price_sensitivity_men.yml'))
    # print(price_sensitivity_men_dictionary)
    # 特征转下标
    customer_labels_data['price_sensitivity_men_index'] = customer_labels_data['price_sensitivity_men'].map(lambda x: price_sensitivity_men_dictionary['name2index'][x])

    # 客群价格敏感度|ACC 映射转换
    price_sensitivity_acc_dictionary = {}
    price_sensitivity_acc_index = customer_labels_data[(customer_labels_data['price_sensitivity_acc']=='nan')|(customer_labels_data['price_sensitivity_acc']=='0')].index
    customer_labels_data.loc[price_sensitivity_acc_index,'price_sensitivity_acc'] = 'Unknown'
    price_sensitivity_acc = customer_labels_data['price_sensitivity_acc'].drop_duplicates() # 去重
    price_sensitivity_acc = price_sensitivity_acc.sort_values() # 排序
    price_sensitivity_acc_dictionary = price_sensitivity_acc.values
    price_sensitivity_acc_dictionary = to_yml(price_sensitivity_acc_dictionary, os.path.join(labels_dir,'price_sensitivity_acc.yml'))
    # print(price_sensitivity_acc_dictionary)
    # 特征转下标
    customer_labels_data['price_sensitivity_acc_index'] = customer_labels_data['price_sensitivity_acc'].map(lambda x: price_sensitivity_acc_dictionary['name2index'][x])

    # 去除无用列
    customer_labels_data = customer_labels_data.drop(columns=[
        'shopping_center_type','shopping_center_position','luxury_shopping_center',
        'fashion_type','price_sensitivity_women','price_sensitivity_men','price_sensitivity_acc'],axis=1)
    # print(customer_labels_data)

    # 处理客群总量列
    resident_index = customer_labels_data[customer_labels_data['resident_amount'] == '无'].index
    office_index = customer_labels_data[customer_labels_data['office_amount'] == '无'].index
    customer_labels_data.loc[resident_index,'resident_amount'] = 0
    customer_labels_data.loc[office_index,'office_amount'] = 0
    customer_labels_data = customer_labels_data.fillna(0).astype({'resident_amount':int, 'office_amount':int})
    # print(customer_labels_data)

    return customer_labels_data
pass
def convert_shop_features_data(
    shop_data_filter:pd.DataFrame,
    labels_dir,
    predict_date:datetime.datetime,
    logger:logging.Logger):
    shop_features_data = shop_data_filter.copy()
    # 计算开店时长
    shop_features_data.loc[shop_features_data['open_date']=='Unknown', 'open_date'] = predict_date.strftime('%Y-%m-%d')
    shop_features_data.loc[shop_features_data['new_open_date']=='Unknown', 'new_open_date'] = predict_date.strftime('%Y-%m-%d')
    shop_features_data['open_days'] = (predict_date-pd.to_datetime(shop_features_data['open_date'])).dt.days
    shop_features_data['new_open_days'] = (predict_date-pd.to_datetime(shop_features_data['new_open_date'])).dt.days
    # 列转下标,需要转下标的列
    columns_filter = [
        # 'shop_no', # 店铺编码
        # 'shop_name', # 店铺名称
        # 'brand', # 品牌
        # 'currency_code', # 币种
        # 'shop_type', # 店铺类型
        # 'shop_type_desc', # 店铺类型描述
        'shop_sort', # 店铺归类
        # 'shop_sort_desc', # 店铺归类描述
        'region_no', # 大区编码
        # 'region_name', # 大区名称
        'subregion_no', # 小区编码
        # 'subregion_name', # 小区名称
        'bussinesscircle_code', # 商圈编码
        # 'bussinesscircle_name', # 商圈名称
        # 'country_code', # 国家编码
        # 'country_name', # 国家名称
        # 'province', # 省份编码
        'province_name', # 省份名称
        # 'city_code', # 城市编码
        'city_name', # 城市名称
        # 'city_level_code', # 城市等级编码
        'city_level_name', # 城市等级名称
        'district', # 城市区域
        # 'open_date', # 开业日期
        # 'new_open_date', # 再次开业日期
        'shop_property', # 店铺属性
        'is_new_shop', # 是否新老店
        'is_kids', # 是否童装店
        # 'is_same_shop', # 是否同店
        # 'is_promote', # 是否特卖
        'is_streetstore', # 是否街铺
        'is_outrant', # 是否外租仓
        # 'contract_size', # 合同面积
        # 'shop_size', # 实用面积
        # 'sales_size', # 销售面积
        'shop_dis_level', # 配货等级
        'sales_level', # 销售等级
        'image_level', # 形象级别
        'shop_level', # 店铺级别
        'ppsregion_no', # 企划分区
        # 'ppsregion_name', # 企划分区名称
        'temperature_zone', # 气温带
        'store_status', # 店铺状态
        # 'lon', # 经度
        # 'lat', # 纬度
        'store_centercode', # 分部编码
        # 'store_centername', # 分部名称
    ]
    # 所有列
    all_columns_filter = [
        'shop_no', # 店铺编码*
        # 'shop_name', # 店铺名称
        # 'brand', # 品牌
        # 'currency_code', # 币种
        # 'shop_type', # 店铺类型
        # 'shop_type_desc', # 店铺类型描述
        'shop_sort', # 店铺归类
        # 'shop_sort_desc', # 店铺归类描述
        'region_no', # 大区编码
        # 'region_name', # 大区名称
        'subregion_no', # 小区编码
        # 'subregion_name', # 小区名称
        'bussinesscircle_code', # 商圈编码
        # 'bussinesscircle_name', # 商圈名称
        # 'country_code', # 国家编码
        # 'country_name', # 国家名称
        # 'province', # 省份编码
        'province_name', # 省份名称
        # 'city_code', # 城市编码
        'city_name', # 城市名称
        # 'city_level_code', # 城市等级编码
        'city_level_name', # 城市等级名称
        'district', # 城市区域
        # 'open_date', # 开业日期*
        # 'new_open_date', # 再次开业日期*
        'open_days', # 开业天数*
        'new_open_days', # 再次开业天数*
        'shop_property', # 店铺属性
        'is_new_shop', # 是否新老店
        'is_kids', # 是否童装店
        # 'is_same_shop', # 是否同店
        # 'is_promote', # 是否特卖
        'is_streetstore', # 是否街铺
        'is_outrant', # 是否外租仓
        'contract_size', # 合同面积*
        'shop_size', # 实用面积*
        'sales_size', # 销售面积*
        'shop_dis_level', # 配货等级
        'sales_level', # 销售等级
        'image_level', # 形象级别
        'shop_level', # 店铺级别
        'ppsregion_no', # 企划分区
        # 'ppsregion_name', # 企划分区名称
        'temperature_zone', # 气温带
        'store_status', # 店铺状态
        'lon', # 经度*
        'lat', # 纬度*
        'store_centercode', # 分部编码
        # 'store_centername', # 分部名称
    ]
    select_columns = [] # 选中列
    dictionary_list = {}
    for c in all_columns_filter:
        # 判断是否需要转下标
        if c in columns_filter:
            # 去重
            values = shop_features_data[c].drop_duplicates().values
            values_indexes_dictionary = unknown_first(list(values))
            # 下标字典保存到文件
            values_indexes_dictionary = to_yml(values_indexes_dictionary, os.path.join(labels_dir, c+'.yml'))
            dictionary_list[c] = values_indexes_dictionary
            # 转下标
            shop_features_data[c+'_index'] = shop_features_data[c].map(lambda x: values_indexes_dictionary['name2index'][x])
            select_columns.append(c+'_index')
        else:
            select_columns.append(c)
    shop_features_data = shop_features_data[select_columns]
    return shop_features_data, dictionary_list
pass
def data_convert(
    other_dir,
    labels_dir,
    serial_ur_bi_dw_dir,
    ims_dir,
    predict_date:datetime.datetime,
    logger:logging.Logger,
):
    logger.info('开始数据转换')
    # 创建子目录
    if not os.path.exists(labels_dir):
        os.makedirs(labels_dir)

    # 创建子目录
    if not os.path.exists(other_dir):
        os.makedirs(other_dir)

    common_datas_dir = ShareArgs.get_args_value('common_datas_dir')
    common_ur_bi_dir = os.path.join(common_datas_dir, 'ur_bi_data')
    common_clickhouse_path = os.path.join(common_datas_dir, 'clickhouse_data')

    # 加载日期数据.。去掉列的前缀,然后将季节进行转换
    dim_date = convert_date_data(common_ur_bi_dir, labels_dir)

    # 店铺数据。去掉前缀、修正数据、空值转换、数据过滤、挑选字段列、保存店铺编码和名称字典,将过滤数据进行返回
    shop_data_filter = convert_shop_data(
        common_ur_bi_dir=common_ur_bi_dir,
        labels_dir=labels_dir,
        logger=logger
    )
    # print('shop_data_filter:', shop_data_filter.columns)

    # 加载SKU数据
    dim_goods, serial_code2name_dictionary = convert_goods_data(
        common_ur_bi_dir=common_ur_bi_dir,
        labels_dir=labels_dir,
        logger=logger
    )

    # 加载销售数据。分批读取并转换数据,然后将数据返回和存起来,如果曾经转换过,就不再进行转换了。
    sales_data = convert_sales_data(
        labels_dir=labels_dir,
        common_ur_bi_dir=common_ur_bi_dir,
        shop_data_filter=shop_data_filter,
        dim_goods=dim_goods,
        dim_date=dim_date,
        logger=logger
    )

    # 加载销售目标数据。读取月销售目标数据、空值处理,筛选销售数据、按店月系列汇总金额
    sales_goal_data = convert_sales_goal_data(
        ur_bi_dw_dir=serial_ur_bi_dw_dir,
        shop_data_filter=shop_data_filter,
        logger=logger
    )

    # 加载预测占比数据。将年和月字段拼成一个年月字段
    predict_data = convert_predict_data(
        ims_dir=ims_dir,
    )
    # print('predict_data:', predict_data.columns)

    # 计算销售占比数据。计算店月所有系列总销售金额,即店铺每月总销售金额。然后计算每个系列占比
    sales_data = convert_sales_percentage_data(
        labels_dir=labels_dir,
        sales_data=sales_data,
        logger=logger
    )

    # 计算销售目标占比数据
    sales_goal_data, sales_goal_percentage_data = convert_sales_goal_percentage_data(
        sales_goal_data=sales_goal_data,
        logger=logger
    )

    # print('shop_data_filter:', shop_data_filter[shop_data_filter['shop_no']=='URCN0410'])
    # # 去除没销售的店
    # shop_data_filter = shop_data_filter[shop_data_filter['shop_no'].isin(sales_data['shop_no'])]

    # 当月销售占比数据
    now_sales_data = sales_data[sales_data['month_of_year'] == predict_date.strftime('%Y%m')]

    # 当月销售目标占比数据
    now_sales_goal_data = sales_goal_data[sales_goal_data['month_of_year'] == predict_date.strftime('%Y%m')]

    # 当月预测占比数据
    now_predict_data = predict_data[predict_data['month_of_year'] == predict_date.strftime('%Y%m')]
    
    # 店SKU数

    # 店已审核配货单

    # print('shop_data_filter:', shop_data_filter[shop_data_filter['shop_no']=='URCN0410'])

    # 转换店铺特征数据。将字符串特征变成数字编码,数字编码才可以进行embedding
    shop_features_data, dictionary_list = convert_shop_features_data(
        shop_data_filter=shop_data_filter,
        labels_dir=labels_dir,
        predict_date=predict_date,
        logger=logger
    )

    # 店铺和客群特征。
    customer_labels_data = shop_labels_convert(
        other_dir=other_dir,
        labels_dir=labels_dir,
        logger=logger
    )

    # 保存到文件。。黄俊雄:训练和预测都是从这些文件里面读数据
    shop_data_filter.to_csv(os.path.join(labels_dir,'shop_data_filter.csv'), index=False)
    logger.info('已导出: %s', os.path.join(labels_dir,'shop_data_filter.csv'))
    sales_data.to_csv(os.path.join(labels_dir,'sales_data.csv'), index=False)
    logger.info('已导出: %s', os.path.join(labels_dir,'sales_data.csv'))
    sales_goal_percentage_data.to_csv(os.path.join(labels_dir,'sales_goal_percentage_data.csv'), index=False)
    logger.info('已导出: %s', os.path.join(labels_dir,'sales_goal_percentage_data.csv'))
    now_sales_data.to_csv(os.path.join(labels_dir,'now_sales_data.csv'), index=False)
    logger.info('已导出: %s', os.path.join(labels_dir,'now_sales_data.csv'))
    now_sales_goal_data.to_csv(os.path.join(labels_dir,'now_sales_goal_data.csv'), index=False)
    logger.info('已导出: %s', os.path.join(labels_dir,'now_sales_goal_data.csv'))
    now_predict_data.to_csv(os.path.join(labels_dir,'now_predict_data.csv'), index=False)
    logger.info('已导出: %s', os.path.join(labels_dir,'now_predict_data.csv'))
    shop_features_data.to_csv(os.path.join(labels_dir,'shop_features_data.csv'), index=False)
    logger.info('已导出: %s', os.path.join(labels_dir,'shop_features_data.csv'))
    customer_labels_data.to_csv(os.path.join(labels_dir,'customer_labels_feature.csv'), index=False)
    logger.info('已导出: %s', os.path.join(labels_dir,'customer_labels_feature.csv'))
    logger.info('完成数据转换')
pass







# data_convert(
#     other_dir=other_dir,  # 用户业务给的额外数据,不是从数据库里面读取的
#     labels_dir=labels_dir,  # 数据预处理之后文件的存放位置
#     serial_ur_bi_dw_dir=serial_ur_bi_dw_dir,  # 占比模型需要使用到的一些数据存放路径
#     ims_dir=ims_save_dir,  # 从ims数据库里面读取的文件数据。其实就是另外一个模型预测出来的占比数据
#     predict_date=predict_date,  # 
#     logger=self.logger
# )
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐