版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、Storm实时日志分析实战-编码实现LogParserBolt类package com.ibeifeng.bigdata.storm.weglog;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;impor
2、t backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Map;import java.util.regex.Matcher;import java.util.regex.Pattern;import static com.ibeifeng.bigdata
3、.storm.weglog.WebLogConstants.*;/* * 日志解析类 * Created by ad on 2016/12/17. */public class LogParserBolt implements IRichBolt private Pattern pattern; private OutputCollector collector; Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) pattern = Ppile( *)
4、* * (d+*) * ( *) * d3 d+ (*) (*) *); this.collector = collector; Override public void execute(Tuple input) String webLog = input.getStringByField(str); / 解析 if(webLog!= null | !.equals(webLog) Matcher matcher = pattern.matcher(webLog); if(matcher.find() /matcher.group(0); String ip = matcher.group(1
5、); String serverTimeStr = matcher.group(2); / 处理时间 long timestamp = Long.parseLong(serverTimeStr); Date date = new Date(); date.setTime(timestamp); DateFormat df = new SimpleDateFormat(yyyyMMddHHmm); String dateStr = df.format(date); String day = dateStr.substring(0,8); String hour = dateStr.substri
6、ng(0,10); String minute = dateStr ; String requestUrl = matcher.group(3); String httpRefer = matcher.group(4); String userAgent = matcher.group(5); / 分流 this.collector.emit(IP_COUNT_STREAM, input,new Values(day, hour, minute, ip); this.collector.emit(URL_PARSER_STREAM, input,new Values(day, hour, mi
7、nute, requestUrl); this.collector.emit(HTTPREFER_PARSER_STREAM, input,new Values(day, hour, minute, httpRefer); this.collector.emit(USERAGENT_PARSER_STREAM, input,new Values(day, hour, minute, userAgent); this.collector.ack(input); Override public void cleanup() Override public void declareOutputFie
8、lds(OutputFieldsDeclarer declarer) declarer.declareStream(IP_COUNT_STREAM,new Fields(DAY, HOUR, MINUTE, IP); declarer.declareStream(URL_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, REQUEST_URL); declarer.declareStream(HTTPREFER_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, HTTP_REFER); declarer.declare
9、Stream(USERAGENT_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, USERAGENT); Override public Map getComponentConfiguration() return null; UserAgentParserBolt类package com.ibeifeng.bigdata.storm.weglog;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.st
10、orm.topology.IBasicBolt;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import com.ibeifeng.bigdata.storm.util.UserAgentUtil;import static com.ibeifeng
11、.bigdata.storm.weglog.WebLogConstants.*;import java.util.Map;/* * 解析UserAgent * Created by ad on 2016/12/18. */public class UserAgentParserBolt implements IRichBolt private OutputCollector _collector; Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) thi
12、s._collector = collector; Override public void execute(Tuple input) String day = input.getStringByField(DAY); String hour = input.getStringByField(HOUR); String minute = input.getStringByField(MINUTE); String userAgent = input.getStringByField(USERAGENT); / 解析userAgent if(userAgent != null & !.equal
13、s(userAgent) UserAgentUtil.UserAgentInfo userAgentInfo = UserAgentUtil.analyticUserAgent(userAgent); if(userAgentInfo!= null) String browserName = userAgentInfo.getBrowserName(); String browserVersion = userAgentInfo.getBrowserVersion(); if(browserName!=null & !.equals(browserName) / 只考虑浏览器的类型 this.
14、_collector.emit(BROWSER_COUNT_STREAM,input,new Values(day,hour,minute,browserName); if(browserVersion!=null & !.equals(browserVersion) this._collector.emit(BROWSER_COUNT_STREAM,input,new Values(day,hour,minute, browserName+_+browserVersion); String osName = userAgentInfo.getOsName(); String osVersio
15、n = userAgentInfo.getOsVersion(); if(osName!= null & !.equals(osName) this._collector.emit(OS_COUNT_STREAM,input,new Values(day,hour,minute, osName); if(osVersion != null & !.equals(osVersion) this._collector.emit(OS_COUNT_STREAM,input,new Values(day,hour,minute,osName+_+osVersion); this._collector.
16、ack(input); Override public void cleanup() Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declareStream(BROWSER_COUNT_STREAM,new Fields(DAY,HOUR,MINUTE,BROWSER); declarer.declareStream(OS_COUNT_STREAM,new Fields(DAY,HOUR,MINUTE,OS); Override public Map getComponentC
17、onfiguration() return null; CountKpiBolt类package com.ibeifeng.bigdata.storm.weglog;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;imp
18、ort backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import static com.ibeifeng.bigdata.storm.weglog.WebLogConstants.*;/* * 通用的计数Bolt * Created by ad on 2016/12/17. */public class CountKpiBolt implements IRichBolt p
19、rivate String kpiType; /TODO 优化 替换为内存数据库 redis private Map kpiCounts; private String currentDay = ; private OutputCollector _collector; public CountKpiBolt(String kpiType) this.kpiType = kpiType; Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) this.kpi
20、Counts = new HashMap(); this._collector = collector; Override public void execute(Tuple input) String day = input.getStringByField(day); String hour = input.getStringByField(hour); String minute = input.getStringByField(minute); String kpi = input.getString(3); String kpiByDay = day + _ + kpi; Strin
21、g kpiByHour = hour +_ + kpi; String kpiByMinute = minute + _ + kpi; / 隔天清理内存 if(!currentDay.equals(day) / 说明隔天了 IteratorMap.Entry iter = kpiCounts.entrySet().iterator(); while(iter.hasNext() Map.Entry entry = iter.next(); if(entry.getKey().startsWith(currentDay) iter.remove(); currentDay = day; int
22、kpiCountByDay = 0; int kpiCountByHour = 0; int kpiCountByMinute = 0; if(kpiCounts.containsKey(kpiByDay) kpiCountByDay = kpiCounts.get(kpiByDay); if(kpiCounts.containsKey(kpiByHour) kpiCountByHour = kpiCounts.get(kpiByHour); if(kpiCounts.containsKey(kpiByMinute) kpiCountByMinute = kpiCounts.get(kpiBy
23、Minute); kpiCountByDay +; kpiCountByHour +; kpiCountByMinute +; kpiCounts.put(kpiByDay, kpiCountByDay); kpiCounts.put(kpiByHour, kpiCountByHour); kpiCounts.put(kpiByMinute,kpiCountByMinute); this._collector.emit(input, new Values(kpiType+_ + kpiByDay, kpiCountByDay); this._collector.emit(input, new
24、Values(kpiType+_ + kpiByHour, kpiCountByHour); this._collector.emit(input, new Values(kpiType+_ + kpiByMinute, kpiCountByMinute); this._collector.ack(input); Override public void cleanup() Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields(SERVERTIME_
25、KPI, KPI_COUNTS); Override public Map getComponentConfiguration() return null; SavaBolt类package com.ibeifeng.bigdata.storm.weglog;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.IRichBolt;i
26、mport backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.cli
27、ent.RetriesExhaustedWithDetailsException;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;import java.io.InterruptedIOException;import java.util.Map;import static com.ibeifeng.bigdata.storm.weglog.WebLogConstants.*;/* * 将数据存储到HBase数据库中 * Created by ad on 2016/12/17. */public clas
28、s SaveBolt implements IRichBolt private HTable table; private OutputCollector _collector; Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) Configuration configuration = HBaseConfiguration.create(); try table = new HTable(configuration,HBASE_TABLENAME);
29、catch (IOException e) e.printStackTrace(); throw new RuntimeException(e); this._collector = collector; Override public void execute(Tuple input) String serverTimeAndKpi = input.getStringByField(SERVERTIME_KPI); Integer kpiCounts = input.getIntegerByField(KPI_COUNTS); System.err.println(serverTimeAnd
30、Kpi= + serverTimeAndKpi + , kpiCounts= + kpiCounts); if(serverTimeAndKpi!= null & kpiCounts != null) Put put = new Put(Bytes.toBytes(serverTimeAndKpi); String columnQuelifier = serverTimeAndKpi.split(_)0; put.add(Bytes.toBtes(COLUMN_FAMILY), Bytes.toBytes(columnQuelifier),Bytes.toBytes(+kpiCounts);
31、try table.put(put); catch (IOException e) throw new RuntimeException(e); this._collector.ack(input); Override public void cleanup() if(table!= null) try table.close(); catch (IOException e) e.printStackTrace(); Override public void declareOutputFields(OutputFieldsDeclarer declarer) Override public M
32、ap getComponentConfiguration() return null; 常量类WebLogConstantspackage com.ibeifeng.bigdata.storm.weglog;/* * 常量类 * Created by ad on 2016/12/17. */public class WebLogConstants public static final String KAFKA_SPOUT_ID = kafkaSpoutId; public static final String WEB_LOG_PARSER_BOLT = webLogParserBolt;
33、public static final String COUNT_IP_BOLT = countIpBolt; public static final String COUNT_BROWSER_BOLT = countBrowserBolt; public static final String COUNT_OS_BOLT = countOsBolt; public static final String USER_AGENT_PARSER_BOLT = userAgentParserBolt; public static final String SAVE_BOLT = saveBolt;
34、/ 流ID public static final String IP_COUNT_STREAM = ipCountStream; public static final String URL_PARSER_STREAM = urlParserStream; public static final String HTTPREFER_PARSER_STREAM = httpReferParserStream; public static final String USERAGENT_PARSER_STREAM = userAgentParserStream; public static fina
35、l String BROWSER_COUNT_STREAM = browserCountStream; public static final String OS_COUNT_STREAM = osCountStream; / tuple key名称 public static final String DAY = day; public static final String HOUR = hour; public static final String MINUTE = minute; public static final String IP = ip; public static fi
36、nal String REQUEST_URL = requestUrl; public static final String HTTP_REFER = httpRefer; public static final String USERAGENT = userAgent; public static final String BROWSER = browser; public static final String OS = os; public static final String SERVERTIME_KPI = serverTimeAndKpi; public static fina
37、l String KPI_COUNTS = kpiCounts; / kpi类型 public static final String IP_KPI = I; public static final String URL_KPI = U; public static final String BROWSER_KPI = B; public static final String OS_KPI = O; / 表名称 public static final String HBASE_TABLENAME = weblogstatictis; public static final String CO
38、LUMN_FAMILY = info;测试类WebLogStatictispackage com.ibeifeng.bigdata.storm.weglog;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import ba
39、cktype.storm.generated.StormTopology;import backtype.storm.spout.SchemeAsMultiScheme;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;import storm.kafka.*;import static com.ibeifeng.bigdata.storm.weglog.WebLogConstants.*;impo
40、rt java.util.UUID;/* * Created by ad on 2016/12/17. */public class WebLogStatictis public static void main(String args) WebLogStatictis webLogStatictis = new WebLogStatictis(); StormTopology topology = webLogStatictis.buildTopology(); Config conf = new Config(); /conf.setNumAckers(4); if(args = null
41、 | args.length = 0) / 本地执行 /conf.setMessageTimeoutSecs(1); / tuple发射超时时间 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(webloganalyse, conf , topology); else / 提交到集群上执行 conf.setNumWorkers(4); / 指定使用多少个进程来执行该Topology try StormSubmitter.submitTopology(args0,conf, topology); catch (AlreadyAliveException e) e.printStackTrace(); catch (InvalidTopologyException e) e.printStackTrace(); /* * 构造一个kafkaspout * return */ private IRichSpout generate
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2026江苏宿迁市泗洪县招聘合同制和公益性岗位人员15人备考题库附答案详解【完整版】
- 2026春季深圳供电局有限公司校园招聘备考题库一套附答案详解
- 2026福建漳州市龙文区教育局招聘43人备考题库附答案详解【达标题】
- 2026中冶京诚工程技术有限公司春季校园招聘备考题库附答案详解【满分必刷】
- 2026吉林四平市双辽市公益性岗位招聘92人备考题库附答案详解【综合卷】
- 2026中国美术学院特殊专业技术岗位招聘19人备考题库(浙江)附答案详解(满分必刷)
- 2026天津市和平保育院招聘派遣制工作人员备考题库附参考答案详解(综合题)
- 哈药集团股份有限公司2026届春季校园招聘备考题库附答案详解(巩固)
- 2026汉江实验室三亚研究中心(三亚深海科学与工程研究所)招聘20人备考题库及参考答案详解【满分必刷】
- 2026浙江宁波东钱湖旅游度假区某国有企业招聘派遣制工作人员备考题库标准卷附答案详解
- 真空压力浸渍工艺-洞察及研究
- T/CAS 850-2024燃气用滚压螺纹热镀锌钢管技术规范
- 企业自行监测指南培训
- 2025中考英语作文复习:12个写作话题写作指导+满分范文
- 零基预算研究分析
- 郑州大学高层次人才考核工作实施办法
- 土壤氡浓度检测方案
- DBJT13-366-2021 建筑工程附着式升降脚手架应用技术标准
- 麻醉科应急预案及流程
- 上海市第一至十八届高一物理基础知识竞赛试题及答案
- 《皮肤性病学4》课程标准
评论
0/150
提交评论