实时数仓 之 Kafka-Flink-Hive集成原理和实战代码(原理+实战)_第1页
实时数仓 之 Kafka-Flink-Hive集成原理和实战代码(原理+实战)_第2页
实时数仓 之 Kafka-Flink-Hive集成原理和实战代码(原理+实战)_第3页
实时数仓 之 Kafka-Flink-Hive集成原理和实战代码(原理+实战)_第4页
实时数仓 之 Kafka-Flink-Hive集成原理和实战代码(原理+实战)_第5页
已阅读5页,还剩14页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

实时数仓之Kafka-Flink-Hive集成原理和实战代码

(原理+实战)

1、Flink・Hive理论

1.1>FlinkHive介绍

Flink1.11

版本中,社区新增了一大功能是实时数仓,可以通过

kafka

,将

kafkasink

端的数据实时写入到

Hive

中。

为实现这个功能、

Flinkl.ll

版本主要做了以下改变:

1.将FlieSystemStreamingSink重新修改,增加J’分区提交和滚动策略机

制。

2.ikHiveStreamingsink重新使用文件系统流作为接收器。

可以通过Flink社区,查看

FLIP-85FilesystemconnectorinTable

的设计思路。

1.2、FlinkHive集成原理

Flink与Hive集成原理图如下:

_______________

CLI|JDBC

Driver

主要包含三部分内容:

1.HiveDialecto

Flinkl.l

新引入了

Hive

方言,所以在FlinkSQL中可以编写Hive语法,即HiveDialect。

2.编写HiveSQL后,

FlinkSQLPlanner

会将

SQL

进行解析,验证,转换成逻辑计划,物理计划,最终变成

Dobgraph

O

3.HiveCatalogo

HiveCatalog

作为

Flink

Hive

的持久化介质,会将不同会话的

Flink

元数据存储到

HiveMetastore

中。

1.3、FlinkHive版本支持

Flink目前支持Hive的l.x、2.x、3.x,每个大的版本对于的Flink依赖如

下:

依赖

版本Maven依赖

1.0.0-1.2.2flink-sql-connector-hive-1.2.2

2.0.0-2.2.0flink-sql-connector-hive-2.2.0

2.3.0-2.3.6flink-sql-connector-hive-2.3.6

3.0.0-3.1.2flink-sql-connector-hive-3.1.2

1.4、FlinkSQL支持Hive语言

FlinkSQL支持两种SQL语言,分别为

default

hive

配置方式也包含两种,配置如下图所示:

1.通过客户端配置。

可以通过tabie.sql-dialect富性指定SQL方宫.因此,您可以configuraMon在yam仪件的部分中为SQL客户端设置鬟使用的初始方

宫.

execution:

planner:blink

type:batch

resuIt-mode:table

configuration:

table.sql-dialectshive

2.通过SQL配置。

您也可以在启动SQLClient后设置方套.

F”nkSQL>settable.sql-dialect=hive;一tousehivedialect

[INFO]Sessionpropertyhasbeenset.

FlinkSQL>settable.sql-dialect=default;--tousedefaultdialect

[INFO]Sessionpropertyhasbeenset.

2、kafka-Flink・Hive集群配置

需求:实时将kafka中的数据通过flinkSql计算存储到hive数据仓库

中。

2.1集群部署

配置信息如下:

1.Hadoop:hadoop2.6,4

2.Kafka:kafka_2.11-2.2.0

3.Flink:flinkl.13.0

4.Hive:hive-2.3.4-bin

5.Zookeeper:zookeeper-3.4.5

组件版本

Hadoophadoop-2.6.4

zookeeperzookeeper-3.4.5

Kafkakafka_2.11-2.2.0

Flinkflink-1.11.1

HiVPhivp-?.^.4-hin

2.2查询结果要求

1.希望FlinkSql查询kafka输入的数据的表结构如下:

userid______orderamount__logts

952413318.02020-08-24T10:23:15

952414319.02020-08-20T10:22:15

952415320.02020-08-20T10:24:15

952416321.02020-08-20T10:26:20

952456412.02020-08-24T10:28:15

2.希望FlinkSQL实时将kafka中的数据插入Hive查询的结果根据分区

查询如下:

userid______orderamount_______dt_____________hr

952413318.02020-08-241°

952414319.02020-08-2010

952415320.02020-08-2010

952416321.02020-08-20i1o0।

952456412.02020-08-24

2.3kafka启动命令

1.kafka启动

nohup./kafka-server-start.sh../config/server,properties&

2.查看kafkaTopic

./kafka-topics.sh--list--bootstrap-server192.168.244.161:9092〃查

看是否有需要用到的topic主题

3.仓犍kafkaTopic

kafka-topics.sh--create--bootstrap-server61:9092--

topictest--partitions1。--replication-factor1

4.启动kafka生产者让批量传输数据

kafka-console-producer.sh--broker-list61:9092--topictest

5.往kafka中批量传入的数据源

{•,userid";"l",,'order_amount,,:,,124.5",“■Log_ts“:”2020-08-2419:20:15”}

{"user_idM£22M^"order_amount":"38.4","log_ts":"2020-08-2411:20:15"}

{"user_id":"3","ordejamount":"176.9","log_ts":"2020-08-2513:20:15")

("userid":"4",•'orde^amount":"302","log_ts":"2020-08-2514:20:15")

{"user_id":"S","ordejamount”:"124.5","logjts":"2020-08寸614:26:15")

{“usejid":"6","ordejamount":"38.4","202。-08-2615:20:15”}

{"user_id":"7","order_amount":"176.9","log_ts":"2020-08-2716:20:15"}

{"user_id":"8","order_amount":"302","log_ts":"2020-08-2717:20:15")

{"user_id":"9",,,order_amount":"124.5","log_ts":"2020-08-2410:20:15")

Cueer_id工"10",J'order-amounf^'lZA.6","logits":"2020-08-2410:21:15"]

{"usejid":"ordejamount":“124.7","log_ts":"2020-08-2410:22:15")

{"usejid":"12","order_amount":"124.8:“log_ts":"2020-08-2410:23^5)

{"user_id":"13","order_amount":"124.9","log_ts":"2020-08-2410:24:15":

{"user_id":"14",,,order_amount":"125.5","log_ts":"2020-08-2410:25:15";

「user_id":"15","ordejamount":"126.5","log_ts":"2020-08-2410:26:15"]

2.4Hive集成Flink

1.hive安装十多改hive-env.sh

ft^SetHADOOP_HOMEtopointjtoaspecifichadoopinstalldirectory

HADOOP^OME^/root/sd/hadoop^2^6.4

#HiveConfigurationDirectorycanbecontrolledby:

exportHIVE_CONF_DIR=/root/sd/apache-hive-2.3.4-bin/conf

^Foldercontainingextralibrariesrequiredforhivecompilation/executi

oncanbecontrolledby:

次portH]VE_AUX_JARS_PATH=/root/sd/apache-hive-2.3.4-bin/lib

由于hive的文件本身就在hdfs中保存的,所以需要指定Hadoop.Home

的路径,同时指定配置文件路径和依赖包的路径。

2.修改hive-site.xml文件

<!--指定mysql数据库连接的database-

〈property〉

<name>javax.jdo.option.ConnectionURL</name>

<value>jdbc:mysql:〃192.168遂44.161:3306/hive?createDatabaseIfNotExist=tr

ue</value>

<description>3DBCconnectstringfora3DBCmetastore</description>

</property>

(property〉__________________________________________________________________________________________________________________

<name>javaxJdo.option.ConnectionDriverName</name^____________________

<value>com.mysql.jdbc.Driver</value>

<description>DriverclassnameforaODBCmetastore</description>

〈/property>___________________________________________________________________

〈property>____________________________________________________________________

<name>javax.jdo.option.ConnectionUserName</name>

<value>root</value>

<description>usernametouseagainstmetastoredatabase</description>

〈/property〉________________________________________________________________________________________________________________

<property>

<name>javax.jdo.option.ConnectionPassword</name>

<value>123456</value>

〈description>passkordtouseagainstmetastoredatabase</description>

〈/property)___________________________________________________________________

<property>

<name>hive.metastore.uris</name>

<value>thrift://hlinkl63:9083</value>

<descrlption>ThriftURIfortheremotemetastore.usedbymetastorec

Menttoconnecttoremotemetastore.〈/description>________________________

</property>

<property>

<2iame>datanucleus.schema.autoCreateAll<2name>__________________________

<value>true〈/value>______________________________________________________

〈/property>___________________________________________________________________

<property>

<name>hive.server?.logging.operation.log.location</name>

<value>/root/sd/apache-hive-2.3.4-bin/tmp/operation_logs〈/value>______

<description>TopJLeveldirectorywhereoperationlogsarestoredJ.f1

oggingfunctionalityisenabled</description>

</property>

<property>

<name>hive.exec.scratchdir</name>

<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive</value>

______<description>HDFSrootlenatchdirforHj.vejobswhich^get^created

withwriteall(733)permission.Foreachconnectinguser,anHDFSscrat

chdir:${hive.exec.scratchdir}/<username>iscreated,with${hive.

scratch.dir.permission).</description>

〈/property〉

(property)

<name>hive.exec.local.scratchdir</name>

<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/local</value>

<description>LocalscratchspaceforHivejobs</description>

〈/property)

<property>

<name>hive.downloaded.resources.dir</name>

<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/resources</value>

<description>Temporarylocaldirectoryforaddedresourcesintherem

otefij.esystem.〈/description〉

〈/property〉

3.添加Flink与Hadoop的依赖在flink-conf.yam)中添加hadoop依赖.

env,hadoop,conf.dir:/root/sd/hadoop-2・6・4/etc/hadoop

env,hadoop・home.dir:/root/sd/hadoop-2.6.Q

2.5Hive集群启动

1.启动hive服务器

hive--servicemetastore〃端口号9083

可以使用命令查询一下,看是否启动成功

2.6Flink集群启动

1.启动FlinkSQL(在bin目录下)

./sql-client.sherrbedded-d../conf/sql-client-defaults.yaml

2.在flinksql下查看hive的catalogs

showcatalogs

结果如下:

FlinkSQL>showcatalogs;

defaultcatalog

myhive

FlinkSQL>口

3.使用myhivecatalog

usecatalogmyhive:

showtables;

3、kafka-Flink-HiveDDL

3.1、创建flink读取kafka的表(source)

SETtable.sql-dialect=default;

CREATETABLElog_kafka(

user_idSTRING,

order_amountDOLBLE,

log_tsTIMESTAMP(1),

WATERMARKFORlcg_tsASlog_ts-INTERVAL*5'SECOND

)WITH(

:connector:='kafka',

'topic'='test',

'properties.bootstrap.servers*='192.168.244^161:9092,,

'scan.startup.rrode'='earliest-offset',

'format'='jscn*,

___son.ignore二parse-errors'='true',

'json.fail-on-rrissing・field'='false',

properties.grcup.id1='flinkl'

)1

kafka消费的启动模式有'earliest-offset',latest-offset','group-

offsets','timestamp*,'specific-offsets'等

3.2、创建flink写入hive表(sink)

SET^table.sql-dialect=hive;

CREATETABLElog_hive(

user_idSTRING,

order_amountDOUBLE

)PARTITIONEDBY(dtSTRING,hrSTRING)STOREDASparquetTBLPROPERTIES(

'partition.time-extractor.timestamp-pattern'='$dt$hr:00:00,,

'sink.partitiorvcommit.trigger'='partition-time',

'sink.partitiorvcommit^.delay'='lmin',

'sink.semantic'='exactly-once',

'sink.rolling-policy.file-size'='128MB",

'sink.rolling-policy.rollover-interval'='lmin't

'sink.rolling2policy±check-interval'=2lmin',

'sink.partition-commit.policy.kind'='metastore,success-file'

);

配置解释:

1.'sink.partition-commit.trigger'='partition-time'

-使用partition中抽取时间,加上watermark决定partitoncommit

的时机

2.'partition.time-extractor.timestamp-pattern'='$dt$hour:00:00

-配置hour级别的partition时间抽取策略,这个例子中dt字段是

yyyy-MM-dd格式的天,hour是0-23的小时,timestamp-pattprn定

义了如何从这两个partition字段推出完整的timestamp

3.'sink.partition-commit.delay'='1min

-配置dalay为分钟级,当watermark>partition时间+1分钟,会

commit这个partition

4.'sink.partition-commit.policy.kind'='metastore,success-file'

-partitioncommit的策略是:先更新metastore(addPartition),再写

SUCCESS文件

3.3、将数据插入hive中

INSERTINTOTABLElog_hiveSELECTuser_id,order_amount,DATE_FORMAT(log_t

s,'yyyy-MM-dd'),DATE_FORMAT(log_ts,'HH')FROMlog_kafka;

3.4、查询结果

SELECT*FROMhive_tableWHEREdt="2020-08-25'andhr='16';

userid______orderamount_______dt___________hr

952413318.02020-08-2410

952414319.02020-08-2010

952415320.02020-08-2010

952416321.02020-08-2010

952456412.02020-08-2410

4、kafka-Flink-HiveTableAPI编写

4.1pom.xml配置

<?xmlversion="1.0"encoding="UTF-8”?>

<projectxmlns="http:〃maven.apache^org/POM/4.0.0"

xmlns:xsi="http://w\>jw./2001/XMLSchema-instance"

xsi;schenaLocation="http;///POM/4.0.0http;//mav

/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupld>comAlink</groupld>

<artifactld>flinkhive</artifactld〉

<packaging>jar</packaging>

<version>l.0-SNAPSHOT</version>

〈properties)

〈scala.bin.version〉2.11〈/scala.bin.version〉

<flink.version>l«13.0</flink»version>

<hadoop.version>2.6.4</hadoop.version>

<hive.version>2.3.4</hive.version>

〈/properties)

vdopendoncios)

<dependency>

<groupld>crg.apache.flink</groupld>

(artifact工d>flink-streaming-

scala_$(s^cala.bin.version}</artifact1d>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupld>org.apache.flink</groupld>

(artifact工d>flink^streaming-

java_$(s^cala.binary^,version)〈/artifactId>

<version>${flink.version)</version>

</dependency>

<dependency>

(group工d>crg.apache.flink</group工d>

cartifact工d>flink-clients_${scala.bin.version}〈/artifact工d>

<version>${flink.version}</version)

</dependency>

<dependency>

<groupld>org.apache.flink</groupld>

<artifactld>flink-table-common</artlfactld>

<version>${flink^version}</version>

〈/dependency)

<dependency>

<groupld>crg.apache.flink</groupld>

<artifact1d>flink-table-api-scala-___________________________

bridge_$(scala.bin.version}〈/artifactl:d>

<version〉${flink.version}“version)

</dependency>

<dependency>

<groupld>crg.apache.flink</groupld>

<artifactld>flink-tabl62Planner-

blink_$(scala.bin^version}</artifact:Id>

(version${flink1Version}</version>

</dependency>

(dependency)

<groupld>crg.apache.flink</groupld>

<artifactld>flink-connector-

hive_$(scala.bin.version}</artifactld>

<version>${flink.version}〈/version〉_______________________________________

</dependency>

<dependency>

<groupld>crg.apache.flink</groupld>

(artifact工d>f]jnk-sq1-connector2___________________________________

kafka_$(scala.bin.version}</artifact1d>____________________________________

(vorsion>${flink.version}"version)__________________________

</dependency>

<dependency>

(group工d>crg.apache.f1ink</group!d>________________________________

〈artifactMflink-jsorx/artifactld)

<version>${flink.version}</version>________________________________

</dependency>

<dependency>

<groupld>org.apache.hadoop</groupld>

〈artifact工d>hadooommon〈/artifact工d>_________________________

(version〉${hadoop.version}</version>___________________________

___</dependency>_____________________________________

<dependency>

(group工d>org.apache.hadoop</groupld>___________________________

〈artifact工d>hadoodfs</artifact工d>___________________________

<version>${hadoop.version}</version>___________________________

</dependency>

<dependency>

<groupid>org.apache.hadoop</groupid>___________________________

<artifact1d>hadoojj2yarn-c1ient〈/artifact1d>___________________

<version>${hadoop.version}</version>

</dependency>

(dependency>________________________________________________________

〈group工d>org.apache.hadoop</groupld>___________________________

<artifactId>hadoop・mapreduce-client-core〈/artifactld>

<version>${hadoop.version}</version>

</dependency>

<dependency>

<group1d>crg.apache.hive</groupld>_________________________________

<artifact1d>hive-exec</artifact1d>

<version>${hive^vers^ion}</version>_________________________________

</dependency>

</dependencies>

</project>

4.2代码存放路径截图

I-IProject。W—M

▼fe"flinkhiveG:\flink..-hive\flinkhive

►MJdea

▼・src

▼・main

・java

▼Blresources

星hdfs-site.xml

瑞hive-site.xml

sql-client-defaults.yaml

►圜scala

►■test

►target

他flinkhive.iml

Scala版本代码:

importjava.time.Duration

importorg.apache.flink.streaming.api.{CheckpointingMode,TimeCharacteris

tic}__________________________________________________________________________

impurLurg.dpache.flink.bLredinirig.dpi..ExeculiuiiCheckpuinlLigO

ptions

importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment

importorg.apache.flink.table.api.{Environmentsettings,SqlDialect)

importorg.apache.flink.table.api.bridge.scala.StreamTableEnvironment

importorg.apache.flink.table.catalog.hive.HiveCatalog

objectKafkaToHive(

defmain(args:Zirray[String])j_Unit={

valstreamEnv=StreamExecutionEnvironment,getExecutionEnvironment

streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

streamEnv.setParallelism(3)

____valtableEnvSettings=Environmentsettings.new:Tnstance()

______.useBlinkPlanner()____________________________________________________

.inStreamingMode()__________

.build()

valtableEnv=StreamTableEnvironment.create(streamEnv,tableEnvSetti

ngs)_________________________________________________________________________

tableEnv.getConfig^getConfiguration.set(ExecutionCheckpointingOptions

^CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)

tableEnv.getCcnfig.getConfiguration.set(ExecutionCheckpointingOptions

.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))

____val^atalogNarre="my_catalog2_________________________________________

valcatalog=newHiveCatalog(

catalogName,//cataLogname

"default",

______"・/src/main/resources",〃Hiveconfigdirectory

”2.3.4"〃Hivev/ersi。。

tableEnv.registerCatalog(catalogName,catalog)

tableEnv.useCatalog(catalogName)

tableEnv.executesql(^CREATE^DATABASEIFNOTEXISTSstream_tmp")_______

tableEnv.executeSql(3)R0PTABLE^IFEXISTSstream_tmp.logkafka")

tableEnv.executeSql(

•IIIH

(CREATETABLE^tream^mpjlog_kafka(

IuseridSTRING,

IorderamountDOUBLE,

I10g_tSTIMESTAMP(3),

|WATERMARKFORlog_tsASlog_ts-INTERVAL'5*SECOND

|)WITH(

________|1connector*='kafka\___________________________________________

|,topic'='test.,

_________|'properties.bootstrap.servers*-'hlink163:90927,______________

|'properties.group.id'='flinkl',

'scan.startup.mode'='earliest-offset1,

format'='json',

json.fail-on-missing5field1='false'

json.ignore-parse-errors'='true'

n

.stripMargin

)

tableEnv.getConfig^,jetSqlDialect(SglDialect.HIVE)

tableEnv.executeSql("CREATEDATABASEIFNOTEXISTShive_tmp")

tableEnv.executeSql("DROPTABLEIFEXISTShive_tmp.log_hive")

tableEnv.executeSql(

|CREATETABLEhive_tmp.log_hive(

|user_idSTRING,

|order_amountDOUBLE

I)PARTITIONEDBY(

|dtSTRING,

|hrSTRING

|)STOREDASPARQUET

ITBLPROPERTIES(

|'sink.partition-commit.trigger'='partition-time',

|'sink.partition-commit.delay*='1min',

|1format'='json\_

|'sink.partition-commit.policy.kind'='metastore,success-file'

|'partition.time-extractor.timestamp-pattern'='$dt$hr:00:00'

JD_

......stripMargin

L

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

tableEnv.executeSql(

INSERTINTOhive_tmp.log_hive

SELECT

user_id,

order_amount,

DATE_FCRMAT(log_ts,'yyyy-MM-dd'),DATE_FORMAT(log_ts,'HH')

FROMstream_tmp.log_kafka

.stripMargin

1

java版本代码:

importorg.apache.flink.streaming^api.CheckpointingMode;

importorg.apache.flink.streaming^api.TimeCharacteristic;

importorg.apache.flink.streaming^api.environment.ExecutionCheckpointi

ngOptions;

importorg.apache.flink.streaming.api.environment.StreamExecutionEnvir

onment;

importorg.apache.flink.table.api.EnvironmentSettings;

importor^.apachjflink.table.api.SqlDialect;

importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;

importorg.apache.flink.table.catalog.hive.HiveCatalog;

importjava.time.Duration;

*广描述.

*LassNameKafha丁。-e

*©description:

*^Author:Lyz

*@Date:2022/9/6卜午9:50

*/

publicclassKafkaToHive{

publicstaticvoidmain(String口args){

StreamExecutionEnvironmentsenv=StreamExecutionEnvironment^g_

etExecutionEnvironment();

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

senv.setParallelism(3);

EnvironmentSettingstableEnvSettings=Environmentsettings.new

Jnstance(J^useBlinkPlanner()^inStreamingMode()

.build();

StreamTableEnvironmenttableEnv=StreamTableEnvironment.creat

e(senv,tableEnvSettings);

tableEnv.getConfig()^getConfiguration().set(ExecutionCheckpoin

tingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpoin

tingOptions.CHECKPOINTING_INTERVAL^Duration.ofSeconds£20)Jj_

StringcatalogName=atalog”;

HiveCatalogcatalog=newHiveCatalog(

catalogName,

“default",//defaultdatabase

_________________”./src/main/resources”,〃HiveconfigSive-

directory

"2.3.4"//Hiveversion

);

tableEnv.registerCatalog£catalogName.catalog);

tableEnv.useCatalog(catalogName);

tableEnv.executeSql("CREATEDATABASEIFNOTEXISTSstream_tmp"

);

tableEnv.executeSql(^"DROPTABLEIFEXISTSstreamtmp.logkafka

211_______________________________________________________________________

tableEnv.executeSq1("createtablestream_tmp.log_kafka("+

"user_idString,\

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论