版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第1章DWSDWM我们在之前通过分流等,把数据分拆成了独立的kafkatopic。那么接下来如何处指标以宽表的形式输出就是我们的DWS层。page_log需要用page_log需要通过page_log需要识别开始标page_logpage_logpage_logpage_log需要用page_log日志直接可求商品下单再次聚当然实际需求还会有,这里主要以为可视化大屏为目的进行实时计算的处理DWS将的实时数据以的方式组合起来便于管理,同时也能减少维度查询的次数第2章DWS层-访客计page_log需要用page_log需要通过page_logpage_logDWS度量包括PV、UV、跳出次数、进入页面数(session_count)、连续时维度包括在分析中比较重要的几个字段:、地区、版本、新老用户进行聚package封装VisitorStatsApp,Kafka各个流数packageimportimportimportorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;Author:Desc:访 统? 多个明细的同样的维度统计在一起因为单位时间内mid的操作数据非常有限不能明显的压缩数据量(如果是数据量够大,或者单位时间够长可以所以用常用统计的四个维度进行聚 、新老用户、app版本、省市区度量值包括启动、日活(当日首次启动) 页面数、新增用户数、跳出数、平均页面停留时长、 时各个数据在维度聚合前不具备关联性,所以publicclassVisitorStatsApppublicstaticvoidmain(String[]args)throwsExceptionenv.enableCheckpointing(5000,CheckpointingMode.AT_LEAST_ONCE);StateBackendfsStateBackend=newFsStateBackend(StringgroupId=//TODO1.从Kafka的pv、uv、跳转明 中获取数StringpageViewSourceTopic=StringuniqueVisitSourceTopic=StringuserJumpDetailSourceTopic=FlinkKafkaConsumer<String>pageViewSource=MyKafkaUtil.getKafkaSource(pageViewSourceTopic,groupId);FlinkKafkaConsumer<String>uniqueVisitSource=MyKafkaUtil.FlinkKafkaConsumer<String>uniqueVisitSource=MyKafkaUtil.getKafkaSource(uniqueVisitSourceTopic,groupId);FlinkKafkaConsumer<String>userJumpSource=MyKafkaUtil.getKafkaSource(userJumpDetailSourceTopic,groupId);DataStreamSource<String>pageViewDStream=env.addSource(pageViewSource);DataStreamSource<String>uniqueVisitDStream=env.addSource(uniqueVisitSource);DataStreamSource<String>userJumpDStream=env.addSource(userJumpSource); }}合并数据流1)封装宽表实体类packageAuthor:1)封装宽表实体类packageAuthor:publicclassVisitorStatsprivateStringstt;privateStringedt;privateStringvc;privateStringprivateprivateStringprivateStringis_new;privateLonguv_ct=0L; privateLongprivateLongprivateLong//度量:持 privateLongprivateLongts;}22)对的各个数据流进行结构的转换//TODO2. 的流进行结构转//2.1pvSingleOutputStreamOperator<VisitorStats>pageViewStatsDstream=pageViewDStream.map(json->{//System.out.println("pv:"+json);JSONObjectjsonObj=JSON.parseObject(json);returnnewVisitorStats("","",0L,1L,0L,0L,jsonObj.getJSONObject("page").getLong("during_time"),SingleOutputStreamOperator<VisitorStats>uniqueVisitStatsDstream=uniqueVisitDStream.map(json->{JSONObjectjsonObj=JSON.parseObject(json);returnnewVisitorStats("","",1L,0L,0L,0L,0L,jsonObj.getLong("ts"));SingleOutputStreamOperator<VisitorStats>sessionVisitDstream=pageViewDScess(newProcessFunction<String,VisitorStats>(){publicvoidprocessElement(Stringjson,Contextctx,Collector<VisitorStats>out)throwsException{JSONObjectjsonObj=StringlastPageId=jsonObj.getJSONObject("page").getString("last_page_id");if(lastPageId==null||lastPageId.length()==0){ VisitorStatsvisitorStats=newVisitorStats("","",0L,0L,1L,0L,0L,jsonObj.getLong("ts"));}}SingleOutputStreamOperator<VisitorStats>userJumpStatDstream=userJumpDStream.map(json->{JSONObjectjsonObj=JSON.parseObject(json);returnnewVisitorStats("","",0L,0L,0L,1L,0L,jsonObj.getLong("ts"));四条流合并起来DataStream<VisitorStats>unionDetailDstream=uniqueVisitStatsDstream.union(根据维度进行聚合设置时间标记及水位线因为涉及开窗聚合,所以要设定时间及水位SingleOutputStreamOperator<VisitorStats>visitorStatsWithWatermarkDstream=withTimestampAssigner((visitorStats,ts)->visitorStats.getTs()))分组key,使用Tuple4=.keyBy(newKeySelector<VisitorStats,Tuple4<String,String,String,String>>(){publicTuple4<String,String,String,String>getKey(VisitorStatsvisitorStats)throwsException{returnnew,visitorStats.getCh(),}}开窗////TODO6.WindowedStream<VisitorStats,Tuple4<String,String,String,String>,TimeWindow>=窗口内聚合及补充时间字段SingleOutputStreamOperator<VisitorStats>visitorStatsDstream=windowStream.reduce(newReduceFunction<VisitorStats>(){{stats1.setPv_ct(stats1.getPv_ct()+stats2.getPv_ct());stats1.setUv_ct(stats1.getUv_ct()+stats2.getUv_ct());stats1.setUj_ct(stats1.getUj_ct()+stats2.getUj_ct());stats1.setSv_ct(stats1.getSv_ct()+stats2.getSv_ct());stats1.setDur_sum(stats1.getDur_sum()+stats2.getDur_sum());returnstats1;}},newProcessWindowFunction<VisitorStats,VisitorStats,Tuple4<String,String,String,String>,TimeWindow>(){publicvoidprocess(Tuple4<String,String,String,String>tuple4,Contextcontext, ble<VisitorStats>visitorStatsIn,Collector<VisitorStats>visitorStatsOut)throwsExceptionSimpleDateFormatsimpleDateFormat=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss");for(VisitorStatsvisitorStats:visitorStatsIn){StringstartDate=simpleDateFormat.format(new}}写入OLAP数据库ClickHouse,ClickHouseClickHouse数据库的详细安装及,请参考《05_尚硅谷大数据之FLINK版实时项ClickHouse数据表准备createcreatetablevisitor_stats_2021(sttDateTime,edtDateTime,vcString,chString,arString,is_newString,uv_ctUInt64,pv_ctUInt64,sv_ctUInt64,uj_ctUInt64,dur_sumUInt64,tsUInt64)engine=Re cingMergeTree(ts)partitionbytoYYYYMMDD(stt)orderby(之所以选用RecingMergeTree引擎主要是靠它来保证数据表的幂等性parititionby把日期变为数字类型(如: ,条件尽量包含stt字段。加入ClickHouse依赖包其中flink-connector-jdbc是通用的jdbcSink包。只要引入对应的jdbc驱动,flink可以用它应对各种支持jdbcphoenixjdbc-sink增加JdbcSink.<T>sink()的四个参数说明1:传入Sql,格式如:insertintoxxx参数2: 可以用lambda表达实现(jdbcPreparedStatement,t)->t为数据对象,要3:4:ClickhouseUtil中获取JdbcSink函数的实现packagepackageimportcom.atguigu.gmall.realtime.bean.TransientSink;import importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcExecutionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importimportAuthor:publicclassClickHouseUtilpublicstatic<T>SinkFunctiongetJdbcSink(Stringsql){SinkFunction<T>sink=JdbcSink.<T>sink((jdbcPreparedStatement,t)->Field[]fields=t.getClass().getDeclaredFields();intskipOffset=0;//for(inti=0;i<fields.length;i++){Fieldfield=fields[i];TransientSinktransientSink=if(transientSink!=null)System.out.println("跳过字段:field.getName());}try{Objecto= 1jdbcPreparedStatement.setObject(ijdbcPreparedStatement.setObject(i+1-skipOffset,}catch(Exceptione){}}newJdbcExecutionOptions.Builder().withBatchSize(2).build(),newJdbcConnectionOptions.JdbcConnectionOptionsBuilder()return}}创建TransientSink注解,该注解标记不需要保存的字段由于之前的ClickhouseUtilpackagepackageimportjava.lang.annotation.Retention;importjava.lang.annotation.Target;importstaticjava.lang.annotation.ElementType.FIELD;importstaticjava.lang.annotation.Retention Author:Desc:向ClickHousepublic@interfaceTransientSink}在GmallConfig中配置ClickHouse的连接地址packagepackageAuthor:publicclassGmallConfigpublicstaticfinalStringHBASE_SCHEMA="GMALL2021_REALTIME";publicstaticfinalStringpublicstaticfinalString}为主程序增加写入ClickHouse的ClickHouseUtil.getJdbcSink("insertintovisitor_stats_2021整体测试启动运行 下的jarClickHousevisitor_stats_2021第3章DWS层-商品计page_logpage_logdws从Kafka中获得数据Json设定时间与水位封装商品统计实体类packagepackageAuthor:***例如:我们在属性上赋值了初始值为0L,如果不加这个注解,通过构造者创建的对象属性值会变为importlombok.Builder;importlombok.Data;importjava.math.BigDecimal;importjava.util.HashSet;importjava.util.Set;publicclassProductStatsStringstt;//窗口起始时间Stringedt;//窗口结束时间Longsku_id;//sku编号Stringsku_name;//sku名称BigDecimalsku_price//skuLongspu_id;//spu编号Stringspu_name;//spu名称Longtm_id;//品牌编号Stringtm_name;//品牌名称Longcategory3_id;//品类编号Long y_ct0L Longclick_ct0L;//Longfavor_ct0LLongcart_ct0L;//Longorder_sku_num0L BigDecimalorder_amount=Longorder_ct0L BigDecimalpayment_amount=Longpaid_order_ct0L;//Longrefund_order_ct0LBigDecimalrefund_amount=Longcomment_ct0L;//ment_ct0LSetorderIdSetnewHashSet();//SetpaidOrderIdSetnewHashSet(SetrefundOrderIdSetnewHashSet();//Longts}创建ProductStatsApp,从Kafka中获得数据流packagepackageimportimportimportorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;Author: 点击购物车下单支付退单评论数宽publicclassProductStatsApppublicstaticvoidmain(String[]args)throwsExceptionenv.enableCheckpointing(5000,CheckpointingMode.AT_LEAST_ONCE);StateBackendfsStateBackend=newFsStateBackend(StringgroupId=StringpageViewSourceTopic="dwd_page_log";StringorderWideSourceTopic="dwm_order_wide";StringpaymentWideSourceTopic="dwm_payment_wide";StringcartInfoSourceTopic="dwd_cart_info";StringfavorInfoSourceTopic="dwd_favor_info";StringrefundInfoSourceTopic="dwd_order_refund_info";StringcommentInfoSourceTopic=" FlinkKafkaConsumer<String>pageViewSource=FlinkKafkaConsumer<String>orderWideSource=FlinkKafkaConsumer<String>paymentWideSource=FlinkKafkaConsumer<String>favorInfoSourceSouce=FlinkKafkaConsumer<String>cartInfoSource=FlinkKafkaConsumer<String>refundInfoSource=FlinkKafkaConsumer<String>commentInfoSource=DataStreamSource<String>pageViewDStream=env.addSource(pageViewSource);DataStreamSource<String>DataStreamSource<String>pageViewDStream=env.addSource(pageViewSource);DataStreamSource<String>favorInfoDStream=env.addSource(favorInfoSourceSouce);DataStreamSource<String>orderWideDStream=env.addSource(orderWideSource);DataStreamSource<String>paymentWideDStream=env.addSource(paymentWideSource);DataStreamSource<String>cartInfoDStream=env.addSource(cartInfoSource);DataStreamSource<String>refundInfoDStream=env.addSource(refundInfoSource);DataStreamSource<String>commentInfoDStream=env.addSource(commentInfoSource);}}JSON字符串数据流转换为统一数据对象的数据流 SingleOutputStreamOperator<ProductStats>pageAndDispStatsDstream=pageViewDScess(newProcessFunction<String,ProductStats>(){Exception{JSONObjectjsonObj=JSONObjectpageJsonObj=jsonObj.getJSONObject("page");StringpageId=pageJsonObj.getString("page_id");if(pageId==null){}Longts=if(pageId.equals("good_detail"))LongskuId=ProductStatsproductStats=ProductStats.builder().sku_id(skuId).}JSONArraydis ys=jsonObj.getJSONArray("dis if(dis ys!=null&&dis ys.size()>0){for(inti=0;i<dis ys.size();i++){JSONObjectdis y=dis if(dis y.getString("item_type").equals("sku_id")){LongskuId=dis ProductStatsproductStats= }}}}SingleOutputStreamOperator<ProductStats>orderWideStatsDstream=orderWideDStream.map(json->{OrderWideorderWide=JSON.parseObject(json,OrderWide.class);System.out.println("orderWide:==="+orderWide);Stringcreate_time=orderWide.getCreate_time();Longts=DateTimeUtil.toTs(create_time);return.orderIdSet(newSingleOutputStreamOperator<ProductStats>favorStatsDstream=favorInfoDStream.map(json->{JSONObjectfavorInfo=Longts=DateTimeUtil.toTs(favorInfo.getString("create_time"));returnProductStats.builder().sku_id(favorInfo.getLong("sku_id"))SingleOutputStreamOperator<ProductStats>cartStatsDstream=cartInfoDStream.map(json->{JSONObjectcartInfo=Longts=DateTimeUtil.toTs(cartInfo.getString("create_time"));returnProductStats.builder().sku_id(cartInfo.getLong("sku_id"))SingleOutputStreamOperator<ProductStats>paymentStatsDstream=paymentWideDStream.map(json->{PaymentWidepaymentWide=JSON.parseObject(json,PaymentWide.class);Longts=DateTimeUtil.toTs(paymentWide.getCallback_time());return.paidOrderIdSet(newSingleOutputStreamOperator<ProductStats>refundStatsDstream=refundInfoDStream.map(json->{JSONObjectrefundJsonObj=Longts=DateTimeUtil.toTs(refundJsonObj.getString("create_time"));ProductStatsproductStats=ProductStats.builder()newreturnproductStats;SingleOutputStreamOperator<ProductStats>commonInfoStatsDstream=commentInfoDStream.map(json->{JSONObjectcommonJsonObj=Longts=1L:ProductStatsproductStats= returnproductStats;packageAuthor:packageAuthor: publicclassGmallConstantpublicstaticfinalStringORDER_STATUS_UNPAID="1001";//未支付publicstaticfinalStringORDER_STATUS_PAID="1002";//已支付publicstaticfinalStringORDER_STATUS_CANCEL="1003";//已取消publicstaticfinalStringORDER_STATUS_FINISH="1004";//已完成publicstaticfinalStringORDER_STATUS_REFUND="1005";//退款中publicstaticfinalStringORDER_STATUS_REFUND_DONE="1006";//退款完成publicstaticfinalStringPAYMENT_TYPE_ALIPAY="1101";//支付宝publicstaticfinalStringPAYMENT_TYPE_WECHAT="1102";//publicstaticfinalStringPAYMENT_TYPE_UNION="1103";//银联//12publicstaticfinalStringAPPRAISE_GOOD="1201";/好评publicstaticfinalStringAPPRAISE_SOSO="1202";/中评publicstaticfinalStringAPPRAISE_BAD="1203";//差评publicstaticfinalStringAPPRAISE_AUTO="1204";/自动publicstaticfinalStringREFUND_REASON_BAD_GOODS="1301";/publicstaticfinalStringREFUND_REASON_WRONG_DESC="1302";/publicstaticfinalStringREFUND_REASON_SALE_OUT="1303";// publicstaticfinalStringREFUND_REASON_SIZE_ISSUE="1304";//号码不合适publicstaticfinalStringREFUND_REASON_MISTAKE="1305";//拍错publicstaticfinalStringREFUND_REASON_NO_REASON="1306";//publicstaticfinalString publicstaticfinalStringCOUPON_STATUS_UNUSED="1401";//未使用publicstaticfinalStringCOUPON_STATUS_USING="1402";//使用中publicstaticfinalStringCOUPON_STATUS_USED="1403";//已使用publicstaticfinalString publicstaticfinalString publicstaticfinalString publicstaticfinalStringSOURCE_TYPE_PROMOTION="2402";// publicstaticfinalStringSOUR publicstaticfinalStringSOURCE_TYPE_ACTIVITY="2404";// publicstaticfinalStringCOUPON_RANGE_TYPE_CATEGORY3="3301";//publicstaticfinalStringCOUPON_RANGE_TYPE_TRADEMARK="3302";//publicstaticfinalStringCOUPON_RANGE_TYPE_SPU="3303";//publicstaticfinalStringCOUPON_TYPE_MJ="3201";//满减publicstaticfinalStringCOUPON_TYPE_DZ="3202";/满量打折publicstaticfinalStringCOUPON_TYPE_DJ="3203";//代金券publicstaticfinalStringACTIVITY_RULE_TYPE_MJ="3101";publicstaticfinalStringACTIVITY_RULE_TYPE_DZ="3102";publicstaticfinalStringACTIVITY_RULE_TYPE_ZK="3103";publicstaticfinalStringKEYWORD_SEARCH="SEARCH";publicstaticfinalStringKEYWORD_CLICK="CLICK";publicstaticfinalStringKEYWORD_CART="CART";publicstaticfinalStringKEYWORD_ORDER="ORDER";}把统一的数据结构流合并为一个流DataStream<ProductStats>productStatDetailDStream=pageAndDispStatsDstream.union(orderWideStatsDstream,cartStatsDstream,paymentStatsDstream,refundStatsDstream,favorStatsDstream,3.2.6设定时间与水位线//TODO4.设 时间与水位SingleOutputStreamOperator<ProductStats>productStatsWithTsStream=(productStats,recordTimestamp)->{return分组、开窗、聚合SingleOutputStreamOperator<ProductStats>productStatsDstream=newKeySelector<ProductStats,Long>(){publicLonggetKey(ProductStatsproductStats)throwsException{returnproductStats.getSku_id();}//5.2.reduce(newReduceFunction<ProductStats>(){publicProductStatsreduce(ProductStatsstats1,ProductStatsstats2)throwsException{ y_ct()+stats2.getDis stats1.setClick_ct(stats1.getClick_ct()+stats2.getClick_ct());stats1.setCart_ct(stats1.getCart_ct()+stats2.getCart_ct());stats1.setFavor_ct(stats1.getFavor_ct()+stats2.getFavor_ct());stats1.setOrder_ct(stats1.getOrderIdSet().size()+0L);stats1.setRefund_order_ct(stats1.getRefundOrderIdSet().size()+0L);ment_ct() ment_ct()return}newWindowFunction<ProductStats,ProductStats,Long,TimeWindow>()publicvoidapply(LongaLong,TimeWindow ble<ProductStats>productStatsI Collector<ProductStats>out)throwsException{SimpleDateFormatsimpleDateFormat=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss");for(ProductStatsproductStats:productStatsI ble){productStats.setTs(newDate().getTime());}}补充商品维度信息idSingleOutputStreamOperator<ProductStats>productStatsWithSkuDstream=newDimAsyncFunction<ProductStats>("DIM_SKU_INFO"){publicvoidjoin(ProductStatsproductStats,JSONObjectjsonObject)throws{}publicStringgetKey(ProductStatsproductStats){returnString.valueOf(productStats.getSku_id());}},60,newDimAsyncFunction<ProductStats>("DIM_SPU_INFO"){publicvoidjoin(ProductStatsproductStats,JSONObjectjsonObject)throws{}publicStringgetKey(ProductStatsproductStats){returnString.valueOf(productStats.getSpu_id());}},60,SingleOutputStreamOperator<ProductStats>productStatsWithCategory3Dstream=newDimAsyncFunction<ProductStats>("DIM_BASE_CATEGORY3"){publicvoidjoin(ProductStatsproductStats,JSONObjectjsonObject)throws{}publicStringgetKey(ProductStatsproductStats)return}},60,SingleOutputStreamOperator<ProductStats>productStatsWithTmDstream=newDimAsyncFunction<ProductStats>("DIM_BASE_TRADEMARK"){publicvoidjoin(ProductStatsproductStats,JSONObjectjsonObject)throws{}publicStringgetKey(ProductStatsproductStats){returnString.valueOf(productStats.getTm_id());}},60,写入11)在ClickHouse中创建商品宽表createtableproduct_stats_2021(sttDateTime,edtDateTime,sku_idUInt64,sku_nameString,sku_priceDecimal64(2),spu_idUInt64,spu_nameString,tm_idUInt64,tm_nameString,category3_idUInt64,category3_nameString, y_ctUInt64,click_ctUInt64,favor_ctUInt64,cart_ctUInt64,order_sku_numUInt64,order_amountDecimal64(2),order_ctUInt64,payment_amountDecimal64(2),paid_order_ctUInt64,refund_order_ctUInt64,refund_amountDecimal64(2),comment_ctUInt64,ment_ctUInt64,tsUInt64)engine cingMergeTree(partitionbytoYYYYMMDD(stt)orderby(stt,edt,sku_id2)为主程序增加写入ClickHouse的整体测试启动运行 下的jar运行 下的jarClickHouseproducts_stats_2021第4章DWS层-地区表page_log需要用page_logSQL如果是Flink支持的数据库也可以直接把目标数据表定义为动态表用insertinto写入由于ClickHouse目前没有支持的jdbc连接(目前支持Mysql、PostgreSQL、Derby)。也可以制作自定义sink,实现不支持的连接器。但是比较繁琐。在pom.xml文件中添加FlinkSQL相关依赖 创建ProvinceStatsSqlApp,定义Table流环境packagepackageimportorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importAuthor:Desc:FlinkSQL实现地 宽表计publicclassProvinceStatsSqlApppublicstaticvoidmain(String[]args)throwsExceptionenv.enableCheckpointing(5000,CheckpointingMode.AT_LEAST_ONCE);StateBackendfsStateBackend=newFsStateBackend(EnvironmentSettingssettings=StreamTableEnvironmenttableEnv=StreamTableEnvironment.create(env,}}把数据源定义为动态表WATERMARKFORrowtimeASrowtimeStringgroupId=StringorderWideTopic=tableEnv.executeSql("CREATETABLEORDER_WIDE(province_idBIGINT,"+"province_nameSTRING,province_area_codeSTRING"+",province_iso_codeSTRING,province_3166_2_codeSTRING,order_idSTRING,"+"WATERMARKFORrowtimeASrowtime)"+"WITH("+MyKafkaUtil.getKafkaDDL(orderWideTopic,groupId)+MyKafkaUtil增加一个DDL的方法publicstaticStringgetKafkaDDL(Stringtopic,StringgroupId){Stringddl="'connector'='kafka',"+"'topic'= "'properties.bootstrap.servers'='"+kafkaServer+"',"+"'properties.group.id'='"+groupId+"',"+"'format'='json',""'scan.startup.mode'='latest-offset'";returnddl;}聚合计算TableprovinceStateTable=tableEnv.sqlQuery("select""DATE_FORMAT(TUMBLE_START(rowtime,INTERVAL'10'SECOND),'yyyy-MM-ddHH:mm:ss')stt,"+"DATE_FORMAT(TUMBLE_END(rowtime,INTERVAL'10'SECOND),'yyyy-MM-ddHH:mm:ss')edt,"+"province_id,province_name,province_area_codearea_code,"+"province_iso_codeiso_code,province_3166_2_codeiso_3166_2,"+"COUNT(DISTINCTorder_id)order_count,sum(split_total_amount)order_amount,"+"UNIX_TIMESTAMP()*1000ts"+"fromORDER_WIDEgroupbyTUMBLE(rowtime,INTERVAL'10'SECOND),"定义地区统计宽表实体类packagepackageimportlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importjava.math.BigDecimal;importjava.util.Date;Author:publicclassProvinceStatsprivateStringstt;privateStringedt;privateprivateStringstt;privateStringedt;privateLongprovince_id;privateStringprovince_name;privateStringarea_code;privateStringiso_code;privateStringiso_3166_2;privateBigDecimalorder_amount;privateLongorder_count;privateLongts;publicProvinceStats(OrderWideorderWide){province_id=orderWide.getProvince_id();order_amount=orderWide.getSplit_total_amount();order_count=ts=new}}转为数据流DataStream<ProvinceStats>provinceStatsDataStream=tableEnv.toAppendStream(provinceStateTable,ProvinceStats.class);4.2.8在ClickHouse中创建地区宽表createtableprovince_stats_2021(sttDateTime,edtDateTime,province_idUInt64,province_nameString,area_codeString,iso_codeString,iso_3166_2String,order_amountDecimal64(2),order_countUInt64,ts)engine cingMergeTree(partitionpartitionbyorderby(stt,edt,province_id写入<ProvinceStats>getJdbcSink("insertintoprovince_stats_2021整体测试启动运行 下的jarClickHouseproducts_stats_2021再运行rt_dblog的jar的时候,才会触发第一次运行的watermark第5章DWS层-表获取关于分词搜索功能实IK分词器的使用1)在pom.xml中加入依赖 2)封装分词工具类并进试packageimportorg.wltea. importorg.wltea. importjava.io.IOException;importjava.io.StringReader;importjava.util.ArrayList;importjava.util.List;Author:publicclassKeywordUtilpublicstaticList<String> yze(Stringtext){StringReadersr=newStringReader(text);IKSegmenterik=newIKSegmenter(sr,true);Lexemelex=null;List<String>keywordList=newArrayList();while(true){tryif((lex=ik.next())!=null)StringlexemeText=}else}}catch(IOExceptione){}}return}publicstaticvoidmain(String[]args)Stringtext= XSMax(A2104)256GB深空灰色移 }}自定义函数有了分词器,那么另外一个要考虑的问题就是如何把分词器的使用揉进FlinkSQL中。因为SQL的语法和相关的函数都是Flink内定的,想要使用外部工具,就必须结合自定义自定义函数分类ScalarFunction(相当于Spark的TableFunction(相当于Spark的AggregationFunctions(相当于Spark们应该选择TableFunction。封装KeywordUDTF函数参考@FunctionHintrow.setField(0,keyword00packagepackageimportcom.atguigu.gmall.realtime.utils.KeywordUtil;importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.annotation.FunctionHint;importorg.apache.flink.table.functions.TableFunction;importorg.apache.flink.types.Row;importAuthor:@FunctionHint(output=@DataTypeHint("ROW<sSTRING>"))publicclassKeywordUDTFextendsTableFunction<Row>publicvoideval(Stringvalue)List<String>keywordList=KeywordUtil. for(Stringkeyword:keywordList){Rowrow=newRow(1);}}}创 tatsApp,定义流环境packagepackageimportorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importAuthor:public tatsApppublicstaticvoidmain(String[]args)env.enableCheckpointing(5000,CheckpointingMode.AT_LEAST_ONCE);StateBackendfsStateBackend=newFsStateBackend(EnvironmentSettingssettings=StreamTableEnvironmenttableEnv=StreamTableEnvironment.create(env,}}动态表和自定义函数jsonMap////TODO yze",StringgroupId="keyword_stats_app";StringpageViewSourceTopic="dwd_page_log";tableEnv.executeSql("CREATETABLEpage_view"+"(commonMAP<STRING,STRING>,"+"pageMAP<STRING,STRING>,tsBIGINT,""rowtimeASTO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-ddHH:mm:ss')),"+"WATERMARKFORrowtimeASrowtime-INTERVAL'2'SECOND)"+"WITH("+TablefullwordView=tableEnv.sqlQuery("selectpage['item']fullword,"+"rowtimefrompage_view"+"wherepage['page_id']='good_list'""andpage['item']ISNOTNULL利用UDTF进行拆分TablekeywordView=tableEnv.sqlQuery("selectkeyword,rowtimefrom"+fullwordView+","+"LA LTABLE(ik_ yze(fullword))asT(keyword)");tatsSearch=tableEnv.sqlQuery("selectkeyword,count(*)ct,+GmallConstant.KEYWORD_SEARCH+"'source,""DATE_FORMAT(TUMBLE_START(rowtime,INTERVAL'10'SECOND),'yyyy-MM-ddHH:mm:ss')stt,"+"DATE_FORMAT(TUMBLE_END(rowtime,INTERVAL'10'SECOND),'yyyy-MM-ddHH:mm:ss')edt,"+"UNIX_TIMESTAMP()*1000ts +"GROUPBYTUMBLE(rowtime,INTERVAL'10'SECOND转换为流并写入11)在ClickHouse中创建统计表createtablekeyword_stats_2021(sttDateTime,edtDateTime,keywordString,sourceString,ctUInt64,ts)engine cingMergeTree(partitionpartitionbyorderby(stt,edt,keyword,source封 tats实体packagepackageimportlombok.AllArgsConstructor;importlombok.Data;importAuthor: publicc
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2026年高危安全培训内容记录核心要点
- 2026年安全事故后培训内容重点
- 2026年瓷砖厂釉线安全培训内容实战案例
- 固原地区西吉县2025-2026学年第二学期五年级语文期中考试卷(部编版含答案)
- 甘南藏族自治州卓尼县2025-2026学年第二学期六年级语文第五单元测试卷部编版含答案
- 崇左市龙州县2025-2026学年第二学期四年级语文第六单元测试卷(部编版含答案)
- 2026年-银行安全保卫培训内容实操要点
- 枣庄市市中区2025-2026学年第二学期六年级语文第五单元测试卷部编版含答案
- 喀什地区泽普县2025-2026学年第二学期六年级语文第五单元测试卷部编版含答案
- 百色市靖西县2025-2026学年第二学期六年级语文第五单元测试卷部编版含答案
- 2026湖北宜昌夷陵区小溪塔街道办事处招聘民政助理1人笔试备考试题及答案解析
- 2026新疆兵团第七师胡杨河市公安机关社会招聘辅警358人考试参考试题及答案解析
- 2026陕西榆林市旅游投资集团有限公司招聘7人考试备考试题及答案解析
- 2024版前列腺癌药物去势治疗随访管理中国专家共识课件
- 2026年基于责任区的幼儿园联片教研活动设计方案
- 《油气管道地质灾害风险管理技术规范》SYT 6828-2024
- 2026新疆喀什正信建设工程检测有限公司招聘12人考试参考试题及答案解析
- 2026年宁夏工业职业学院单招职业技能考试题库含答案详解(完整版)
- 会计内部监督制度
- 2026春冀人版(2024)二年级下册小学科学教案(附目录)
- 09鉴赏诗歌语言之炼字炼句
评论
0/150
提交评论