第1章 大屏实时计算深度剖析 学习目标 目标1:了解实时计算的应用场景 目标2:实时流计算的快速入门( Flink 的入门使用) 目标3:Flink接入技术体系的剖析( hdfs,jdbc,kafka, socket) 目标4:Flink 数据处理引擎的实战(采用双十一大屏, 热销数据统计, 区域分类统计, cep复杂事件的 处理) 目标5:实时流数据处理的设计方案(技术栈使用, Spring Boot /Spring Cloud集成) 目标6:Flink的核心技术点(多流JOIN, 流式的聚合去重,CEP 事件处理,UV布隆过滤器实现) 1. 实时计算应用场景 1.1 智能推荐 什么是智能推荐? 定义: 根据用户行为习惯所提供的数据, 系统提供策略模型,自动推荐符合用户行为的信息。 例举: 比如根据用户对商品的点击数据(时间周期,点击频次), 推荐类似的商品; 根据用户的评价与满意度, 推荐合适的品牌; 根据用户的使用习惯与点击行为,推荐类似的资讯。 应用案例: 1. 小红书推荐系统 +微信study322 专业一手超清完整 全网课程 超低价格+微信study322 专业一手超清完整 全网课程 超低价格 2. 实时流处理3. Flink处理(新一代大数据处理引擎) 1.2 实时数仓 什么是实时数仓 数据仓库(Data Warehouse) ,可简写为DW或DWH,是一个庞大的数据存储集合,通过对各种业务数 据进行筛选与整合,生成企业的分析性报告和各类报表,为企业的决策提供支持。实时仓库是基于 Storm/Spark(Streaming)/Flink等实时处理框架,构建的具备实时性特征的数据仓库。 应用案例 分析物流数据, 提升物流处理效率。 +微信study322 专业一手超清完整 全网课程 超低价格阿里巴巴菜鸟网络实时数仓设计: 数仓分层处理架构(流式ETL): ODS -> DWD -> DWS -> ADS ODS(Operation Data Store):操作数据层, 一般为原始采集数据。 DWD(Data Warehouse Detail) :明细数据层, 对数据经过清洗,也称为DWI。 DWS(Data Warehouse Service):汇总数据层,基于DWD层数据, 整合汇总成分析某一个主题域的服 务数据,一般是宽表, 由多个属性关联在一起的表, 比如用户行为日志信息:点赞、评论、收藏等。 ADS(Application Data Store): 应用数据层, 将结果同步至RDS数据库中, 一般做报表呈现使用。 +微信study322 专业一手超清完整 全网课程 超低价格1.3 大数据分析应用 1. IoT数据分析 1) 什么是IoT 物联网是新一代信息技术,也是未来发展的趋势,英文全称为: Internet of things(IOT),顾名 思义, 物联网就是万物相联。物联网通过智能感知、识别技术与普适计算等通信感知技术,广泛 应用于网络的融合中,也因此被称为继计算机、互联网之后世界信息产业发展的第三次浪潮。 2) 应用案例 物联网设备运营分析: 华为Iot数据分析平台架构: +微信study322 专业一手超清完整 全网课程 超低价格2. 智慧城市 城市中汽车越来越多, 川流不息,高德地图等APP通过技术手段采集了越来越多的摄像头、车流 的数据。 但道路却越来越拥堵,越来越多的城市开始通过大数据技术, 对城市实行智能化管理。 2018年, 杭州采用AI智慧城市,平均通行速度提高15%,监控摄像头日报警次数高达500次,识 别准确率超过92%,AI智慧城市通报占全体95%以上,在中国城市交通堵塞排行榜, 杭州从中国 第5名降至57名。 3. 金融风控 风险是金融机构业务固有特性,与金融机构相伴而生。金融机构盈利的来源就是承担风险的风险溢 价。 金融机构中常见的六种风险:市场风险、信用风险、流动性风险、操作风险、声誉风险及法律风 险。其中最主要的是市场风险和信用风险。 线上信贷流程,通过后台大数据系统进行反欺诈和信用评估: +微信study322 专业一手超清完整 全网课程 超低价格4. 电商行业 用户在电商的购物网站数据通过实时大数据分析之后, 通过大屏汇总展示, 比如天猫的双11购物 活动,通过大屏, 将全国上亿买家的订单数据可视化,实时性的动态展示,包含总览数据,流式 TopN数据,多维区域统计数据等,极大的增强了对海量数据的可读性。 TopN排行: +微信study322 专业一手超清完整 全网课程 超低价格+微信study322 专业一手超清完整 全网课程 超低价格 区域统计:+微信study322 专业一手超清完整 全网课程 超低价格2. Flink快速入门 2.1 Flink概述 Flink是什么 Flink是一个面向数据流处理和批处理的分布式开源计算框架。 无界流VS有界流 任何类型的数据都可以形成流数据,比如用户交互记录, 传感器数据,事件日志等等。 Apache Flink 擅长处理无界和有界数据集。 精确的时间控制和有状态的计算,使得 Flink能够运行 任何处理无界流的应用。 流数据分为无界流和有界流。 1) 无界流:有定义流的开始,但没有定义流的结束, 会不停地产生数据,无界流采用的是流处 理方式。 2) 有界流:有定义流的开始, 也有定义流的结束, 需要在获取所有数据后再进行计算,有界流 采用的是批处理方式。 +微信study322 专业一手超清完整 全网课程 超低价格编程模型 DataSet 一般用来处理有界流数据。 DataStream一般用来处理无界流数据。 2.2 Flink基础案例 1. 环境搭建配置 POM配置 FLINK集成 2. 批处理案例 功能: 通过批处理方式,统计日志文件中的异常数量。 代码: public class BatchProcessorApplication { public static void main(String[] args) throws Exception { // 1. 定义运行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2. 读取数据源(日志文件) DataSource<String> logData = env.readTextFile("./data/order_info.log"); // 3. 清洗转换数据 logData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>> () { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { // 1) 根据正则, 提取每行日志的级别 Pattern pattern = Pattern.compile("\\[main\\](.*?)\\["); Matcher matcher = pattern.matcher(value); if(matcher.find()) { // 2) 如果匹配符合规则, 放置元组内 collector.collect(new Tuple2<String,Integer> (matcher.group(1).trim(), 1)); } +微信study322 专业一手超清完整 全网课程 超低价格3. 流处理案例 功能: 根据 IP统计访问次数 代码: 2.3 Flink部署配置 1. 安装配置JDK8环境 } }).groupBy(0).sum(1).print(); // 4. 根据日志级别, 汇总统计, 打印结果 } } publ ic class StreamProcessorApplication { public static void main(String[] args) throws Exception{ // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取Socket数据源 DataStreamSource<String> socketStr = env.socketTextStream("127.0.0.1", 9911, "\n"); // 3. 转换处理流数据 socketStr.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { // 根据分隔符解析数据 String[] arrValue = value.split("\t"); collector.collect(new Tuple2<String,Integer>(arrValue[0], 1)); } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(2); env.execute("accessLog"); } } [root@localhost ~]# java -version java version "1.8.0_181" Java(TM) SE Runtime Environment (build 1.8.0_181-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode) +微信study322 专业一手超清完整 全网课程 超低价格2. 下载Flink安装包 官方地址 安装包 3. 安装配置 1) 解压 2)运行 主节点访问端口: vi conf/masters: 4. 访问控制台 http://10.10.20.132:8081/#/overview Available Task Slots: 有效任务槽数量 对应配置文件: vi conf/flflink-conf.yaml TaskManger与JobManager关系 tar -xvf flink-1.11.2-bin-scala_2.11.tgz bin/ start-cluster.sh localhost:8081 taskmanager.numberOfTaskSlots: 1 +微信study322 专业一手超清完整 全网课程 超低价格Client 用来提交任务给 JobManager,JobManager 分发任务给 TaskManager 去执行, TaskManager 会采用心跳的方式, 汇报任务的执行状态。 JobManager 负责整个 Flink 集群任务的调度以及资源的管理。 TaskManager 负责具体的任务执行和对应任务在每个节点上的资源申请和管理。 2.4 Flink任务提交 第一种方式: 界面提交 1. 修改代码配置 socket数据源连接,采用主机名称配置 2. 工程代码打包 POM文件增加打包插件 DataStreamSource<String> socketStr = env.socketTextStream("flink1", 9911, "\n"); <build> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> <!--<encoding>${project.build.sourceEncoding} </encoding>--> </configuration> </plugin> <!-- 打jar包插件(会包含所有依赖) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> +微信study322 专业一手超清完整 全网课程 超低价格注意,这里不能采用spring-boot-maven-plugin打包插件, 否则flflink不能正常识别。 3. 提交任务 上传Jar包 接下来,在flflink1节点上, 开启Socket交互端口9911 <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> < ! - - zi p -d learn_spark.jar META INF/* . R S A M E T A - I N F / * . D S A M E T A - I N F / * . S F - -> < exclude>META-INF/*.SF</exclude> < e x c l u d e > M E T A - I N F / * . D S A < / e x c l u d e > < e x c l u d e > M E T A - I N F / * . R S A < / e x c l u d e > </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTra nsformer"> <!-- 可以设置jar包的入口类(可选) --> <mainClass>com.itcast.flink.usage.stream.StreamProcessorApplication</mainCl ass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> +微信study322 专业一手超清完整 全网课程 超低价格然后提交并执行任务 savepoint path: 容错机制中快照保存的路径。 4. 运行验证 nc发送一些数据, 在TaskManager当中可以查看输出结果。 第二种方式: 命令行提交 在flflink控制台清除原有的Job任务。 1. 上传Jar包 将Jar包上传至flflink服务器: 2. 提交任务 采用命令行方式提交任务: 3. 验证结果 发送一些数据并在控制台验证输出结果。 [root@flink1 flink-1.11.2]# nc -lk 9911 [root@flink1 examples]# ll total 81880 drwxr-xr-x. 2 root root 194 Sep 9 23:48 batch -rw-r--r--. 1 root root 83843774 Sep 26 05:57 flink-usage-1.0-SNAPSHOT.jar drwxr-xr-x. 2 root root 50 Sep 9 23:48 gelly drwxr-xr-x. 3 root root 19 Sep 9 23:48 python drwxr-xr-x. 2 root root 241 Sep 9 23:48 streaming drwxr-xr-x. 2 root root 209 Sep 9 23:48 table [root@flink1 flink-1.11.2]# bin/flink run -c com.itcast.flink.usage.stream.StreamProcessorApplication examples/flink usage-1.0-SNAPSHOT.jar Job has been submitted with JobID 4c127f68f6683e5a9342410d7b6540db +微信study322 专业一手超清完整 全网课程 超低价格3. Flink接入体系 3.1 Flink Connectors Flink 连接器包含数据源 输入 与汇聚 输出两部分。 Flink 自身内置了一些基础的连接器,数据源输入包含 文件、目录、Sock et以及 支持从 collections 和 iter ators 中读取数据;汇聚输出支持把数据写入文件、 标准输出(stdout )、标准错误输出(stderr)和 socket。 官方地址 Flink还可以支持扩展的连接器,能够与第三方系统进行交互。目前支持以下系统 : Apache Kafka (source/sink) Apache Cassandra (sink) Amazon Kinesis Streams (source/sink) Elasticsearch (sink) Hadoop FileSystem (sink) RabbitMQ (source/sink) Apache NiFi (source/sink) Twitter Streaming API (source) Google PubSub (source/sink) JDBC (sink) 常用的是Kafka、ES、HDFS以及JDBC。 3.2 JDBC(读/写) Flink Connectors JDBC 如何使用? 功能: 将集合数据写入数据库中 代码: public class JdbcConnectorApplication { public static void main(String[] args) throws Exception{ // 配置日志文件 System.setProperty("log4j.configurationFile", "log4j2.xml"); // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 创建集合数据 List arrs = new ArrayList<String>(); arrs.add("10.10.20.101\t1601297294548\tPOST\taddOrder"); arrs.add("10.10.20.102\t1601297296549\tGET\tgetOrder"); // 3. 读取集合数据, 写入数据库 env.fromCollection(arrs).addSink(JdbcSink.sink( // 配置SQL语句 +微信study322 专业一手超清完整 全网课程 超低价格数据表: 自定义写入数据源 功能:读取Socket数据, 采用流方式写入数据库中。 代码: "insert into t_access_log (ip, time, type, api) values (?,?,?,?)", (ps, value) -> { System.out.println("receive ==> " + value); // 解析数据 String[] arrValue = String.valueOf(value).split("\t"); for(int i=0; i<arrValue.length; i++) { // 新增数据 ps.setString(i+1, arrValue[i]); } } , / / JD BC 连 接 配 置 n e w Jd b c Co nn ec tionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://192.168.19.150:3306/flink? useS S L = f a l s e " ) .withDriverName("com.mysql.jdbc.Driver") .withUsername("root") .withPassword("654321") .build())); // 4. 执行任务 env.execute("job"); } } DROP TABLE IF EXISTS `t_access_log`; CREATE TABLE `t_access_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID', `ip` varchar(32) NOT NULL COMMENT 'IP地址', `time` varchar(255) NULL DEFAULT NULL COMMENT '访问时间', `type` varchar(32) NOT NULL COMMENT '请求类型', `api` varchar(32) NOT NULL COMMENT 'API地址', PRIMARY KEY (`id`) ) ENGINE = InnoDB AUTO_INCREMENT=1; public class CustomSinkApplication { public static void main(String[] args) throws Exception{ // 配置日志文件 System.setProperty("log4j.configurationFile", "log4j2.xml"); // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取Socket数据源 DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); +微信study322 专业一手超清完整 全网课程 超低价格AccessLog: 测试数据: // 3. 转换处理流数据 SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() { @Override public AccessLog map(String value) throws Exception { System.out.println(value); // 根据分隔符解析数据 String[] arrValue = value.split("\t"); // 将数据组装为对象 A c ce s s Lo g l o g = new AccessLog(); lo g .s e t N u m ( 1 ) ; f o r (i nt i= 0 ; i <a rrValue.length; i++) { if ( i == 0) { l og .s et I p (a r rV alue[i]); } e l se if ( i= = 1) { log.setTime(arrValue[i]); }else if( i== 2) { log.setType(arrValue[i]); }else if( i== 3) { log.setApi(arrValue[i]); } } return log; } }); // 4. 配置自定义写入数据源 outputStream.addSink(new MySQLSinkFunction()); // 5. 执行任务 env.execute("job"); } } @Data public class AccessLog { private String ip; private String time; private String type; private String api; private Integer num; } 10.10.20.11 1603166893313 GET getOrder 10.10.20.12 1603166893314 POST addOrder +微信study322 专业一手超清完整 全网课程 超低价格自定义读取数据源 功能: 读取数据库中的数据, 并将结果打印出来。 代码: 3.3 HDFS(读/写) 通过Sink写入HDFS数据 功能: 将Socket接收到的数据, 写入至HDFS文件中。 代码: public class CustomSourceApplication { publi c static void main(String[] args) throws Exception { / / 1 . 创 建 运 行 环 境 S t re a m Ex ec ut i on Environment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 配置自定义MySQL读取数据源 DataStreamSource<AccessLog> dataStream = env.addSource(new MySQLSourceFunction()); // 3. 设置并行度 dataStream.print().setParallelism(1); // 4. 执行任务 env.execute("custom jdbc source."); } } public class HdfsSinkApplication { public static void main(String[] args) throws Exception{ // 配置日志文件 System.setProperty("log4j.configurationFile", "log4j2.xml"); // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取Socket数据源 DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); BucketingSink<String> sink = new BucketingSink<String> ("d:/tmp/hdfs"); sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HHmm")); sink.setWriter(new StringWriter()) .setBatchSize(5*1024) // 设置每个文件的大小 +微信study322 专业一手超清完整 全网课程 超低价格POM依赖: 数据源模拟实现: .setBatchRolloverInterval(5*1000) // 设置滚动写入新文件的时间 .setInactiveBucketCheckInterval(30*1000) // 30秒检查一次不写入的文 件 .setInactiveBucketThreshold(60*1000); // 60秒不写入,就滚动写入新的 文件 socketStr.addSink(sink).setParallelism(1); // 5. 执行任务 env.execute("job"); } } <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.11.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.8.1</version> </dependency> public class SocketSourceApplication { /** * 服务端的端口 */ private int port; /** * 初始化构造方法 * @param port */ public SocketSourceApplication(int port) { this.port = port; } /** * IP 访问列表 */ private static String[] accessIps = new String[]{"10.10.20.101", "10.10.20.102", "10.10.20.103"}; /** * 请求访问类型 */ private static String[] accessTypes = new String[] {"GET", "POST", "PUT"}; +微信study322 专业一手超清完整 全网课程 超低价格/** * 请求接口信息 */ private static String[] accessApis = new String[] {"addOrder", "getAccount", "getOrder"}; /** * Netty通讯服务启动方法 * @thr ows Exception */ pub l i c v o i d ru n S e r v e r() throws Exception { / / 1 . 创 建 N e t t y 服 务 / / 2 . 定 义 事 件 B o s s 监 听 组 Ev e nt Lo o pG ro u p b o s s G r o up = ne w NioEventLoopGroup(); // 3. 定 义 用 来 处 理 已 经 被 接 收 的 连 接 EventLoopGroup workerGourp = new NioEventLoopGroup(); try { // 4. 定义NIO的服务启动类 ServerBootstrap sbs = new ServerBootstrap(); // 5. 配置NIO服务启动的相关参数 sbs.group(bossGroup, workerGourp) .channel(NioServerSocketChannel.class) // tcp最大缓存链接个数,它是tcp的参数, tcp_max_syn_backlog(半连 接上限数量, CENTOS6.5默认是128) .option(ChannelOption.SO_BACKLOG, 128) //保持连接的正常状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 根据日志级别打印输出 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //管道注册handler ChannelPipeline pipeline = socketChannel.pipeline(); //编码通道处理 pipeline.addLast("decode", new StringDecoder()); //转码通道处理 pipeline.addLast("encode", new StringEncoder()); // 处理接收到的请求 pipeline.addLast(new NettyServerHandler()); } }); System.err.println("-------server 启动------"); // 6. 监听控制台的输入, 并将输入信息, 广播发送给客户端 new Thread(new Runnable() { @Override public void run() { try { while(true) { String accessLog = getAccessLog(); System.out.println("broadcast (" + NettyServerHandler.channelList.size() + ") ==> " + accessLog); if(NettyServerHandler.channelList.size() > 0 ){ +微信study322 专业一手超清完整 全网课程 超低价格NettyServerHandler: for(Channel channel : NettyServerHandler.channelList) { channel.writeAndFlush(accessLog); } } Thread.sleep(1000); } }catch(Exception e) { e.printStackTrace(); } } }).start(); / / 7 . 启 动 n e t t y 服 务 C h an n e lF ut u r e c f = sbs.bind(port).sync(); cf.channel().closeFuture().sync(); }catch (Exception e) { e.printStackTrace(); } } /** * 获取访问日志 * @return */ private String getAccessLog() { StringBuilder strBuilder = new StringBuilder(); strBuilder.append(accessIps[new Random().nextInt(accessIps.length )]).append("\t") .append(System.currentTimeMillis()).append("\t") .append(accessTypes[new Random().nextInt(accessTypes.length)]).append("\t") .append(accessApis[new Random().nextInt(accessApis.length)]).append("\t\n"); return strBuilder.toString(); } /** * netty服务端的启动 * @param args * @throws Exception */ public static void main(String[] args) throws Exception{ new SocketSourceApplication(9911).runServer(); } } public class NettyServerHandler extends ChannelInboundHandlerAdapter { // 客户端通道记录集合 public static List<Channel> channelList = new ArrayList<>(); +微信study322 专业一手超清完整 全网课程 超低价格POM依赖: @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Server---连接已建立: " + ctx); super.channelActive(ctx); // 将成功建立的连接通道, 加入到集合当中 channelList.add(ctx.channel()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server---收到的消息: " + msg); } @O verride public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("server--读取数据出现异常"); cause.printStackTrace(); ctx.close(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); // 移除无效的连接通道 channelList.remove(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); // 移除无效的连接通道 channelList.remove(ctx.channel()); } } <dependencies> <!-- Netty 核心组件依赖 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.16.Final</version> </dependency> <!-- spring boot 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>${spring.boot.version}</version> </dependency> <!-- Spring data jpa 组件依赖--> <dependency> +微信study322 专业一手超清完整 全网课程 超低价格读取HDFS文件数据 Hadoop环境安装 1. 配置免密码登录 生成秘钥: <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> <version>${spring.boot.version}</version> </dependency> <!-- mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> < v e r si o n >${mysql.jdbc.version}</version> </d e p e nd e n cy > <!-- Redis 缓存依赖 --> < d e p e nd e nc y > < g ro u p I d>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.1.1.RELEASE</version> </dependency> </dependencies> public class HdfsSourceApplication { public static void main(String[] args) throws Exception{ // 配置日志文件 System.setProperty("log4j.configurationFile", "log4j2.xml"); // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取HDFS数据源 DataStreamSource<String> socketStr = env.readTextFile("hdfs://10.10.20.132:9090/hadoop-env.sh"); // 3. 打印文件内容 socketStr.print().setParallelism(1); // 4. 执行任务 env.execute("job"); } } [root@flink1 hadoop-2.6.0-cdh5.15.2]# ssh-keygen -t rsa -P '' Generating public/private rsa key pair. +微信study322 专业一手超清完整 全网课程 超低价格将秘钥写入认证文件: 修改认证文件权限: 2. 配置环境变量 将Hadoop 安装包解压, 将Hadoop加入环境变量/etc/profifile: 执行生效: 3. 修改Hadoop配置文件 1) 修改hadoop-env.sh文件 修改JAVA_HOME: 2)修改core-site.xml文件 这里的主机名称是flflink1。 3)修改hdfs-site.xml文件 [root@flink1 .ssh]# cat id_rsa.pub >> ~/.ssh/authorized_keys [root@flink1 .ssh]# chmod 600 ~/.ssh/authorized_keys e x p o r t H A DO O P_ HO M E= / u s r/ lo c a l / h a do op-2.6.0-cdh5.15.2 e x p o r t P AT H =$ HA DO O P_ H O M E/ b i n : $ PA T H source /etc/profile vi /usr/local/hadoop-2.6.0-cdh5.15.2/etc/hadoop export JAVA_HOME=/usr/local/jdk1.8.0_181 <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://flink1:9090</value> </property> </configuration> +微信study322 专业一手超清完整 全网课程 超低价格4 )修改mapred-site.xml文件 5)修改slaves文件 这里配置的是单节点, 指向本机主机名称。 6)修改yarn-site.xml 4. 启动Hadoop服务 <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>hadoop.tmp.dir</name> < v alue>/usr/local/hadoop-2.6.0-cdh5.15.2/tmp</value> < / p r o p e r t y> </configuration> <configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration> flink1 <configuration> <!-- Site specific YARN configuration properties --> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration> [root@flink1 sbin]# ./start-all.sh This script is Deprecated. Instead use start-dfs.sh and start-yarn.sh 20/09/27 19:22:37 WARN util.NativeCodeLoader: Unable to load native hadoop library for your platform... using builtin-java classes where applicable Starting namenodes on [flink1] flink1: setterm: $TERM is not defined. flink1: starting namenode, logging to /usr/local/hadoop-2.6.0- cdh5.15.2/logs/hadoop-root-namenode-flink1.out flink1: setterm: $TERM is not defined. +微信study322 专业一手超清完整 全网课程 超低价格上传一个文件, 用于测试: 5. 访问验证 flink1: starting datanode, logging to /usr/local/hadoop-2.6.0- cdh5.15.2/logs/hadoop-root-datanode-flink1.out Starting secondary namenodes [0.0.0.0] 0.0.0.0: setterm: $TERM is not defined. 0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop- 2.6.0-cdh5.15.2/logs/hadoop-root-secondarynamenode-flink1.out 20/09/27 19:22:53 WARN util.NativeCodeLoader: Unable to load native hadoop library for your platform... using builtin-java classes where applicable start i n g y a r n d a e m o n s start i n g r es o u r c e m a na ger, logging to /usr/local/hadoop-2.6.0- c d h 5 .1 5 .2 / lo g s / y a r n- r o ot - re so u r ce m a n ag er-flink1.out f li n k 1 : s et t e r m : $ T ER M is no t d e f i ne d . f li n k 1 : s ta r ti n g n o d e m a na g e r , lo g g in g t o / u s r/ local/hadoop-2.6.0- cd h 5 . 15 .2 / lo g s/ y a r n - r oo t - no d e ma n a ge r - f l i n k 1 .o ut hdfs dfs -put /usr/local/hadoop-2.6.0-cdh5.15.2/etc/hadoop/hadoop env.sh / +微信study322 专业一手超清完整 全网课程 超低价格3.4 ES(写) ES服务安装 1. 到官网下载地址下载6.8.1版本的gz压缩包, 不要下载最新版本, Spring Boot等项目可能未 及时更新支持。 2. 解压安装包 3. ElasticSear ch不能以Root身份运行, 需要单独创建一个用户 执行以上命令,创建一个名为elsearch用户, 并赋予目录权限。 4. 修改配置文件 vi confifig/elasticsearch.yml, 只需修改以下设置: 5. 指定JDK版本 最新版的ElasticSearch需要JDK11版本, 下载JDK11压缩包, 并进行解压。 修改环境配置文件 vi bin/elasticsearch-env 参照以下位置, 追加一行, 设置JAVA_HOME, 指定JDK11路径。 tar - xvf elasticsearch-6.8.1-linux-x86_64.tar.gz 1. g ro u p ad d e l s ea rc h 2 . us e ra d d e l se a rc h -g elsearch -p elasticsearch 3. chown -R elsearch:elsearch /usr/local/elasticsearch-6.8.1 #集群名称 cluster.name: my-application #节点名称 node.name: node-1 #数据存储路径 path.data: /usr/local/elasticsearch-6.8.1/data #日志存储路径 path.logs: /usr/local/elasticsearch-6.8.1/logs # 绑定IP地址 network.host: 10.10.20.28 # 指定服务访问端口 http.port: 9200 # 指定API端户端调用端口 transport.tcp.port: 9300 +微信study322 专业一手超清完整 全网课程 超低价格关闭ConcMarkSweepGC JDK9版本以后不建议使用ConcMarkSweepGC, 如果不想出现提示, 可以将其关闭 vi confifig/jvm.options 将UseConcMarkSweepGC注释: 6. 启动ElasticSearch 切换用户 su elsearch 以后台常驻方式启动 bin/elasticsearch -d 7. 问题处理 出现max virtual memory areas vm.max_map_count [65530] is too low, increase to at least 错误信息 修改系统配置: vi /etc/sysctl.conf 添加 vm.max_map_count=655360 执行生效 sysctl -p vi /etc/security/limits.conf 在文件末尾添加 * soft nofifile 65536 * hard nofifile 131072 * soft nproc 2048 JAVA_HOME=/usr/local/jdk_11 # now set the path to java if [ ! -z "$JAVA_HOME" ]; then JAVA="$JAVA_HOME/bin/java" else if [ "$(uname -s)" = "Darwin" ]; then # OSX has a different structure JA VA="$ES_HOME/jdk/Contents/Home/bin/java" e ls e JAVA="$ES_HOME/jdk/bin/java" fi f i ## GC configuration #-XX:+UseConcMarkSweepGC ... ## G1GC Configuration # NOTE: G1GC is only supported on JDK version 10 or later. # To use G1GC uncomment the lines below. #-XX:-UseConcMarkSweepGC ... +微信study322 专业一手超清完整 全网课程 超低价格* hard nproc 4096 elsearch soft nproc 125535 elsearch hard nproc 125535 重新切换用户即可: su - elsearch FLINK ES写入功能实现 功能: 将Sock et流数据, 写入至ES服务。 代码: public class ElasticSinkApplication { public static void main(String[] args) throws Exception{ // 配置日志文件 System.setProperty("log4j.configurationFile", "log4j2.xml"); // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取Socket数据源 DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); //3. 配置ES服务信息 List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("10.10.20.132", 9200, "http")); //4. 数据解析处理 ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction<String>() { public IndexRequest createIndexRequest(String element) { Map<String, String> json = new HashMap<>(); // 解析数据 String[] arrValue = String.valueOf(element).split("\t"); for(int i=0; i<arrValue.length; i++) { if(i == 0) { json.put("ip", arrValue[i]); }else if( i== 1) { json.put("time", arrValue[i]); }else if( i== 2) { json.put("type", arrValue[i]); }else if( i== 3) { json.put("api", arrValue[i]); } } return Requests.indexRequest() .index("flink-es") .type("access-log") .source(json); } @Override +微信study322 专业一手超清完整 全网课程 超低价格查看index信息: http://10.10.20.132:9200/_cat/indices?v 查看具体数据: http://10.10.20.132:9200/flflink-es/_search?pretty 3.5 KAFKA(读/写) Kafka安装 1. 下载Kafka_2.12-1.1.1安装包 2. 将安装包解压 3. 修改kafka配置 只修改绑定IP, 因为是单节点, 其他按默认配置来。 如有多个IP地址, 绑定为对外访问的IP。 4. 启动zookeeper服务 kafka安装包内置了zookeeper,可以直接启动。 public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } } ); // 5. ES的写入配置 esSinkBuilder.setBulkFlushMaxActions(1); e s S i n k B ui ld e r . se t Re st C l ie nt F ac tory( re st C l i e nt Bu i ld e r -> { restClientBuilder.setMaxRetryTimeoutMillis(5000); } ); // 6. 添加ES的写入器 socketStr.addSink(esSinkBuilder.build()); socketStr.print().setParallelism(1); // 7. 执行任务 env.execute("job"); } } tar -xvf kafka_2.12-1.1.1.tgz listeners=PLAINTEXT://10.10.20.132:9092 advertised.listeners=PLAINTEXT://10.10.20.132:9092 +微信study322 专业一手超清完整 全网课程 超低价格5. 启动kafka服务 Flink Kafka 读取功能 功能: 通过flflink读取kafka消息队列数据, 并打印显示。 代码: 通过kafka生产者命令测试验证: 扩展点:kafka消息的消费处理策略: bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh -daemon config/server.properties publ ic class KafkaSourceApplication { public static void main(String[] args) throws Exception { // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 设置kafka服务连接信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); properties.setProperty("group.id", "fink_group"); // 3. 创建Kafka消费端 FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer( "flink-source", // 目标 topic new SimpleStringSchema(), // 序列化 配置 properties); // kafkaProducer.setStartFromEarliest(); // 尽可能从最早的记录开始 // kafkaProducer.setStartFromLatest(); // 从最新的记录开始 // kafkaProducer.setStartFromTimestamp(...); // 从指定的时间开始(毫秒) // kafkaProducer.setStartFromGroupOffsets(); // 默认的方法 // 4. 读取Kafka数据源 DataStreamSource<String> socketStr = env.addSource(kafkaProducer); socketStr.print().setParallelism(1); // 5. 执行任务 env.execute("job"); } } bin/kafka-console-producer.sh --broker-list 10.10.20.132:9092 --topic flink source +微信study322 专业一手超清完整 全网课程 超低价格Flink Kafka 写入功能 功能: 将Sock et的流数据,通过flflink 写入kafka 消息队列。 代码: 通过kafka消费者命令测试验证: 控制消息的发送处理模式: // kafkaProducer.setStartFromEarliest(); // 尽可能从最早的记录开始 // kafkaProducer.setStartFromLatest(); // 从最新的记录开始 // kafkaProducer.setStartFromTimestamp(...); // 从指定的时间开始(毫秒) // kafkaProducer.setStartFromGroupOffsets(); // 默认的方法 public class KafkaSinkApplication { public static void main(String[] args) throws Exception { // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取Socket数据源 DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); // 3. Kakfa的生产者配置 FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( "10.10.20.132:9092", // broker 列表 "flink-topic", // 目标 topic new SimpleStringSchema()); // 序列化 方式 // 4. 添加kafka的写入器 socketStr.addSink(kafkaProducer); socketStr.print().setParallelism(1); // 5. 执行任务 env.execute("job"); } } bin/kafka-console-consumer.sh --bootstrap-server 10.10.20.132:9092 --topic flink-topic +微信study322 专业一手超清完整 全网课程 超低价格提供了三种消息处理模式: S e m a n t i c . N O NE : Flink 不会有任何语义的保证,产生的记录可能会丢失或重复。 S e m a n t i c .A T _ LEA S T_ON CE (默认设置):类似 FlinkKafkaProducer010 版本中的 setFlushOnCheckpoint(true) ,这可以保证不会丢失任何记录(虽然记录可能会重复)。 Semantic.EXACTLY_ONCE :使用 Kafka 事务提供精准一次的语义。无论何时,在使用事务 写入 Kafka 时,都要记得为所有消费 Kafka 消息的应用程序设置所需的 isolation.level ( read_committed 或 read_uncommitted - 后者是默认值)。 Kafka 的消息可以携带时间戳,指示事件发生的时间或消息写入 Kafka broker 的时间。 3.6 自定义序列化(Protobuf) 在实际应用场景中, 会存在各种复杂传输对象,同时要求较高的传输处理性能, 这就需要采用自定义 的序列化方式做相应实现, 这里以Protobuf为例做讲解。 功能: kafka对同一Topic的生产与消费,采用Protobuf做序列化与反序列化传输, 验证能否正常解析 数据。 1. 通过protobuf脚本生成JAVA文件 通过批处理脚本,生成JAVA文件: // 控制消息的操作模式 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( "flink-topic", // 目标 topic new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), properties, Fl in k Ka f k a P roducer.Semantic.EXACTLY_ONCE ) ; / / 序 列 化 s c h em a kafkaProducer.setWriteTimestampToKafka(true); syntax = "proto3"; option java_package = "com.itcast.flink.connectors.kafka.proto"; option java_outer_classname = "AccessLogProto"; // 消息结构定义 message AccessLog { string ip = 1; string time = 2; string type = 3; string api = 4; string num = 5; } +微信study322 专业一手超清完整 全网课程 超低价格注意, 路径要配置正确。 2. 自定义序列化实现 添加POM 依赖: AccessLog对象: CustomSerialSchema: @echo off for %%i in (proto/*.proto) do ( d:/TestCode/protoc.exe --proto_path=./proto --java_out=../java ./proto/%%i echo generate %%i to java file successfully! ) <dep endencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.11.2</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.8.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.1.8.RELEASE</version> </dependency> </dependencies> @Data public class AccessLog implements Serializable { private String ip; private String time; private String type; private String api; private Integer num; } /** * 自定义序列化实现(Protobuf) */ +微信study322 专业一手超清完整 全网课程 超低价格public class CustomSerialSchema implements DeserializationSchema<AccessLog>, SerializationSchema<AccessLog> { private static final long serialVersionUID = 1L; private transient Charset charset; public CustomSerialSchema() { this(StandardCharsets.UTF_8); } p u b l ic C u st o m S e r i a l Sc he m a (C ha r se t ch a r se t ) { t h i s .c h a r se t = c he c k No tN ul l ( c ha r se t ) ; } public Charset getCharset() { return charset; } /** * 反序列化实现 * @param message * @return */ @Override public AccessLog deserialize(byte[] message) { AccessLog accessLog = null; try { AccessLogProto.AccessLog accessLogProto = AccessLogProto.AccessLog.parseFrom(message); accessLog = new AccessLog(); BeanUtils.copyProperties(accessLogProto, accessLog); return accessLog; } catch (Exception e) { e.printStackTrace(); } return accessLog; } @Override public boolean isEndOfStream(AccessLog nextElement) { return false; } /** * 序列化处理 * @param element * @return */ @Override public byte[] serialize(AccessLog element) { AccessLogProto.AccessLog.Builder builder = AccessLogProto.AccessLog.newBuilder(); BeanUtils.copyProperties(element, builder); return builder.build().toByteArray(); } +微信study322 专业一手超清完整 全网课程 超低价格3. 通过flflink对kafka消息生产者的实现 /** * 定义消息类型 * @return */ @Override public TypeInformation<AccessLog> getProducedType() { return TypeInformation.of(AccessLog.class); } } public class KafkaSinkApplication { public static void main(String[] args) throws Exception { // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取Socket数据源 DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); // 3. 转换处理流数据 SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() { @Override public AccessLog map(String value) throws Exception { System.out.println(value); // 根据分隔符解析数据 String[] arrValue = value.split("\t"); // 将数据组装为对象 AccessLog log = new AccessLog(); log.setNum(1); for(int i=0; i<arrValue.length; i++) { if(i == 0) { log.setIp(arrValue[i]); }else if( i== 1) { log.setTime(arrValue[i]); }else if( i== 2) { log.setType(arrValue[i]); }else if( i== 3) { log.setApi(arrValue[i]); } } return log; } }); // 3. Kakfa的生产者配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( "10.10.20.132:9092", // broker 列表 +微信study322 专业一手超清完整 全网课程 超低价格开启Kafka消费者命令行终端,验证生产者的可用性: 4. 通过flflink对kafka消息订阅者的实现 "flink-serial", // 目标 topic new CustomSerialSchema() // 序列化 方式 ); // 4. 添加kafka的写入器 outputStream.addSink(kafkaProducer); socketStr.print().setParallelism(1); / / 5 . 执 行 任 务 e n v .e x ec ut e ("job"); } } [root@flink1 kafka_2.12-1.1.1]# bin/kafka-console-consumer.sh --bootstrap server 10.10.20.132:9092 --topic flink-serial 1601649380422GET" getAccount 1601649381422POSTaddOrder 1601649382422POST" public class KafkaSourceApplication { public static void main(String[] args) throws Exception { // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 设置kafka服务连接信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); properties.setProperty("group.id", "fink_group"); // 3. 创建Kafka消费端 FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer( "flink-serial", // 目标 topic new CustomSerialSchema(), // 自定义序列化 properties); // 4. 读取Kafka数据源 DataStreamSource<AccessLog> socketStr = env.addSource(kafkaProducer); socketStr.print().setParallelism(1); // 5. 执行任务 env.execute("job"); } +微信study322 专业一手超清完整 全网课程 超低价格通过flflink的kafka生产者消息的发送, 对消费者的功能做测试验证。 4. Flink大屏数据实战 4.1 双十一大屏数据 总览数据 总销售量/总销售金额 TopN: 热销商品/商品类目/商品PV/商品UV 区域/分类数据 不同区域销售排名 不同分类销售排名 4.2 Canal同步服务安装 1. 下载安装包 安装包 后台管理包 2. 解压 解压安装包: 解压管理包: } mkdir -p /usr/local/canal tar -xzvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal/ +微信study322 专业一手超清完整 全网课程 超低价格3. 初始化管理数据库 导入初始化数据脚本: 4. 修改MySQL服务同步配置 编辑配置文件: 增加同步配置: 重启服务: 检查同步功能是否开启: 创建同步用户: 赋予同步所需权限: mkdir -p /usr/local/canal-admin tar -xvf canal.admin-1.1.4.tar.gz -C /usr/local/canal-admin mysql -uroot -p654321 < /usr/local/canal-admin/conf/canal_manager.sql vi / etc/my.cnf [mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # MySQL ID服务标识 service mysqld restart mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.01 sec) mysql> FLUSH PRIVILEGES; mysql> CREATE USER canal IDENTIFIED BY 'canal'; mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; Query OK, 0 rows affected (0.00 sec) mysql> FLUSH PRIVILEGES; Query OK, 0 rows affected (0.00 sec) +微信study322 专业一手超清完整 全网课程 超低价格5. 修改后台管理配置文件 配置内容: 先启动后台管理服务, 再启动Canal服务, 后台管理服务启动命令: 访问:http://10.10.20.133:8089/ 登录: admin/123456 6. Canal服务配置 配置内容: 启动Canal服务: vi /usr/local/canal-admin/conf/application.yml server: port: 8089 spring: jackson : d a te - fo rm a t : yy y y-MM-dd HH:mm:ss t im e -z on e : G MT + 8 spri ng.datasource: address: 192.168.19.150:3306 database: canal_manager username: root password: 654321 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}? useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 /usr/local/canal-admin/bin/startup.sh vi /usr/local/canal/conf/canal_local.properties # register ip canal.register.ip = 10.10.20.133 # canal admin config canal.admin.manager = 10.10.20.133:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = +微信study322 专业一手超清完整 全网课程 超低价格7. 后台管理配置 修改Server管理配置: 修改Instance 配置(如果没有, 则新建,载入模板即可): regex同步配置规则: 常见例子: 1. 所有表:.* or .\.. 2. canal schema下所有表: canal\..* 3. canal下的以canal打头的表:canal\.canal.* 4. canal schema下的一张表:canal.test1 5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔) 4.3 热销商品统计 功能实现流程: 1. 订单数据源的实现 2. flflink代码功能实现 3. Flink 与 Spring Boot的集成 4. 测试验证,比对SQL: /usr/local/canal/bin/startup.sh local # 指向ZK服务地址 canal.zkServers = 10.10.20.132:2181 # Canal同 步 方 式 canal.ser ve rM ode = kafka # mq服 务 地 址 canal . mq .s ervers = 10.10.20.132:9092 # mysql 同步服务ID标识, 不要配置冲突 canal.instance.mysql.slaveId=112 # mysql 同步主节点连接配置 canal.instance.master.address=192.168.19.150:3306 # 数据库用户名 canal.instance.dbUsername=canal # 数据库用户密码 canal.instance.dbPassword=canal # 数据同步消息队列 canal.mq.topic=order_binlog # 修改需要同步的数据库 canal.instance.filter.regex=flink.t_order select goodsId, sum(execPrice * execVolume) as totalAmount from t_order where execTime < 时间窗口的结束时间戳 group by goodsId order by totalAmount desc +微信study322 专业一手超清完整 全网课程 超低价格5. 数据呈现 kibana服务安装 Kibana是一个针对 Elasticsearch的开源分析及可视化平台,用来搜索、查看交互存储在Elasticsearch索 引中的数据。 1. 到官网下载, Kibana 安装包 , 与之对应 6.8.1版本, 选择 Linux 64 位版本下载,并进行解压。 2. Kibana启动不能使用 r oot用户, 使用上面创建的 elsear ch 用户, 进行赋权: 3. 修改配置文件 vi confifig/kibana.yml , 修改以下配置: 4. 启动kibana 看到以下日志, 代表启动正常 如果出现启动失败的情况, 要检查集群各节点的日志, 确保服务正常运行状态。 4.4 区域分类统计 1. 增加订单地址信息数据源 2. 创建对应的表与实体 实体: OrderAddress BO: JoinOrderAddress(订单数据与地址数据的合并对象) BO: HotDimensionOrder(ES存储的映射对象), 注意这里的ID唯一性, 如果是按省份统计, ID存储省份信息,如果是按地级市统计, ID则存储为市区信息。 3. 改造订单数据源, 增加缓存写入, 地址信息数据源增加缓存的读取。 chow n -R elsearch:elsearch kibana-6.8.1-linux-x86_64 1 # 服务端口 server.port: 5601 # 服务地址 server.host: "0.0.0.0" # elasticsearch服务地址, 填写集群所有节点地址, 之间用逗号分割 elasticsearch.hosts: ["http://10.10.20.28:9200", "http://10.10.20.29:9200", "http://10.10.20.30:9200"] ./kibana -q log [01:40:00.143] [info][listening] Server running at http://0.0.0.0:5601 +微信study322 专业一手超清完整 全网课程 超低价格4. 修改Canal的后台配置, 增加地址数据源的监听队列。 5. 区域双流统计的核心代码实现: 1) 增加双流的kafka配置, 每个流监听不同的数据队列。 2)每个流要加上时间水印, 设定时间窗, 设定值比后面聚合的时间窗稍小一些。 3)根据订单ID做join匹配。 4) 根据区域做汇总统计(省份、城市)。 5) 将数据写入至 ES。 6. 测试验证 验证SQL: 4.5 订单状态监控统计(CEP) 1. 增加订单支付流水数据源 2. 创建对应的表与实体 实体: OrderPayment BO: JoinOrderAddress 3. 修改Canal的后台配置, 增加地址数据源的监听队列。 4. 核心代码实现: 1) 实现订单支付流水数据源的监听处理。 2)定义CEP处理规则,解析出支付成功的订单。 5. 测试验证 检查订单状态是未支付 -》 已支付的数据 检查超时的数据: 初始状态为0, 指定时间之内没有已支付的数据。 6. 拓展实现, 热门商品统计排行,只统计支付成功的数据。 sele ct province, goodsId, sum(execPrice * execVolume) totalAmount from t_order odr left join t_order_address adr on odr.id = adr.orderId where odr.execTime < 时间窗结束时间 group by province, goodsId order by province, totalAmount desc select * from t_order_payment pay where exists ( select 1 from t_order_payment tmp where tmp.orderId = pay.orderId and tmp.status = 0 ) and pay.status = 1 +微信study322 专业一手超清完整 全网课程 超低价格4.6 商品UV统计 功能: 统计商品在一段时间内的UV(Unique Visitor) 核心代码: public cla ss ScreenUniqueVisitorProcessor { /** * 执 行flink任务处理 * @throws Exception */ public void executeFlinkTask() throws Exception { // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(1); // 2. 读取Socket数据源 // DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); DataStreamSource<String> socketStr = env.readTextFile("./data/goods_access.log"); // 3. 数据解析转换处理 socketStr.flatMap(new FlatMapFunction<String, GoodsAccessLog>() { @Override public void flatMap(String value, Collector<GoodsAccessLog> out) throws Exception { // 获取Json中的data数据 // 根据分隔符解析数据 String[] arrValue = value.split("\t"); System.out.println("receive msg => " + value); // 将数据组装为对象 GoodsAccessLog log = new GoodsAccessLog(); for(int i=0; i<arrValue.length; i++) { if(i == 0) { log.setIp(arrValue[i]); }else if( i== 1) { log.setAccessTime(Long.valueOf(arrValue[i])); }else if( i== 2) { log.setEventType(arrValue[i]); }else if( i== 3) { log.setGoodsId(arrValue[i]); } } out.collect(log); } }) .filter(new FilterFunction<GoodsAccessLog>() { +微信study322 专业一手超清完整 全网课程 超低价格4.7 布隆过滤器 功能: 统计商品在一段时间内的UV(采用布隆过滤器) 核心代码: @Override public boolean filter(GoodsAccessLog value) throws Exception { return value.getEventType().equals("view"); } }) .keyBy(GoodsAccessLog::getGoodsId) .timeWindow(Time.seconds(10)) .process( new ProcessWindowFunction<GoodsAccessLog, Map<String, String> , String, TimeWindow>(){ @ O v e r r i d e p u b li c v o id process(String key, Context context, Iterable<G o odsAccessLog> elements, Collector<Map<String, String>> out) throws Exception { S et < S t r i n g > i pS e t = n ew H a s h S et < > () ; M a p < S t r i n g , S t r i n g > g o o d s UV = n ew Li nkedHashMap<>(); elements.forEach( log -> { ipSet.add(log.getIp()); }); goodsUV.put(key , context.window().getEnd() + ":" + ipSet.size()); out.collect(goodsUV); } }) .print("uv result").setParallelism(1); // 5. 执行任务 env.execute("job"); } } /** * 执行flink任务处理 * @throws Exception */ public void executeFlinkTask() throws Exception { // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 2. 读取Socket数据源 // DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); DataStreamSource<String> socketStr = env.readTextFile("./data/goods_access.log"); // 3. 数据解析转换处理 socketStr.flatMap(new FlatMapFunction<String, GoodsAccessLog>() { +微信study322 专业一手超清完整 全网课程 超低价格@Override public void flatMap(String value, Collector<GoodsAccessLog> out) throws Exception { // 获取Json中的data数据 // 根据分隔符解析数据 String[] arrValue = value.split("\t"); // 将数据组装为对象 GoodsAccessLog log = new GoodsAccessLog(); f o r (i nt i= 0 ; i <a rrValue.length; i++) { if ( i == 0) { l og .s et I p (a r rV alue[i]); } e l se if ( i= = 1) { l og .s et A c ce s sT ime(Long.valueOf(arrValue[i])); } e l se if ( i == 2) { log.setEventType(arrValue[i]); }else if( i== 3) { log.setGoodsId(arrValue[i]); } } out.collect(log); } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<GoodsAccessLog>(Time.seconds(0)) { @Override public long extractTimestamp(GoodsAccessLog element) { return element.getAccessTime(); } }) .filter(new FilterFunction<GoodsAccessLog>() { @Override public boolean filter(GoodsAccessLog value) throws Exception { return value.getEventType().equals("view"); } }) .keyBy(GoodsAccessLog::getGoodsId) .timeWindow(Time.minutes(30)) .trigger(new CustomWindowTrigger()) .process(new CustomUVBloom()) .keyBy(0) .timeWindow(Time.seconds(3)) .max(1) .print("uv result => ").setParallelism(1); // 5. 执行任务 env.execute("job"); } +微信study322 专业一手超清完整 全网课程 超低价格
标签:canal,study322,cannal,超清,WM,flink,env,new,public From: https://www.cnblogs.com/shan13936/p/17402517.html