




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
阿里云开发者社区ALIBABACLOUDDEVELOPERCOMMU从入门到实践从技术基础到实践应用首月599元试用阿里云开发者“藏经阁”湖Lakehouse以及部分spark相关应用。书的应用环境/product/bigdata/spark基础篇一、Databricks数据洞察一企业级全托管Spark大数据分析平台 1二、DeltaLake的演进历程和现状优势 三、深度解析数据湖存储方案Lakehouse架构 四、DeltaLake数据湖基础介绍(开源版) 五、DeltaLake数据湖基础介绍(商业版) 应用篇 55七、使用DDI+Confluent进行实时数据采集入湖和分析 八、使用Databricks进行零售业需求预测的应用实践 九、使用Databricks进行营销效果归因分析的应用实践 十、使用Databricks和MLflow进行机器学习模型训练和部署的应用实践 87分析平台作者:棕泽Databricks数据洞察产品介绍1)ApacheSpark创始公司,也是Spark的最大代码贡献者,Spark技术生态背后的商业公司。在2013年,由加州大学伯克利分校AMPLab的创始团队ApacheSpark的创建者所成立。ApacheSpark、DeltaLake、Koalas、MLFlow、OneLakehousePlat3)公司定位⑩DatabricksistheData+Alcompany,为客户提供数据分析、数据工程、数据科学和一体化分析平台4)市场地位⑩科技独角兽,行业标杆,领导Spark整体技术生态的走向及风向标⑩2021年最受期待的科技上市公司(来源Databricks官网)企业级全托管Spark大数据分析平台及案例分析21)2019年10月G轮,估值$6.2Billion2)2021年2月初F轮,估值$28Billion350亿美元,甚至是高达500亿美元Databricks从成立之初的融资/估值历史F轮62亿美元Sep20133.Databricks和阿里云联手打造的高品质Spark大数据分析平台企业级全托管Spark大数据分析平台及案例分析3NICHEPLAYERSDataiku●4.Databricks+阿里云=Databricks数据洞察1)产品核心2)产品引擎与服务⑩100%兼容开源Spark,经阿里云与Databricks联合研发性能优化企业级全托管Spark大数据分析平台及案例分析4DeltaEngine(超高性能)容ApacheSpark)pythonspok4)产品关键信息与优势免运维:无需关注底层资源情况,降低运维成本,聚高品质支持:阿里云和Databricks提供覆盖Spark高性能分析:性能提升3-50倍提升,轻松处理十亿级无缝架构升级:无缝替代Lambda架构,轻松实现批先进的存储技术:支持CRUD操作、TimeTravel、学工作环境。先进的ML工作流程(待发布):基于MLfow和WorkspacDatabricks驱动的超高性能阿里云数据平台SuperChargedAliCloudDataPlatformpoweredbyDatabricks◎全托管◎全托管高性能架构先进企业级Spark交互协作流式数据结构化数据半结构化数据Aspak·100%兼容ApacheSpark02TPC-DS10TB数据,Runtime最快50倍提升0企业级全托管Spark大数据分析平台及案例分析6opómumublizationComputeoplimizaliondurmngoAutoscalingcomputeHighavailabityforcdusteSchemaevolution&enforcSreamingDataSink/S两者详细比较:htps:/helpalbyuncomdocamentdeta175748存算一体=》计算存储分离存算一体=》计算存储分离三NetworkNetworkNetwork按月标价(元)(元/GB/月)hps://priceproductHecs/d(元/GB/月)计算存储分离=》计算存储分离=》JindoFS实现架构方案力耄⑩不同作业类型工作流混合调度企业级全托管Spark大数据分析平台及案例分析8三记录8.丰富的数据源支持支持常见数据文件格式支持常见数据文件格式支持各种数据源(内置Connector)9.元数据管理全托管高性能引擎存算分离场景四:场景四:交互式ML训练Databricks帮助客户快速构建先进的LakeHouse平台,帮助客户降低运维和调优成本、构建批流一体的处理场景一:数仓迁移或向数据湖演进 数据校验难)场景二:构建LakeHouse方案场景三:能力,聚集挖掘数据价值·Recommendation·Risk,Fraud,&Intrusion·CustomerA·loT&Predictive低质量、不可靠的数据差强人意的性能在这样的背景下应运而生了DeltaLake场景4.大数据发展进入LakeHouse时代▲驱动力1.数据模型变化难2.不支持大规模非结构化数据驱动力1.不支持事物2.不支持数据质量校验Structured,Semi-Structured,andUnstru伞3.不支持计算存分类,存储成本高3.不支持批流一体5.使用DDI构建批流一体数仓,简化复杂构架8KafkaStremKafkaooKatfkaKatfka企业级全托管Spark大数据分析平台及案例分析11冒乖④世婪日□回7.Databricks数据洞察典型构架DDI与阿里云产品深度集成(典型场景)1)数据获取3)BI报表数据分析&交互式分析4)AI数据探索⑩支持机器学习,Mllib等Spark生态Al场景。5)上下游网络打通⑩如上游对接Kafka、OSS、EMRHDFS等等,下游承接Elasticsearch、RDS、OSS存储等。典型场景客户案例介绍⑩数据收集/存储:接收实时产生的流式数据和外部云存储上批量数据⑩数据ETL:持续高效地处理增量数据,支持数据的回滚和删改,提供ACID事务性保障⑩BI数据分析&交互式分析:支持查询,Notebook可视化分析,无缝对接多种BI分析工具⑩数据科学:支持机器学习/深度学习OSS存储等◎jupyterDataScience↑Structured,Semi-structured&UnstructuredData↑Structured,Semi-structured&UnstructuredData数仓的架构,再到最近两年的Lakehouse架构。大数据平台架构演进-数据仓库 同时,基于它自身的诸多优化特性,数仓架构对分析型场景能够提供非常好的支持。但是支持的场景比较有限,基本局限于常用的分析场景。而在大数据时代,随着数据规模的逐渐增加,企业对于数据分析的场景要求越来越多,逐渐产生了一些高级的分析场景需求,比如数据科学类或者机器学习类的场景,而数据仓库对此类需求难以支持。另外,数据仓库也无法支持半结构化以及非结构化的数据。阿里云大数据平台架构演进-数据湖+数据仓库阿里云基于Parquet等开放数据格式借助低成本的云对象存储,支持结构和非结构类型数据存储;SchemaSchema-On-Read设计,下游数据治理复杂度高;缺少数据库管理特性及性能优化手段32003年前后,Hadoop面市。伴随着数据规模体量的爆炸式增长,我们对低成本存储的需求也愈发迫切。于是第二代大数据平台架构雏形初现。它以数据湖为基础,能够支持对结构化、非结构化以及半结构化数据的存储。与数据仓库相比,它是一种Schema-on-read的设计,数据能够比较高效地存入数据湖,但是会给下游的分析提供较高的负担。因为数据在写入之前没有做校验,随着时间的推移,数据湖里的数据会变得越来越脏乱,数据治理的复杂度非常高。同时因为数据湖底层是以开放的数据格式存储在云对象存储上,云对象存储的一些特性会导致数据湖架构缺少像数仓一样的数据管理特性。另外因为云对象存储在大数据查询场景上的性能上不足,导致很多场景下都无法很好地体现数据湖的优势。DeltaLake的演进历程和现状优势15大数据平台架构演进-大数据平台架构演进-Lakehouse基于云对象存储的数据湖之上提供ACID事务管理层,同时保证了数据质量和数据访问性能;支持大数据时代各种分析处理场景AmaznS于是第三代大数据平台架构——Lakehouse应运而生。它在数据湖之上抽象出了事务管理层,能够提供传统数仓的一些数据管理特性,还可以针对云对象存储中的数据做一些数据的性能优化。从而能够针对大数据时代各种复杂的分析场景提供支持,且对于流批两种场景也能够提供统一的处理方式。有了Lakehouse架构的背景之后,DeltaLake也应运而生,它是由Databricks开源的一个数据库解决方案,架构清晰简洁,能够提供比较可靠的保障。上图呈现了Deltalake的Multi-hopMedallion架构,即通过多个表结构来提供不同分析场景的支持。数据可以通过streaming的方式流入DeltaLake,也可以使用batch方式DeltaLake里的表可以分为三类:⑩第一类:数据最先导入的表称为bronzetable。direct事务的特table中数据的可靠性,因此它是整个数据湖的sourceoftruth(事实表)。某些场景可以直接读取bronzetable。DeltaLake的演进历程和现状优势16阿里云阿里云底层基于事务日志机制实现可串行化的隔离级别,提供ACID事务,保证读写数据的一致性;在ACID事务基础上,提供了更多数据管理及性能优化特性时间回溯时间回溯支持制约束及自动演化缓存及索引在大数据的场景下,元数据管理本身可能会成为一种负担,因为对于较大的表来说,元数据本身就能成为大数据。所以如何高效地支持元数据管理,也是对架构挑战。此外,在Deltalake的商业版本里,还提供了数据库中的数据布局自动优化的能力,同时实现了传统数仓数据库一系列性能优化特性,比如缓存、索引等优化能力。0.1版本0.2~0.4版本0.5版本0.6版本0.7版本0.8版本1.0版本+Deltalake项目最早开源在2019年4月,事务、流批一体等最核心的功能在0.1版本都社区第一次在Spark之外提供引擎的支持,也是Deltalake开放性目标的一部分;同时0,5版本还提供了一些优化的特性,以及通过SQL的方式直接将parque转成⑩0.6版本:DeltaLake做了一些了进一步优化,对比如describehistory的命令提供了更多metrics信息。元数据本身是transactionog事务日志的一部分,所以有了Hivemetastore的支了对dml的支持。支持了VACUUM的并发删除能力。纵观Deltalake的发展历程,可以清晰地看出,它一直坚定地朝着Everywhere——支持更多元、更开放的生态发展。Support上图展示了Deltalake1.0的一些核心特性。阿里云阿里云1……本身是时间戳,但是数据中并没有eventDate⑩方案1:直接使用时间戳的字段做分区。时间戳是一个比较细粒度的字段,使用它来做会产生大量的分区,对于查询性能会造成非常大的影响。因此,此方案被排除。⑩方案2:数据写入表之前手动维护额外字段。比如从eventTime字段中抽取得到eventDate。但这需要人工维护字段,而但凡涉及到人工,就容易引入错误,尤其是多种数据源同时写入的情况,要求对多种数据源同时做转换,极易出现差错。因此DeltaLake1.0提供了generatedcolumns,它是一种特殊类型的列,它的值可以根据用户指定的函数自动生成。阿里云阿里云eventDateGENERATEDALWAYSAS)eventDateGENERATEDALWAYSAS)1…PARTITIONEDBY(eventDate)可以通过如上图简单的SQL语法,将eventTime转换成date类型,从而生成eventDate字段。整个过程自动完成的,用户只需要在最开始创建表的时候提供这个语法即可。Deltalake1.0提供的第二个重要特性是Standalone。它的目标是可以在Spark之外对接更多引擎,但是诸如Presto、Flink等引擎本身并不需要依赖Spark,如果Deltalake只能强绑定Spark就违背了Deltalake开放性的目标。DeltaLake的演进历程和现状优势20√√√√√√√7√√√√0InIn[2]:fromdeltalake['part-80000-087ecfo⁴-ee64-49b-bebb-f6c5d6c944F8-c200,snapp'part-0000-02ea⁸B³d-c9b5-4461-a¹e0-526595f1a10-c⁰0.snappy.pa'part-00090-049b⁸14d-10a8-4aab-bf³b-cd⁸aelace⁰91-cee⁰.snappy.p'part-0098-06a6346b-c533-4cda-83a8-bc514b749761-cee0.snappy.pa最早的DeltaLake是Spark的一个子项目,因此DeltaLake对Spark引擎的兼容性做得非常好。同时,由于Spark社区发展迅速,能够第一时间兼容Spark也是DeltaLake社区的首要目标。所以在1.0版本,DeltaLake首先兼容了Spark3.1,并对其提供的一些特性进行优化,以便第一时间在DeltaLake里投入使用。很多企业在使用DeltaLake的时候,一个常用场景是使用单一集群去访问/关联一个存储系统。有DeltaLake1.0对此提供了delegatinglogstore功能,通过logstore的方式来支持不同云厂商的对象存储系统,以便能够支持混合云部署的场景,同时也可以避免对单一云服务商产生locking绑定的情况。未来,DeltaLake社区会朝着一个更加开放的方向发展。染基于数据布局的优化-OPTIMIZE&ZOREDERBY这两个特性在Databricks目前开Spark产品-Databricks数据洞察,除了此处列出的Optimize及Z-Ordering,可以在DeltaLake的演进历程和现状优势23阿里云Databricks数据洞察产品架构⑨⑨全托管Spark平台spankDatabricks数据洞察产品是基Databricks的引擎提供的全托管数据平台,它最核心的部引擎。相比于开源的Spark和Delta,商业版在性能上有非常大的提升。DeltaDeltaLake当前应用情况最后,看一下DeltaLake当前在全球范围内的应用情况,越来越多的企业已经开始使用DeltaLake来构建Lakehouse。从Databricks给的数据看,目前已有超过3000+客户在生产环境中部署了DeltaLake,每天处理Exabytes级别数据量,其中超过75%的数据已流式分析BI结构化,半结构化及非结构化数据⑪数据科学机器学习数据仓库数据仓库优势·便于BI和报表系统接入·数据管控能力强劣势·不支持非结构化数据·成本高,灵活度低潮目片、文档等的支持是非常有限的,因此它不适用于类似于机器学习的应用场景。而且一般情况下,数据仓库都是专有系统,使用成本比较高,数据迁移和同步的灵活性比较低。因此,为了解决上述问题,数据湖的架构应运而生。8综上,不论是数据仓库还是数据湖,都无法完全满足用户的需求。复杂的组合型数据系统数据仓库数据工程流式处理HadooAgecheAunArushekafaApacheSpApsheFinkArazenKiaAneSremAndytiesGoger3十3十)8警8因此,在很多实际使用场景中,用户会将两者组合起来使用,但是这导致需要构建很多不同的技术栈来支持所有场景。比如对于数据分析,需要将结构化的数据输入到数据仓库,然后建立数据市场,对数据分析和BI提供支持;对于数据科学和机器学习的场景,需要把不同的数据比如结构化、半结构化以及非结构化的数据存储到数据湖中,经过数据清理,用来支持机器学习和数据科学等场景;而对于流式数据源,需要通过流式数据引擎存储到实时数据库中,同时还需要对数据湖里的数据进行ETL提取、转换和加载,来保证数据的质量。而且,由于所有技术栈都是互相独立的,导致了维护和使用这些系统的团队也是分散的。比如,数据分析师主要使用数据仓库系统,而数据科学家主要使用数据湖系统,同时数据工程师也需要维护整个系统的不同团队,沟通成本比较高。此外,系统中维护了很多不同格式的数据副本,没有统一的管理数据模型,不同团队的数据很有可能会产生差异。①①数据科学机器学习6:流式分析BI解决的办法是综合数据湖与数据仓库的能力——基于数据湖使用的对象存储,构建数据仓库拥有的数据管控能力。而这一切的关键就是其中的结构化事务层。流式分析BI此前,数据湖主要存在以下几个痛点:务性地被同时读到或者同时没有读到,而这是难以实现的,因为在分布式的对象存储上写多个文件,设置一个文件,数据的一致性都是不能完全被保证的。2)数据的修改。由于安全合规等原因,用户会有强制性地修改已有数据的需求,特别是有时候需要根据过滤结果细粒度地修改某些数据。由于数据湖在数据管控能力上的不足,在数据湖上实现此需求往往需要使用全部扫描再重写的方式,成本比较高,速度也比较慢。3)如果一个作业中途失败,而它产生的部分数据已经存入到数据库中,这也会导致数据的损坏。4)批流混合输入。由于数据在批和流系统中都存在,可能会造成数据在两套系统中不一致,导致读取结果不一致。5)存数据历史。有些用户需要保证数据查询的可重复性,方案之一是为了这个需求做很多重复的数据快照,但这会导致数据的存储和计算成本都大幅上升。6)处理海量的元数据。大型数据湖元数据的数据量非常大,经常能够达到大数据的级别。很多数据湖采用的数据目录系统无法支持如此大量的元数据,这也限制了数据湖的扩展性。7)大量小文件的问题。在数据不断输入的过程中,数据湖内会产生大量小文件,随着时间的推移,小文件的数量可能会越来越多,这会严重影响数据湖的读取性能。8)性能问题。在数据湖上达到高性能不是一件容易的事。有的时候为了达到一定的性能要求,用户需要手动做一些性能的优化,比如数据分区等,而这些手动的操作又比较容易出错。9)数据的查询管控。由于数据湖的开放性,确保查询权限合规也是需要解决的问题。10)质量问题。前面很多点都会导致数据质量的问题。在大数据场景下,如何确保数据的正确性也是一个普遍的问题。而DeltaLake能够为Lakehouse带来数据质量、可靠性以及查询性能的提升。ACID事务事物化所有操作·ACID事务事物化所有操作·每一个操作,要么整体成功,要么整体失败g5.保存数据历史上述前五个问题都是关于数据可靠性,它们都可以通过DeltaLake的acid事务能力来解决。在DeltaLake上,每一个操作都是事务的,即每一个操作都是一个整体,要么整体成功,要么整体失败。如果一个操作在中途失败,DeltaLake会负责将其写入的不完整数据清理干净。具体的实现方式是DeltaLake维护了包含所有操作的一个事务日志,能够保证数据与事务日志的一致性。如上图,某次写操作在某个表中添加了很多数据,这些数据被转换成了parquet格式的两个文件file¹和file2。有了事务日志,读操作的时候就能够保证要么读不到这条日志,要么同时读到这两条记录,这样就保证了读取的一致性,解决了读写并行的问题。事物化所有操作·每一个操作,要么整体成功,要么整体g5.保存数据历史此外,有了事务日志后也可以对已有数据做细粒度的修改。比如下一次写操作对表中的某些数据进行修改,在事务日志中就会出现删除原有文件file1和添加修改后文件file3这样两条记录。同样,在读取的时候,这两条记录也会被同时读到或者忽略,使读取的一致性得到保证。针对第三点中途失败的作业,DeltaLake写入的事务性能够保证不完整的数据不会被成功写入。对于批流混合的输入数据,由于Spark天然支持批流一体,在写入时可以将批和流的数据写入到同一张表,避免了数据冗余及不一致性。由于事务日志保存了所有操作的历史记录,我们可以对之前某个时间点的历史数据进行查询。具体实现方法是:DeltaLake可以查到历史某个时间点对应的事务日志,并且根据历史的事务日志进行数据重放,得到该时间点的数据状态。这个能力被称为“时间旅行”。那么,DeltaLake是怎样处理海量元数据的呢?答案很简单,使用Spark来处理。所有DeltaLake的元数据均以开源parquet的格式存储,数据与元数据总是相伴相生,无需进行同步。使用Spark处理元数据,使得DeltaLake的元数据可以在理论上进行无限的扩展。索引机制数据的自动优化·索引机制数据的自动优化·跳过数据扫描:分区,布隆过滤器等·Z-ordering:优化多个列的存储布局DeltaLake还采用索引的机制来优化性能,它采用分区和不同过滤器等的机制,可以跳过数据的扫描。还采用了Z-ordering的机制,可以在对某个列进行优化的同时,使其他列性能牺牲最小化。为了解决大量小文件的问题,DeltaLake还可以在后台定期对数据布局进行自动优化。如果存储的小文件过多,会自动的将他们合并成大文件,这解决了数据湖中小文件越来越多的问题。表级别的权限控制表级别的权限控制·提供权限设置的API·根据用户权限,动态地对视图进行脱敏8.性能问题对于数据查询的管控,DeltaLake实现了表级别的权限控制,也提供了权限设置API,可以根据用户的权限动态对视图进行脱敏。SchemaSchema验证和演化·所有表中的数据必须严格符合schema●可以在数据写人时进行schema演化最后,DeltaLake实现了schema的验证功能来保证数据质量。存在DeltaLake表中的所有数据都必须严格符合其对应的schema,它还支持在数据写入时做schema的合并演化。当输入数据的schema发生变化的时候,DeltaLake可以自动对表的schema进行相应的演化。流式分析BI数据科学机器学习原始输入与数据历史记录业务层聚合原始输入与数据历史记录业务层聚合清理、过滤、扩展后的数据BronzeSilverBronzeSilver新,还可以减少数据冗余,从而优化存储和计算的开销。3)开放的数据格式可以让数据在不同系统之间的迁移更加顺畅。5)批流一体。与lambda架构不同,Lakehouse能够做到真正的批流一体,从而简化SSSpo☆@n⑨MachineLearningDatabricks独家优化了databricksruntime引擎,也可以理解为ApacheSpark的加强好的结合,提供了全托管高性能的企业级Spark平台,能够同时支持企业的商业洞察分析以及机器学习训练等。DeltaLake数据湖基础介绍(开源版)34四、DeltaLake数据湖基础介绍(开源版)作者:王晓龙(筱龙)1.大数据平台架构演进①1)第一代:数仓架构2)第二代:数据湖+数仓架构3)第三代:Lakehouse架构2.DeltaLake-运行在数据湖之上的可靠存储层DeltaLake作为可靠的数据存储中间层,为构建Lakehouse提供了核心支撑。适用于所有场景适用于所有场景ACID存储层提供高可靠性及性能存储所有类型数据的数据湖DeltaLake的核心特性是对ACID事务支持,并且基于事务日志机制,实现可串行化的隔离级别,提供ACID事务,保证读写数据的一致性。2)DeltaLakeStarter-DML准备通过PySparkDataframAPI创建一张Delta表,表的名称是random_num,表中只包>>>spark.sql(>>>spark.sql("CREATET\"}\"".format('/tmp/delta_course/delta_tab接下来往表中执行几条简单的修改操作语句:>>>spark.sql>>>spark.sql("insertin>>>spark.sql("deletefrom3)DeltaLake文件系统目录结构DeltaLake表的物理存储目录下,既包括自身的表数据文件,也包括记录表Schema及表变化的DeltaLogs:_delta_log_delta_log000000000000000010.checkpoin_last_checkpointpart-00000-05518637-0919-4a45-a95c-8066fa910681-c000.spart-00000-1eef41d1-febd-40fe-849c-d9b19bee3761-c000.spart-00000-258b6649-3b08-4e97-9fa⁰-871be15edc1a-c000.spart-00000-467f6d26-902c-4180-937c-29e⁰6a3e7b4e-c000.snpart-0000-60add56e-1078-40c9-a80b-5d86dffe50d5-c⁰00.snpart-00000-687cc4bf-4442-476b-a9a4-fec8ca89429f-c000.snpart-0000-6b042c36-59e6-44Zc-95bd-c15e45529353-c⁰00.snappy.parqpart-00000-85f4a3c9-12e5-4e61-b48a-bfdd6aac7496-c000.snpart-00000-9dd1e⁰91-fd67-4dec-887d-1b566bdcd9af-c000.spart-00000-b8c55888-fd23-46d1-bdbf-fbf0ec014c70-c000.spart-00000-c8dd38c8-7da3-472f-8171-707acab189eb-c000.snpart-00002-61073ab1-be89-4357-a3d9-d1235d475c8c-c000.spart-00004-d2f4a646-a97b-434a-bab5-6a375e41476a-c000.spart-00007-2339a4cd-24ae-4e94-a2a9-1773e29dbe61-c000.spart-00009-eb0782ff-81d2-4a44-9700-af6b8582f6fe-c000.part-00011-e708e4c4-9e2f-43a6-9950-1c9baf04c657-c000.sTransactionLog(事务日志,也称DeltaLog)是一种有序记录集,顺序表从初始创建以来的所有事务操作。__delta_log00000000000000000010.checkp_last_checkpoint1)TransactionLog的整体设计目标,是实现单一信息源(SingleSourceofTruth),通过跟踪记录用户所有的表操作,从而为用户提供在任意时刻准确的数据视图。2)同时,因为DeltaLake是基于ApacheSpark构建的,依托Spark来管理维护事务日志,所以相比通过Metastore使用单一的数据库管理元数据,DeltaLake具备高可扩展的元数据处理能力,能够支持上百PB的Delta表读写访问。3)除此之外,DeltaLake的事务日志也是其它重要数据管理特性实现的基础,比如数据版本回溯(TimeTravel)等。在DeltaLake中,Transaction被称为Commit。每个Commit代表一个事务操作,同时也代表了一个数据版本,对应_delta_log目录下的一个json文件。_delta_log_delta_log00000000000000000010.checkpo_last_checkpoint{"remove":{"path":"part-00004-d2f4a646-a97b-434a-bab5-6a3c000.snappy.parquet","deletionTimestamp":1645945987771,"dataChange":true,"extendedFrue,"partitionValue√事务日志的最后一行是关于commit的详细信息,包括了时间戳、操作名等元数据。DeltaLake当前定义的Action动作包括:涉及数据文件增加和删除(Addfile/Remove事务及协议相关的变更操作(Settransaction、·Updatemetadata-Updatesthetable'smetad·Settransactifromwhereandatwhatti⑩通过Spark获取到表的最新状态DeltaLake定义的Commit维护的是变更操作的过程记录,当针对Delta表执行查询语句时,可以通过Spark获取到表的最新状态。Spark会对事务日志做聚合,检查事务日志经历了哪些事务操作,并基于事务日志构建出可靠准确的Delta表状态。0000000000000001json在变更操作较多的场景,比如CDC,deltalog下会生成大量json小文件,对处理性能会造成较大影响。为了解决上文提到的小文件问题,DeltaLog引入Checkpoint机制。开始到当前时刻所有变更记录(默认每10次Commit创建一个Checkpoint文件)。①Checkpoint文件给Spark提供了一种捷径来重构表状态,避免低效地处理可能上千条的json格式的小文件。__delta_log0000000000000000010.checkpoint.parquet"/tmp/delta_course/delta_table/_delta_log/00000000000000000010.checkpoi-9d0b-4e...InullIDeltaLake数据湖基础介绍(开源版)42借助checkpoint,Spark可以快只需要基于版本10及随后的11和12两次commit构建表的状态,从而大大提升了元数3)乐观并发控制DeltaLake数据湖基础介绍(开源版)43√传统数据库的锁机制其实都是基于悲观并发控制的观点进行实现的;√对比悲观并发控制,乐观并发控制可以提供更好的性能;√示例中,用户A和用户B都拿到版本号为0的commit,排他锁(mutualexclusion)决定了只能有一个用户能够创建版本号为1的commit,假设接受了UserA的commit,操作语义,在版本1基础上完成UserB的写入。ACCONSISTENCYID如上文介绍,TransactionLog将事务抽象成一个个Commit(文件),Commits里可以包含不同类型的Action,但是每个Commit是原子的。MartinKleppman在DDIA书中对原子性的定义:t隔离性是针对并发事务的处理方式,并发事务不应该相互干扰。在DeltaLake中,隔离性是通过OCC+排他锁方式实现的,并且实现了读写的串行化。MartinKleppman在DDIA书中对隔离性的定义:TransactionLog写入分布式磁盘中,在事务处理结束后,对数据的修改就是永久的,即便系统故障也不会丢失。MartinKleppman在DDIA书中对持久性的定义4)一致性Consistency是数据库的属性,应用程序可能依赖数据库的原子性和隔离属性来实现一致性,但这并不取决于数据库本身,但一致性是由应用来决定的。ACIDsense)isapropertyoftheapplication.Theapplicationmayrelyonthedatabase'satomicityandisoThus,theletterCdoesn'treally扩展的元数据处理等特性。DeltaLake的特性,敬请关注。DeltaLake数据湖基础介绍(商业版)46五、DeltaLake数据湖基础介绍(商业版)作者:李洁杏数据管理系统从早期的数据仓库(DataWarehouse),已经发展到今天的Lakehouse。科学和机器分析的场景。个Structured,Semi-Str数据仓库架构可以完全控制数据的存储和查询,因此可以同时设计查询系统,以及适应查询系统的数据存储结构,以达到优越的查询性能;而在Lakehouse架构下,数据是用开放存储结构存储的,如Parquet格式,以便更多系统可以便捷的访问数据,但是开放的存储格式并不一定适合查询操作,查询系统也没有足够的统计数据来实现高效查询。那么,Lakehouse如何以开放的存储格式达到高效的查询性能?DeltaLake数据湖基础介绍(商业版)47如图所示,在Databricks中用SSD作为缓存,可以将数据读取速度提高3倍以上;采用Delta引擎作为缓存,则可以将数据读取速度提高7倍以上。2.建立辅助数据结构示例一:Parquet文件中的DataSkip查询的信息只会存在于file3,因此可以跳过file1和file2的读取。WHEREyear=2020示例二:在Parquet文件上建立索引如下图,如果查询条件是type=“DELETE_ACCOUT”,可以利用在type上建立的索引直接跳到对应的数据上,从而避免读取无关数据。=“DELETE_ACCOUNT”示例三:Parquet文件上建立BloomFilter可以为每一个文件建立BloomFilter,BloomFilter可以快速判断表文件中是否包含需要查询的数据,如果不包含则快速跳过该文件,从而减少扫描数据量,提升查询性能。=“DELETE_ACCOUNT”1)小文件问题在DeltaLake中频繁执行MERGE,UPDATE,INSERT操作,可能会产生大量的小文件。大量的小文件,一方面会降低系统读取性能,同时也会提高元数据操作的开销。Lakehouse中使用了不同的技术来减少小文件的产生:个表文件进行写入,最终会导致一个partition中产生很Databricks对Delta表的写入过程进行了优化,对每个partition使用一个专门的executor来合并其它executor对该partition的写入,从而避免了小文件的产生。在每次向Delta表中写入数据之后,会检查Delta表中的表文件数量,如果Delta表中的小文件(size<128MB则视为小文件)数量达到阈值,则会执行一次小文件合并,将Delta表中的小文件合并为一个新的大文件。除了自动合并,Databricks还提供Opitmize命令,使用户可以手动合并小文件,优化表结构,使得表文件的结构更加紧凑。2)查询时间问题查询运行时间主要取决于访问的数据量,即使使用Parquet格式,也可以通过优化表内的数据布局以减少运行时间。⑩表文件数据排序DeltaLake数据湖基础介绍(商业版)50将表文件存储数据排序,在每个表文件中存储一定量的数据,如下图中file1存储uid=0.….1000,file2存储uid=1001..2000,这样在查询时就可以根据需要跳过无关的表文件,减少文件扫描数量。 在实际查询中,有些查询需要看colomn1在某个范围内的数据,有些查询需要看colomn2在某个范围内的数据,或者更多,这时候仅仅对colomn1进行排序显然是不够的。Z-Ordering可以在多个维度上(如下图的col1-4)将关联的信息存储到同一组文件中,来减少不必要的文件读取。ZZ络4WHEREi_i查询说明:将store_sales与item两个表连起来,条件是当item_sk值相等且item_id等于一个固定值。DeltaLake数据湖基础介绍(商业版)51如果不开启DFP,从上图可以看出,查询会先对store_sales进行全滤后的item表的行进行join,虽然结果仅有4中的86多亿条数据。DeltaLake数据湖基础介绍(商业版)52万条store_sales中的数据,比未启用DFP时减少了近99%。从结果上看,启动DFP后,该条查询实现了10倍的性能提升。针对该特性在TPC-DS上进行测试(见下图),测试发现启用DFP后,TPC-DS的查询速度达到4.5倍到8倍的提升。且通过数据布局优化,建立辅助数据结构减少对非缓存数据读取的I/O,实现了01.什么是克隆两种克隆方式:shallow(浅克隆),deep(深克隆)。1)深克隆语句2)深克隆的特性⑩与源表相比,克隆表有独立的历史记录;⑩在克隆过程中、或之后发生的对源表的任何更改,都不会反映在克隆表中;1)浅克隆语句与深克隆语句类似,只是在SQL中加入SHALLOWCLONE语句;在Python和Scala中2CREATETABLE3.forName(4.clone("path/to/copy",isShallow=tr2)浅克隆的特性⑩浅克隆不是自包含的,即自身不是数据源,如果源文件数据被删除,则浅克隆数据可能会不可用;⑩浅克隆不复制流事务或COPYINTO相关的元数据;4.克隆的使用场景克隆的适用场景有很多,比如:数据存储、短期实验、数据分享和灾难恢复,其中除了短期实验使用浅克隆,其它场景都需要使用深克隆。如何使用DeltaLake构建批流一体数据仓库55六、如何使用DeltaLake构建批流一体数据仓库作者:李元健、冯加亮相信大家在构建数仓处理数据方面都很有经验,而产业界也耗费了大量的资源来构建相关的系统。分别以不同的处理形式为用户提供服务。那么我们期望的理想的系统是什么样的?①更一体化或更加聚焦,让更专业的人干更专业的事情⑩有同时处理流式和批量的能力如何使用DeltaLake构建批流一体数据仓库56理想与现实理想与现实低质、不可靠数据2.想要解决的问题1)历史查询如何使用DeltaLake构建批流一体数据仓库57spark第一条处理流比较简单,比如通过ApachSpark直接使用StreamingAnalytics打通实时与此同时,需要离线流时,历史查询可以使用Lambda架构对应的方式。ApachSpark提供了很好的抽象设计,我们可以通过一种代码或API来完成流和实时的入架构设计。通过历史数据的查询,我们可以进一步使用Spark进行SQL分析,以及用SparkSQL的作业的形式来产生AI技术的能力。2)数据校验A-arch①接下来我们需要面对的第一个问题就是数据的校验。我们的流式数据和批量数据,假设以Lambda架构的形式存在时,如何确认我们在某一如何使用DeltaLake构建批流一体数据仓库58个时间点查出来的数据是对的?到底流式的数据和批量的数据差多少?我们的批量数据什么时候该与流式数据进行同步?所以Lambda架构还需要引入Validation,这需要我们予以确认。尤其是像报表系统面向用户的这种精确的数据分析系统,Validation这一步骤不可或缺。因此,也许我们需要一支旁支来解决流式和批量之间的同步问题,以及对应的验证问题。3)数据修复spork假设如上问题解得到了解决,在系统上了一段时间我们会发现,如我们对应的某个Partitioned数据出了问题,当天的脏数据在若干天之后需要修正。此时我们需要怎么办?通常,我们需要停掉线上的查询后再修复数据,修复完数据后重新恢复线上的任务。如此折腾的过程,实际无形的给系统架构又增加了一个修复以及过去版本回复的能力。因此,Reprocessing诞生了。4)数据更新如何使用DeltaLake构建批流一体数据仓库59A假设解决完了Reprocessing问题,我们在AI和Reporting最终的出口端,可以看到有新的一系列的需求。比如有一天业务部门或者上级部门、合作部门提出能否Schema5)理想中的DeltaLake如何使用DeltaLake构建批流一体数据仓库601.DeltaLake具备的能力→→→→→→4.具备在线处理历史数据的能力3)历史数据以及脏数据的回滚。我们需要有TimeTravel的能力来回溯到某一个时间点5)可以在不阻断下游作业的前提下处理迟到的数据,可以直接入表。如何使用DeltaLake构建批流一体数据仓库61以上5点完全解决之后,我们就可以用DeltaLake来替代Lambda架构,或者说我们一系列批流分制的架构设计可以使用DeltaLake架构。基于基于DeltaLake的架构设计统一批量、流式的持续数据流模型什么是基于DelatLake的架构设计?DelatLake的架构设计中一系列的元数据或者最低的级别就是表。可以将我们的数据一层一层的分成基础数据表,中间数据表以及最终的高质量数据表。所有的一切只需要关注的就是表的上游和下游,它们之间的依赖关系是不是变得更加的简单和干净。我们只需要关注业务层面的数据组织,所以DelatLake是统一批量、流式的持续数据流的模型。以下通过Demo的形式演示如何在Databricks数据洞察里搭建批流一体数据仓库的操作,解决生产环境的问题。Demo演示视频:/live/248826七、使用DDI+Confluent进行实时数据采集入湖和分析本文将介绍使用Python脚本周期性地向KafkaBrokers发ComectksqiDBumlflow上图最左侧为lOT设备和移动应用,负责采集设备或者应用的运行数据,发布至Kafka本示例中模拟实时数据的数据集为纽约市出租车数据,存储在CSv文件内。数据中第一列下车的经纬度以及乘客的数量。Q进入Confluent管控台,Jinxi-DDl-test为本次使用的confluent集群。使用DDI+Confluent进行实时数据采集入湖和分析643登录ControlCenter,可以查看本集群的详细信息,比如broker数量、broker产生和消费的吞吐量、topic数量、partition数量等信息。创建一个用于接收数据的topic,名为nyc_taxi_data,partition数量设置为3,与broker数量一致。92Python脚本里需要提供集群ID,ControlCenter的用户名、密码以及topic名称。创建一个名为conf的dict,指定bootstrapserver的地址,此地址需要根据ClusterlD拼接而成,其他都为样板代码。生成一个producer,其来源于confluentKafka库,因此用户需要安装此库。打开train文件,读取CSV文件里的数据并发送到confluent集群。G进入ControlCenter,可以看到已经开始生产消息。使用DDI+Confluent进行实时数据采集入湖和分析66Aipoa9.o/Pwt(ny:20186-51:2La26n“,"fareim4,5.,“aickupatetta3844-2517:24;ure,auranlagjita进入topic,点击message,可以直接跳转到offset0,即最开始的消息,可以查看消息至此,数据的采集和发布链路已经打通,下一步需要到DDINotebook连接到Kafkatreaming-Data-Proces使用DDI+Confluent进行实时数据采集入湖和分析67--这两个属性是Databricks数据洞察相对开源版deltalake特有的,Spark流式数据入湖时,--会产生大量的小文件,过多的小文件使得delta表的元数据变得不可扩展,且使得查询速度下降--我们提供的这两个特性可以自动合并数据入湖过程中产生的小文件,避免出现查询性能下降和元数据扩展性的问题(delta.autoOptimizerodelta.autoOptimize.autoCompact此外还设置了表的属性,主要与DDI引擎的特性相关。第一个frompyspark.sql.functionsinportfron_json,col,decode,explodeconfluent_serverf"rb-{confcontrol_center_password-os.environ[L'pcheckpoint_Locatibn="oss://databricks-delta-demo/taxi_data/chectaxi_data_delta_lake="oss://databricks-delta-demo/taxi_data/data".option("kafka.bootstrap.servers".option("startingoffs.option("kafka.security.pr.option("kafka.saslf"""org.apache,mon,security.plain.PlainLoginMorequiredusername="{control_center_username}"password-"{control_center_passwschema=(StructType().add('key',TimestampT.add('fare_amount',FLoatType().add('pickup_datetime',TimestampType()).add('pickup_latitude.add('dropoff_longitude',.add('passenger_count',IntegerType()))lines-(lines.withColumn('data',fron_json(col('value').cast('string'),schemquery=(lines.writeStreon.fornat('deltaquery=(lines.writeStreon.fornat('delta.option('path',taxi_data_d集群进行消费。消费逻辑与发送逻辑较为相近,需要给记录消费Kafka数据时的offset,如果作业中断或异常停止,重启后可以从对应使用DDI+Confluent进行实时数据采集入湖和分析68在value里,因此需要创建一列data,负责从value里取值。value为二进制形式,需◎设置◎设置关于LTASKWScuUxPCq誓子用户oss//databricks-deta-demo/taxi_data/check类型/大小最后修改时间Select若干数据进行查看。在OSS的browser里可以看到注入的数据以及过程中产生m三使用DDI+Confluent进行实时数据采集入从交易量的走势图可以看出,3、4、5、6月的交易量较高,8月达到低谷。selectdayofweek('pickup_datetime)asgroupbydayofweek('43254325Took3sec.LastupdatedbyJinxiat统计在每周内从周一到周日的交易量变化,如上图。fook3sec.Lastupdatedby统计一天中每个时间段的交易量,如上图。可以看出每天中午和下午为交易量的低谷。统计每个时间段打车的费用,如上图。可以看出中午打车费用较高。因此可以得出结论:由于打车价格攀升导致了交易量萎靡。.统计每个年份打车费用的均值,如上图左,显示为打车费用连年上升。再统计每个年份的交易量,如上图右,可以得出结论:交易量和打车费用成反比关系。--统计交易金额的分布where'fare_amount>0and'fare_amoungroupbycastC'fare_amount'3count(1)132,147统计交易金额的分布,如上图。可以看出,纽约市打车价格主要分布在3-20美元之间。在以上查询分析的过程中,流式处理作业一直处于运行状态,与批式作业没有冲突,可以并行运行。费KafkaBrokers中的数据,实现了实时数据入湖,并且使用DDI的Zeppelin基于入湖的实时数据做一些简单分析。使用Databricks进行零售业需求预测的应作者:李锦桂本文从零售业需求预测痛点、商店商品模型预测的实践演示,介绍Databricks如何助力零售商进行需求、库存预测,实现成本把控和营收增长。首先,需求预测对零售商至关重要。如果商店的商品过多,货架和仓库的空间紧张,产品容易过期,财务资源被库存束缚。零售商无法利用制造商带来的新机会,从而错过消费模式的转变。由于商店内商品过少,客户无法从上商店内买到需要的商品。不但会造成零售商的收入损失,而且随着时间的推移,消费者的失望情绪,会驱使消费者转向竞争对手。综上所述,预测消费者需求的准确性和及时性,对零售商非常重要。下面我们使用零售数据模拟如何使用DDI的notebook和Facebookprophet来对消费者的需求进行建模和预测。现在我们需要的数据已经上传到了OSS的Bucket里面,接下来,开始对消费者的需求进行建模和预测。当数据上传到OSS上之后,可以在DDI的Notebook上对数据进行分析和建模。本次使用的数据集是2012年到2017年,10个商店中的50商品销售数据。数据包含四列。第一列是日期;第二列是商店的ID(1-10);第三列是商品的ID(1-50);第四列是当日商品的销售量。实验目的是预测未来三个月,这些商品在各个商店的销量,对商店未来的库存备货提供指导。使用Databricks进行零售业需求预测的应用实践73spark.shuffle.service.enaspark.dynamicAllocation.enabledspark.dynamicAllocation.minExecut[train_data_path="oss://time-series-forcTook0sec.LastupdatedbyJinxi(train_df)使用Databricks进行零售业需求预测的应用实践74读取DataFrame之后,通过熟悉的SQL语句对数据进行分析,可以使用dataframe的createOrReplaceTempView方法,创建一个临时的视图。杰201420152016Took1sec.LastupdatedbyJinxiatApril102022,11:56:04PM.创建view之后,对dataframe中的数据进行分析。首先分析销售数据随年份的走势。从图表可以看出,在过去几年,商店的销售额稳步增长,总体呈现线性增长的趋势。在预测下一年的销售额时,可以参考过去几年的增长率。3.基于DDI建立预测单个商店-商品模型与此同时,商品销售往往有很强的季节性,特别是服装行业。T恤在夏季的销售额肯定高于羽绒服的销售额。因此,在预测商品的销售额时,季节性是不可忽略的因素。2013-01-012012016-05-012017-03-012017-Took1sec.LastupdatedbyJi如上图所示,从2013年到2017年,商品销量不断上涨。一年之中,商品的销售额呈现很强的周期性。在12月或1月时,商品销量到达波谷,随着月份不断攀升,7月销量到达波峰。所以在进行建模时,月份是很
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 金属成形机床行业工业机器人应用与编程考核试卷
- 谷物真空包装与保鲜技术优化应用考核试卷
- 软木制品在医疗设备领域的应用考核试卷
- 如何评估嵌入式系统的安全性试题及答案
- 茶叶店品牌战略规划考核试卷
- 行政组织理论的评估指标与绩效监控研究试题及答案
- 葡萄酒酿造过程中的酿造产业链优化与协同创新考核试卷
- 国网公司物资管理制度
- 工会会员会员管理制度
- 员工异地办公管理制度
- 双减背景下高中语文优化作业设计实践与研究
- 《企业财务现状的杜邦分析-以大疆科技为例》开题报告(含提纲)2400字
- 道德与法治六年级下册7《多元文化 多样魅力》(课件)
- 中医治疗颈椎病课件完整版
- KJ251煤矿人员定位系统-设计方案
- 消防接警调度岗位理论知识考试题库汇总-上(单选题)
- YS/T 778-2011真空脱脂烧结炉
- GB/T 15256-1994硫化橡胶低温脆性的测定(多试样法)
- GB/T 10294-2008绝热材料稳态热阻及有关特性的测定防护热板法
- 车库车位检查验收记录表
- spss之统计挖掘第7章-定性资料统计推断课件
评论
0/150
提交评论