尚硅谷大数据项目之实时项目2_第1页
尚硅谷大数据项目之实时项目2_第2页
尚硅谷大数据项目之实时项目2_第3页
尚硅谷大数据项目之实时项目2_第4页
尚硅谷大数据项目之实时项目2_第5页
已阅读5页,还剩8页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第1章实时处理模块1.1模块搭建添加scala框架1.2代码思路1)消费kafka中的数据;2)利用redis过滤当日已经计入的日活设备;3)把每批次新增的当日日活信息保存到HBASE或ES中;4)从ES中查询出数据,发布成数据接口,通可视化化工程调用。1.3代码开发1---消费Kafka1.3.1配置pertiesKafka配置kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092Redis配置redis.host=hadoop102rdis.port=6379pom.xml<dependencies><dependency><groupId>com.atguigu.gmall2019.dw</groupId><artifactId>dw-common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version></version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency><dependency><groupId>io.searchbox</groupId><artifactId>jest</artifactId><version>5.3.3</version></dependency><dependency><groupId>net.java.dev.jna</groupId><artifactId>jna</artifactId><version>4.5.2</version></dependency><dependency><groupId>org.codehaus.janino</groupId><artifactId>commons-compiler</artifactId><version>2.7.8</version></dependency></dependencies><build><plugins><!-该插件用于将Scala代码编译成class文件--><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><声明绑定到maven的compile阶段--><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>1.3.2工具类1)MykafkaUtilpackagecom.atguigu.utilsimportjava.util.Propertiesimportkafka.serializer.StringDecoderimportorg.apache.spark.streaming.StreamingContextimportorg.apache.spark.streaming.dstream.InputDStreamimportorg.apache.spark.streaming.kafka.KafkaUtilsobjectMyKafkaUtil{defgetKafkaStream(ssc:StreamingContext,topics:Set[String]):InputDStream[(String,String)]={valproperties:Properties=PropertiesUtil.load("perties")valkafkaPara=Map("bootstrap.servers"->properties.getProperty("kafka.broker.list"),"group.id"->"bigdata0408”)/基于Direct方式消费Kafka数据valkafkaDStream:InputDStream[(String,String)]=KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaPara,topics)/返回kafkaDStream}}PropertiesUtilimportjava.io.InputStreamReaderimportjava.util.PropertiesobjectPropertiesUtil{defload(propertieName:String):Properties={valprop=newProperties();prop.load(newInputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName),"UTF-8"))prop}}RedisUtilobjectRedisUtil{varjedisPool:JedisPool=nulldefgetJedisClient:Jedis={if(jedisPool==null){println("开辟一个连接池”)valconfig=PropertiesUtil.load("perties")valhost=config.getProperty("redis.host")valport=config.getProperty("redis.port")valjedisPoolConfig=newJedisPoolConfig()jedisPoolConfig.setMaxTotal(100)最大连接数jedisPoolConfig.setMaxIdle(20)最大空闲jedisPoolConfig.setMinIdle(20)最小空闲jedisPoolConfig.setBlockWhenExhausted(true)忙碌时是否等彳寺jedisPoolConfig.setMaxWaitMillis(500)/忙碌时等待时长毫秒jedisPoolConfig.setTestOnBorrow(true)/每次获得连接的进行测试jedisPool=newJedisPool(jedisPoolConfig,host,port.toInt)}//println(s"jedisPool.getNumActive=${jedisPool.getNumActive}")//println(获得一个连接")jedisPool.getResource}}1.3.3样例类StartuplogcaseclassStartUpLog(mid:String,uid:String,appid:String,area:String,os:String,ch:String,logType:String,vs:String,varlogDate:String,varlogHour:String,varts:Long)1.3.4业务类消费kafkaimportorg.apache.phoenix.spark._objectRealtimeStartupApp{defmain(args:Array[String]):Unit={valsparkConf:SparkConf=newSparkConf().setMaster("local[*]").setAppName("gmall2019”)valsc=newSparkContext(sparkConf)valssc=newStreamingContext(sc,Seconds(10))valstartupStream:InputDStream[ConsumerRecord[String,TOC\o"1-5"\h\zString]]=MyKafkaUtil.getKafkaStream(ssc,Set(GmallConstants.KAFKA_TOPIC_STARTUP))——//startupStream.map(_.value()).foreachRDD{rdd=>//println(rdd.collect().mkString("\n"))//}valstartupLogDstream:DStream[StartUpLog]=startupStream.map(_.value()).map{log=>//println(s"log=${log}")valstartUpLog:StartUpLog=JSON.parseObject(log,

classOf[StartUpLog])startUpLog}}1.4代码开发2---去重1.4.1流程图设计Redis的KVkeyvalue设计Redis的KVkeyvalue设备iddau:2019・01-221.4.3业务代码importjava.utilimportjava.text.SimpleDateFormatimportjava.util.Dateimportcom.alibaba.fastjson.JSONimportcom.atguigu.gmall.constant.GmallConstantsimportcom.atguigu.gmall2019.realtime.bean.StartupLogimportcom.atguigu.gmall2019.realtime.util.{MyKafkaUtil,RedisUtil}importorg.apache.hadoop.conf.Configurationimportorg.apache.kafka.clients.consumer.ConsumerRecordimportorg.apache.spark.SparkConfimportorg.apache.spark.broadcast.Broadcastimportorg.apache.spark.rdd.RDDimportorg.apache.spark.streaming.dstream.{DStream,InputDStream}importorg.apache.spark.streaming.{Seconds,StreamingContext}importredis.clients.jedis.Jedisimportorg.apache.phoenix.spark._objectDauApp{设备iddefmain(args:Array[String]):Unit={valsparkConf:SparkConf=newSparkConf().setMaster("local[*]").setAppName("dau_app")valssc=newStreamingContext(sparkConf,Seconds(5))//1消费kafkavalinputDstream:InputDStream[ConsumerRecord[String,String]]MyKafkaUtil.getKafkaStream(ssc,Set(GmallConstants.KAFKA_TOPIC_STARTUP))——//2数据流转换结构变成caseclass补充两个时间字段valstartuplogDstream:DStream[StartupLog]=inputDstream.map{record=>valjsonStr:String=record.value()valstartupLog:StartupLog=JSON.parseObject(jsonStr,classOf[StartupLog])valdateTimeStr:String=newSimpleDateFormat("yyyy-MM-ddHH").format(newDate(startupLog.ts))valdateArr:Array[String]=dateTimeStr.split("")startupLog.logDate=dateArr(0)startupLog.logHour=dateArr(1)startupLog}startuplogDstream.cache()//3利用用户清单进行过滤去重只保留清单中不存在的用户访问记录valfilteredDstream:DStream[StartupLog]=startuplogDstream.transform{rdd=>valjedis:Jedis=RedisUtil.getJedisClient//driver//按周期执行valdateStr:String=newSimpleDateFormat("yyyy-MM-dd").format(newDate())valkey="dau:"+dateStrvaldauMidSet:util.Set[String]=jedis.smembers(key)jedis.close()valdauMidBC:Broadcast[util.Set[String]]=ssc.sparkContext.broadcast(dauMidSet)println(过滤前:"+rdd.count())valfilteredRDD:RDD[StartupLog]=rdd.filter{startuplog=>//executorvaldauMidSet:util.Set[String]=dauMidBC.value!dauMidSet.contains(startuplog.mid)}println(过滤后:"+filteredRDD.count())filteredRDD}//4批次内进行去重::按照mid进行分组,每组取第一个值valgroupbyMidDstream:DStream[(String,Iterable[StartupLog])]filteredDstream.map(startuplog=>(startuplog.mid,startuplog)).groupByKey()valdistictDstream:DStream[StartupLog]=groupbyMidDstream.flatMap{case(mid,startupLogItr)=>startupLogItr.toList.take(1)}//5保存今日访问过的用户(mid)清单-->Redis1key类型:set2key:dau:2019-xx-xx3value:middistictDstream.foreachRDD{rdd=>//driverrdd.foreachPartition{startuplogItr=>valjedis:Jedis=RedisUtil.getJedisClient//executorfor(startuplog<-startuplogItr){valkey="dau:"+startuplog.logDatejedis.sadd(key,startuplog.mid)println(startuplog)}jedis.close()}}ssc.start()ssc.awaitTermination()}}1・5代码实现3---保存到HBase中Phoenix---HBase的SQL化插件技术详情参见《尚硅谷大数据技术之phoenix》1.5.2利用Phoenix建立数据表createtablegmall190408_dau(—midvarchar,uidvarchar,appidvarchar,areavarchar,osvarchar,chvarchar,typevarchar,vsvarchar,logDatevarchar,logHourvarchar,tsbigintCONSTRAINTdau_pkPRIMARYKEY(mid,logDate));pom.xml中增加依赖<dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-spark</artifactId><version>4.14.2-HBase-1.3</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId></dependency>1.5.4业务保存代码//把数据写入hbase+phoenixdistictDstream.foreachRDD{rdd=>rdd.saveToPhoenix("GMALL2019_DAU”,Seq("MID”,"UID","APPID”,"AREA”,"OS","CH","TYPE","VS","LOGDATE","LOGHOUR","TS"),newConfiguration,Some("hadoop102,hadoop103,hadoop104:2181"))}

第2章日活数据查询接口2.1访问路径总数http://localhost:8070/realtime-total?date=2019-09-06分时统计http://localhost:8070/realtime-hours?id=dau&date=2019-09-062.2要求数据格式总数[{"id":"dau","name":"新增日活'/value":1200},{"id"new_mid","name"新增设备","value":233}]分时统计{"yesterday":{"11":383,"12":123,"17":88,"19":200},"today":{"12":38,"13":1233,"17":123,"19":688}}2.3搭建发布工程ProjectMetadataGroup:com.etguigu.gnnGll2019.dwArtiffiict!dw-publishsrType:Msu*enProj&ct(G>enerateaMsvenLanguage:Java7Packaging:Jar7JavaVersion:avVersion:0.0.1-SNiLPSHOTName:dvr-publicherDeEtription!DemeprojectforSpringB□ofPackage:com.stguiguigmsll201g.dw.publisherDeveloperTmIsWeblemplataEngines-SecuritySpringDataJPA匚MySQLDrhretfHNDatabaseSQL0JDBCAPINoSQLMessagingI/OOpsSpringCloudSpringClcwdSecurity£rvinnrkniriTn*0MyEsts-FirmmewMkPoEtgreSQLDriverMISSQLServerDriverHyperSQLDatabaseApacheDiarbyDatabaseI1=:™NewModuleDependeHeieEQSpringBool1-521SelectedD^pcndcncieEOeh/eloperTc»q|$LombokWWSpringWebSorterSQLJDBCAPIMyBatisFramewofk2・4配置文件pom.xml<properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--/artifact/io.searchbox/jest--><dependency><groupId>com.atguigu.gmall2019.dw</groupId><artifactId>dw-common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.3.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-core</artifactId><version>4.14.2-HBase-1.3</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>20.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>pertiesserver.port=8070logging.level.root=errorspring.datasource.driver-class-name=org.apache.phoenix.jdbc.PhoenixDriverspring.datasource.url=jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181spring.datasource.data-username=spring.datasource.data-password=#mybatis#mybatis.typeAliasesPackage=com.example.phoenix.entitymybatis.mapperLocations=classpath:mapper/*.xmlmybatis.configuration.map-underscore-to-camel-case=true2.5代码实现控制层PublisherController实现接口的web发布服务层PublisherService数据业务查询interfacePublisherServicelmpl业务查询的实现类数据层DauMapper数据层查询的interfaceDauMapper.xml数据层查询的实现配置主程序GmallPublisherApplication增加扫描包2.5.1GmallPublisherApplication增加扫描包@SpringBootApplication@MapperScan(basePackages"com.atguigu.gmallXXXXXXX.publisher.mapper")publicclassGmall2019PublisherApplication{publicstaticvoidmain(String[]args){SpringApplication.run(Gmall2019PublisherApplication.class,args);}}controller层importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importcom.atguigu.gmall2019.dw.publisher.service.PublisherService;mons.lang.time.DateUtils;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;importjava.text.ParseException;importjava.text.SimpleDateFormat;importjava.util.*;@RestControllerpublicclassPublisherController{@AutowiredPublisherServicepublisherService;@GetMapping("realtime-total")publicStringrealtimeHourDate(@RequestParam("date")Stringdate){List<Map>list=newArrayList<Map>();//日活总数intdauTotal=publisherService.getDauTotal(date);MapdauMap=newHashMap<String,Object>();dauMap.put("id”,"dau");dauMap.put("name”,新增日活");dauMap.put("value”,dauTotal);list.add(dauMap);//新增用户intnewMidTotal=publisherService.getNewMidTotal(date);MapnewMidMap=newHashMap<String,Object>();newMidMap.put("id”,"new_mid");newMidMap.put("name”,新增用户");newMidMap.put("value”,newMidTotal);list.add(newMidMap);returnJSON.toJSONString(list);}@GetMapping("realtime-hours")publicStringrealtimeHourDate(@RequestParam("id")Stringid,@RequestParam("date")Stringdate){if("dau”.equals(id)){MapdauHoursToday=publisherService.getDauHours(date);JSONObjectjsonObject=newJSONObject();jsonObject.put("today”,dauHoursToday);StringyesterdayDateString="”;try{DatedateToday=newSimpleDateFormat("yyyy-MM-dd").parse(date);DatedateYesterday=DateUtils.addDays(dateToday,-1);yesterdayDateString=newSimpleDateFormat("yyyy-MM-dd").format(dateYesterday);}catch(ParseExceptione){e.printStackTrace();}MapdauHoursYesterday=publisherService.getDauHours(yesterdayDateString);jsonObject.put("yesterday”,dauHoursYesterday);returnjsonObject.toJSONString();}if("new_order_totalamount”.equals(id)){StringnewOrderTotalamountJson=publisherService.getNewOrderTotalAmount

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论