如何使用Python读取Hive数据库_第1页
如何使用Python读取Hive数据库_第2页
如何使用Python读取Hive数据库_第3页
如何使用Python读取Hive数据库_第4页
如何使用Python读取Hive数据库_第5页
已阅读5页,还剩44页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第如何使用Python读取Hive数据库?port=21051,

database=ur_AI_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

logger:logging.Logger=None

self.host=host

self.port=port

self.database=database

self.auth_mechanism=auth_mechanism

self.user=user

self.password=password

self.logger=logger

self.impala_conn=None

self.conn=None

self.cursor=None

self.engine=None

self.session=None

defcreate_table_code(self,file_name):

创建表类代码

os.system(fsqlacodegen{self.connection_str}{file_name})

returnself.conn

defget_conn(self):

创建连接或获取连接

ifself.connisNone:

engine=self.get_engine()

self.conn=engine.connect()

returnself.conn

defget_impala_conn(self):

创建连接或获取连接

ifself.impala_connisNone:

self.impala_conn=connect(

host=self.host,

port=self.port,

database=self.database,

auth_mechanism=self.auth_mechanism,

user=self.user,

password=self.password

returnself.impala_conn

defget_engine(self):

创建连接或获取连接

ifself.engineisNone:

self.engine=sqlalchemy.create_engine(impala://,creator=self.get_impala_conn)

returnself.engine

defget_cursor(self):

创建连接或获取连接

ifself.cursorisNone:

self.cursor=self.conn.cursor()

returnself.cursor

defget_session(self)-sessionmaker:

创建连接或获取连接

ifself.sessionisNone:

engine=self.get_engine()

Session=sessionmaker(bind=engine)

self.session=Session()

returnself.session

defclose_conn(self):

关闭连接

ifself.connisnotNone:

self.conn.close()

self.conn=None

self.dispose_engine()

self.close_impala_conn()

defclose_impala_conn(self):

关闭impala连接

ifself.impala_connisnotNone:

self.impala_conn.close()

self.impala_conn=None

defclose_session(self):

关闭连接

ifself.sessionisnotNone:

self.session.close()

self.session=None

self.dispose_engine()

defdispose_engine(self):

释放engine

ifself.engineisnotNone:

#self.engine.dispose(close=False)

self.engine.dispose()

self.engine=None

defclose_cursor(self):

关闭cursor

ifself.cursorisnotNone:

self.cursor.close()

self.cursor=None

defget_data(self,sql,auto_close=True)-pd.DataFrame:

查询数据

conn=self.get_conn()

data=None

try:

#异常重试3次

foriinrange(3):

try:

data=pd.read_sql(sql,conn)

break

exceptExceptionasex:

ifi==2:

raiseex#往外抛出异常

time.sleep(60)#一分钟后重试

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

ifauto_close:

self.close_conn()

returndata

classVarsHelper():

def__init__(self,save_dir,auto_save=True):

self.save_dir=save_dir

self.auto_save=auto_save

self.values={}

ifnotos.path.exists(os.path.dirname(self.save_dir)):

os.makedirs(os.path.dirname(self.save_dir))

ifos.path.exists(self.save_dir):

withopen(self.save_dir,rb)asf:

self.values=pickle.load(f)

f.close()

defset_value(self,key,value):

self.values[key]=value

ifself.auto_save:

self.save_file()

defget_value(self,key):

returnself.values[key]

defhas_key(self,key):

returnkeyinself.values.keys()

defsave_file(self):

withopen(self.save_dir,wb)asf:

pickle.dump(self.values,f)

f.close()

classGlobalShareArgs():

args={

debug:False

defget_args():

returnGlobalShareArgs.args

defset_args(args):

GlobalShareArgs.args=args

defset_args_value(key,value):

GlobalShareArgs.args[key]=value

defget_args_value(key,default_value=None):

returnGlobalShareArgs.args.get(key,default_value)

defcontain_key(key):

returnkeyinGlobalShareArgs.args.keys()

defupdate(args):

GlobalShareArgs.args.update(args)

classShareArgs():

args={

labels_dir:./hjx/shop_group/month_w_amt/data/labels,#标签目录

labels_output_dir:./hjx/shop_group/month_w_amt/data/labels_output,#聚类导出标签目录

common_datas_dir:./hjx/data,#共用数据目录。ur_bi_dw的公共

only_predict:False,#只识别,不训练

delete_model:True,#先删除模型,仅在训练时使用

export_excel:False,#导出excel

classes:12,#聚类数

batch_size:16,

hidden_size:32,

max_nrof_epochs:100,

learning_rate:0.0005,

loss_type:categorical_crossentropy,

avg_model_num:10,

steps_per_epoch:4.0,#4.0

lr_callback_patience:4,

lr_callback_cooldown:1,

early_stopping_callback_patience:6,

get_data:True,

defget_args():

returnShareArgs.args

defset_args(args):

ShareArgs.args=args

defset_args_value(key,value):

ShareArgs.args[key]=value

defget_args_value(key,default_value=None):

returnShareArgs.args.get(key,default_value)

defcontain_key(key):

returnkeyinShareArgs.args.keys()

defupdate(args):

ShareArgs.args.update(args)

classUrBiGetDatasBase():

#线程锁列表,同保存路径共用锁

lock_dict:Dict[str,threading.Lock]={}

#时间列表,用于判断是否超时

time_dict:Dict[str,datetime.datetime]={}

#用于记录是否需要更新超时时间

get_data_timeout_dict:Dict[str,bool]={}

def__init__(

self,

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

save_dir=None,

logger:logging.Logger=None,

self.save_dir=save_dir

self.logger=logger

self.db_helper=HiveHelper(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

logger=logger

#创建子目录

ifself.save_dirisnotNoneandnotos.path.exists(self.save_dir):

os.makedirs(self.save_dir)

self.vars_helper=None

ifGlobalShareArgs.get_args_value(debug):

self.vars_helper=VarsHelper(./hjx/data/vars/UrBiGetDatas)

defclose(self):

关闭连接

self.db_helper.close_conn()

defget_last_time(self,key_name)-bool:

获取是否超时

#转静态路径,确保唯一性

key_name=os.path.abspath(key_name)

ifself.vars_helperisnotNoneandself.vars_helper.has_key(UrBiGetDatasBase.time_list):

UrBiGetDatasBase.time_dict=self.vars_helper.get_value(UrBiGetDatasBase.time_list)

timeout=12#12小时

ifGlobalShareArgs.get_args_value(debug):

timeout=24#24小时

get_data_timeout=False

ifkey_namenotinUrBiGetDatasBase.time_dict.keys()or(datetime.datetime.today()-UrBiGetDatasBase.time_dict[key_name]).total_seconds()(timeout*60*60):

(超时%d小时,重新查数据:%s,timeout,key_name)

#UrBiGetDatasBase.time_list[key_name]=datetime.datetime.today()

get_data_timeout=True

else:

(未超时%d小时,跳过查数据:%s,timeout,key_name)

#ifself.vars_helperisnotNone:

#self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_list)

UrBiGetDatasBase.get_data_timeout_dict[key_name]=get_data_timeout

returnget_data_timeout

defsave_last_time(self,key_name):

更新状态超时

#转静态路径,确保唯一性

key_name=os.path.abspath(key_name)

ifUrBiGetDatasBase.get_data_timeout_dict[key_name]:

UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()

ifself.vars_helperisnotNone:

UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()

self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_dict)

defget_lock(self,key_name)-threading.Lock:

获取锁

#转静态路径,确保唯一性

key_name=os.path.abspath(key_name)

ifkey_namenotinUrBiGetDatasBase.lock_dict.keys():

UrBiGetDatasBase.lock_dict[key_name]=threading.Lock()

returnUrBiGetDatasBase.lock_dict[key_name]

defget_data_of_date(

self,

save_dir,

sql,

sort_columns:List[str],

del_index_list=[-1],#删除最后下标

start_date=datetime.datetime(2017,1,1),#开始时间

offset=relativedelta(months=3),#时间间隔

date_format_fun=lambdad:%04d%02d01%(d.year,d.month),#查询语句中替代时间参数的格式化

filename_format_fun=lambdad:%04d%02d.csv%(d.year,d.month),#查询语句中替代时间参数的格式化

stop_date=20700101,#超过时间则停止

data_format_fun=None,#格式化数据

分时间增量读取数据

#创建文件夹

ifnotos.path.exists(save_dir):

os.makedirs(save_dir)

else:

#删除最后一个文件

file_list=os.listdir(save_dir)

iflen(file_list)0:

file_list.sort()

fordel_indexindel_index_list:

os.remove(os.path.join(save_dir,file_list[del_index]))

print(删除最后一个文件:,file_list[del_index])

select_index=-1

#start_date=datetime.datetime(2017,1,1)

whileTrue:

end_date=start_date+offset

start_date_str=date_format_fun(start_date)

end_date_str=date_format_fun(end_date)

(date:%s-%s,start_date_str,end_date_str)

file_path=os.path.join(save_dir,filename_format_fun(start_date))

#(file_path:%s,file_path)

ifnotos.path.exists(file_path):

data:pd.DataFrame=self.db_helper.get_data(sql%(start_date_str,end_date_str))

ifdataisNone:

break

(data:%d,len(data))

#(data:%d,data.columns)

iflen(data)0:

select_index+=1

ifdata_format_funisnotNone:

data=data_format_fun(data)

#排序

data=data.sort_values(sort_columns)

data.to_csv(file_path)

elifselect_index!=-1:

break

elifstop_datestart_date_str:

raiseException(读取数据异常,时间超出最大值!)

start_date=end_date

classUrBiGetDatas(UrBiGetDatasBase):

def__init__(

self,

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

save_dir=./hjx/data/ur_bi_dw_data,

logger:logging.Logger=None

self.save_dir=save_dir

self.logger=logger

super().__init__(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

save_dir=save_dir,

logger=logger

defget_dim_date(self):

日期数据

file_path=os.path.join(self.save_dir,ur_bi_dw.dim_date.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

sql=SELECT*FROMur_bi_dw.dim_date

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:dim_date.+cforcincolumns}

data=data.rename(columns=columns)

data=data.sort_values([dim_date.date_key])

data.to_csv(file_path)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_dim_shop(self):

店铺数据

file_path=os.path.join(self.save_dir,ur_bi_dw.dim_shop.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

sql=SELECT*FROMur_bi_dw.dim_shop

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:dim_shop.+cforcincolumns}

data=data.rename(columns=columns)

data=data.sort_values([dim_shop.shop_no])

data.to_csv(file_path)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_dim_vip(self):

会员数据

sub_dir=os.path.join(self.save_dir,vip_no)

now_lock=self.get_lock(sub_dir)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(sub_dir):

return

sql=SELECTdv.*,dd.date_key,dd.date_name2

FROMur_bi_dw.dim_vipasdv

INNERJOINur_bi_dw.dim_dateasdd

ONdv.card_create_date=dd.date_name2

wheredd.date_key=%s

anddd.date_key%s

#data:pd.DataFrame=self.db_helper.get_data(sql)

sort_columns=[dv.vip_no]

#TODO:

self.get_data_of_date(

save_dir=sub_dir,

sql=sql,

sort_columns=sort_columns,

start_date=datetime.datetime(2017,1,1),#开始时间

offset=relativedelta(years=1)

#更新超时时间

self.save_last_time(sub_dir)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_weather(self):

天气数据

sub_dir=os.path.join(self.save_dir,weather)

now_lock=self.get_lock(sub_dir)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(sub_dir):

return

sql=

selectweather.*fromur_bi_ods.ods_base_weather_data_1200asweather

whereweather.date_key=%sandweather.date_key%s

sort_columns=[weather.date_key,weather.areaid]

defdata_format_fun(data):

columns=list(data.columns)

columns={c:weather.+cforcincolumns}

data=data.rename(columns=columns)

returndata

self.get_data_of_date(

save_dir=sub_dir,

sql=sql,

sort_columns=sort_columns,

del_index_list=[-2,-1],#删除最后下标

data_format_fun=data_format_fun,

#更新超时时间

self.save_last_time(sub_dir)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_weather_city(self):

天气城市数据

file_path=os.path.join(self.save_dir,ur_bi_dw.weather_city.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

sql=SELECT*FROMur_bi_dw.dim_weather_cityasweather_city

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:weather_city.+cforcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_dim_goods(self):

货品数据

file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

sql=SELECT*FROMur_bi_dw.dim_goods

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:dim_goods.+cforcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_dim_goods_market_shop_date(self):

店铺商品生命周期数据

file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_shop_date.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date

sql=

selectshop_no,sku_no,shop_market_date,lifecycle_end_date,lifecycle_days

FROMur_bi_dw.dim_goods_market_shop_date

wherelifecycle_end_dateisnotnull

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:c.replace(lifecycle_end_date.,)forcincolumns}

data=data.rename(columns=columns)

data=data.sort_values([shop_market_date])

data.to_csv(file_path,index=False)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_dim_goods_market_date(self):

全国商品生命周期数据

file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_date.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

sql=

select*FROMur_bi_dw.dim_goods_market_date

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:dim_goods_market_date.+cforcincolumns}

data=data.rename(columns=columns)

data=data.sort_values([dim_goods_market_date.sku_no])

data.to_csv(file_path,index=False)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_dim_goods_color_dev_sizes(self):

商品开发码数数据

file_path=os.path.join(self.save_dir,dim_goods_color_dev_sizes.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date

sql=SELECT*FROMur_bi_dm.dim_goods_color_dev_sizes

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:c.replace(dim_goods_color_dev_sizes.,)forcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path,index=False)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_dwd_daily_sales_size(self):

实际销售金额

sub_dir=os.path.join(self.save_dir,dwd_daily_sales_size_all)

now_lock=self.get_lock(sub_dir)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(sub_dir):

return

sql=

selectshop_no,sku_no,date_key,`size`,

sum(tag_price)as`tag_price`,

sum(sales_qty)as`sales_qty`,

sum(sales_tag_amt)as`sales_tag_amt`,

sum(sales_amt)as`sales_amt`,

count(0)as`sales_count`

fromur_bi_dw.dwd_daily_sales_sizeassales

wheresales.date_key=%sandsales.date_key%s

andsales.currency_code=CNY

groupbyshop_no,sku_no,date_key,`size`

sort_columns=[date_key,shop_no,sku_no]

self.get_data_of_date(

save_dir=sub_dir,

sql=sql,

sort_columns=sort_columns,

start_date=datetime.datetime(2017,1,1),#开始时间

#更新超时时间

self.save_last_time(sub_dir)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_dwd_daily_delivery_size(self):

实际配货金额

sub_dir=os.path.join(self.save_dir,dwd_daily_delivery_size_all)

now_lock=self.get_lock(sub_dir)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(sub_dir):

return

sql=

selectshop_no,sku_no,date_key,`size`,

sum(delivery.shop_distr_received_qty)as`shop_distr_received_qty`,

sum(delivery.shop_distr_received_amt)as`shop_distr_received_amt`,

sum(delivery.online_distr_received_qty)as`online_distr_received_qty`,

sum(delivery.online_distr_received_amt)as`online_distr_received_amt`,

sum(delivery.pr_received_qty)as`pr_received_qty`,

count(0)as`delivery_count`

fromur_bi_dw.dwd_daily_delivery_sizeasdelivery

wheredelivery.date_key=%sanddelivery.date_key%s

anddelivery.currency_code=CNY

groupbyshop_no,sku_no,date_key,`size`

sort_columns=[date_key,shop_no,sku_no]

self.get_data_of_date(

save_dir=sub_dir,

sql=sql,

sort_columns=sort_columns,

start_date=datetime.datetime(2017,1,1),#开始时间

#更新超时时间

self.save_last_time(sub_dir)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_v_last_nation_sales_status(self):

商品畅滞销数据

file_path=os.path.join(self.save_dir,v_last_nation_sales_status.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

sql=SELECT*FROMur_bi_dw.v_last_nation_sales_status

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:c.replace(v_last_nation_sales_status.,)forcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path,index=False)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_dwd_daily_finacial_goods(self):

商品成本价数据

file_path=os.path.join(self.save_dir,dwd_daily_finacial_goods.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

sql=

selectt1.sku_no,t1.`size`,t1.cost_tax_inclfromur_bi_dw.dwd_daily_finacial_goodsast1

innerjoin(

selectsku_no,`size`,max(date_key)asdate_key

fromur_bi_dw.dwd_daily_finacial_goods

wherecurrency_code=CNYandcountry_code=CN

groupbysku_no,`size`

)ast2

ont2.sku_no=t1.sku_no

andt2.`size`=t1.`size`

andt2.date_key=t1.date_key

wheret1.currency_code=CNYandt1.country_code=CN

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:c.replace(t1.,)forcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path,index=False)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_dim_size_group(self):

尺码映射数据

file_path=os.path.join(self.save_dir,dim_size_group.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

sql=select*fromur_bi_dw.dim_size_group

data:pd.DataFrame=self.db_helper.get_data(sql)

columns=list(data.columns)

columns={c:c.replace(dim_size_group.,)forcincolumns}

data=data.rename(columns=columns)

data.to_csv(file_path,index=False)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_common_datas(

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

logger:logging.Logger=None):

#共用文件

common_datas_dir=ShareArgs.get_args_value(common_datas_dir)

common_ur_bi_dir=os.path.join(common_datas_dir,ur_bi_data)

ur_bi_get_datas=UrBiGetDatas(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

save_dir=common_ur_bi_dir,

logger=logger

try:

(正在查询日期数据...)

ur_bi_get_datas.get_dim_date()

(查询日期数据完成!)

(正在查询店铺数据...)

ur_bi_get_datas.get_dim_shop()

(查询店铺数据完成!)

(正在查询天气数据...)

ur_bi_get_datas.get_weather()

(查询天气数据完成!)

(正在查询天气城市数据...)

ur_bi_get_datas.get_weather_city()

(查询天气城市数据完成!)

(正在查询货品数据...)

ur_bi_get_datas.get_dim_goods()

(查询货品数据完成!)

(正在查询实际销量数据...)

ur_bi_get_datas.get_dwd_daily_sales_size()

(查询实际销量数据完成!)

exceptExceptionasex:

logger.exception(ex)

raiseex#往外抛出异常

finally:

ur_bi_get_datas.close()

classCustomUrBiGetDatas(UrBiGetDatasBase):

def__init__(

self,

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

save_dir=./hjx/data/ur_bi_data,

logger:logging.Logger=None

self.save_dir=save_dir

self.logger=logger

super().__init__(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

save_dir=save_dir,

logger=logger

defget_sales_goal_amt(self):

销售目标金额

file_path=os.path.join(self.save_dir,month_of_year_sales_goal_amt.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

sql=

selectsales_goal.shop_no,

if(sales_goal.serial=Y,W,sales_goal.serial)as`sales_goal.serial`,

dates.month_of_year,

sum(sales_goal.sales_goal_amt)assales_goal_amt

fromur_bi_dw.dwd_sales_goal_westassales_goal

innerjoinur_bi_dw.dim_dateasdates

onsales_goal.date_key=dates.date_key

groupbysales_goal.shop_no,

if(sales_goal.serial=Y,W,sales_goal.serial),

dates.month_of_year

data:pd.DataFrame=self.db_helper.get_data(sql)

data=data.rename(columns={

shop_no:sales_goal.shop_no,

serial:sales_goal.serial,

month_of_year:dates.month_of_year,

#排序

data=data.sort_values([sales_goal.shop_no,sales_goal.serial,dates.month_of_year])

data.to_csv(file_path)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_shop_serial_area(self):

店-系列面积

file_path=os.path.join(self.save_dir,shop_serial_area.csv)

now_lock=self.get_lock(file_path)

now_lock.acquire()#加锁

try:

#设置超时4小时才重新查数据

ifnotself.get_last_time(file_path):

return

sql=

selectshop_serial_area.shop_no,

if(shop_serial_area.serial=Y,W,shop_serial_area.serial)as`shop_serial_area.serial`,

shop_serial_area.month_of_year,

sum(shop_serial_area.area)as`shop_serial_area.area`

fromur_bi_dw.dwd_shop_serial_areaasshop_serial_area

whereshop_serial_area.areaisnotnull

groupbyshop_serial_area.shop_no,if(shop_serial_area.serial=Y,W,shop_serial_area.serial),shop_serial_area.month_of_year

data:pd.DataFrame=self.db_helper.get_data(sql)

data=data.rename(columns={

shop_no:shop_serial_area.shop_no,

serial:shop_serial_area.serial,

month_of_year:shop_serial_area.month_of_year,

area:shop_serial_area.area,

#排序

data=data.sort_values([shop_serial_area.shop_no,shop_serial_area.serial,shop_serial_area.month_of_year])

data.to_csv(file_path)

#更新超时时间

self.save_last_time(file_path)

exceptExceptionasex:

self.logger.exception(ex)

raiseex#往外抛出异常

finally:

now_lock.release()#释放锁

defget_datas(

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

save_dir=./data/sales_forecast/ur_bi_dw_data,

logger:logging.Logger=None):

ur_bi_get_datas=CustomUrBiGetDatas(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

save_dir=save_dir,

logger=logger

try:

#店,系列,品类,年月,销售目标金额

(正在查询年月销售目标金额数据...)

ur_bi_get_datas.get_sales_goal_amt()

(查询年月销售目标金额数据完成!)

exceptExceptionasex:

logger.exception(ex)

raiseex#往外抛出异常

finally:

ur_bi_get_datas.close()

defgetdata_ur_bi_dw(

host=2,

port=21051,

database=ur_ai_dw,

auth_mechanism=LDAP,

user=urbi,

password=Ur#730xd,

save_dir=./data/sales_forecast/ur_bi_dw_data,

logger=None

get_common_datas(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

logger=logger

get_datas(

host=host,

port=port,

database=database,

auth_mechanism=auth_mechanism,

user=user,

password=password,

save_dir=save_dir,

logger=logger

#代码入口

#getdata_ur_bi_dw(

#host=ur_bi_dw_host,

#port=ur_bi_dw_port,

#database=ur_bi_dw_database,

#auth_mechanism=ur_bi_dw_auth_mechanism,

#user=ur_bi_dw_user,

#password=ur_bi_dw_password,

#save_dir=ur_bi_dw_save_dir,

#logger=logger

#)

代码说明和领悟

每个类的具体作用说明,代码

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论