大数据采集器培训课件_第1页
大数据采集器培训课件_第2页
大数据采集器培训课件_第3页
大数据采集器培训课件_第4页
大数据采集器培训课件_第5页
已阅读5页,还剩25页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

大数据采集器培训课件课程内容导航01大数据采集概述理解核心概念与应用场景02采集环境搭建配置Python与核心工具03网络数据采集技术掌握爬虫框架与实战04分布式消息系统Kafka高吞吐量数据流处理05日志采集系统Flume多源数据采集与同步06数据仓库与集成构建企业级数据中心07ETL工具Kettle实操数据转换与加载实践08数据清洗与预处理提升数据质量标准09采集器实战案例企业级应用场景剖析总结与答疑第一章大数据采集概述大数据的核心特征大数据采集是构建数据价值链的第一步。现代企业面临海量数据的挑战,这些数据呈现出Volume(体量大)、Velocity(速度快)、Variety(种类多)、Value(价值密度低)的4V特征。采集器的战略价值高效的数据采集系统能够从多样化的数据源中实时获取信息,为企业决策提供坚实基础。采集器不仅要保证数据完整性,还需要应对不同场景下的技术挑战。结构化数据关系型数据库、Excel表格等规范化数据半结构化数据JSON、XML、日志文件等带标签数据非结构化数据图片、视频、文本等无固定格式数据大数据采集的三大核心要点1数据源多样性管理现代企业数据来源复杂多样,包括关系型数据库(MySQL、Oracle)、NoSQL数据库(MongoDB、Redis)、API接口、物联网设备、社交媒体平台等。采集器需要具备跨平台、多协议的适配能力,实现统一的数据接入标准。结构化数据:传统数据库表格数据半结构化数据:日志文件、JSON、XML格式非结构化数据:文本、图像、音视频内容2实时性与批处理平衡根据业务场景选择合适的采集模式至关重要。实时采集适用于金融交易、监控告警等对时效性要求极高的场景,而批处理则更适合数据仓库的定期更新、历史数据分析等场景。混合模式能够兼顾两者优势。实时流处理:毫秒级响应,支持实时分析微批处理:秒级或分钟级数据汇总批量处理:小时级或天级大批量数据迁移3数据质量与安全保障高质量的数据是分析价值的基础。采集过程中需要实施严格的数据校验机制,包括格式验证、完整性检查、一致性校对等。同时必须遵守数据安全法规,实施数据加密、脱敏处理、访问控制等安全措施。数据验证:格式、范围、逻辑一致性检查安全加密:传输加密、存储加密、访问认证合规性:符合GDPR、数据安全法等法规大数据采集全流程架构完整的大数据采集系统由四个关键环节组成,形成从数据源到价值产出的完整链路。采集器在整个流程中扮演着数据入口的核心角色,其性能和稳定性直接影响后续所有环节的效果。数据采集从多源异构系统中获取原始数据,支持实时流采集和批量采集两种模式数据预处理进行数据清洗、格式转换、去重、异常值处理,确保数据质量数据存储根据数据特点选择合适的存储方案,如HDFS、Hive、HBase、ES等数据分析基于清洗后的数据进行挖掘分析,产生业务洞察和决策支持关键要点:采集器的选择需要综合考虑数据量级、实时性要求、系统兼容性、运维成本等多个维度。优秀的采集系统应具备高可用、可扩展、易监控的特性。第二章采集环境搭建指南构建稳定的实验环境搭建一套完整的大数据采集环境需要多个组件协同工作。我们将从Python基础环境开始,逐步安装配置Kafka、Flume、Kettle等核心工具,构建起功能完整的实验平台。1Python环境配置安装Python3.8+版本,配置pip镜像源,安装必要的数据处理库2依赖工具安装部署Kafka消息队列、Flume日志采集、KettleETL工具3环境验证测试运行示例程序,验证各组件连通性和功能完整性环境搭建注意事项确保JDK版本在1.8以上预留足够的磁盘空间(建议50GB+)配置防火墙规则开放必要端口使用虚拟环境隔离不同项目依赖第三章网络数据采集技术精讲网络爬虫是获取互联网公开数据的重要手段。本章将深入讲解爬虫的工作原理、常用框架的特点以及实战中的存储方案选择,帮助学员快速掌握网络数据采集的核心技能。爬虫基础原理HTTP协议解析、HTML/CSS选择器、JavaScript渲染处理、反爬虫策略应对主流框架对比Scrapy:功能强大的异步框架,适合大规模采集Requests+BeautifulSoup:轻量级组合,适合快速开发存储方案选型MongoDB:适合半结构化数据存储Redis:高性能缓存和队列管理MySQL:结构化数据持久化网络爬虫实战案例解析实战场景一:热搜数据采集通过爬取百度热搜榜,我们可以实时掌握社会热点话题。该案例涵盖了动态页面解析、定时任务调度、数据存储等核心技能点。importrequestsfrombs4importBeautifulSoupdefcrawl_hotlist():url=''response=requests.get(url)soup=BeautifulSoup(response.text)items=soup.select('.hot-item')return[item.textforiteminitems]实战场景二:新闻内容采集新闻网站的内容采集需要处理分页、提取正文、过滤广告等复杂情况。合理的去重策略和异常处理机制是保证数据质量的关键。数据去重策略基于URL的MD5哈希去重使用布隆过滤器提升效率内容相似度计算去重异常处理技巧网络超时重试机制4xx/5xx状态码处理动态代理IP池轮换性能优化方法异步并发请求连接池复用分布式爬虫架构第四章分布式消息系统KafkaKafka是LinkedIn开发的高性能分布式消息队列系统,现已成为大数据生态的核心组件。它能够处理每秒数百万级别的消息,为实时数据流处理提供可靠的基础设施。Topic(主题)消息的逻辑分类,类似数据库中的表,支持多订阅者模式Partition(分区)Topic的物理分片,实现并行处理和水平扩展能力Broker(代理)Kafka集群节点,负责消息存储和转发,支持高可用部署ConsumerGroup消费者组机制,实现负载均衡和故障转移Kafka的核心优势:高吞吐量(单机每秒数十万条消息)、低延迟(毫秒级)、持久化存储、水平扩展、容错性强,是构建实时数据管道的理想选择。Kafka数据采集实战操作生产者与消费者编程Kafka提供了简洁的API接口,支持多种编程语言。Python的kafka-python库是最常用的客户端之一,能够快速实现消息的生产和消费。#生产者示例fromkafkaimportKafkaProducerproducer=KafkaProducer(bootstrap_servers=['localhost:9092'])producer.send('test-topic',b'HelloKafka')producer.flush()#消费者示例fromkafkaimportKafkaConsumerconsumer=KafkaConsumer('test-topic',bootstrap_servers=['localhost:9092'])formsginconsumer:print(msg.value)数据存储集成方案Kafka可以无缝对接多种存储系统。通过KafkaConnect框架,可以实现与MongoDB、Redis、Elasticsearch等系统的双向数据同步,构建灵活的数据流转链路。性能调优调整batch.size、linger.ms等参数优化吞吐量监控工具KafkaManager、Burrow等开源监控方案可靠性保障配置副本机制、ACK确认、事务支持第五章日志采集系统Flume详解ApacheFlume是Cloudera开发的分布式日志采集系统,专门用于高效收集、聚合和移动大量日志数据。它具有高可靠性、高可用性和可扩展性,是Hadoop生态中重要的数据采集工具。1Source(数据源)负责接收数据,支持多种类型:AvroSource:接收Avro格式数据ExecSource:执行命令获取数据SpoolingDirectory:监控目录文件KafkaSource:从Kafka消费数据2Channel(通道)缓冲区,连接Source和Sink:MemoryChannel:内存队列,速度快FileChannel:磁盘持久化,可靠性高KafkaChannel:利用Kafka做缓冲3Sink(接收器)负责将数据写入目标系统:HDFSSink:写入HDFS分布式存储KafkaSink:发送到Kafka队列HiveSink:直接写入Hive表ElasticsearchSink:写入ES搜索引擎Flume多场景实战应用MySQL日志同步到Kafka使用Flume监听MySQL的binlog日志,实时捕获数据变更事件,通过KafkaSink发送到消息队列,实现数据库变更的实时同步和订阅。适用于数据仓库实时更新、缓存更新等场景。文件日志实时采集通过SpoolingDirectorySource监控应用服务器的日志目录,自动采集新生成的日志文件,经过数据清洗和格式化后,写入HDFS或Elasticsearch。支持日志轮转、断点续传、重复过滤等特性。多数据源整合采集构建复杂的Flume拓扑结构,从Web服务器、应用服务器、数据库等多个源头采集数据,通过多级Agent串联,实现数据的聚合、过滤、路由,最终统一存储到数据湖中。Flume配置最佳实践合理设置Channel容量,避免数据积压;启用事务机制保证数据不丢失;使用Interceptor实现数据过滤和转换;配置多个Agent实现高可用;定期监控采集性能和数据质量。第六章数据仓库与数据集成数据仓库的核心价值数据仓库是面向主题的、集成的、稳定的、反映历史变化的数据集合,用于支持企业决策分析。它将分散在不同业务系统中的数据整合到统一的存储平台,提供一致的数据视图。主题导向:按业务主题组织数据集成性:统一数据标准和格式时变性:保留历史数据变化轨迹非易失性:数据稳定不被修改数据集成关键技术数据集成是将来自不同源系统的数据进行抽取、转换和加载的过程。主要挑战包括异构数据源适配、数据质量保障、实时性要求、大规模数据迁移等。ETL流程设计与优化增量更新与全量同步数据血缘与影响分析元数据管理与治理Hive数据仓库基于Hadoop的SQL查询引擎,支持PB级数据分析,提供SQL接口HBase列式数据库分布式NoSQL数据库,支持实时读写,适合海量数据随机访问Impala实时查询MPP架构的SQL引擎,提供秒级查询响应,适合交互式分析数据集成工具实战应用1Sqoop数据迁移Sqoop是Hadoop与关系型数据库之间的数据传输工具,支持MySQL、Oracle、SQLServer等主流数据库。通过并行导入导出,可以高效完成TB级数据迁移。支持全量和增量同步模式。2DataX异构传输阿里开源的异构数据源离线同步工具,支持MySQL、Oracle、HDFS、Hive、HBase等20+种数据源。采用Framework+Plugin架构,易于扩展。提供限流、脏数据处理等企业级特性。3实战案例演示从MySQL业务数据库将订单、用户、商品等核心表数据导入Hadoop生态。首先使用Sqoop进行全量导入建立基线,然后配置定时任务实现每日增量更新。最终在Hive中构建多维分析模型。选型建议:Sqoop适合简单的RDBMS到Hadoop的数据迁移,性能稳定;DataX支持更丰富的数据源,配置灵活,适合复杂的异构数据集成场景。第七章ETL工具Kettle快速入门PentahoDataIntegration(简称Kettle)是开源的ETL工具,提供图形化界面进行数据抽取、转换和加载设计。它功能强大、易于使用,支持100+种数据源,是企业数据集成的首选工具之一。Kettle核心概念Transformation(转换):数据处理的基本单元,由Step组成的有向图Job(作业):工作流程序,控制Transformation的执行顺序Step(步骤):转换中的最小执行单元,如表输入、排序、过滤等Hop(跳):连接各个Step,定义数据流向安装与配置要点下载Kettle社区版或企业版配置Java环境变量(JDK1.8+)设置内存参数优化性能配置数据库连接和资源库安装必要的插件和驱动01数据抽取从源系统读取数据,支持数据库、文件、API等多种方式02数据转换进行字段映射、格式转换、计算、清洗、合并等操作03数据加载将处理后的数据写入目标系统,支持批量和实时模式Kettle高级功能与最佳实践复杂数据转换技巧Kettle提供丰富的转换组件处理复杂场景:数据清洗:空值处理、去重、字符串清洗、数据类型转换数据验证:正则表达式校验、范围检查、业务规则验证数据计算:JavaScript脚本、Java代码、公式计算器数据聚合:分组统计、排序、唯一值提取、行转列调度与自动化管理生产环境中的ETL任务需要可靠的调度机制:Kettle内置调度:使用Kitchen命令行执行JobCron定时任务:Linux系统的定时调度DolphinScheduler:大数据工作流调度平台集成监控告警:配置邮件通知、日志记录、执行状态跟踪性能优化策略提升ETL执行效率的关键措施:并行处理:调整Step副本数,充分利用多核CPU批量提交:设置合理的Commitsize减少IO次数索引优化:在数据库表上创建适当索引分区策理:对大表进行分区,分批处理数据缓存使用:利用内存缓存减少重复查询错误处理最佳实践配置错误处理Step捕获异常记录;使用日志记录Step跟踪关键节点;设置事务回滚保证数据一致性;建立数据质量监控指标;定期检查错误日志并优化。第八章数据清洗与预处理技术数据清洗是数据分析前的关键步骤,直接影响分析结果的准确性。真实世界的数据往往存在缺失值、异常值、重复记录、格式不一致等质量问题,需要系统化的清洗和预处理流程。数据质量评估完整性、准确性、一致性、时效性检查缺失值处理删除、填充、插值等策略选择异常值检测统计方法、机器学习方法识别异常去重与标准化记录去重、格式统一、编码转换数据转换归一化、离散化、特征工程数据预处理核心技术解析数据集成与变换来自不同源系统的数据需要进行整合和统一:Schema匹配:识别不同表之间的对应关系实体解析:识别指向同一实体的不同记录数据融合:解决数据冲突,合并多源数据格式转换:日期、数字、字符串格式统一数据脱敏与隐私保护在数据处理过程中保护敏感信息:遮蔽:部分字符替换为*号替换:用假值替换真实数据加密:使用加密算法保护数据泛化:将具体值替换为区间扰动:添加随机噪声保护隐私Pandas清洗实战使用Python的Pandas库是数据清洗的常用方法,提供了丰富的数据处理函数importpandasaspd#读取数据df=pd.read_csv('data.csv')#删除重复行df.drop_duplicates(inplace=True)#填充缺失值df.fillna(method='ffill',inplace=True)#异常值处理df=df[df['age']<120]第九章企业级采集器案例分享本章将介绍三个典型的企业级数据采集解决方案,涵盖日志采集、轻量级采集和大数据平台建设等不同场景,帮助学员了解实际生产环境中的采集器应用模式。阿里云LoongCollector阿里云推出的新一代可观测数据采集器,支持日志、Metric、Trace等多种数据类型。采用Golang开发,性能优异,资源占用低。提供丰富的采集插件和数据处理能力,支持云原生环境部署,与阿里云SLS无缝集成。ElasticBeats家族Elastic公司开源的轻量级数据采集器系列,包括Filebeat(日志)、Metricbeat(指标)、Packetbeat(网络)等。采用Go语言编写,占用资源少,部署简单。与Elasticsearch、Logstash、Kibana完美配合,构建完整的数据采集分析链路。敏捷大数据平台基于开源技术栈构建的企业级大数据平台,整合Kafka、Flume、Flink等组件。支持实时和离线两种采集模式,提供统一的数据接入、清洗、存储、计算、可视化能力。适用于构建企业数据中台和数据湖。实时数据采集平台设计剖析构建一个企业级的实时数据采集平台需要综合考虑性能、可靠性、可扩展性等多个维度。下面以一个典型的电商实时数据采集架构为例,详细分析各个环节的技术选型和设计要点。数据采集层Web/App埋点、服务器日志、业务数据库binlog采集消息队列层Kafka集群实现数据缓冲和分发实时计算层Flink/SparkStreaming进行实时处理存储层HBase/ClickHouse/ES多引擎存储应用层实时大屏、报表、告警等业务应用开源项目Wormhole由edp963开发的流式数据处理平台,提供统一的流式数据接入、处理和分发能力。支持多种数据源和目标存储,配置简单,适合快速构建数据管道。开源项目Davinci数据可视化平台,支持对接多种数据源,提供丰富的图表类型和交互能力。可以快速搭建数据大屏和报表系统,满足数据展示需求。开源项目Moonbox数据虚拟化引擎,提供统一的SQL接口访问异构数据源。支持联邦查询、数据缓存、权限控制等企业级特性,简化数据访问复杂度。采集器性能优化核心技巧资源占用优化监控CPU、内存、网络、磁盘使用情况调整JVM堆内存大小和GC策略控制并发线程数避免资源竞争使用异步IO提升处理效率合理设置缓冲区大小负载均衡策略确保集群各节点负载均衡使用一致性哈希分配数据动态感知节点状态调整分配配置多个采集器实例提高可用性实现故障自动转移机制可靠性保障确保数据不丢失、不重复启用ACK确认机制配置数据持久化和备份实现幂等性处理避免重复断点续传支持任务恢复容错机制设计网络异常时自动重试,设置最大重试次数和退避策略数据重试策略失败数据进入死信队列,人工介入处理或定期重新处理监控告警体系实时监控采集速率、延迟、错误率,异常时及时告警通知采集器安全与合规实践数据脱敏技术应用在数据采集和传输过程中保护敏感信息是法律和业务的双重要求。主要的脱敏技术包括:静态脱敏:在数据落盘前进行脱敏处理动态脱敏:查询时根据权限动态脱敏字段级加密:对敏感字段单独加密存储令牌化:用不可逆的令牌替换真实值访问控制与审计建立完善的权限管理和审计机制:身份认证:支持LDAP、OAuth等认证方式权限控制:基于角色的细粒度权限管理审计日志:记录所有数据访问操作合规报告:定期生成合规性审计报告行业标准与法规数据采集需要遵守《网络安全法》、《数据安全法》、《个人信息保护法》等法律法规,以及GDPR、CCPA等国际标准。采集个人信息需要明确告知用户并获得授权。安全最佳实践传输层使用TLS/SSL加密;存储层使用透明数据加密;定期进行安全漏洞扫描和渗透测试;建立应急响应预案;对员工进行数据安全培训。第十章大数据采集技术发展趋势随着技术的不断演进,大数据采集领域正在经历深刻变革。云原生、人工智能、多源融合等新技术正在重塑数据采集的架构和模式,为企业提供更加智能、高效、灵活的数据采集解决方案。1云原生采集器基于Kubernetes的容器化部署成为主流,采集器具备弹性伸缩、自动恢复、服务网格等云原生特性。Serverless采集模式降低运维成本,按需付费更加经济。支持多云、混合云环境的统一数据采集。2AI智能采集机器学习算法自动识别数据模式和质量问题,智能推荐数据清洗规则。自然语言处理技术实现非结构化文本的智能解析和信息抽取。异常检测算法实时发现数据质量问题和安全威胁。3多源融合物联网设备、边缘计算节点、5G网络等新型数据源不断涌现。数据湖架构支持结构化、半结构化、非结构化数据的统一存储。流批一体化架构实现实时和离线数据的融合处理。展望未来:数据采集将更加智能化、自动化、实时化。采集器不仅是数据搬运工,更是具备数据理解和处理能力的智能代理。企业需要及时跟进技术趋势,构建面向未来的数据基础设施。课堂互动:常见问题答疑如何选择合适的采集器?需要综合考虑数据源类型、数据量级、实时性要求、技术栈兼容性、团队技术能力等因素。小规模场景可选轻量级工具如Filebeat;大规模实时场景推荐Kafka+Flink;复杂ETL场景选择Kettle或DataX。环境搭建中的常见难点主要问题包括:版本兼容性冲突、网络配置错误、权限不足、端口被占用、依赖包缺失等。建议使用Docker容器化部署简化环境配置,参考官方文档逐步排查问题,必要时查看详细日志定位错误。实操中的技术瓶颈性能瓶颈:优化并发参数、增加资源配置、使用批量操作。数据质量问题:建立数据校验规则、实施数据清洗流程。运维困难:完善监控告警、自动化运维脚本、建立应急预案。更多疑问?欢迎在课后通过以下方式继续交流:加入课程专属技术交流群邮件咨询讲师获取详细解答参与在线直播答疑活动查阅课程配套的技术文档学习建议理论与实践相结合,多动手操作搭建本地实验环境反复练习阅读优秀开源项目源码关注技术社区最新动态参与实际项目积累经验课程核心内容总结本次培训系统讲解了大数据采集器的核心技术和实战应用,涵盖了从理论基础到工具使用、从环境搭建到生产实践的完整知识体系。希望学员能够掌握以下核心要点:理论基础理解大数据采集的核心概念、技术架构和应用场景工具掌握熟练使用Kafka、Flume、Kettle等主流采集工具编程能力掌握Python爬虫、数据清洗、ETL开发等技能架构设计能够设计企业级数据采集系统架构方案性能优化掌握采集器性能调优和故障排查方法安全合规了解数据安全和隐私保护的相关要求持续学习建议:大数据技术发展迅速,建议保持学习热情,关注行业动态,积极参与开源社区,在实践中不断提升技能水平。可以从小项目开始,逐步挑战更复杂的场景。参考资料与学习资源推荐权威书籍推荐《数据采集与预处理》-林子雨著系统介绍数据采集理论和实践技术《大数据技术原理与应用》-林子雨著Hadoop生态系统全面解析《Kafka权威指南》深入理解Kafka架构和最佳实践《数据密集型应用系统设计》分布式系统设计经典著作在线学习资源阿里云大学:采集器官方文档和视频教程Apache官网:Kafka、Flume等项目文档GitHub:优秀开源项目源码学习StackOverflow:技术问答社区掘金/CSDN:技术博客和实战案例开源项目链接Wormhole:/edp963/wormholeDavinci:/edp963/davinciMoonbox:/edp963/moonboxDataX:/alibaba/DataX技术社区Apache中文社区云栖社区大数据技术峰会各类技术交流群附录一:常用命令与配置Kafka核心命令速查#启动Zookeeperbin/zookeeper-server-start.shconfig/perties#启动Kafka服务bin/kafka-server-start.shconfig/perties#创建Topicbin/kafka-topics.sh--create--topictest--bootstrap-serverlocalhost:9092--partitions3--replication-factor1#查看Topic列表bin/kafka-topics.sh--list--bootstrap-serverlocalhost:9092#生产消息bin/kafka-console-producer.sh--topictest--bootstrap-serverlocalhost:9092#消费消息bin/kafka-console-consumer.sh--topictest--from-beginning--bootstrap-serverlocalhost:9092Flume配置示例#agent名称agent.sources=r1agent.channels=c1agent.sinks=k1#source配置agent.sources.r1.type=execmand=tail-F/var/log/app.log#channel配置agent.channels.c1.type=memoryagent.channels.c1.capacity=1000#sink配置agent.sinks.k1.type=hdfsagent.sinks.k1.hdfs.path=/data/logs#绑定关系agent.sources.r1.channels=c1agent.sinks.k1.channel=c1Kettle转换设计要点表输入:配置数据库连接和SQL字段选择:选择需要的字段值映射:字段名称和类型转换过滤记录:设置过滤条件排序记录:指定排序字段去除重复:配置唯一键字段表输出:配置目标表和写入方式附录二:实验环境安装指南Windows环境适合开发和学习使用Linux环境生产环境首选平台Windows安装步骤安装JDK:下载JDK1.8或11版本,配置JAVA_HOME环境变量安装Python:下载Python3.8+,勾选"AddtoPATH"选项安装Kafka:下载二进制包,解压到指定目录,修改配置文件安装Flume:解压Flume安装包,配置FLUME_HOME安装Kettle:解压Kettle(PDI)安装包,运行Spoon.bat启动Windows注意事项路径不要包含中文和空格;使用管理员权限运行;配置防火墙规则;注意路径分隔符使用反斜杠。Linux安装步骤更新系统:sudoapt-getupdate(Ubuntu)或yumupdate(CentOS)安装JDK:sudoapt-getinstallopenjdk-8-jdk安装Python:通常自带,或使用包管理器安装配置环境变量:编辑~/.bashrc添加JAVA_HOME等变量安装工具:解压各工具到/opt目录,配置启动脚本Linux注意事项检查端口占用情况;配置文件描述符限制;设置开机自启动;注意文件权限问题。常见问题解决方案端口冲突使用netstat命令查看端口占用,修改配置文件更改端口号内存不足调整JVM参数,减小堆内存大小或增加物理内存连接超时检查网络配置、防火墙设置、服务是否正常启动依赖缺失根据错误提示安装对应的依赖库或驱动程序附录三:实操代码片段精选Python爬虫完整示例importrequestsfrombs4importBeautifulSoupimportpymongoimporttimeclassNewsSpider:def__init__(self):self.session=requests.Session()self.client=pymongo.MongoClient('localhost',27017)self.db=self.client['news_db']self.collection=self.db['articles']

defcrawl_list(self,url):"""爬取列表页"""response=self.session.get(url,timeout=10)soup=BeautifulSoup(response.text,'html.parser')articles=soup.select('.article-item')

forarticleinarticles:title=article.select_one('.title').text.strip()link=article.select_one('a')['href']self.crawl_detail(link,title)time.sleep(1)#避免频率过高

defcrawl_detail(self,url,title):"""爬取详情页"""response=self.session.get(url,timeout=10)soup=BeautifulSoup(response.text,'html.parser')content=soup.select_one('.content').text.strip()

#保存到MongoDBself.collection.insert_one({'title':title,'url':url,'content':content,'crawl_time':time.time()})if__name__=='__main__':spider=NewsSpider()spider.crawl_list('/news')Kafka生产者示例fromkafkaimportKafkaProducerimportjsonproducer=KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambdav:json.dumps(v).encode('utf-8'),acks='all',#等待所有副本确认retries=3,#失败重试次数batch_size=16384#批量发送大小)#发送单条消息data={'user_id':123,'action':'click','timestamp':1234567890}producer.send('user-events',value=data)#批量发送foriinrange(100):producer.send('test-topic',value={'id':i,'msg':f'message{i}'})producer.flush()#确保所有消息发送完成producer.close()Kafka消费者示例fromkafkaimportKafkaConsumerimportjsonconsumer=KafkaConsumer('user-events',bootstrap_servers=['localhost:9092'],group_id='my-group',auto_offset_reset='earliest',value_deserializer=lambdam:json.loads(m.decode('utf-8')))#消费消息formessageinconsumer:data=message.valueprint(f"Received:{data}")

#处理业务逻辑process_data(data)

#手动提交offset(可选)mit()Flume多源

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论