




已阅读5页,还剩77页未读, 继续免费阅读
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
尚硅谷大数据技术之电信客服尚硅谷大数据技术之电信客服(作者:尚硅谷大数据研发部)版本:V1.1目录第1章 项目背景3第2章 项目架构3第3章 项目实现33.1 数据生产43.1.1 数据结构43.1.2 编写代码53.1.3 打包测试93.2 数据采集/消费(存储)93.2.1 数据采集103.2.2 数据消费113.2.3 数据查询方式一183.2.4 数据查询方式二233.2.5 数据消费方案优化273.2.6 协处理器293.2.7 数据消费测试293.3 数据分析303.3.1 需求分析313.3.2 Mysql表结构设计323.3.3 环境准备343.3.4 需求实现363.3.5 运行测试543.4 数据展示543.4.1 环境准备553.4.2 编写代码603.4.5 最终预览723.5 定时任务74第4章 项目总结74第1章 项目背景通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等+。我们以此为背景,寻找一个切入点,学习其中的方法论。当前我们的需求是:统计每天、每月以及每年的每个人的通话次数及时长。第2章 项目架构项目架构如图1所示图1 项目架构第3章 项目实现系统环境:表1系统版本windows10 专业版linuxCentOS 6.8开发工具:表2工具版本idea2017.2.5旗舰版maven3.3.9JDK1.8+提示:idea2017.2.5必须使用maven3.3.9,不要使用maven3.5,有部分兼容性问题集群环境:表3框架版本hadoop2.7.2zookeeper3.4.10hbase1.3.1flume1.7.0kafka2.11-硬件环境:表4hadoop102hadoop103hadoop104内存4G2G2GCPU2核1核1核硬盘50G50G50G3.1 数据生产此情此景,对于该模块的业务,即数据生产过程,一般并不会让你来进行操作,数据生产是一套完整且严密的体系,这样可以保证数据的鲁棒性。但是如果涉及到项目的一体化方案的设计(数据的产生、存储、分析、展示),则必须清楚每一个环节是如何处理的,包括其中每个环境可能隐藏的问题;数据结构,数据内容可能出现的问题。3.1.1 数据结构我们将在HBase中存储两个电话号码,以及通话建立的时间和通话持续时间,最后再加上一个flag作为判断第一个电话号码是否为主叫。姓名字段的存储我们可以放置于另外一张表做关联查询,当然也可以插入到当前表中。表5列名解释举例call1第一个手机号all1_name第一个手机号码人姓名(非必须)李雁call2第二个手机号码19920860202call2_name第二个手机号码人姓名(非必须)卫艺date_time建立通话的时间20171017081520date_time_ts建立通话的时间(时间戳形式)duration通话持续时间(秒)06003.1.2 编写代码思路:a) 创建Java集合类存放模拟的电话号码和联系人;b) 随机选取两个手机号码当作“主叫”与“被叫”(注意判断两个手机号不能重复),产出call1与call2字段数据;c) 创建随机生成通话建立时间的方法,可指定随机范围,最后生成通话建立时间,产出date_time字段数据;d) 随机一个通话时长,单位:秒,产出duration字段数据;e)、将产出的一条数据拼接封装到一个字符串中;f)、使用IO操作将产出的一条通话数据写入到本地文件中;新建module项目:ct_producerpom.xml文件配置: junit junit 4.12 test 1) 随机输入一些手机号码以及联系人,保存于Java的集合中。新建类:ProductLog/存放联系人电话与姓名的映射public Map contacts = null;/存放联系人电话号码public List phoneList = null;public void initContacts() contacts = new HashMap(); phoneList = new ArrayList(); phoneList.add; phoneList.add(19920860202); phoneList.add; phoneList.add; phoneList.add; phoneList.add(19379884788); phoneList.add(19335715448); phoneList.add; phoneList.add; phoneList.add; phoneList.add; phoneList.add; phoneList.add(19877232369); phoneList.add; phoneList.add; phoneList.add; phoneList.add; phoneList.add(19879419704); phoneList.add(16480981069); phoneList.add; phoneList.add; phoneList.add; phoneList.add; phoneList.add; phoneList.add; contacts.put 李雁); contacts.put(19920860202, 卫艺); contacts.put 仰莉); contacts.put 陶欣悦); contacts.put 施梅梅); contacts.put(19379884788, 金虹霖); contacts.put(19335715448, 魏明艳); contacts.put 华贞); contacts.put 华啟倩); contacts.put 仲采绿); contacts.put 卫丹); contacts.put 戚丽红); contacts.put(19877232369, 何翠柔); contacts.put 钱溶艳); contacts.put 钱琳); contacts.put 缪静欣); contacts.put 焦秋菊); contacts.put(19879419704, 吕访琴); contacts.put(16480981069, 沈丹); contacts.put 褚美丽); contacts.put 孙怡); contacts.put 许婵); contacts.put 曹红恋); contacts.put 吕柔); contacts.put 冯怜云);2) 创建随机生成通话时间的方法:randomDate该时间生成后的格式为yyyy-MM-dd HH:mm:ss,并使之可以根据传入的起始时间和结束时间来随机生成。/随机生成通话建立时间private String randomDate(String startDate, String endDate) SimpleDateFormat simpleDateFormat = new SimpleDateFormat(yyyy-MM-dd); try Date start = simpleDateFormat.parse(startDate); Date end = simpleDateFormat.parse(endDate); if (start.getTime() end.getTime() return null; long resultTime = start.getTime() + (long) (Math.random() * (end.getTime() - start.getTime(); return resultTime + ; catch (ParseException e) e.printStackTrace(); return null;3) 创建生产日志一条日志的方法:productLog随机抽取两个电话号码,随机产生通话建立时间,随机通话时长,将这几个字段拼接成一个字符串,然后return,便可以产生一条通话的记录。需要注意的是,如果随机出的两个电话号码一样,需要重新随机(随机过程可优化,但并非此次重点)。通话时长的随机为20分钟以内,即:60秒 * 20,并格式化为4位数字,例如:0600(10分钟)。/拼接日志private String productLog() int call1Index = new Random().nextInt(phoneList.size(); int call2Index = -1; String call1 = phoneList.get(call1Index); String call2 = null; while (true) call2Index = new Random().nextInt(phoneList.size(); call2 = phoneList.get(call2Index); if (!call1.equals(call2) break; /随机生成通话时长(30分钟内_0600) int duration = new Random().nextInt(60 * 30) + 1; /格式化通话时间,使位数一致 String durationString = new DecimalFormat(0000).format(duration); /通话建立时间:yyyy-MM-dd,月份:011,天:131 String randomDate = randomDate(2017-01-01, 2018-01-01); String dateString = new SimpleDateFormat(yyyy-MM-dd HH:mm:ss).format(Long.parseLong(randomDate); /拼接log日志 StringBuilder logBuilder = new StringBuilder(); logBuilder.append(call1).append(,).append(call2).append(,).append(dateString).append(,) .append(durationString); System.out.println(logBuilder); try Thread.sleep(500); catch (InterruptedException e) e.printStackTrace(); return logBuilder.toString();4) 创建写入日志方法:writeLogproductLog每产生一条日志,便将日志写入到本地文件中,所以建立一个专门用于日志写入的方法,需要涉及到IO操作,需要注意的是,输出流每次写一条日之后需要flush,不然可能导致积攒多条数据才输出一次。最后需要将productLog方法放置于while死循环中执行。/将产生的日志写入到本地文件calllog中public void writeLog(String filePath, ProduceLog productLog) OutputStreamWriter outputStreamWriter = null; try outputStreamWriter = new OutputStreamWriter(new FileOutputStream(filePath, true), UTF-8); while (true) String log = productLductLog(); outputStreamWriter.write(log + n); outputStreamWriter.flush(); catch (Exception e) e.printStackTrace(); finally try assert outputStreamWriter != null; outputStreamWriter.flush(); outputStreamWriter.close(); catch (IOException e) e.printStackTrace(); 5) 在主函数中初始化以上逻辑,并测试:public static void main(String args) if(args = null | args.length flume - kafka - flume(根据情景增删该流程) - HDFS消费存储模块流程如图2所示:图2 消费存储模块流程图3.2.1 数据采集思路:a) 配置kafka,启动zookeeper和kafka集群;b) 创建kafka主题;c) 启动kafka控制台消费者(此消费者只用于测试使用);d) 配置flume,监控日志文件;e) 启动flume监控任务;f) 运行日志生产脚本;g) 观察测试。1)启动zookeeper,kafka集群$/opt/module/kafka/bin/kafka-server-start.sh /opt/module/kafka/config/perties2)创建kafka主题$ /opt/module/kafka/bin/kafka-topics.sh -zookeeper hadoop102:2181 -topic calllog -create -replication-factor 1 -partitions 3检查一下是否创建主题成功:$ /opt/module/kafka/bin/kafka-topics.sh -zookeeper hadoop102:2181 -list3)启动kafka控制台消费者,等待flume信息的输入$ /opt/module/kafka/bin/kafka-console-consumer.sh -zookeeper hadoop102:2181 -topic calllog -from-beginning4)配置flume(flume-kafka.conf)# definea1.sources = r1a1.sinks = k1a1.channels = c1# sourcea1.sources.r1.type = mand = tail -F -c +0 /home/atguigu/call/calllog.csva1.sources.r1.shell = /bin/bash -c# sinka1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092a1.sinks.k1.kafka.topic = callloga1.sinks.k1.kafka.flumeBatchSize = 20ducer.acks = 1ducer.linger.ms = 1# channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# binda1.sources.r1.channels = c1a1.sinks.k1.channel = c15)启动flume$ /opt/module/flume/bin/flume-ng agent -conf /opt/module/flume/conf/ -name a1 -conf-file /home/atguigu/calllog/flume2kafka.conf6)运行生产日志的任务脚本,观察kafka控制台消费者是否成功显示产生的数据$ sh /home/atguigu/calllog/productlog.sh3.2.2 数据消费如果以上操作均成功,则开始编写操作HBase的代码,用于消费数据,将产生的数据实时存储在HBase中。思路:a) 编写kafka消费者,读取kafka集群中缓存的消息,并打印到控制台以观察是否成功;b) 既然能够读取到kafka中的数据了,就可以将读取出来的数据写入到HBase中,所以编写调用HBaseAPI相关方法,将从Kafka中读取出来的数据写入到HBase;c) 以上两步已经足够完成消费数据,存储数据的任务,但是涉及到解耦,所以过程中需要将一些属性文件外部化,HBase通用性方法封装到某一个类中。创建新的module项目:ct_consumerpom.xml文件配置: junit junit 4.12 test org.apache.kafka kafka-clients org.apache.hbase hbase-client 1.3.1 org.apache.hbase hbase-server 1.3.1 1)新建类:HBaseConsumer该类主要用于读取kafka中缓存的数据,然后调用HBaseAPI,持久化数据。package com.atguigu.kafka;import java.util.HashMap;import java.util.List;import java.util.Map;import com.atguigu.dao.HBaseDAO;import com. atguigu.utils.PropertiesUtil;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.serializer.StringDecoder;import kafka.utils.VerifiableProperties;public class HBaseConsumer public static void main(String args) KafkaConsumer kafkaConsumer = newKafkaConsumer(PropertiesUperties);kafkaConsumer.subscribe(Collections.singletonList(PropertiesUtil.getProperty(kafka.topic);HBaseDAO hBaseDAO = new HBaseDAO();while (true) ConsumerRecords records = kafkaConsumer.poll(100); for (ConsumerRecord cr : records) System.out.println(cr.value(); hBaseDAO.put(cr.value(); 2)新建类:PropertiesUtil该类主要用于将常用的项目所需的参数外部化,解耦,方便配置。package com. atguigu.utils; import java.io.IOException;import java.io.InputStream;import java.util.Properties;public class PropertiesUtil public static Properties properties = null; static try / 加载配置属性 InputStream inputStream = ClassLoader.getSystemResourceAsStream(perties); properties = new Properties(); properties.load(inputStream); catch (IOException e) e.printStackTrace(); public static String getProperty(String key) return properties.getProperty(key); 3)创建perties文件,并放置于resources目录下bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092group.id=mit=erval.ms=30000key.deserializer=mon.serialization.StringDeserializervalue.deserializer=mon.serialization.StringDeserializer#设置kafka主题kafka.topics=space=ns_=ns_telecom:callloghbase.regions=64)将hdfs-site.xml、core-site.xml、hbase-site.xml、perties放置于resources目录5)新建类:HBaseUtil该类主要用于封装一些HBase的常用操作,比如创建命名空间,创建表等等。package com.atguigu.utils;import java.io.IOException;import java.text.DecimalFormat;import java.util.Iterator;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.MasterNotRunningException;import org.apache.hadoop.hbase.NamespaceDescriptor;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.ZooKeeperConnectionException;import org.apache.hadoop.hbase.client.HBaseAdmin;import org.apache.hadoop.hbase.util.Bytes;public class HBaseUtil /* * 判断HBase表是否存在 * * throws IOException * throws ZooKeeperConnectionException * throws MasterNotRunningException */ public static boolean isExistTable(Configuration conf, String tableName) / 操作HBase表必须创建HBaseAdmin对象 HBaseAdmin admin = null; try admin = new HBaseAdmin(conf); return admin.tableExists(tableName); catch (IOException e) e.printStackTrace(); finally try if (admin != null) admin.close(); catch (IOException e) e.printStackTrace(); return false; /* * 初始化命名空间 */ public static void initNamespace(Configuration conf, String namespace) HBaseAdmin admin = null; try admin = new HBaseAdmin(conf); /命名空间类似于关系型数据库中的schema,可以想象成文件夹 NamespaceDescriptor ns = NamespaceDescriptor .create(namespace) .addConfiguration(creator, WHAlex) .addConfiguration(create_time, String.valueOf(System.currentTimeMillis() .build(); admin.createNamespace(ns); catch (MasterNotRunningException e) e.printStackTrace(); catch (ZooKeeperConnectionException e) e.printStackTrace(); catch (IOException e) e.printStackTrace(); finally if (null != admin) try admin.close(); catch (IOException e) e.printStackTrace(); /* * 创建表 * * param tableName * param columnFamily * throws IOException * throws ZooKeeperConnectionException * throws MasterNotRunningException */ public static void createTable(Configuration conf, String tableName, String. columnFamily) HBaseAdmin admin = null; try admin = new HBaseAdmin(conf); / 判断表是否存在 if (isExistTable(conf, tableName) / 存在 System.out.println(表已经存在: + tableName); System.exit(0); else / 不存在 / 通过表名实例化“表描述器” HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName); for (String cf : columnFamily) tableDescriptor.addFamily(new HColumnDescriptor(cf).setMaxVersions(3); /添加协处理器 /tableDescriptor.addCoprocessor(com.atguigu.coprocessor.CalleeWriteObserver); int regions = Integer.valueOf(PropertiesUtil.getProperty(hbase.regions.count); admin.createTable(tableDescriptor, getSplitKeys(regions); System.out.println(表创建成功: + tableName); catch (IOException e) e.printStackTrace(); finally try if (admin != null) admin.close(); catch (IOException e) e.printStackTrace(); /* * 预分区键 * 例如:00|, 01|, 02|, 03|, 04|, 05| * return */ private static byte getSplitKeys(int regions) String keys = new Stringregions; /这里默认不会超过两位数的分区,如果超过,需要变更设计,如果需要灵活操作,也需要变更设计 DecimalFormat df = new DecimalFormat(00); for(int i = 0; i regions; i+) /例如:如果regions = 6,则:00|, 01|, 02|, 03|, 04|, 05| keysi = df.format(i) + |; byte splitKeys = new bytekeys.length; TreeSet rows = new TreeSet(Bytes.BYTES_COMPARATOR);/ 升序排序 for (in
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 流程改进咨询创新创业项目商业计划书
- 红枣枸杞补血养颜酒行业跨境出海项目商业计划书
- 运动健身APP广告与赞助合作创新创业项目商业计划书
- 资本结构动态调整创新创业项目商业计划书
- 企业社会责任报告撰写示范文本
- 互联网公司产品策划方案
- 幼儿园春天主题儿歌导入与教学设计
- 快消品行业运营风险识别与管理
- 健身房会员管理流程标准
- 蛋鸡养殖户年度经济效益分析报告
- 2025年共青团入团积极分子结业考试题库及答案
- 2025年绩效管理自考试题和答案
- GB 16663-2025醇基液体燃料
- 高三试卷:山东省名校考试联盟2024-2025学年高三上学期期中考试化学+答案
- 广东上进联考2025-2026学年领航高中联盟2026届高三10月一轮复习阶段检测化学(含答案)
- 2024年全国统计师之初级统计基础理论及相关知识考试快速提分卷(附答案)
- 土地法律知识培训内容课件
- 2025年湖北省荆门市辅警考试题库(附答案)
- 2025西南证券股份有限公司校园招聘300人笔试历年参考题库附带答案详解
- 气象科研课题申报书
- 人工智能+开放共享城市安全监控数据共享分析报告
评论
0/150
提交评论