




已阅读5页,还剩8页未读, 继续免费阅读
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
Storm实时日志分析实战-编码实现LogParserBolt类package com.ibeifeng.bigdata.storm.weglog;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Map;import java.util.regex.Matcher;import java.util.regex.Pattern;import static com.ibeifeng.bigdata.storm.weglog.WebLogConstants.*;/* * 日志解析类 * Created by ad on 2016/12/17. */public class LogParserBolt implements IRichBolt private Pattern pattern; private OutputCollector collector; Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) pattern = Ppile( *) * * (d+*) * ( *) * d3 d+ (*) (*) *); this.collector = collector; Override public void execute(Tuple input) String webLog = input.getStringByField(str); / 解析 if(webLog!= null | !.equals(webLog) Matcher matcher = pattern.matcher(webLog); if(matcher.find() /matcher.group(0); String ip = matcher.group(1); String serverTimeStr = matcher.group(2); / 处理时间 long timestamp = Long.parseLong(serverTimeStr); Date date = new Date(); date.setTime(timestamp); DateFormat df = new SimpleDateFormat(yyyyMMddHHmm); String dateStr = df.format(date); String day = dateStr.substring(0,8); String hour = dateStr.substring(0,10); String minute = dateStr ; String requestUrl = matcher.group(3); String httpRefer = matcher.group(4); String userAgent = matcher.group(5); / 分流 this.collector.emit(IP_COUNT_STREAM, input,new Values(day, hour, minute, ip); this.collector.emit(URL_PARSER_STREAM, input,new Values(day, hour, minute, requestUrl); this.collector.emit(HTTPREFER_PARSER_STREAM, input,new Values(day, hour, minute, httpRefer); this.collector.emit(USERAGENT_PARSER_STREAM, input,new Values(day, hour, minute, userAgent); this.collector.ack(input); Override public void cleanup() Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declareStream(IP_COUNT_STREAM,new Fields(DAY, HOUR, MINUTE, IP); declarer.declareStream(URL_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, REQUEST_URL); declarer.declareStream(HTTPREFER_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, HTTP_REFER); declarer.declareStream(USERAGENT_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, USERAGENT); Override public Map getComponentConfiguration() return null; UserAgentParserBolt类package com.ibeifeng.bigdata.storm.weglog;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import com.ibeifeng.bigdata.storm.util.UserAgentUtil;import static com.ibeifeng.bigdata.storm.weglog.WebLogConstants.*;import java.util.Map;/* * 解析UserAgent * Created by ad on 2016/12/18. */public class UserAgentParserBolt implements IRichBolt private OutputCollector _collector; Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) this._collector = collector; Override public void execute(Tuple input) String day = input.getStringByField(DAY); String hour = input.getStringByField(HOUR); String minute = input.getStringByField(MINUTE); String userAgent = input.getStringByField(USERAGENT); / 解析userAgent if(userAgent != null & !.equals(userAgent) UserAgentUtil.UserAgentInfo userAgentInfo = UserAgentUtil.analyticUserAgent(userAgent); if(userAgentInfo!= null) String browserName = userAgentInfo.getBrowserName(); String browserVersion = userAgentInfo.getBrowserVersion(); if(browserName!=null & !.equals(browserName) / 只考虑浏览器的类型 this._collector.emit(BROWSER_COUNT_STREAM,input,new Values(day,hour,minute,browserName); if(browserVersion!=null & !.equals(browserVersion) this._collector.emit(BROWSER_COUNT_STREAM,input,new Values(day,hour,minute, browserName+_+browserVersion); String osName = userAgentInfo.getOsName(); String osVersion = userAgentInfo.getOsVersion(); if(osName!= null & !.equals(osName) this._collector.emit(OS_COUNT_STREAM,input,new Values(day,hour,minute, osName); if(osVersion != null & !.equals(osVersion) this._collector.emit(OS_COUNT_STREAM,input,new Values(day,hour,minute,osName+_+osVersion); this._collector.ack(input); Override public void cleanup() Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declareStream(BROWSER_COUNT_STREAM,new Fields(DAY,HOUR,MINUTE,BROWSER); declarer.declareStream(OS_COUNT_STREAM,new Fields(DAY,HOUR,MINUTE,OS); Override public Map getComponentConfiguration() return null; CountKpiBolt类package com.ibeifeng.bigdata.storm.weglog;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import static com.ibeifeng.bigdata.storm.weglog.WebLogConstants.*;/* * 通用的计数Bolt * Created by ad on 2016/12/17. */public class CountKpiBolt implements IRichBolt private String kpiType; /TODO 优化 替换为内存数据库 redis private Map kpiCounts; private String currentDay = ; private OutputCollector _collector; public CountKpiBolt(String kpiType) this.kpiType = kpiType; Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) this.kpiCounts = new HashMap(); this._collector = collector; Override public void execute(Tuple input) String day = input.getStringByField(day); String hour = input.getStringByField(hour); String minute = input.getStringByField(minute); String kpi = input.getString(3); String kpiByDay = day + _ + kpi; String kpiByHour = hour +_ + kpi; String kpiByMinute = minute + _ + kpi; / 隔天清理内存 if(!currentDay.equals(day) / 说明隔天了 IteratorMap.Entry iter = kpiCounts.entrySet().iterator(); while(iter.hasNext() Map.Entry entry = iter.next(); if(entry.getKey().startsWith(currentDay) iter.remove(); currentDay = day; int kpiCountByDay = 0; int kpiCountByHour = 0; int kpiCountByMinute = 0; if(kpiCounts.containsKey(kpiByDay) kpiCountByDay = kpiCounts.get(kpiByDay); if(kpiCounts.containsKey(kpiByHour) kpiCountByHour = kpiCounts.get(kpiByHour); if(kpiCounts.containsKey(kpiByMinute) kpiCountByMinute = kpiCounts.get(kpiByMinute); kpiCountByDay +; kpiCountByHour +; kpiCountByMinute +; kpiCounts.put(kpiByDay, kpiCountByDay); kpiCounts.put(kpiByHour, kpiCountByHour); kpiCounts.put(kpiByMinute,kpiCountByMinute); this._collector.emit(input, new Values(kpiType+_ + kpiByDay, kpiCountByDay); this._collector.emit(input, new Values(kpiType+_ + kpiByHour, kpiCountByHour); this._collector.emit(input, new Values(kpiType+_ + kpiByMinute, kpiCountByMinute); this._collector.ack(input); Override public void cleanup() Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields(SERVERTIME_KPI, KPI_COUNTS); Override public Map getComponentConfiguration() return null; SavaBolt类package com.ibeifeng.bigdata.storm.weglog;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;import java.io.InterruptedIOException;import java.util.Map;import static com.ibeifeng.bigdata.storm.weglog.WebLogConstants.*;/* * 将数据存储到HBase数据库中 * Created by ad on 2016/12/17. */public class SaveBolt implements IRichBolt private HTable table; private OutputCollector _collector; Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) Configuration configuration = HBaseConfiguration.create(); try table = new HTable(configuration,HBASE_TABLENAME); catch (IOException e) e.printStackTrace(); throw new RuntimeException(e); this._collector = collector; Override public void execute(Tuple input) String serverTimeAndKpi = input.getStringByField(SERVERTIME_KPI); Integer kpiCounts = input.getIntegerByField(KPI_COUNTS); System.err.println(serverTimeAndKpi= + serverTimeAndKpi + , kpiCounts= + kpiCounts); if(serverTimeAndKpi!= null & kpiCounts != null) Put put = new Put(Bytes.toBytes(serverTimeAndKpi); String columnQuelifier = serverTimeAndKpi.split(_)0; put.add(Bytes.toBtes(COLUMN_FAMILY), Bytes.toBytes(columnQuelifier),Bytes.toBytes(+kpiCounts); try table.put(put); catch (IOException e) throw new RuntimeException(e); this._collector.ack(input); Override public void cleanup() if(table!= null) try table.close(); catch (IOException e) e.printStackTrace(); Override public void declareOutputFields(OutputFieldsDeclarer declarer) Override public Map getComponentConfiguration() return null; 常量类WebLogConstantspackage com.ibeifeng.bigdata.storm.weglog;/* * 常量类 * Created by ad on 2016/12/17. */public class WebLogConstants public static final String KAFKA_SPOUT_ID = kafkaSpoutId; public static final String WEB_LOG_PARSER_BOLT = webLogParserBolt; public static final String COUNT_IP_BOLT = countIpBolt; public static final String COUNT_BROWSER_BOLT = countBrowserBolt; public static final String COUNT_OS_BOLT = countOsBolt; public static final String USER_AGENT_PARSER_BOLT = userAgentParserBolt; public static final String SAVE_BOLT = saveBolt; / 流ID public static final String IP_COUNT_STREAM = ipCountStream; public static final String URL_PARSER_STREAM = urlParserStream; public static final String HTTPREFER_PARSER_STREAM = httpReferParserStream; public static final String USERAGENT_PARSER_STREAM = userAgentParserStream; public static final String BROWSER_COUNT_STREAM = browserCountStream; public static final String OS_COUNT_STREAM = osCountStream; / tuple key名称 public static final String DAY = day; public static final String HOUR = hour; public static final String MINUTE = minute; public static final String IP = ip; public static final String REQUEST_URL = requestUrl; public static final String HTTP_REFER = httpRefer; public static final String USERAGENT = userAgent; public static final String BROWSER = browser; public static final String OS = os; public static final String SERVERTIME_KPI = serverTimeAndKpi; public static final String KPI_COUNTS = kpiCounts; / kpi类型 public static final String IP_KPI = I; public static final String URL_KPI = U; public static final String BROWSER_KPI = B; public static final String OS_KPI = O; / 表名称 public static final String HBASE_TABLENAME = weblogstatictis; public static final String COLUMN_FAMILY = info;测试类WebLogStatictispackage com.ibeifeng.bigdata.storm.weglog;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.generated.StormTopology;import backtype.storm.spout.SchemeAsMultiScheme;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;import storm.kafka.*;import static com.ibeifeng.bigdata.storm.weglog.WebLogConstants.*;import java.util.UUID;/* * Created by ad on 2016/12/17. */public class WebLogStatictis public static void main(String args) WebLogStatictis webLogStatictis = new WebLogStatictis(); StormTopology topology = webLogStatictis.buildTopology(); Config conf = new Config(); /conf.setNumAckers(4); if(args = null | args.length = 0) / 本地执行 /conf.setMessageTimeoutSecs(1); / tuple发射超时时间 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(webloganalyse, conf , topology); else / 提交到集群上执行 conf.setNumWorkers(4); / 指定使用多少个进程来执行该Topology try StormSubmitter.submitTopology(args0,conf, topology); catch (AlreadyAliveException e) e.printStackTrace(); catch (InvalidTopologyException e) e.printStackTrace(); /* * 构造一个kafkaspout * return */ private IRichSpout generateSpout() BrokerHosts
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 台阶坡道改建方案范本
- 审计部门年度工作总结
- 情志护理与养生
- 单招综评政策解读
- 教官培训工作述职报告
- 2026届福建省龙岩市北城中学英语九年级第一学期期末调研试题含解析
- 早教教师述职报告
- 新人销售技巧培训
- 2026届四川省成都西蜀实验英语九上期末学业质量监测试题含解析
- 江苏省徐州市市区部分2026届化学九年级第一学期期中学业水平测试试题含解析
- 2025-2030滑雪培训行业市场发展分析及前景趋势预测与投资可行性评估报告
- 课堂高效学习的主阵地 教学设计-2023-2024学年高中上学期主题班会
- 2025年放射工作人员培训考试试题(附答案)
- 高考熟词生义解密(复习讲义)-2026年高考英语一轮复习(北京专用)挖空版
- 2025年北京市中考英语试卷(含答案与解析)
- 浙江名校协作体(G12)2025年9月2026届高三返校联考英语(含答案)
- 2025年环保法律法规基础知识考试卷及答案
- 2026届新人教版高考物理一轮复习讲义:静电场及其应用(含答案)
- 检测基础知识培训课件
- 采购管理大师谢勤龙讲义《供应链管理的问题多多与解决之道》
- 国企招聘笔试题及答案-投资专员、投资经理B卷
评论
0/150
提交评论