尚硅谷大数据项目之实时项目2日活需求_第1页
尚硅谷大数据项目之实时项目2日活需求_第2页
尚硅谷大数据项目之实时项目2日活需求_第3页
尚硅谷大数据项目之实时项目2日活需求_第4页
尚硅谷大数据项目之实时项目2日活需求_第5页
已阅读5页,还剩12页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

尚硅谷大数据技术之项目尚硅谷大数据项目之实时分析系统 -日活(作者:尚硅谷大数据研发部)版本:V 1.6第1章 实时处理模块1.1 模块搭建添加scala框架1.2 代码思路1)消费kafka中的数据;2)利用redis过滤当日已经计入的日活设备;3)把每批次新增的当日日活信息保存到HBASE或ES中;4)从ES中查询出数据,发布成数据接口,通可视化化工程调用。1.3 代码开发1 -消费Kafka1.3.1 配置1)perties# Kafka配置kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092# Redis配置redis.host=hadoop102rdis.port=63792)pom.xml com.atguigu.gmall2019.dw dw-common 1.0-SNAPSHOT org.apache.spark spark-core_2.11 org.apache.spark spark-streaming_2.11 org.apache.kafka kafka-clients org.apache.spark spark-streaming-kafka-0-8_2.11 redis.clients jedis 2.9.0 io.searchbox jest 5.3.3 net.java.dev.jna jna 4.5.2 org.codehaus.janino commons-compiler 2.7.8 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile 1.3.2 工具类1)MykafkaUtilpackage com.atguigu.utilsimport java.util.Propertiesimport kafka.serializer.StringDecoderimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka.KafkaUtilsobject MyKafkaUtil def getKafkaStream(ssc: StreamingContext, topics: SetString): InputDStream(String, String) = val properties: Properties = PropertiesUtil.load(perties) val kafkaPara = Map( bootstrap.servers - properties.getProperty(kafka.broker.list), group.id - bigdata0408 ) /基于Direct方式消费Kafka数据 val kafkaDStream: InputDStream(String, String) = KafkaUtils.createDirectStreamString, String, StringDecoder, StringDecoder(ssc, kafkaPara, topics) /返回 kafkaDStream 2)PropertiesUtilimport java.io.InputStreamReaderimport java.util.Propertiesobject PropertiesUtil def load(propertieName:String): Properties = val prop=new Properties(); prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName) , UTF-8) prop 3)RedisUtilobject RedisUtil var jedisPool:JedisPool=null def getJedisClient: Jedis = if(jedisPool=null)println(开辟一个连接池) val config = PropertiesUtil.load(perties) val host = config.getProperty(redis.host) val port = config.getProperty(redis.port) val jedisPoolConfig = new JedisPoolConfig() jedisPoolConfig.setMaxTotal(100) /最大连接数 jedisPoolConfig.setMaxIdle(20) /最大空闲 jedisPoolConfig.setMinIdle(20) /最小空闲 jedisPoolConfig.setBlockWhenExhausted(true) /忙碌时是否等待 jedisPoolConfig.setMaxWaitMillis(500) /忙碌时等待时长 毫秒 jedisPoolConfig.setTestOnBorrow(true) /每次获得连接的进行测试 jedisPool=new JedisPool(jedisPoolConfig,host,port.toInt) / println(sjedisPool.getNumActive = $jedisPool.getNumActive)/ println(获得一个连接) jedisPool.getResource 1.3.3 样例类Startuplogcase class StartUpLog(mid:String, uid:String, appid:String, area:String, os:String, ch:String, logType:String, vs:String, var logDate:String, var logHour:String, var ts:Long)1.3.4 业务类消费kafkaimport org.apache.phoenix.spark._object RealtimeStartupApp def main(args: ArrayString): Unit = val sparkConf: SparkConf = new SparkConf().setMaster(local*).setAppName(gmall2019) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc,Seconds(10) val startupStream: InputDStreamConsumerRecordString, String = MyKafkaUtil.getKafkaStream(ssc ,Set(GmallConstants.KAFKA_TOPIC_STARTUP)/ startupStream.map(_.value().foreachRDD rdd=/ println(rdd.collect().mkString(n)/ val startupLogDstream: DStreamStartUpLog = startupStream.map(_.value().map log = / println(slog = $log) val startUpLog: StartUpLog = JSON.parseObject(log, classOfStartUpLog) startUpLog 1.4 代码开发2 -去重1.4.1 流程图1.4.2 设计Redis的KVkeyvaluedau:2019-01-22设备id1.4.3 业务代码import java.utilimport java.text.SimpleDateFormatimport java.util.Dateimport com.alibaba.fastjson.JSONimport com.atguigu.gmall.constant.GmallConstantsimport com.atguigu.gmall2019.realtime.bean.StartupLogimport com.atguigu.gmall2019.realtime.util.MyKafkaUtil, RedisUtilimport org.apache.hadoop.conf.Configurationimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.spark.SparkConfimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.dstream.DStream, InputDStreamimport org.apache.spark.streaming.Seconds, StreamingContextimport redis.clients.jedis.Jedisimport org.apache.phoenix.spark._object DauApp def main(args: ArrayString): Unit = val sparkConf: SparkConf = new SparkConf().setMaster(local*).setAppName(dau_app) val ssc = new StreamingContext(sparkConf,Seconds(5) / 1 消费kafka val inputDstream: InputDStreamConsumerRecordString, String = MyKafkaUtil.getKafkaStream(ssc,Set(GmallConstants.KAFKA_TOPIC_STARTUP) /2 数据流 转换 结构变成case class 补充两个时间字段 val startuplogDstream: DStreamStartupLog = inputDstream.map record = val jsonStr: String = record.value() val startupLog: StartupLog = JSON.parseObject(jsonStr, classOfStartupLog) val dateTimeStr: String = new SimpleDateFormat(yyyy-MM-dd HH).format(new Date(startupLog.ts) val dateArr: ArrayString = dateTimeStr.split( ) startupLog.logDate = dateArr(0) startupLog.logHour = dateArr(1) startupLog startuplogDstream.cache() /3 利用用户清单进行过滤 去重 只保留清单中不存在的用户访问记录 val filteredDstream: DStreamStartupLog = startuplogDstream.transform rdd = val jedis: Jedis = RedisUtil.getJedisClient /driver /按周期执行 val dateStr: String = new SimpleDateFormat(yyyy-MM-dd).format(new Date() val key = dau: + dateStr val dauMidSet: util.SetString = jedis.smembers(key) jedis.close() val dauMidBC: Broadcastutil.SetString = ssc.sparkContext.broadcast(dauMidSet) println(过滤前: + rdd.count() val filteredRDD: RDDStartupLog = rdd.filter startuplog = /executor val dauMidSet: util.SetString = dauMidBC.value !dauMidSet.contains(startuplog.mid) println(过滤后: + filteredRDD.count() filteredRDD /4 批次内进行去重:按照mid 进行分组,每组取第一个值 val groupbyMidDstream: DStream(String, IterableStartupLog) = filteredDstream.map(startuplog=(startuplog.mid,startuplog).groupByKey() val distictDstream: DStreamStartupLog = groupbyMidDstream.flatMap case (mid, startupLogItr) = startupLogItr.toList.take(1) / 5 保存今日访问过的用户(mid)清单 -Redis 1 key类型 : set 2 key : dau:2019-xx-xx 3 value : mid distictDstream.foreachRDDrdd= /driver rdd.foreachPartition startuplogItr= val jedis:Jedis=RedisUtil.getJedisClient /executor for (startuplog - startuplogItr ) val key= dau:+startuplog.logDate jedis.sadd(key,startuplog.mid) println(startuplog) jedis.close() ssc.start() ssc.awaitTermination() 1.5 代码实现3 -保存到HBase中1.5.1 Phoenix -HBase的SQL化插件 技术详情参见 尚硅谷大数据技术之phoenix1.5.2 利用Phoenix建立数据表create table gmall190408_dau( mid varchar, uid varchar, appid varchar, area varchar, os varchar, ch varchar, type varchar, vs varchar, logDate varchar, logHour varchar, ts bigint CONSTRAINT dau_pk PRIMARY KEY (mid, logDate);1.5.3 pom.xml 中增加依赖 org.apache.phoenix phoenix-spark 4.14.2-HBase-1.3 org.apache.spark spark-sql_ 业务保存代码/把数据写入hbase+phoenixdistictDstream.foreachRDDrdd= rdd.saveToPhoenix(GMALL2019_DAU,Seq(MID, UID, APPID, AREA, OS, CH, TYPE, VS, LOGDATE, LOGHOUR, TS) ,new Configuration,Some(hadoop102,hadoop103,hadoop104:2181)第2章 日活数据查询接口2.1 访问路径总数http:/localhost:8070/realtime-total?date=2019-09-06分时统计http:/localhost:8070/realtime-hours?id=dau&date=2019-09-062.2 要求数据格式总数id:dau,name:新增日活,value:1200,id:new_mid,name:新增设备,value:233分时统计yesterday:11:383,12:123,17:88,19:200 ,today:12:38,13:1233,17:123,19:688 2.3 搭建发布工程2.4 配置文件2.4.1 pom.xml 1.8 org.springframework.boot spring-boot-starter-web com.atguigu.gmall2019.dw dw-common 1.0-SNAPSHOT org.springframework.boot spring-boot-starter-test test org.mybatis.spring.boot mybatis-spring-boot-starter 1.3.4 org.springframework.boot spring-boot-starter-jdbc org.apache.phoenix phoenix-core 4.14.2-HBase-1.3 com.google.guava guava 20.0 org.springframework.boot spring-boot-maven-plugin 2.4.2 pertiesserver.port=8070 logging.level.root=error spring.datasource.driver-class-name=org.apache.phoenix.jdbc.PhoenixDriverspring.datasource.url=jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181spring.datasource.data-username=spring.datasource.data-password=#mybatis#mybatis.typeAliasesPackage=com.example.phoenix.entitymybatis.mapperLocations=classpath:mapper/*.xmlmybatis.configuration.map-underscore-to-camel-case=true2.5 代码实现控制层PublisherController实现接口的web发布服务层PublisherService数据业务查询interfacePublisherServiceImpl业务查询的实现类数据层DauMapper数据层查询的interfaceDauMapper.xml数据层查询的实现配置主程序GmallPublisherApplication增加扫描包2.5.1 GmallPublisherApplication增加扫描包SpringBootApplicationMapperScan(basePackages = com.atguigu.gmallXXXXXXX.publisher.mapper)public class Gmall2019PublisherApplication public static void main(String args) SpringApplication.run(Gmall2019PublisherApplication.class, args); 2.5.2 controller层import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.atguigu.gmall2019.dw.publisher.service.PublisherService;import mons.lang.time.DateUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.*;RestControllerpublic class PublisherController Autowired PublisherService publisherService; GetMapping(realtime-total) public String realtimeHourDate(RequestParam(date) String date) List list = new ArrayList(); / 日活总数 int dauTotal = publisherService.getDauTotal(date); Map dauMap=new HashMap(); dauMap.put(id,dau); dauMap.put(name,新增日活); dauMap.put(value,dauTotal); list.add(dauMap); / 新增用户 int newMidTotal = publisherService.getNewMidTotal(date); Map newMidMap=new HashMap(); newMidMap.put(id,new_mid); newMidMap.put(name,新增用户); newMidMap.put(value,newMidTotal); list.add(newMidMap); return JSON.toJSONString(list); GetMapping(realtime-hours) public String realtimeHourDate(RequestParam(id) String id,RequestParam(date) String date) if(dau.equals(id) Map dauHoursToday = publisherService.getDauHours(date); JSONObject jsonObject = new JSONObject(); jsonObject.put(today,dauHoursToday); String yesterdayDateString=; try Date dateToday = new SimpleDateFormat(yyyy-MM-dd).parse(date); Date dateYesterday = DateUtils.addDays(dateToday, -1); yesterdayDateString=new SimpleDateFormat(yyyy-MM-dd).format(dateYesterday); catch (ParseException e) e.printStackTrace(); Map dauHoursYesterday = publisherService.getDauHours(yesterdayDateString); jsonObject.put(yesterday,dauHoursYesterday); return jsonObject.toJSONString(); if( new_order_totalamount.equals(id) String newOrderTo

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论