首页 > 其他分享 >【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(2) - 基于时间的滚动和滑动窗口

【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(2) - 基于时间的滚动和滑动窗口

时间:2023-12-28 13:37:30浏览次数:30  
标签:窗口 示例 flink streaming api import apache org



文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、示例:基于时间的滚动和滑动窗口
  • 1、滚动窗口实现统计地铁进站口人数
  • 1)、一般实现(Tuple2数据结构)及验证
  • 2)、面向对象实现(pojo数据结构)及验证
  • 3)、面向对象lambda实现(pojo的数据结构lambda)及验证
  • 4)、一般lambda实现(Tuple2数据结构)及验证
  • 2、滑动窗口实现统计地铁进站口人数
  • 1)、一般实现(Tuple2数据结构)及验证
  • 2)、面向对象实现(pojo数据结构)及验证



本文介绍了Flink window的基于时间的滚动窗口与滑动窗口的几种使用示例,其中包含详细的验证步骤与验证结果。

一、maven依赖

<properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <flink.version>1.17.0</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.2</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

二、示例:基于时间的滚动和滑动窗口

1、滚动窗口实现统计地铁进站口人数

实现:每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数

1)、一般实现(Tuple2数据结构)及验证

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.java.functions.KeySelector;

/**
 * @author alanchan 
 * 基于滚动窗口的入门示例 
 * 每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数 
 * size=slide
 *
 */
public class TumblingTimeWindowsDemo1 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
		
		// transformation
		DataStream<Tuple2<String, Integer>> subwayExit = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {

			@Override
			public Tuple2<String, Integer> map(String line) throws Exception {
				String[] arr = line.split(",");

				return Tuple2.of(arr[0], Integer.parseInt(arr[1]));
			}
		});
		
		//按照地铁口分组
//		KeyedStream<Tuple2<String, Integer>, String> keyedDS = subwayExit.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//			@Override
//			public String getKey(Tuple2<String, Integer> value) throws Exception {
//				return value.f0;
//			}
//		});
		//另外一种分组方式
		KeyedStream<Tuple2<String, Integer>, Tuple> keyedDS = subwayExit.keyBy(0);
				
		DataStream<Tuple2<String, Integer>> result1 = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
				//另外一种聚合方式实现
//				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
//
//					@Override
//					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
//
//						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
//					}
//
//				});
				.sum(1);

		// sink
		result1.print();

		// execute
		env.execute();
	}

}

验证步骤

  • 1、启动nc
nc -lk 9999
  • 2、启动应用程序
  • 3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
no1,1
no2,1
no1,2
no1,3
no2,6
  • 4、查看应用程序控制台输出

2)、面向对象实现(pojo数据结构)及验证

  • bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SubWay {
	// 地铁站进站口
	private String No;
	// 某一时段人数
	private Integer userCount;
}
  • 实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan 
 * 基于滚动窗口的入门示例 
 * 每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数 
 * size=slide
 *
 */
public class TumblingTimeWindowsDemo2 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		DataStream<Subway> subwayExit = lines.map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String line) throws Exception {
				String[] arr = line.split(",");

				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// 按照地铁口分组
		KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});
		
		//userCount是Subway的属性名称
		DataStream<Subway> result = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("userCount");
		// sink
		result.print();

		// execute
		env.execute();
	}

}
  • 验证步骤
    1、启动nc
nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
no,1
no2,1
no2,4
no1,2

4、查看应用程序控制台输出

【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(2) - 基于时间的滚动和滑动窗口_flink

3)、面向对象lambda实现(pojo的数据结构lambda)及验证

Subway的bean参考上文示例中的内容。

import java.util.Arrays;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.typeinfo.Types;

/**
 * @author alanchan
 *
 */
public class TumblingTimeWindowsDemo3 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
//		DataStream<Subway> subwayExit = lines.map(new MapFunction<String, Subway>() {
//
//			@Override
//			public Subway map(String line) throws Exception {
//				String[] arr = line.split(",");
//				return new Subway(arr[0], Integer.parseInt(arr[1]));
//			}
//		});
		DataStream<Subway> subwayExit = lines.map(new Splitter());

		// 按照地铁口分组
		KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(Subway::getNo);
		// subwayExit.keyBy(subway->subway.getNo())

		DataStream<Subway> result = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("userCount");
		
		// sink
		result.print();

		// execute
		env.execute();
	}

	public static class Splitter implements MapFunction<String, Subway> {
		@Override
		public Subway map(String value) {
			String[] arr = value.split(",");
			return new Subway(arr[0], Integer.parseInt(arr[1]));
		}
	}
}

验证步骤

  • 1、启动nc
nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
n1,2
n2,3
n1,4
n1,5

4、查看应用程序控制台输出

【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(2) - 基于时间的滚动和滑动窗口_flink hive_02

4)、一般lambda实现(Tuple2数据结构)及验证

  • 实现
import java.util.Arrays;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.typeinfo.Types;

/**
 * @author alanchan
 *
 */
public class TumblingTimeWindowsDemo4 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("192.168.10.42", 9999)
                .map(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .sum(1);
		// sink
		dataStream.print();

		// execute
		env.execute();
	}

	public static class Splitter implements MapFunction<String, Tuple2<String, Integer>> {
		@Override
		public Tuple2<String, Integer> map(String value) {
			String[] arr = value.split(",");
			return new Tuple2(arr[0], Integer.parseInt(arr[1]));
		}
	}

}

验证步骤

  • 1、启动nc
nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
n3,1
n3,5
n4,6
n4,8
n3,3

4、查看应用程序控制台输出

【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(2) - 基于时间的滚动和滑动窗口_flink hive_03

2、滑动窗口实现统计地铁进站口人数

每分钟统计一次地铁进站每个入口人数,最近2分钟每个进站口的人数

lambda实现方式不再赘述

1)、一般实现(Tuple2数据结构)及验证

  • 实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan 
 * 基于滑动窗口的入门示例 
 * 每分钟统计一次地铁进站每个入口人数,最近2分钟每个进站口的人数 
 * size>slide
 * 
 */
public class SlidingProcessingTimeWindowsDemo1 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		DataStream<Tuple2<String, Integer>> subwayExit = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {

			@Override
			public Tuple2<String, Integer> map(String line) throws Exception {
				String[] arr = line.split(",");
				return Tuple2.of(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// 按照地铁口分组
		KeyedStream<Tuple2<String, Integer>, String> keyedDS = subwayExit.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
			@Override
			public String getKey(Tuple2<String, Integer> value) throws Exception {
				return value.f0;
			}
		});

		SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);

		// sink
		result.print();

		// execute
		env.execute();
	}

}
  • 验证步骤
    1、启动nc
nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
1,4
2,3
2,4,
1,2
1,3

4、查看应用程序控制台输出

通过验证发现输出数据与预期一致

7> (1,5)
4> (2,3)
7> (1,9)
4> (2,7)
7> (1,6)
4> (2,4)
7> (1,5)
7> (1,3)

2)、面向对象实现(pojo数据结构)及验证

  • 实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan
 *
 */
public class SlidingProcessingTimeWindowsDemo2 {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		DataStream<Subway> subwayExit = lines.map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String line) throws Exception {
				String[] arr = line.split(",");
				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// 按照地铁口分组
		KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});

		SingleOutputStreamOperator<Subway> result = keyedDS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum("userCount");

		// sink
		result.print();

		// execute
		env.execute();

	}

}
  • 验证步骤
    1、启动nc
nc -lk 9999

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
2,2
3,3
2,4
2,5
3,5
4,5
3,5

4、查看应用程序控制台输出

通过查看输出结果与预期一致。

5> Subway(No=3, userCount=3)
4> Subway(No=2, userCount=2)
4> Subway(No=2, userCount=6)
5> Subway(No=3, userCount=3)
1> Subway(No=4, userCount=5)
5> Subway(No=3, userCount=5)
4> Subway(No=2, userCount=9)
5> Subway(No=3, userCount=10)
4> Subway(No=2, userCount=5)
1> Subway(No=4, userCount=5)
5> Subway(No=3, userCount=5)

以上,本文介绍了Flink window的基于时间的滚动窗口与滑动窗口的几种使用示例,其中包含详细的验证步骤与验证结果。

标签:窗口,示例,flink,streaming,api,import,apache,org
From: https://blog.51cto.com/alanchan2win/9013144

相关文章

  • 【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口
    文章目录Flink系列文章一、Flink的window介绍1、window介绍2、windowAPI1)、WindowAssigner2)、Trigger3)、Evictor3、window的生命周期二、window的分类1、TumblingWindows2、SlidingWindows3、SessionWindows4、GlobalWindows5、按照时间time和数量count分类6、按照滑动间隔s......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、广播变量BroadcastVariables示例1、介绍2、广播变量示例3、验证三、BroadcastState与BroadcastVariable区别本文简单的介绍了flink中关于广播变量的简单使用示例。一、maven依赖为避免篇幅过长,所有基础依赖均在第一篇文章中列出,具......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、Flinksink介绍三、sink到文件、console示例1、console输出2、sink到文件1)、sinktxt文件到hdfs上2)、sinkcsv文件到本地3)、sinktext文件到hdfs上(writeUsingOutputFormat)四、sink到socket示例(writeToSocket)五、Jdbc/mysql示例1、maven依......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、分布式缓存(DistributedCache)示例1、介绍2、maven依赖3、实现4、验证1)、验证步骤2)、验证本文介绍了flink关于分布式缓存的使用示例,比较简单。本文除了maven依赖外,没有其他依赖。本示例需要hadoop环境可用。一、maven依赖为避免篇幅过长,所......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、sink到ClickHouse示例1、介绍2、maven依赖3、创建clickhouse表4、验证clickhouseweb页面是否正常5、实现1)、userbean2)、sink实现6、验证1)、nc输入2)、启动应用程序3)、观察应用程序控制台输出4)、查看clickhouse表中的数据本文介绍了nc作......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、环境或版本说明三、flinksink到kafka示例1、介绍2、1.13.6版本示例1)、maven依赖2)、实现3)、验证步骤3、1.17.0版本示例1)、maven依赖2)、实现3)、验证步骤本文介绍了flink将数据sink到kafka的示例,并提供了flink的1.13.6和1.17两个版本sink到......
  • Flink实验
     题目:实验八姓名 日期12.8实验环境:(1)Ubuntu18.04(或Ubuntu16.04)。(2)IntelliJIDEA。(3)Flink1.9.1。 实验内容与完成情况:(1)使用IntelliJIDEA工具开发WordCount程序在Linux系统中安装IntelliJIDEA,然后使用IntelliJIDEA工具开发WordCount程序,并打包......
  • 【SpringBoot快速入门】(4)SpringBoot项目案例代码示例
    目录1创建工程3配置文件4静态资源之前我们已经学习的Spring、SpringMVC、Mabatis、Maven,详细讲解了Spring、SpringMVC、Mabatis整合SSM的方案和案例,上一节我们学习了SpringBoot的开发步骤、工程构建方法以及工程的快速启动,从这一节开始,我们开始学习SpringBoot配置文件。接下来......
  • 【SpringBoot快速入门】(3)SpringBoot整合junit和MyBatis 详细代码示例与讲解
    目录1.SpringBoot整合junit1.1环境准备1.2编写测试类2.SpringBoot整合mybatis2.1回顾Spring整合Mybatis2.2SpringBoot整合mybatis2.2.1创建模块2.2.2定义实体类2.2.3定义dao接口2.2.4定义测试类2.2.5编写配置2.2.6测试2.2.7使用Druid数据源之前我们已经学习的Spring、......
  • 项目应用多级缓存示例
    前不久做的一个项目,需要在前端实时展示硬件设备的数据。设备很多,并且每个设备的数据也很多,总之就是数据很多。同时,设备的刷新频率很快,需要每2秒读取一遍数据。问题来了,我们如何读取数据,并且在前端展示?我的想法是利用多级缓存:1)首先是有个数据采集程序,不停地采集设备的数据。采集到......