如何使用python生成大量数据写入es数据库并查询操作_第1页
如何使用python生成大量数据写入es数据库并查询操作_第2页
如何使用python生成大量数据写入es数据库并查询操作_第3页
如何使用python生成大量数据写入es数据库并查询操作_第4页
如何使用python生成大量数据写入es数据库并查询操作_第5页
已阅读5页,还剩12页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第如何使用python生成大量数据写入es数据库并查询操作模拟学生成绩信息写入es数据库,包括姓名、性别、科目、成绩。

示例代码1:【一次性写入10000*1000条数据】【本人亲测耗时5100秒】

fromelasticsearchimportElasticsearch

fromelasticsearchimporthelpers

importrandom

importtime

es=Elasticsearch(hosts=':9200')

#print(es)

names=['刘一','陈二','张三','李四','王五','赵六','孙七','周八','吴九','郑十']

sexs=['男','女']

subjects=['语文','数学','英语','生物','地理']

grades=[85,77,96,74,85,69,84,59,67,69,86,96,77,78,79,80,81,82,83,84,85,86]

datas=[]

start=time.time()

#开始批量写入es数据库

#批量写入数据

forjinrange(1000):

print(j)

action=[

"_index":"grade",

"_type":"doc",

"_id":i,

"_source":{

"id":i,

"name":random.choice(names),

"sex":random.choice(sexs),

"subject":random.choice(subjects),

"grade":random.choice(grades)

}foriinrange(10000*j,10000*j+10000)

helpers.bulk(es,action)

end=time.time()

print('花费时间:',end-start)

elasticsearch-head中显示:

示例代码2:【一次性写入10000*5000条数据】【本人亲测耗时23000秒】

fromelasticsearchimportElasticsearch

fromelasticsearchimporthelpers

importrandom

importtime

es=Elasticsearch(hosts=':9200')

#print(es)

names=['刘一','陈二','张三','李四','王五','赵六','孙七','周八','吴九','郑十']

sexs=['男','女']

subjects=['语文','数学','英语','生物','地理']

grades=[85,77,96,74,85,69,84,59,67,69,86,96,77,78,79,80,81,82,83,84,85,86]

datas=[]

start=time.time()

#开始批量写入es数据库

#批量写入数据

forjinrange(5000):

print(j)

action=[

"_index":"grade3",

"_type":"doc",

"_id":i,

"_source":{

"id":i,

"name":random.choice(names),

"sex":random.choice(sexs),

"subject":random.choice(subjects),

"grade":random.choice(grades)

}foriinrange(10000*j,10000*j+10000)

helpers.bulk(es,action)

end=time.time()

print('花费时间:',end-start)

示例代码3:【一次性写入10000*9205条数据】【耗时过长】

fromelasticsearchimportElasticsearch

fromelasticsearchimporthelpers

importrandom

importtime

es=Elasticsearch(hosts=':9200')

names=['刘一','陈二','张三','李四','王五','赵六','孙七','周八','吴九','郑十']

sexs=['男','女']

subjects=['语文','数学','英语','生物','地理']

grades=[85,77,96,74,85,69,84,59,67,69,86,96,77,78,79,80,81,82,83,84,85,86]

datas=[]

start=time.time()

#开始批量写入es数据库

#批量写入数据

forjinrange(9205):

print(j)

action=[

"_index":"grade2",

"_type":"doc",

"_id":i,

"_source":{

"id":i,

"name":random.choice(names),

"sex":random.choice(sexs),

"subject":random.choice(subjects),

"grade":random.choice(grades)

}foriinrange(10000*j,10000*j+10000)

helpers.bulk(es,action)

end=time.time()

print('花费时间:',end-start)

查询数据并计算各种方式的成绩总分。

示例代码4:【一次性获取所有的数据,在程序中分别计算所耗的时间】

fromelasticsearchimportElasticsearch

importtime

defsearch_data(es,size=10):

query={

"query":{

"match_all":{}

res=es.search(index='grade',body=query,size=size)

#print(res)

returnres

if__name__=='__main__':

start=time.time()

es=Elasticsearch(hosts=':9200')

#print(es)

size=10000

res=search_data(es,size)

#print(type(res))

#total=res['hits']['total']['value']

#print(total)

all_source=[]

foriinrange(size):

source=res['hits']['hits'][i]['_source']

all_source.append(source)

#print(source)

#统计查询出来的所有学生的所有课程的所有成绩的总成绩

start1=time.time()

all_grade=0

fordatainall_source:

all_grade+=int(data['grade'])

print('所有学生总成绩之和:',all_grade)

end1=time.time()

print("耗时:",end1-start1)

#统计查询出来的每个学生的所有课程的所有成绩的总成绩

start2=time.time()

names1=[]

all_name_grade={}

fordatainall_source:

ifdata['name']innames1:

all_name_grade[data['name']]+=data['grade']

else:

names1.append(data['name'])

all_name_grade[data['name']]=data['grade']

print(all_name_grade)

end2=time.time()

print("耗时:",end2-start2)

#统计查询出来的每个学生的每门课程的所有成绩的总成绩

start3=time.time()

names2=[]

subjects=[]

all_name_all_subject_grade={}

fordatainall_source:

ifdata['name']innames2:

ifall_name_all_subject_grade[data['name']].get(data['subject']):

all_name_all_subject_grade[data['name']][data['subject']]+=data['grade']

else:

all_name_all_subject_grade[data['name']][data['subject']]=data['grade']

else:

names2.append(data['name'])

all_name_all_subject_grade[data['name']]={}

all_name_all_subject_grade[data['name']][data['subject']]=data['grade']

print(all_name_all_subject_grade)

end3=time.time()

print("耗时:",end3-start3)

end=time.time()

print('总耗时:',end-start)

运行结果:

在示例代码4中当把size由10000改为2000000时,运行效果如下所示:

在项目中一般不用上述代码4中所统计成绩的方法,面对大量的数据是比较耗时的,要使用es中的聚合查询。计算数据中所有成绩之和。

示例代码5:【使用普通计算方法和聚类方法做对比验证】

fromelasticsearchimportElasticsearch

importtime

defsearch_data(es,size=10):

query={

"query":{

"match_all":{}

res=es.search(index='grade',body=query,size=size)

#print(res)

returnres

defsearch_data2(es,size=10):

query={

"aggs":{

"all_grade":{

"terms":{

"field":"grade",

"size":1000

res=es.search(index='grade',body=query,size=size)

#print(res)

returnres

if__name__=='__main__':

start=time.time()

es=Elasticsearch(hosts=':9200')

size=2000000

res=search_data(es,size)

all_source=[]

foriinrange(size):

source=res['hits']['hits'][i]['_source']

all_source.append(source)

#print(source)

#统计查询出来的所有学生的所有课程的所有成绩的总成绩

start1=time.time()

all_grade=0

fordatainall_source:

all_grade+=int(data['grade'])

print('200万数据所有学生总成绩之和:',all_grade)

end1=time.time()

print("耗时:",end1-start1)

end=time.time()

print('200万数据总耗时:',end-start)

#聚合操作

start_aggs=time.time()

es=Elasticsearch(hosts=':9200')

#size=2000000

size=0

res=search_data2(es,size)

#print(res)

aggs=res['aggregations']['all_grade']['buckets']

print(aggs)

sum=0

foragginaggs:

sum+=(agg['key']*agg['doc_count'])

print('1000万数据总成绩之和:',sum)

end_aggs=time.time()

print('1000万数据总耗时:',end_aggs-start_aggs)

运行结果:

计算数据中每个同学的各科总成绩之和。

示例代码6:【子聚合】【先分组,再计算】

fromelasticsearchimportElasticsearch

importtime

defsearch_data(es,size=10):

query={

"query":{

"match_all":{}

res=es.search(index='grade',body=query,size=size)

#print(res)

returnres

defsearch_data2(es):

query={

"size":0,

"aggs":{

"all_names":{

"terms":{

"field":"name.keyword",

"size":10

"aggs":{

"total_grade":{

"sum":{

"field":"grade"

res=es.search(index='grade',body=query)

#print(res)

returnres

if__name__=='__main__':

start=time.time()

es=Elasticsearch(hosts=':9200')

size=2000000

res=search_data(es,size)

all_source=[]

foriinrange(size):

source=res['hits']['hits'][i]['_source']

all_source.append(source)

#print(source)

#统计查询出来的每个学生的所有课程的所有成绩的总成绩

start2=time.time()

names1=[]

all_name_grade={}

fordatainall_source:

ifdata['name']innames1:

all_name_grade[data['name']]+=data['grade']

else:

names1.append(data['name'])

all_name_grade[data['name']]=data['grade']

print(all_name_grade)

end2=time.time()

print("200万数据耗时:",end2-start2)

end=time.time()

print('200万数据总耗时:',end-start)

#聚合操作

start_aggs=time.time()

es=Elasticsearch(hosts=':9200')

res=search_data2(es)

#print(res)

aggs=res['aggregations']['all_names']['buckets']

#print(aggs)

dic={}

foragginaggs:

dic[agg['key']]=agg['total_grade']['value']

print('1000万数据:',dic)

end_aggs=time.time()

print('1000万数据总耗时:',end_aggs-start_aggs)

运行结果:

计算数据中每个同学的每科成绩之和。

示例代码7:

fromelasticsearchimportElasticsearch

importtime

defsearch_data(es,size=10):

query={

"query":{

"match_all":{}

res=es.search(index='grade',body=query,size=size)

#print(res)

returnres

defsearch_data2(es):

query={

"size":0,

"aggs":{

"all_names":{

"terms":{

"field":"name.keyword",

"size":10

"aggs":{

"all_subjects":{

"terms":{

"field":"subject.keyword",

"size":5

"aggs":{

"total_grade":{

"sum":{

"field":"grade"

res=es.search(index='grade',body=query)

#print(res)

returnres

if__name__=='__main__':

start=time.time()

es=Elasticsearch(hosts=':9200')

size=2000000

res=search_data(es,size)

all_source=[]

foriinrange(size):

source=res['hits']['hits'][i]['_source']

all_source.append(source)

#print(source)

#统计查询出来的每个学生的每门课程的所有成绩的总成绩

start3=time.time()

names2=[]

subjects=[]

all_name_all_subject_grade={}

fordatainall_source:

ifdata['name']innames2:

ifall_name_all_subject_grade[data['name']].get(data['subject']):

all_name_all_subject_grade[data['name']][data['subject']]+=data['grade']

else:

all_name_all_subject_grade[data['name']][data['subject']]=data['grade']

else:

names2.append(data['name'])

all_name_all_subject_grade[data['name']]={}

all_name_all_subject_grade[data['name']][data['subject']]=data['grade']

print('200万数据:',all_name_all_subject_grade)

end3=time.time()

print("耗时:",end3-start3)

end=time.time()

print('200万数据总耗时:',end-start)

#聚合操作

start_aggs=time.time()

es=Elasticsearch(hosts=':9200')

res=search_data2(es)

#print(res)

aggs=res['aggregations']['all_names']['buckets']

#print(aggs)

dic={}

foragginaggs:

dic[agg['key']]={}

forsubinagg['all_subjects']['buckets']:

dic[agg['key']][sub['key']]=sub['total_grade']['value']

print('1000万数据:',dic)

end_aggs=time.time()

print('1000万数据总耗时:',end_aggs-start_aggs)

运行结果:

在上面查询计算示例代码中,

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论