




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第如何使用Python读取Hive数据库?port=21051,
database=ur_AI_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
logger:logging.Logger=None
self.host=host
self.port=port
self.database=database
self.auth_mechanism=auth_mechanism
self.user=user
self.password=password
self.logger=logger
self.impala_conn=None
self.conn=None
self.cursor=None
self.engine=None
self.session=None
defcreate_table_code(self,file_name):
创建表类代码
os.system(fsqlacodegen{self.connection_str}{file_name})
returnself.conn
defget_conn(self):
创建连接或获取连接
ifself.connisNone:
engine=self.get_engine()
self.conn=engine.connect()
returnself.conn
defget_impala_conn(self):
创建连接或获取连接
ifself.impala_connisNone:
self.impala_conn=connect(
host=self.host,
port=self.port,
database=self.database,
auth_mechanism=self.auth_mechanism,
user=self.user,
password=self.password
returnself.impala_conn
defget_engine(self):
创建连接或获取连接
ifself.engineisNone:
self.engine=sqlalchemy.create_engine(impala://,creator=self.get_impala_conn)
returnself.engine
defget_cursor(self):
创建连接或获取连接
ifself.cursorisNone:
self.cursor=self.conn.cursor()
returnself.cursor
defget_session(self)-sessionmaker:
创建连接或获取连接
ifself.sessionisNone:
engine=self.get_engine()
Session=sessionmaker(bind=engine)
self.session=Session()
returnself.session
defclose_conn(self):
关闭连接
ifself.connisnotNone:
self.conn.close()
self.conn=None
self.dispose_engine()
self.close_impala_conn()
defclose_impala_conn(self):
关闭impala连接
ifself.impala_connisnotNone:
self.impala_conn.close()
self.impala_conn=None
defclose_session(self):
关闭连接
ifself.sessionisnotNone:
self.session.close()
self.session=None
self.dispose_engine()
defdispose_engine(self):
释放engine
ifself.engineisnotNone:
#self.engine.dispose(close=False)
self.engine.dispose()
self.engine=None
defclose_cursor(self):
关闭cursor
ifself.cursorisnotNone:
self.cursor.close()
self.cursor=None
defget_data(self,sql,auto_close=True)-pd.DataFrame:
查询数据
conn=self.get_conn()
data=None
try:
#异常重试3次
foriinrange(3):
try:
data=pd.read_sql(sql,conn)
break
exceptExceptionasex:
ifi==2:
raiseex#往外抛出异常
time.sleep(60)#一分钟后重试
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
ifauto_close:
self.close_conn()
returndata
classVarsHelper():
def__init__(self,save_dir,auto_save=True):
self.save_dir=save_dir
self.auto_save=auto_save
self.values={}
ifnotos.path.exists(os.path.dirname(self.save_dir)):
os.makedirs(os.path.dirname(self.save_dir))
ifos.path.exists(self.save_dir):
withopen(self.save_dir,rb)asf:
self.values=pickle.load(f)
f.close()
defset_value(self,key,value):
self.values[key]=value
ifself.auto_save:
self.save_file()
defget_value(self,key):
returnself.values[key]
defhas_key(self,key):
returnkeyinself.values.keys()
defsave_file(self):
withopen(self.save_dir,wb)asf:
pickle.dump(self.values,f)
f.close()
classGlobalShareArgs():
args={
debug:False
defget_args():
returnGlobalShareArgs.args
defset_args(args):
GlobalShareArgs.args=args
defset_args_value(key,value):
GlobalShareArgs.args[key]=value
defget_args_value(key,default_value=None):
returnGlobalShareArgs.args.get(key,default_value)
defcontain_key(key):
returnkeyinGlobalShareArgs.args.keys()
defupdate(args):
GlobalShareArgs.args.update(args)
classShareArgs():
args={
labels_dir:./hjx/shop_group/month_w_amt/data/labels,#标签目录
labels_output_dir:./hjx/shop_group/month_w_amt/data/labels_output,#聚类导出标签目录
common_datas_dir:./hjx/data,#共用数据目录。ur_bi_dw的公共
only_predict:False,#只识别,不训练
delete_model:True,#先删除模型,仅在训练时使用
export_excel:False,#导出excel
classes:12,#聚类数
batch_size:16,
hidden_size:32,
max_nrof_epochs:100,
learning_rate:0.0005,
loss_type:categorical_crossentropy,
avg_model_num:10,
steps_per_epoch:4.0,#4.0
lr_callback_patience:4,
lr_callback_cooldown:1,
early_stopping_callback_patience:6,
get_data:True,
defget_args():
returnShareArgs.args
defset_args(args):
ShareArgs.args=args
defset_args_value(key,value):
ShareArgs.args[key]=value
defget_args_value(key,default_value=None):
returnShareArgs.args.get(key,default_value)
defcontain_key(key):
returnkeyinShareArgs.args.keys()
defupdate(args):
ShareArgs.args.update(args)
classUrBiGetDatasBase():
#线程锁列表,同保存路径共用锁
lock_dict:Dict[str,threading.Lock]={}
#时间列表,用于判断是否超时
time_dict:Dict[str,datetime.datetime]={}
#用于记录是否需要更新超时时间
get_data_timeout_dict:Dict[str,bool]={}
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=None,
logger:logging.Logger=None,
self.save_dir=save_dir
self.logger=logger
self.db_helper=HiveHelper(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
logger=logger
#创建子目录
ifself.save_dirisnotNoneandnotos.path.exists(self.save_dir):
os.makedirs(self.save_dir)
self.vars_helper=None
ifGlobalShareArgs.get_args_value(debug):
self.vars_helper=VarsHelper(./hjx/data/vars/UrBiGetDatas)
defclose(self):
关闭连接
self.db_helper.close_conn()
defget_last_time(self,key_name)-bool:
获取是否超时
#转静态路径,确保唯一性
key_name=os.path.abspath(key_name)
ifself.vars_helperisnotNoneandself.vars_helper.has_key(UrBiGetDatasBase.time_list):
UrBiGetDatasBase.time_dict=self.vars_helper.get_value(UrBiGetDatasBase.time_list)
timeout=12#12小时
ifGlobalShareArgs.get_args_value(debug):
timeout=24#24小时
get_data_timeout=False
ifkey_namenotinUrBiGetDatasBase.time_dict.keys()or(datetime.datetime.today()-UrBiGetDatasBase.time_dict[key_name]).total_seconds()(timeout*60*60):
(超时%d小时,重新查数据:%s,timeout,key_name)
#UrBiGetDatasBase.time_list[key_name]=datetime.datetime.today()
get_data_timeout=True
else:
(未超时%d小时,跳过查数据:%s,timeout,key_name)
#ifself.vars_helperisnotNone:
#self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_list)
UrBiGetDatasBase.get_data_timeout_dict[key_name]=get_data_timeout
returnget_data_timeout
defsave_last_time(self,key_name):
更新状态超时
#转静态路径,确保唯一性
key_name=os.path.abspath(key_name)
ifUrBiGetDatasBase.get_data_timeout_dict[key_name]:
UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()
ifself.vars_helperisnotNone:
UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()
self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_dict)
defget_lock(self,key_name)-threading.Lock:
获取锁
#转静态路径,确保唯一性
key_name=os.path.abspath(key_name)
ifkey_namenotinUrBiGetDatasBase.lock_dict.keys():
UrBiGetDatasBase.lock_dict[key_name]=threading.Lock()
returnUrBiGetDatasBase.lock_dict[key_name]
defget_data_of_date(
self,
save_dir,
sql,
sort_columns:List[str],
del_index_list=[-1],#删除最后下标
start_date=datetime.datetime(2017,1,1),#开始时间
offset=relativedelta(months=3),#时间间隔
date_format_fun=lambdad:%04d%02d01%(d.year,d.month),#查询语句中替代时间参数的格式化
filename_format_fun=lambdad:%04d%02d.csv%(d.year,d.month),#查询语句中替代时间参数的格式化
stop_date=20700101,#超过时间则停止
data_format_fun=None,#格式化数据
分时间增量读取数据
#创建文件夹
ifnotos.path.exists(save_dir):
os.makedirs(save_dir)
else:
#删除最后一个文件
file_list=os.listdir(save_dir)
iflen(file_list)0:
file_list.sort()
fordel_indexindel_index_list:
os.remove(os.path.join(save_dir,file_list[del_index]))
print(删除最后一个文件:,file_list[del_index])
select_index=-1
#start_date=datetime.datetime(2017,1,1)
whileTrue:
end_date=start_date+offset
start_date_str=date_format_fun(start_date)
end_date_str=date_format_fun(end_date)
(date:%s-%s,start_date_str,end_date_str)
file_path=os.path.join(save_dir,filename_format_fun(start_date))
#(file_path:%s,file_path)
ifnotos.path.exists(file_path):
data:pd.DataFrame=self.db_helper.get_data(sql%(start_date_str,end_date_str))
ifdataisNone:
break
(data:%d,len(data))
#(data:%d,data.columns)
iflen(data)0:
select_index+=1
ifdata_format_funisnotNone:
data=data_format_fun(data)
#排序
data=data.sort_values(sort_columns)
data.to_csv(file_path)
elifselect_index!=-1:
break
elifstop_datestart_date_str:
raiseException(读取数据异常,时间超出最大值!)
start_date=end_date
classUrBiGetDatas(UrBiGetDatasBase):
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./hjx/data/ur_bi_dw_data,
logger:logging.Logger=None
self.save_dir=save_dir
self.logger=logger
super().__init__(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
defget_dim_date(self):
日期数据
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_date
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_date.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_date.date_key])
data.to_csv(file_path)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_dim_shop(self):
店铺数据
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_shop.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_shop
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_shop.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_shop.shop_no])
data.to_csv(file_path)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_dim_vip(self):
会员数据
sub_dir=os.path.join(self.save_dir,vip_no)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(sub_dir):
return
sql=SELECTdv.*,dd.date_key,dd.date_name2
FROMur_bi_dw.dim_vipasdv
INNERJOINur_bi_dw.dim_dateasdd
ONdv.card_create_date=dd.date_name2
wheredd.date_key=%s
anddd.date_key%s
#data:pd.DataFrame=self.db_helper.get_data(sql)
sort_columns=[dv.vip_no]
#TODO:
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#开始时间
offset=relativedelta(years=1)
#更新超时时间
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_weather(self):
天气数据
sub_dir=os.path.join(self.save_dir,weather)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(sub_dir):
return
sql=
selectweather.*fromur_bi_ods.ods_base_weather_data_1200asweather
whereweather.date_key=%sandweather.date_key%s
sort_columns=[weather.date_key,weather.areaid]
defdata_format_fun(data):
columns=list(data.columns)
columns={c:weather.+cforcincolumns}
data=data.rename(columns=columns)
returndata
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
del_index_list=[-2,-1],#删除最后下标
data_format_fun=data_format_fun,
#更新超时时间
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_weather_city(self):
天气城市数据
file_path=os.path.join(self.save_dir,ur_bi_dw.weather_city.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_weather_cityasweather_city
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:weather_city.+cforcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_dim_goods(self):
货品数据
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_goods
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_goods.+cforcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_dim_goods_market_shop_date(self):
店铺商品生命周期数据
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_shop_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date
sql=
selectshop_no,sku_no,shop_market_date,lifecycle_end_date,lifecycle_days
FROMur_bi_dw.dim_goods_market_shop_date
wherelifecycle_end_dateisnotnull
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(lifecycle_end_date.,)forcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([shop_market_date])
data.to_csv(file_path,index=False)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_dim_goods_market_date(self):
全国商品生命周期数据
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
sql=
select*FROMur_bi_dw.dim_goods_market_date
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_goods_market_date.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_goods_market_date.sku_no])
data.to_csv(file_path,index=False)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_dim_goods_color_dev_sizes(self):
商品开发码数数据
file_path=os.path.join(self.save_dir,dim_goods_color_dev_sizes.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date
sql=SELECT*FROMur_bi_dm.dim_goods_color_dev_sizes
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(dim_goods_color_dev_sizes.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_dwd_daily_sales_size(self):
实际销售金额
sub_dir=os.path.join(self.save_dir,dwd_daily_sales_size_all)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(sub_dir):
return
sql=
selectshop_no,sku_no,date_key,`size`,
sum(tag_price)as`tag_price`,
sum(sales_qty)as`sales_qty`,
sum(sales_tag_amt)as`sales_tag_amt`,
sum(sales_amt)as`sales_amt`,
count(0)as`sales_count`
fromur_bi_dw.dwd_daily_sales_sizeassales
wheresales.date_key=%sandsales.date_key%s
andsales.currency_code=CNY
groupbyshop_no,sku_no,date_key,`size`
sort_columns=[date_key,shop_no,sku_no]
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#开始时间
#更新超时时间
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_dwd_daily_delivery_size(self):
实际配货金额
sub_dir=os.path.join(self.save_dir,dwd_daily_delivery_size_all)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(sub_dir):
return
sql=
selectshop_no,sku_no,date_key,`size`,
sum(delivery.shop_distr_received_qty)as`shop_distr_received_qty`,
sum(delivery.shop_distr_received_amt)as`shop_distr_received_amt`,
sum(delivery.online_distr_received_qty)as`online_distr_received_qty`,
sum(delivery.online_distr_received_amt)as`online_distr_received_amt`,
sum(delivery.pr_received_qty)as`pr_received_qty`,
count(0)as`delivery_count`
fromur_bi_dw.dwd_daily_delivery_sizeasdelivery
wheredelivery.date_key=%sanddelivery.date_key%s
anddelivery.currency_code=CNY
groupbyshop_no,sku_no,date_key,`size`
sort_columns=[date_key,shop_no,sku_no]
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#开始时间
#更新超时时间
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_v_last_nation_sales_status(self):
商品畅滞销数据
file_path=os.path.join(self.save_dir,v_last_nation_sales_status.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.v_last_nation_sales_status
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(v_last_nation_sales_status.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_dwd_daily_finacial_goods(self):
商品成本价数据
file_path=os.path.join(self.save_dir,dwd_daily_finacial_goods.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
sql=
selectt1.sku_no,t1.`size`,t1.cost_tax_inclfromur_bi_dw.dwd_daily_finacial_goodsast1
innerjoin(
selectsku_no,`size`,max(date_key)asdate_key
fromur_bi_dw.dwd_daily_finacial_goods
wherecurrency_code=CNYandcountry_code=CN
groupbysku_no,`size`
)ast2
ont2.sku_no=t1.sku_no
andt2.`size`=t1.`size`
andt2.date_key=t1.date_key
wheret1.currency_code=CNYandt1.country_code=CN
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(t1.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_dim_size_group(self):
尺码映射数据
file_path=os.path.join(self.save_dir,dim_size_group.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
sql=select*fromur_bi_dw.dim_size_group
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(dim_size_group.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_common_datas(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
logger:logging.Logger=None):
#共用文件
common_datas_dir=ShareArgs.get_args_value(common_datas_dir)
common_ur_bi_dir=os.path.join(common_datas_dir,ur_bi_data)
ur_bi_get_datas=UrBiGetDatas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=common_ur_bi_dir,
logger=logger
try:
(正在查询日期数据...)
ur_bi_get_datas.get_dim_date()
(查询日期数据完成!)
(正在查询店铺数据...)
ur_bi_get_datas.get_dim_shop()
(查询店铺数据完成!)
(正在查询天气数据...)
ur_bi_get_datas.get_weather()
(查询天气数据完成!)
(正在查询天气城市数据...)
ur_bi_get_datas.get_weather_city()
(查询天气城市数据完成!)
(正在查询货品数据...)
ur_bi_get_datas.get_dim_goods()
(查询货品数据完成!)
(正在查询实际销量数据...)
ur_bi_get_datas.get_dwd_daily_sales_size()
(查询实际销量数据完成!)
exceptExceptionasex:
logger.exception(ex)
raiseex#往外抛出异常
finally:
ur_bi_get_datas.close()
classCustomUrBiGetDatas(UrBiGetDatasBase):
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./hjx/data/ur_bi_data,
logger:logging.Logger=None
self.save_dir=save_dir
self.logger=logger
super().__init__(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
defget_sales_goal_amt(self):
销售目标金额
file_path=os.path.join(self.save_dir,month_of_year_sales_goal_amt.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
sql=
selectsales_goal.shop_no,
if(sales_goal.serial=Y,W,sales_goal.serial)as`sales_goal.serial`,
dates.month_of_year,
sum(sales_goal.sales_goal_amt)assales_goal_amt
fromur_bi_dw.dwd_sales_goal_westassales_goal
innerjoinur_bi_dw.dim_dateasdates
onsales_goal.date_key=dates.date_key
groupbysales_goal.shop_no,
if(sales_goal.serial=Y,W,sales_goal.serial),
dates.month_of_year
data:pd.DataFrame=self.db_helper.get_data(sql)
data=data.rename(columns={
shop_no:sales_goal.shop_no,
serial:sales_goal.serial,
month_of_year:dates.month_of_year,
#排序
data=data.sort_values([sales_goal.shop_no,sales_goal.serial,dates.month_of_year])
data.to_csv(file_path)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_shop_serial_area(self):
店-系列面积
file_path=os.path.join(self.save_dir,shop_serial_area.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加锁
try:
#设置超时4小时才重新查数据
ifnotself.get_last_time(file_path):
return
sql=
selectshop_serial_area.shop_no,
if(shop_serial_area.serial=Y,W,shop_serial_area.serial)as`shop_serial_area.serial`,
shop_serial_area.month_of_year,
sum(shop_serial_area.area)as`shop_serial_area.area`
fromur_bi_dw.dwd_shop_serial_areaasshop_serial_area
whereshop_serial_area.areaisnotnull
groupbyshop_serial_area.shop_no,if(shop_serial_area.serial=Y,W,shop_serial_area.serial),shop_serial_area.month_of_year
data:pd.DataFrame=self.db_helper.get_data(sql)
data=data.rename(columns={
shop_no:shop_serial_area.shop_no,
serial:shop_serial_area.serial,
month_of_year:shop_serial_area.month_of_year,
area:shop_serial_area.area,
#排序
data=data.sort_values([shop_serial_area.shop_no,shop_serial_area.serial,shop_serial_area.month_of_year])
data.to_csv(file_path)
#更新超时时间
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外抛出异常
finally:
now_lock.release()#释放锁
defget_datas(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./data/sales_forecast/ur_bi_dw_data,
logger:logging.Logger=None):
ur_bi_get_datas=CustomUrBiGetDatas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
try:
#店,系列,品类,年月,销售目标金额
(正在查询年月销售目标金额数据...)
ur_bi_get_datas.get_sales_goal_amt()
(查询年月销售目标金额数据完成!)
exceptExceptionasex:
logger.exception(ex)
raiseex#往外抛出异常
finally:
ur_bi_get_datas.close()
defgetdata_ur_bi_dw(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./data/sales_forecast/ur_bi_dw_data,
logger=None
get_common_datas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
logger=logger
get_datas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
#代码入口
#getdata_ur_bi_dw(
#host=ur_bi_dw_host,
#port=ur_bi_dw_port,
#database=ur_bi_dw_database,
#auth_mechanism=ur_bi_dw_auth_mechanism,
#user=ur_bi_dw_user,
#password=ur_bi_dw_password,
#save_dir=ur_bi_dw_save_dir,
#logger=logger
#)
代码说明和领悟
每个类的具体作用说明,代码
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 行政管理专业经济法认知试题及答案
- 毕业课程设计答辩要点解析
- 项目管理中的绩效激励机制试题及答案
- 冲刺抢分卷09 备战2025年高考考前仿真模拟卷冲刺抢分卷化学试题09 (辽宁、黑龙江、吉林、内蒙古专用) 含解析
- 市政工程领域全景式试题及答案
- 项目管理知识体系的结构试题及答案
- 合作社社区服务与农产品供应协议
- 核心考点解读2025年公共关系学试题及答案
- 中级经济师基础知识试题及答案解答
- 机械工程CAD绘图技巧与题解
- 2025展览馆装饰工程合同范本
- 2019-2025年房地产经纪协理之房地产经纪操作实务过关检测试卷B卷附答案
- 初中历史明清时期的科技与文化 课件 2024-2025学年统编版七年级历史下册
- 2025年上半年发展对象题库(含答案)
- 大连银行招聘笔试真题2024
- 输血管理制度
- 信息必刷卷04(广东省卷专用)2025年中考数学(原卷版)
- 膝关节韧带损伤护理查房
- GB/T 21196.2-2025纺织品马丁代尔法织物耐磨性的测定第2部分:试样破损的测定
- 中国传统文化-剪纸艺术知到课后答案智慧树章节测试答案2025年春石河子大学
- 重庆市2025年中考数学模拟试题(含答案)
评论
0/150
提交评论