首页 > 其他分享 >【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询

【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询

时间:2024-01-07 16:03:15浏览次数:31  
标签:Table 示例 flink 查询 API org apache import table

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


(文章目录)


本文通过Table API 进行基本表的查询操作,同时给出了Tumble和Over窗口的查询示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文需要有kafka的运行环境。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1) 17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章: 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图 【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询 【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作 【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作 【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作 【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等) 【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本) 【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作 【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作 【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作 【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作 【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作 【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本) 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版 【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、示例:通过API查询表和使用窗口函数的查询

本示例实现了Tumble和Over窗口查询。 如果使用sql的窗口函数参考: 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)

1、示例:基本的查询表

本示例仅仅是基本的查询表用法,包含2种方式,即Table API 与 SQL的方式。

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;

import com.google.common.collect.Lists;
/**
 * @author alanchan
 *
 */
public class TestTableAPIDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// testQueryTableBySQL();		
		testQueryTableByAPI();
	}
	
static void testQueryTableByAPI() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		// SQL 创建输入表
		String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" +
				"  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" +
				"  `partition` BIGINT METADATA VIRTUAL,\r\n" +
				"  `offset` BIGINT METADATA VIRTUAL,\r\n" +
				"  `user_id` BIGINT,\r\n" +
				"  `item_id` BIGINT,\r\n" +
				"  `behavior` STRING\r\n" +
				") WITH (\r\n" +
				"  'connector' = 'kafka',\r\n" +
				"  'topic' = 'user_behavior',\r\n" +
				"  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" +
				"  'properties.group.id' = 'testGroup',\r\n" +
				"  'scan.startup.mode' = 'earliest-offset',\r\n" +
				"  'format' = 'csv'\r\n" +
				");";
		tenv.executeSql(sourceSql);

		// 2、将流转成table
		String sql = "select * from Alan_KafkaTable ";
		Table streamTable = tenv.sqlQuery(sql);

		// 3、API查询
		Table resultQuery = streamTable
				.groupBy($("user_id"), $("behavior"))
				.select($("user_id"), $("behavior"), $("behavior").count().as("count(*)"));

		// 4、将流转成table
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultQuery, Row.class);

		// 5、sink
		resultDS.print();

		// 6、执行
		env.execute();
		// kafka中输入测试数据
		// 1,1001,login
		// 1,2001,p_read

		// 程序运行控制台输入如下
		// 14> (true,+I[1, p_read, 1])
		// 3> (true,+I[1, login, 1])
	}
	
	static void testQueryTableBySQL() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		// SQL 创建输入表
		String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" +
				"  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" +
				"  `partition` BIGINT METADATA VIRTUAL,\r\n" +
				"  `offset` BIGINT METADATA VIRTUAL,\r\n" +
				"  `user_id` BIGINT,\r\n" +
				"  `item_id` BIGINT,\r\n" +
				"  `behavior` STRING\r\n" +
				") WITH (\r\n" +
				"  'connector' = 'kafka',\r\n" +
				"  'topic' = 'user_behavior',\r\n" +
				"  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" +
				"  'properties.group.id' = 'testGroup',\r\n" +
				"  'scan.startup.mode' = 'earliest-offset',\r\n" +
				"  'format' = 'csv'\r\n" +
				");";
		tenv.executeSql(sourceSql);

		// 查询
		String sql = "select * from Alan_KafkaTable ";
		Table resultQuery = tenv.sqlQuery(sql);

		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultQuery, Row.class);

		// 6、sink
		resultDS.print();

		// 7、执行
		env.execute();
		// kafka中输入测试数据
		// 1,1001,login
		// 1,2001,p_read

		// 程序运行控制台输入如下
		// 11> (true,+I[16:32:19.923, 0, 0, 1, 1001, login])
		// 11> (true,+I[16:32:32.258, 0, 1, 1, 2001, p_read])

	}
}

2、示例:Tumble窗口查询表

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;

import com.google.common.collect.Lists;
/**
 * @author alanchan
 *
 */
public class TestTableAPIDemo {
	final static List<User> userList = Arrays.asList(
			new User(1L, "alan", 18, 1698742358391L),
			new User(2L, "alan", 19, 1698742359396L),
			new User(3L, "alan", 20, 1698742360407L),
			new User(4L, "alanchan", 28, 1698742361409L),
			new User(5L, "alanchan", 29, 1698742362424L));
			
	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		testQueryTableWithWindwosByAPI();
	}

	static void testQueryTableWithWindwosByAPI() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		DataStream<User> users = env.fromCollection(userList)
				.assignTimestampsAndWatermarks(
						WatermarkStrategy
						.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
						.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
						)
				;
		
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("age"),$("rt").rowtime());
		
		// tumble
		Table result = usersTable
				.filter(
					and(
//							$("name").equals("alanchan"),
//							$("age").between(1, 20),
							$("name").isNotNull(),
							$("age").isGreaterOrEqual(19)
							)
				)
				.window(Tumble.over(lit(1).hours()).on($("rt")).as("hourlyWindow"))// 定义滚动窗口并给窗口起一个别名
				.groupBy($("name"),$("hourlyWindow"))// 窗口必须出现的分组字段中
				.select($("name"),$("name").count().as("count(*)"), $("hourlyWindow").start(), $("hourlyWindow").end())
				;
		result.printSchema();
		
		DataStream<Tuple2<Boolean, Row>> resultDS =  tenv.toRetractStream(result, Row.class);
		resultDS.print();
				
		env.execute();
	
		}

	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class User {
		private long id;
		private String name;
		private int age;
		private Long rowtime;
	}
}

3、示例:Over窗口查询表

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;

import com.google.common.collect.Lists;
/**
 * @author alanchan
 *
 */
public class TestTableAPIDemo {
	final static List<User> userList = Arrays.asList(
			new User(1L, "alan", 18, 1698742358391L),
			new User(2L, "alan", 19, 1698742359396L),
			new User(3L, "alan", 20, 1698742360407L),
			new User(4L, "alanchan", 28, 1698742361409L),
			new User(5L, "alanchan", 29, 1698742362424L));
			
	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class User {
		private long id;
		private String name;
		private int age;
		private Long rowtime;
	}		
	
	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		testQueryTableWithWindwosByAPI();
	}

	static void testQueryTableWithWindwosByAPI() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		DataStream<User> users = env.fromCollection(userList)
				.assignTimestampsAndWatermarks(
						WatermarkStrategy
						.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
						.withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
						)
				;
		
		Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("age"),$("rt").rowtime());
			
		// over 
		usersTable
			.window(Over.partitionBy($("name")).orderBy($("rt")).preceding(unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE)).as("hourlyWindow"))
			.select($("id"), $("rt"), $("id").count().over($("hourlyWindow")).as("count_t"))
            .execute()
            .print()
			;
		
		env.execute();
	
		}

}

以上,本文通过Table API 进行基本表的查询操作,同时给出了Tumble和Over窗口的查询示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1) 17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章: 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图 【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询 【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作 【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作 【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作 【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等) 【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本) 【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作 【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作 【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作 【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作 【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作 【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本) 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版 【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

标签:Table,示例,flink,查询,API,org,apache,import,table
From: https://blog.51cto.com/alanchan2win/9134120

相关文章

  • 简洁、轻量级的 Go API 框架
    本次分享的框架是「gin-api-mono」介绍gin-api-mono前先了解go-gin-apigo-gin-api这是一个基于Gin的API框架,它提供了WEB界面一键安装的方式,让你可以快速启动一个开箱即用的Go项目。无论你是否有项目经验,这个框架都适合作为练手项目使用(新手入门必备)。该框架采用了......
  • js和python的接口api怎么开发
    在JavaScript(JS)和Python之间开发接口(API)时,可以使用多种方法,具体取决于你的需求和偏好。以下是一些常见的方法:RESTfulAPI:RESTful(RepresentationalStateTransfer)是一种设计风格,通过HTTP协议进行通信。你可以使用Node.js(JavaScript)和Flask/Django(Python)等框架来实现RESTfulAPI。在......
  • mysql如何进行简单的分析查询
    在MySQL中进行简单的分析查询通常涉及使用一些聚合函数和条件筛选来获取有关数据集的汇总信息。以下是一些常见的分析查询示例:计算平均值:SELECTAVG(column_name)ASaverage_valueFROMtable_name;计算总和:SELECTSUM(column_name)AStotal_sumFROMtable_name;计算最大值和最......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • • python 脚本 输入字符串 输出字符串+当前时间 生成api http请求
    案例问题背景python脚本输入字符串输出字符串+当前时间生成apihttp请求脚本1这是单线程的单次处理单个http请求同时多个请求按照顺序处理而不是并行处理多请求!=多线程但是相关使用多线程来并行处理多请求使用flask或django等web服务器框架可以与wsgi服务器配合使用比如guni......
  • C#中Queue队列的基本使用示例
       在C#中,Queue是一个内置的FIFO(First-In-First-Out)集合,这意味着元素在队列中的顺序与它们被添加的顺序相同,当且仅当从队列中移除元素时,元素出队的顺序才是正确的。Queue在.NETFramework中是一个泛型集合类型,这意味着你可以存储任何类型的元素。它提供了许多方法来操作队列,......
  • 2024年小红书最新x-s-common签名算法分析以及点赞api接口测试nodejs(2024-01-05)
      2024年小红书又更新了x-s-common算法,现在的版本是:3.6.8。这个签名算法现在是越来越重要了,许多接口都要用到。比如:评论,点赞等接口,没有这个算法采集不到数据。  一、chrome逆向x-s-common算法  1、x-s-common  打开chrome,按f12,打开开发者模式,随便找一接口,全局......
  • 敏捷研发管理流程及示例-Leangoo领歌|永久免费的敏捷开发工具
    ​ Leangoo领歌是一款永久免费的专业的敏捷开发管理工具,提供端到端敏捷研发管理解决方案,涵盖敏捷需求管理、任务协同、进展跟踪、统计度量等。Leangoo领歌上手快、实施成本低,可帮助企业快速落地敏捷,提质增效、缩短周期、加速创新。Leangoo领歌区别于传统项目管理软件,项目的需求......
  • Golang如何进行数据库查询
    Golang是一门高效、快速、强大的编程语言,可用于构建各种应用程序,尤其是在Web开发中表现突出。当与数据库结合使用时,Golang提供了一些强大的工具,帮助开发人员操作数据库。在本篇文章中,我们将重点介绍Golang如何进行数据库查询。一、Golang数据库查询Golang中的数据库查询主要有两......
  • 如何使用loki查询日志中大于某一数字的值的日志
    简介loki是一款轻量级的日志收集中间件,比elk体系占用的内存更小,采用go语言开发,可以利用grafana来查询loki中存储的日志,loki存储日志只对提前预设的标签做索引,所以日志存储空间占用比elk小很多。方法loki只对提前预设的标签做索引,但如果我们想给标签之外的文本根据其值代表的数......