Flink原理与实践PPT完整全套教学课件_第1页
Flink原理与实践PPT完整全套教学课件_第2页
Flink原理与实践PPT完整全套教学课件_第3页
Flink原理与实践PPT完整全套教学课件_第4页
Flink原理与实践PPT完整全套教学课件_第5页
已阅读5页,还剩274页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第一章

大数据技术概述全套ppt课件大数据的5个VVolume:数据量大Velocity:数据产生速度快Variety:数据类型繁多Veracity:数据真实性Value:数据价值大数据单台计算机无法处理所有数据,使用多台计算机组成集群,进行分布式计算。分而治之:将原始问题分解为多个子问题多个子问题分别在多台计算机上求解将子结果汇总比较经典的模式和框架:MPIMapReduce大数据分而治之MPI:MessagePassingInterface消息传递接口使用分治法将问题分解成子问题,在不同节点上分而治之地求解。MPI提供数据发送和数据接收操作:将本进程中某些数据发送给其他进程接收其他进程的数据自行设计分治算法,将复杂问题分解为子问题优势:以很细的粒度控制数据的通信劣势:难度大,开发调试时间成本高MPI程序员只需要定义两个操作:Map和Reduce案例:三明治制作Map阶段将原材料在不同的节点上分别进行处理Shuffle/Group阶段将不同的中间食材进行组合Reduce阶段最终将一组中间食材组合成三明治成品学习门槛比MPI低MapReduce单条数据被称为事件(Event)或者被称为一条数据或一个元素。事件按照时序排列会形成一个数据流(Data

Stream)。数据流一般是无界(Unbounded)的,某段有界数据流(BoundedDataStream)可以组成一个数据集。数据与数据流批处理(BatchProcessing):对一批数据进行处理案例:微信运动统计步数,银行信用卡账单统计…

数据总量大,计算非常耗时流处理数据本质上是流,流处理(StreamProcessing)对数据流进行处理案例:查看电商实时销售业绩、股票交易…批处理与流处理流处理一般使用生产者-消费者模型股票交易案例:辅助人工决策实现消费者侧代码,以10秒为一个时间窗口,统计窗口内的交易情况可扩展性:随着数据不断增多,能否保证我们的程序能够快速扩展到更多的节点上。数据倾斜:数据没有均匀分布到分布式系统各个节点上。容错性:系统崩溃重启后,之前的那些计算如何恢复。时序错乱:数据到达的时间和实际发生的时间是不一致的,有一定的延迟,需要设计等待策略。Flink:为流处理而生。流处理框架必要性生产者-消费者模型MapReduce编程模型的一种实现,逐渐形成了一整套生态圈。主要组件:HadoopMapReduce:数据处理模型,面向批处理。HDFS:分布式文件系统,提供存储支持。YARN:资源调度器,分配计算资源。其他著名组件:Hive:SQL-on-HadoopHbase:基于HDFS的分布式数据库,毫秒级实时查询Kafka:消息队列ZooKeeper:分布式环境的协调HadoopHadoop生态圈Spark初衷:改良HadoopMapReduce的编程模型,提高运行速度,优化机器学习性能。易用性:比MapReduce更好用,提供了多种编程语言API,支持SQL、机器学习和图计算。速度快:尽量将计算放在内存中。完美融入进Hadoop生态圈。流处理:SparkStreaming,mini-batch思想,将输入数据流拆分成多个批次。Spark是一个批流一体的计算框架。SparkSpark生态圈Spark

mini-batch流处理消息队列:数据集成和系统解耦,某个应用系统专注于一个目标。企业将各个子系统独立出来,子系统之间通过消息队列来发送数据。Kafka

Kafka可以连接多个组件和系统主要面向流处理流处理框架经历了三代演进StormSpark

StreamingFlink事件投递保障:Exactly-Once:一条数据只影响一次最终结果毫秒级的延迟FlinkLambda架构:批处理层、流处理层、在线服务层批处理层:等待一个批次数据,使用批处理框架计算,得到一个非实时的结果。比如,凌晨0点开始统计前一天所有商品的计算次数,计算需要几个小时。流处理层:使用流处理框架生层结果。早期的流处理框架不成熟,结果近似准确。在线服务层:将来自批处理层准确但有延迟的预处理结果和流处理层实时但不够准确的预处理结果做融合。程序员需要维护批处理和流处理两套业务逻辑。Lambda架构Lambda架构Kafka等消息队列可以保存更长时间的历史数据,它不仅起到消息队列的作用,也可以存储数据,替代数据仓库。Flink流处理框架解决了事件乱序下计算结果的准确性问题。程序员只维护一套流处理层,维护成本低。Kappa架构Kappa架构延迟:一个事件被系统处理的总时间。案例:自助食堂,一位用餐者从进入食堂到离开食堂的总耗时。高峰时,总耗时会增加。分位延迟更能反映系统的性能。吞吐:系统最大能处理多少事件。与系统本身设计有关,也与数据源的数据量有关。延迟与吞吐相互影响,一起反映了系统的性能。优化方式:优化单节点内的计算速度,使用并行策略,分而治之地处理数据。延迟和吞吐延迟和吞吐更直观的表现:用户是否排队。滚动窗口(TumblingWindow):定义一个固定的窗口长度,长度是一个时间间隔。滑动窗口(SlidingWindow):定义一个固定的窗口长度和一个滑动长度。会话窗口(SessionWindow):窗口长度不固定,根据会话间隔(SessionGap)确定窗口,两个事件之间的间隔大于SessionGap,则两个事件被划分到不同的窗口中。窗口三种时间窗口EventTime:事件实际发生的时间事件发生时,EventTime就已经确定ProcessingTime:事件被流处理框架处理的时间不同节点、系统内不同模块、同一数据不同次处理都会产生不同的ProcessingTime案例:手机游戏,用户需要与服务器实时交互,游戏根据实时数据计分。信号丢失,部分数据上传有延迟,使用事件的Event

Time更准确。时间Watermark是插入到数据流的元素。Watermark元素到达,假设不会有比这个时间点更晚的上报数据。可以设置不同的Watermark策略,是一种折中方案:Watermark等待时间短,保证低延迟,数据准确性下降。Watermark等待时间长,数据更准确,延迟高,维护难度大。Watermark无状态:流处理中,不需要额外信息,给定一个输入数据,直接得到输出。将英文单词转化为小写。有状态:根据历史信息,处理新流入数据。统计一分钟内单词出现次数,需要保存已经进入系统的历史。使用检查点(Checkpoint)技术,将状态数据保存下来,用于故障后的恢复。状态与检查点有状态计算和无状态计算如果发生故障,数据是否被成功处理?At-Most-Once:每个事件最多被处理一次。有些数据被丢弃,最不安全。At-Least-Once:每个事件至少被处理一次,有些事件可能被处理多次。部分数据被处理多次,可能不准确。Exactly-Once:每个事件只被处理一次。事件不丢不重。实现难度最大。数据一致性保障Java企业级编程语言有很多开源包大数据必备Scala函数式编程有一定学习门槛Flink目前绝大多数代码和功能均由Java实现编程语言的选择Python简单易用PyFlinkSQL上手门槛很低第二章

大数据必备编程知识案例:动物类(Animal)和鱼类(Fish)继承关系保证所有动物子类都具有动物类的属性和方法子类有自己的属性和方法。除了动物,还有很多其他事物也会移动,使用接口(interface)来抽象“移动”。继承Java的继承:继承类extends

实现接口implements继承publicclassClassAimplementsMove{@Overridepublicvoidmove(){...}}实现接口publicclassDogextendsAnimal

{

privateStringdogData;

publicDog(StringmyName,StringmyDescription,StringmyDogData){=myName;this.description=myDescription;this.dogData=myDogData}

}

继承类interfaceclass重写:子类和父类都定义同名方法,子类的方法会覆盖父类中已有的方法。重载:多个同名方法,这些方法名字相同、参数不同、返回类型不同。重写与重载publicclassClassAimplementsMove{@Overridepublicvoidmove(){...}}@Override

:在子类中重写父类中的同名方法publicclassOverloading{

//无参数,返回值为int类型publicinttest(){System.out.println("test");return1;}

//有一个参数publicvoidtest(inta){System.out.println("test"+a);}

//有两个参数和一个返回值publicStringtest(inta,Strings){System.out.println("test"+a+""+s);returna+""+s;}}同名方法重载:一个类中多个方法都名为test,但是参数类型和返回值类型不同。案例:Java中的List和ArrayListArrayList是一个泛型类,List是一个泛型接口ArrayList泛型是一种集合容器,可以向这个集合容器中添加String、Double以及其他各类数据类型。没必要创建StringArrayList、DoubleArrayList等类。泛型ist<String>strList=newArrayList<String>();List<Double>doubleList=newLinkedList<Double>();类名后面加上<T>

类内部的一些属性和方法都可以使用泛型T泛型规范:T代表一般的任何类。E代表元素(Element)或异常(Exception)。 K或KEY代表键(Key)。 V代表值(Value),通常与K一起配合使用。Java泛型类publicclassMyArrayList<T>{

privateintsize;

T[]elements;

publicMyArrayList(intcapacity){this.size=capacity;this.elements=(T[])newObject[capacity];}

publicvoidset(Telement,intposition){elements[position]=element;}

@OverridepublicStringtoString(){Stringresult="";for(inti=0;i<size;i++){result+=elements[i].toString();}returnresult;}

}

与泛型类类似,使用<>符号可以继承并实现这个接口Java泛型接口publicinterfaceList<E>{...publicList<E>subList(intfromIndex,inttoIndex);}

publicclassArrayList<E>implementsList<E>{...publicList<E>subList(intfromIndex,inttoIndex){...//返回一个List<E>类型值}}

要实现的子类是泛型的publicclassDoubleListimplementsList<Double>{...publicList<Double>subList(intfromIndex,inttoIndex){...//返回一个List<Double>类型值}}要实现的子类不是泛型的,而是有确定类型的泛型方法可以存在于泛型类中,也可以存在于普通的类中。泛型方法的类型E和泛型类中的类型T可以不一样。泛型方法是泛型类的一个成员,泛型方法既可以继续使用类的类型T,也可以自己定义新的类型E。Java泛型方法publicclassMyArrayList<T>{...//public关键字后的<E>表明该方法是一个泛型方法//泛型方法中的类型E和泛型类中的类型T可以不一样

public<E>EprocessElement(Eelement){...returnE;}}

Java泛型信息只存在于代码编译阶段,当程序运行到JVM上时,与泛型相关的信息会被擦除。对于绝大多数应用系统开发者来说影响不太大,对于框架开发者来说,必须要注意。

类型擦除Class<?>strListClass=newArrayList<String>().getClass();Class<?>intListClass=newArrayList<Integer>().getClass();//输出:classjava.util.ArrayListSystem.out.println(strListClass);//输出:classjava.util.ArrayListSystem.out.println(intListClass);//输出:trueSystem.out.println(strListClass.equals(intListClass));泛型擦除:无法区别strListClass和intListClass这两个类型适合进行并行计算的一种编程范式非函数式编程:创建中间变量,分步执行函数式编程

:与数学表达式更相似实现单个函数,将零到多个输入转换成零到多个输出。比如,add()

将两个输入转化为一个输出。将多个函数连接起来,实现所需业务逻辑。比如,将add()、multiply()连接到一起。函数式编程

addResult=x+yresult=addResult*z非函数式编程result=add(x,y).multiply(z)函数式编程Lambda表达式被一些编程语言用来实现函数式编程。一个箭头符号

->

,两边连接着输入参数和函数体。Lambda表达式(parameters)->{body}Java的Lambda表达式的语法规则//接收2个int类型参数,返回它们的和(intx,inty)->x+y//接收1个String类型参数,将其输出到控制台,不返回任何值(Strings)->{System.out.print(s);}

//参数为圆半径,返回圆面积,返回值为double类型(doubler)->{doublepi=3.1415;returnr*r*pi;}几个Java

Lambda表达式案例输入参数:接收零到多个输入参数程序员可以提供输入类型,也可以不提供类型,让代码根据上下文去推断参数可以放在圆括号()中,多个参数通过英文逗号,隔开函数体:可以有一到多行语句函数体有多行内容,必须使用花括号{}

输出的类型与所需要的类型相匹配Java

Lambda表达式Lambda表达式本质是一种接口,它要实现一个函数式接口(FunctionalInterface)中的虚方法函数式接口是一种接口,并且它只有一个虚方法。@FunctionalInterface

注解函数式接口@FunctionalInterfaceinterfaceAddInterface<T>{Tadd(Ta,Tb);}

publicstaticclassMyAddimplementsAddInterface<Double>{@OverridepublicDoubleadd(Doublea,Doubleb){returna+b;}}如果没有Lambda表达式(Integera,Integerb)->a+b;

使用Lambda表达式Java

8之后推出的,专注于对集合(Collection)对象的操作。右侧案例:数据先经过stream()方法被转换为一个Stream类型,后经过filter()、map()、collect()等处理逻辑,生成我们所需的输出。各个操作之间使用英文点号.来连接,这种方式被称作链式调用(MethodChaining)。链式调用:将多个函数连接起来。Flink的API是面向数据集或数据流的操作。这些操作分布在大数据集群的多个节点上,并行地分布式执行。Java

Stream

APIList<String>strings=Arrays.asList("abc","","bc","12345","efg","abcd","","jkl");

List<Integer>lengths=strings.stream().filter(string->!string.isEmpty()).map(s->s.length()).collect(Collectors.toList());

lengths.forEach((s)->System.out.println(s));第三章

Flink的设计与运行原理相关概念方法(Method):Java或Scala语言中的方法,有输入参数和返回值。函数(Function):Flink提供给开发者的接口flatMap()、keyBy()等算子(Operator):在执行层面,算子对数据进行操作,一般一到多个函数对应一个算子。Source、Transformation和Sink数据流图从代码到逻辑视图逻辑视图中圆圈表示算子,箭头表示数据流可以在Flink

Web

UI中查看一个作业的逻辑视图大数据框架的算子对计算做了抽象,方便用户进行并行计算、横向扩展和故障恢复逻辑视图分布式环境下并行化物理执行数据流被切分到多个分区(Partition)算子被切分为算子子任务(Operator

Subtask),又被称为算子实例物理执行的基本单元并行度(Parallelism):衡量并行切分的多少物理执行数据在不同算子子任务之间数据交换常见四种数据交换策略:前向传播(Forward)按Key分组(Keyed-Based)广播(Broadcast)随机(Random)数据交换策略Master协调管理DispatcherResourceManagerJobManagerTaskManager拥有CPU、内存等计算资源Flink作业被分发到多个TaskManager上并行执行主从架构启动一个Flink集群,TaskManager进程启动后会将自己注册给Master的ResourceManagerClient提交作业(Application)Master的Dispatcher接收作业,启动JobManagerJobManager向ResourceManager申请资源,ResourceManager会将闲置资源分配给JobManager作业转化为物理执行图,计算任务分发部署到多个TaskManager上作业提交过程作业提交流程ClientFlink主目录下的bin目录中的命令行工具将用户作业转换为JobGraphDispatcher接收多个作业,为每个作业分配一个JobManagerJobManager单个作业的协调者,每个作业有一个JobManager将JobGraph转化为物理执行图ExecutionGraph向ResourceManager申请资源管理TaskManager,将具体计算任务分发部署到多个TaskManager上Flink核心组件介绍ResourceManager统一处理资源分配上的问题获取计算资源、分配给具体计算作业TaskManager负责具体计算任务的执行提供一定量的任务槽位(TaskSlot,简称Slot),Flink作业运行在这些Slot上Slot会注册到ResourceManager上,ResourceManager分配这些Slot给具体的作业部署层Local、Cluster、Cloud运行时层分布式运行时API层流处理-

DataStream

API批处理–

DataSet

API上层工具基于DataStream/DataSet

API的上层工具Flink组件栈StreamGraph根据用户代码生成的图JobGraphStreamGraph优化之后生成JobGraph算子链ExecutionGraphJobGraph的分布式并行版本物理执行图部署到TaskManager上的具体计算任务再谈逻辑视图到物理执行图算子链将相近的算子子任务链接在一起链接后形成任务(Task)Task以线程的形式被TaskManager调度可以降低算子子任务之间的传输开销任务、算子子任务与算子链上图中,Source和FlatMap链接到了一起,其他算子发生了跨分区数据交换,无法链接到一起。Task

SlotTaskManager下有多个Task

Slot每个Task

Slot中运行着某些TaskSlot之间的内存相互隔离Slot内部共享TCP连接、心跳等允许用户设置TaskManager中的Slot的数目建议将TaskManager下Slot数设置为CPU核心数任务槽位与计算资源Slot与TaskManager多个Task共享一个Slot数据交换成本更低右图中,Source和FlatMap计算量不大,WindowAggregation计算量较大,资源互补增加并行度后,在同样的计算资源基础上,可以部署更多算子实例,处理的数据量更大槽位共享将并行度由2改为6槽位共享后,多个Task共享一个Slot可以增大并行度,有限的资源上处理更多数据并行度逻辑视图并行切分为多个算子子任务每个算子子任务处理输入数据的一部分输入数据量增大时,可适当增大并行度槽位数目针对TaskManager设置资源切分粒度并行度与槽位数目第四章

DataStream

API的介绍和使用Flink程序的骨架结构初始化运行环境读取一到多个Source数据源根据业务逻辑对数据流进行Transformation转换将结果输出到Sink调用作业执行函数执行环境是作业与集群交互的入口设置并行度关闭算子链时间、Checkpoint…流处理和批处理的执行环境不一样Java、Scala两套API设置执行环境//创建Flink执行环境

StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.disableOperatorChaining();Source、Transformation和SinkSource读取数据源统称为Source文件系统、消息队列、数据库等Transformation使用Flink提供的各类函数,进行有状态的计算数据流的分组、窗口和聚合操作等Sink将计算结果输出到外部系统,统称为Sink目的地可以是文件系统、消息队列、数据库等Flink是延迟执行(LazyEvaluation)的调用execute()方法,Flink才会真正执行否则无法得到计算结果字符串参数为当前作业名执行//execute

env.execute("kafkastreamingwordcount");单数据流转换基于Key的分组转换多数据流转换数据重分布转换DataStream<T>泛型T为数据流中每个元素的类型四类Tranformation转换每个输入元素对应一个输出元素重写MapFunction或RichMapFunctionMapFunction<T,O>

T为输入类型O为输出类型实现其中的map()虚方法主逻辑中调用该函数单数据流转换-

map@FunctionalInterfacepublic

interface

MapFunction<T,O>extends

Function,Serializable{//调用这个API就是继承并实现这个虚函数

Omap(Tvalue)

throwsException;}//第一个泛型是输入类型,第二个泛型是输出类型

public

static

class

DoubleMapFunction

implements

MapFunction<Integer,String>{@OverridepublicStringmap(Integerinput)

{ return

"functioninput:"+input+",output:"+(input*2);}}DataStream<String>functionDataStream=dataStream.map(newDoubleMapFunction());MapFunction源代码一个MapFunction的实现直接继承接口类并实现map虚方法上页所示使用匿名类使用Lambda表达式单数据流转换-

map//匿名类

DataStream<String>anonymousDataStream=dataStream.map(newMapFunction<Integer,String>(){@OverridepublicStringmap(Integerinput)

throwsException{ return

"anonymousfunctioninput:"+input+",output:"+(input*2);}});//使用Lambda表达式

DataStream<String>lambdaStream=dataStream .map(input->"lambdainput:"+input+",output:"+(input*2));匿名类实现MapFunctionLambda表达式实现MapFunction对输入元素进行过滤继承并实现FilterFunction或RichFilterFunction重写filter虚方法True

–保留False

–过滤单数据流转换-

filterDataStream<Integer>dataStream=senv.fromElements(1,2,-3,0,5,-9,8);//使用->构造Lambda表达式

DataStream<Integer>lambda=dataStream.filter(input->input>0);public

static

class

MyFilterFunction

extends

RichFilterFunction<Integer>{//limit参数可以从外部传入

privateIntegerlimit;public

MyFilterFunction(Integerlimit)

{this.limit=limit;}@Overridepublic

boolean

filter(Integerinput)

{ returninput>this.limit;}}Lambda表达式实现FilterFunction实现FilterFunction与map()相似输出零个、一个或多个元素可对列表结果展平单数据流转换-

flatMap{苹果,梨,香蕉}.map(去皮){去皮苹果,去皮梨,去皮香蕉}mapflatMap{苹果,梨,香蕉}.flatMap(切碎){[苹果碎片1,苹果碎片2],[梨碎片1,梨碎片2,梨碎片3],[香蕉碎片1]}{苹果碎片1,苹果碎片2,梨碎片1,梨碎片2,梨碎片3,香蕉碎片1}使用Lambda表达式Collector用来收集元素flatMap()虚方法中不使用return返回数据,使用Collector收集返回数据Collector<String>中的泛型String为返回数据类型将flatMap()看做map()和filter()更一般的形式map()和filter()的语义更明确单数据流转换-

flatMapDataStream<String>dataStream=senv.fromElements("HelloWorld","HellothisisFlink");

//split函数的输入为"HelloWorld"输出为"Hello"和"World"组成的列表["Hello","World"]

//flatMap将列表中每个元素提取出来

//最后输出为["Hello","World","Hello","this","is","Flink"]

DataStream<String>words=dataStream.flatMap((Stringinput,Collector<String>collector)->{

for(Stringword:input.split("")){

collector.collect(word);

}}).returns(Types.STRING);数据分组后可进行聚合操作keyBy()将一个DataStream转化为一个KeyedStream聚合操作将KeyedStream转化为DataStreamKeyedStream继承自DataStream基于Key的分组转换根据某种属性或数据的某些字段对数据进行分组对一个分组内的数据进行处理股票:相同股票代号的数据分组到一起相同Key的数据被分配到同一算子实例上需要指定Key数字位置字段名KeySelector基于Key的分组转换-

keyByDataStream<Tuple2<Integer,Double>>dataStream=senv.fromElements( Tuple2.of(1,1.0),Tuple2.of(2,3.2), Tuple2.of(1,5.5),Tuple2.of(3,10.0),Tuple2.of(3,12.5));//使用数字位置定义Key按照第一个字段进行分组

DataStream<Tuple2<Integer,Double>>keyedStream=dataStream.keyBy(0).sum(1);KeySelector重写getKey()方法单数据流转换-

keyBy//IN为数据流元素,KEY为所选择的Key

@FunctionalInterfacepublic

interface

KeySelector<IN,KEY>extends

Function,Serializable

{//选择一个字段作为Key

KEYgetKey(INvalue)

throwsException;}public

class

Word{publicStringword;public

intcount;}//使用KeySelector

DataStream<Word>keySelectorStream=wordStream.keyBy(newKeySelector<Word,String>(){@OverridepublicStringgetKey(Wordin)

{returnin.word;}}).sum("count");KeySelector源码一个KeySelector的实现sum()、max()、min()等指定字段,对该字段进行聚合KeySelector流数据上的聚合实时不断输出到下游状态存储中间数据单数据流转换–

Aggregations将某个字段加和结果保存到该字段上不关心其他字段的计算结果单数据流转换–

sumDataStream<Tuple3<Integer,Integer,Integer>>tupleStream=

senv.fromElements( Tuple3.of(0,0,0),Tuple3.of(0,1,1), Tuple3.of(0,2,2),Tuple3.of(1,0,6), Tuple3.of(1,1,7),Tuple3.of(1,0,8));//按第一个字段分组,对第二个字段求和,打印出来的结果如下:

//(0,0,0)

//(0,1,0)

//(0,3,0)

//(1,0,6)

//(1,1,6)

//(1,1,6)

DataStream<Tuple3<Integer,Integer,Integer>>sumStream=tupleStream.keyBy(0).sum(1);max()对该字段求最大值结果保存到该字段上不保证其他字段的计算结果maxBy()对该字段求最大值其他字段保留最大值元素的值单数据流转换–

max

/

maxByDataStream<Tuple3<Integer,Integer,Integer>>tupleStream=

senv.fromElements( Tuple3.of(0,0,0),Tuple3.of(0,1,1), Tuple3.of(0,2,2),Tuple3.of(1,0,6), Tuple3.of(1,1,7),Tuple3.of(1,0,8));//按第一个字段分组,对第三个字段求最大值max,打印出来的结果如下:

//(0,0,0)

//(0,0,1)

//(0,0,2)

//(1,0,6)

//(1,0,7)

//(1,0,8)

DataStream<Tuple3<Integer,Integer,Integer>>maxStream=tupleStream.keyBy(0).max(2);//按第一个字段分组,对第三个字段求最大值maxBy,打印出来的结果如下:

//(0,0,0)

//(0,1,1)

//(0,2,2)

//(1,0,6)

//(1,1,7)

//(1,0,8)

DataStream<Tuple3<Integer,Integer,Integer>>maxByStream=tupleStream.keyBy(0).maxBy(2);比Aggregation更通用在KeyedStream上生效接受两个输入,生成一个输出两两合一地汇总操作基于Key的分组转换-

reduce实现ReduceFunction基于Key的分组转换-

reducepublic

static

class

MyReduceFunction

implements

ReduceFunction<Score>{@OverridepublicScorereduce(Scores1,Scores2)

{ returnScore.of(,"Sum",s1.score+s2.score);}}DataStream<Score>dataStream=senv.fromElements( Score.of("Li","English",90),Score.of("Wang","English",88), Score.of("Li","Math",85),Score.of("Wang","Math",92), Score.of("Liu","Math",91),Score.of("Liu","English",87));//实现ReduceFunction

DataStream<Score>sumReduceFunctionStream=dataStream.keyBy("name").reduce(newMyReduceFunction());//使用Lambda表达式

DataStream<Score>sumLambdaStream=dataStream .keyBy("name")

.reduce((s1,s2)->Score.of(,"Sum",s1.score+s2.score));将多个同类型的DataStream<T>合并为一个DataStream<T>数据按照先进先出(FIFO)合并多数据流转换-

unionDataStream<StockPrice>shenzhenStockStream=...DataStream<StockPrice>hongkongStockStream=...DataStream<StockPrice>shanghaiStockStream=...DataStream<StockPrice>unionStockStream=shenzhenStockStream.union(hongkongStockStream,shanghaiStockStream);只能连接两个DataStream数据流两个数据流类型可以不一致两个DataStream经过connect()之后转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态应用场景为:使用一个控制流对另一个数据流进行控制多数据流转换-

connect重写CoMapFunction或CoFlatMapFunction三个泛型,分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型对于CoFlatMapFunction,flatMap1()方法处理第一个流的数据,flatMap2()方法处理第二个流的数据可以做到类似SQL

Join的效果多数据流转换-

connect//IN1为第一个输入流的数据类型

//IN2为第二个输入流的数据类型

//OUT为输出类型

public

interface

CoFlatMapFunction<IN1,IN2,OUT>extends

Function,Serializable

{//处理第一个流的数据

void

flatMap1(IN1value,Collector<OUT>out)

throwsException;//处理第二个流的数据

void

flatMap2(IN2value,Collector<OUT>out)

throwsException;}//CoMapFunction三个泛型分别对应第一个流的输入、第二个流的输入,map之后的输出

public

static

class

MyCoMapFunction

implements

CoMapFunction<Integer,String,String>{@OverridepublicStringmap1(Integerinput1)

{ returninput1.toString();}@OverridepublicStringmap2(Stringinput2)

{ returninput2;}}CoFlatMapFunction源代码一个CoFlatMapFunction实现并行度逻辑视图中的算子被切分为多个算子子任务每个算子子任务处理一部分数据可以在整个作业的执行环境层面设置也可以对某个算子单独设置并行度StreamExecutionEnvironmentsenv=StreamExecutionEnvironment.getExecutionEnvironment();//获取当前执行环境的默认并行度

intdefaultParalleism=senv.getParallelism();//设置所有算子的并行度为4,表示所有算子的并行执行的实例数为4

senv.setParallelism(4);在执行环境中设置并行度:对某个算子单独设置:dataStream.map(newMyMapper()).setParallelism(defaultParallelism*2);默认情况下,数据自动分布到多个实例(或者称之为分区)上手动在多个实例上进行数据分配避免数据倾斜输入是DataStream,输出也是DataStream数据重分布dataStream.shuffle();基于正态分布,将数据随机分配到下游各算子实例上:dataStream.broadcast();数据会被复制并广播发送给下游的所有实例上:dataStream.global();将所有数据发送给下游算子的第一个实例上:

rebalance()使用Round-Ribon思想将数据均匀分配到各实例上rescale()就近发送给下游每个实例数据重分布rebalance()将数据轮询式地分布到下游子任务上当上游有2个子任务、下游有4个子任务时使用rescale()partitionCustom()自定义数据重分布逻辑Partitioner[K]中泛型K为根据哪个字段进行分区对一个Score类型数据流重分布,希望按照id均匀分配到下游各实例,那么泛型K就为id的数据类型Long重写partition()方法数据重分布@FunctionalInterfacepublic

interface

Partitioner<K>extends

java.io.Serializable,Function

{//根据key决定该数据分配到下游第几个分区(实例)

int

partition(Kkey,intnumPartitions);}/**

*Partitioner<T>其中泛型T为指定的字段类型*重写partiton函数,并根据T字段对数据流中的所有元素进行数据重分配**/

public

static

class

MyPartitioner

implements

Partitioner<String>{privateRandomrand=newRandom();privatePatternpattern=Ppile(".*\\d+.*");/**

*key泛型T即根据哪个字段进行数据重分配,本例中是Tuple2(Int,String)中的String

*numPartitons为当前有多少个并行实例*函数返回值是一个Int为该元素将被发送给下游第几个实例**/

@Overridepublic

int

partition(Stringkey,intnumPartitions)

{intrandomNum=rand.nextInt(numPartitions/2);Matcherm=pattern.matcher(key);if(m.matches()){returnrandomNum;}else{returnrandomNum+numPartitions/2;}}}//对(Int,String)中的第二个字段使用MyPartitioner中的重分布逻辑

DataStream<Tuple2<Integer,String>>partitioned=dataStream.partitionCustom(newMyPartitioner(),1);Partitioner源码

一个Partitioner的实现数据传输、持久化序列化:将内存对象转换成二进制串、网络可传输或可持久化反序列化:将二进制串转换为内存对象,可直接在编程语言中读写和操作常见序列化方式:JSONJava、Kryo、Avro、Thrift、ProtobufFlink开发了自己的序列化框架更早地完成类型检查节省数据存储空间序列化和反序列化基础类型Java、Scala基础数据类型数组复合类型Scala

case

classJava

POJOTuple辅助类型Option、List、Map泛型和其他类型GenericFlink支持的数据类型TypeInformaton用来表示数据类型,创建序列化器每种数据类型都对应一个TypeInfomationTupleTypeInfo、PojoTypeInfo

…TypeInformationFlink会自动推断类型,调用对应的序列化器,对数据进行序列化和反序列化类型推断和序列化packagemon.typeinfo;public

class

Types{//java.lang.Void

public

static

finalTypeInformation<Void>VOID=BasicTypeInfo.VOID_TYPE_INFO;//java.lang.String

public

static

finalTypeInformation<String>STRING=BasicTypeInfo.STRING_TYPE_INFO;//java.lang.Boolean

public

static

finalTypeInformation<Boolean>BOOLEAN=BasicTypeInfo.BOOLEAN_TYPE_INFO;//java.lang.Integer

public

static

finalTypeInformation<Integer>INT=BasicTypeInfo.INT_TYPE_INFO;//java.lang.Long

public

static

finalTypeInformation<Long>LONG=BasicTypeInfo.LONG_TYPE_INFO;...}一些基础类型的TypeInformation:Types.STRING是用来表示java.lang.String的TypeInformationTypes.STRING被定义为BasicTypeInfo.STRING_TYPE_INFOSTRING_TYPE_INFO:使用何种序列化器和比较器类型推断和序列化public

static

finalBasicTypeInfo<String> STRING_TYPE_INFO= newBasicTypeInfo<>( String.class, newClass<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);STRING_TYPE_INFO定义使用何种序列化器和比较器:在声明式文件中定义Schema使用工具将Schema转换为Java可用的类Avro

Specific生成的类与POJO类似有getter、setter方法在Flink中可以像使用POJO一样使用Avro

Specific模式Avro

Generic不生成具体的类用GenericRecord封装所有用户定义的数据结构必须给Flink提供Schema信息Avro{"namespace":"org.apache.flink.tutorials.avro","type":"record","name":"MyPojo","fields":[ {"name":"id","type":"int"}, {"name":"name","type":"string"}]}Avro声明式文件:Kryo是大数据领域经常使用的序列化框架Flink无法推断出数据类型时,将该数据类型定义为GenericTypeInfo,使用Kryo作为后备选项进行序列化最好实现自己的序列化器,并对数据类型和序列化器进行注册Kryo在有些场景效率不高env.getConfig.disableGenericTypes()禁用Kryo,可以定位到具体哪个类型无法被Flink自动推断,然后针对该类型创建更高效的序列化器Kryo注册数据类型和序列化器://将MyCustomType类进行注册

env.getConfig().registerKryoType(MyCustomType.class);//或者使用下面的方式并且实现自定义序列化器

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class,MyCustomSerializer.class);static

class

MyClassSerializer

extends

Serializer<MyCustomType>implements

Serializable

{private

static

final

longserialVersionUID=...@Overridepublic

void

write(Kryokryo,Outputoutput,MyCustomTypemyCustomType)

{...}@OverridepublicMyCustomTyperead(Kryokryo,Inputinput,Class<MyCustomType>type)

{...}}与Avro

Specific模式相似,使用声明式语言定义Schema,使用工具将声明式语言转化为Java类有人已经实现好Kryo的序列化器案例:MyCustomType是使用Thrift工具生成的Java类,TBaseSerializer是com.twitter:chill-thrift包中别人实现好的序列化器,该序列化器基于Kryo的Serializer。注意在pom.xml中添加相应的依赖Thrift、Protobuf//GoogleProtobuf

//MyCustomType类是使用Protobuf生成的Java类

//ProtobufSerializer是别人实现好的序列化器

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class,ProtobufSerializer.class);//ApacheThrift

//MyCustomType是使用Thrift生成的Java类

//TBaseSerializer是别人实现好的序列化器

env.getConfig().addDefaultKryoSerializer(MyCustomType.class,TBaseSerializer.class);Flink的数据类型:Java、Scala、Table

API分别有自己的数据类型体系绝大多数情况下,程序员不需要关心使用何种TypeInformation,只需要使用自己所需的数据类型Flink会做类型推断、选择对应的序列化器当自动类型推断失效,用户需要关注TypeInformation数据类型选择:需要考虑:上下游的数据结构、序列化器的性能、状态数据的持续迭代能力POJO和Tuple等内置类型性能更好Avro、Thrift和Protobuf对上下游数据的兼容性更好,不需要在Flink应用中重新设计一套POJOPOJO和Avro对Flink状态数据的持续迭代更友好数据类型小结用户自定义函数的三种方式:继承并实现函数类使用Lambda表达式继承并实现Rich函数类用户自定义函数对于map()、flatMap()、reduce()等函数,我们可以实现MapFunction、FlatMapFunction、ReduceFunction等interface接口。以FlatMapFunction函数式接口为例:继承了Flink的Function函数式接口函数在运行过程中要发送到各个实例上,发送前后要进行序列化和反序列化,一定要保证函数内的所有内容都可以被序列化两个泛型T和O,T是输入,O是输出,要设置好输入和输出数据类型,否则会报错重写虚方法flatMap()Collector收集输出数据函数类packagemon.functions;@FunctionalInterfacepublicinterfaceFlatMapFunction<T,O>extendsFunction,Serializable{voidflatMap(Tvalue,Collector<O>out)throwsException;}//使用FlatMapFunction实现过滤逻辑,只对字符串长度大于limit的内容进行词频统计

publicstaticclass

WordSplitFlatMap

implements

FlatMapFunction<String,String>

{privateIntegerlimit;publicWordSplitFlatMap(Integerlimit){this.limit=limit;}@OverridepublicvoidflatMap(Stringinput,Collector<String>collector)throwsException{if(input.length()>limit){for(Stringword:input.split(""))

collector.collect(word);}}}DataStream<String>dataStream=senv.fromElements("HelloWorld","HellothisisFlink");DataStream<String>functionStream=dataStream.flatMap(newWordSplitFlatMap(10));FlatMapFunction源码一个FlatMapFunction实现简洁紧凑Scala对Lambda表达式支持更好Java

8之后也开始支持Lambda表达式,有类型擦除问题使用returns提供类型信息Lambda表达式DataStream<String>words=dataStream.flatMap((Stringinput,Collector<String>collector)->{for(Stringword:input.split("")){collector.collect(word);}})//提供类型信息以解决类型擦除问题

.returns(Types.STRING);vallambda=dataStream.flatMap{(value:String,out:Collector[String])=>{if(value.size>10){value.split("").foreach(out.collect)}}}Scala:Java:RichMapFunction、RichFlatMapFunction、RichReduceFunction增加了更多功能:open()方法:初始化close()方法:算子最后执行这个方法,可以释放一些资源getRuntimeContext()方法:获取算子子任务的运行时上下文累加器例子:分布式计算环境下,计算是分布在多台节点上的,每个节点处理一部分数据,使用for循环无法满足累加器功能Rich函数类//实现RichFlatMapFunction类

//添加了累加器Accumulator

public

static

class

WordSplitRichFlatMap

extends

RichFlatMapFunction<String,String>{private

intlimit;//创建一个累加器

privateIntCounternumOfLines=newIntCounter(0);public

WordSplitRichFlatMap(Integerlimit)

{this.limit=limi

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论