首页 > 其他分享 >Flink处理函数

Flink处理函数

时间:2023-01-09 15:12:00浏览次数:45  
标签:Flink flink ctx new org apache import 处理函数

Process

Flink 提供了 8 个不同的处理函数:

(1) ProcessFunction

最基本的处理函数,基于DataStream 直接调用.process()时作为参数传入。

(2) KeyedProcessFunction

对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。

(3) ProcessWindowFunction

开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream 调用.process()时作为参数传入。

(4) ProcessAllWindowFunction

同样是开窗之后的处理函数,基于AllWindowedStream 调用.process()时作为参数传入。

(5) CoProcessFunction

合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。

(6) ProcessJoinFunction

间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。

(7) BroadcastProcessFunction

广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。

(8) KeyedBroadcastProcessFunction

按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream 与广播流(BroadcastStream)做连接之后的产物。

处理函数是一个最底层的API,直面的就是数据流中最基本的元素:数据事件(envent)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权,能做任何事。

基本处理函数(ProcessFunction)

ProcessFunction继承了AbstractRichFunction

ProcessFunction接口自身有2个比较重要的方法就是

  • processElement(I value, Context ctx, Collector out),流中的每一个元素都会调用此方法。在这个方法中有个 Context 即上下文,Context可以访问元素的时间戳、key值、output、TimerService时间服务。另外,TimerService()是一个抽象方法,里面可获得当前时间(处理时间)、获取水位线(事件时间)、注册定时器(process/event)、删除定时器的方法。
  • onTimer(long timestamp, OnTimerContext ctx, Collector out) ,onTime是一个条件触发执行方法,当我们注册了定时器,并且到达了指定时间时后,就会触发onTime方法。但该方法只能针对KeyedStream使用。而本身的ProcessFunction是基于DataStream流的。

//processElement(输入,上下文,输出)
/*ctx

  • timestamp

  • timeServer->可获得当前时间(处理时间)、获取水位线(事件事件)、注册定时器(process/event)、删除定时器

  • output

  • onTime只要在KeyBy操作后才能用(onTime是基于KeyedStream的方法)

  • onTime->当我们注册了定时器,并且到达了指定时间时后,就会触发onTime方法

*/

package com.peng.process;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @author 海绵先生
 * @Description TODO
 * @date 2022/11/6-16:28
 */
public class ProcessFunctionDemo01 {
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Event {
        private String user;
        private String url;
        private Long timestamp;
    }

    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // source
        DataStreamSource<Event> dataStreamSource = env.fromElements(new Event("Sponge", "./user", 100L),
                new Event("Tom", "./user", 150L),
                new Event("Jack", "./lookspouce", 300L),
                new Event("Sponge", "./like", 400L));

        SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));

        SingleOutputStreamOperator<String> process = eventSingleOutputStreamOperator.process(new ProcessFunction<Event, String>() {
            @Override
            public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                //processElement(输入,上下文,输出)
                /*ctx上下文中的方法
                 * timestamp
                 * timeServer->可获得当前时间(处理时间)、获取水位线(事件事件)、注册定时器(process/event)、删除定时器
                 * output
                 */
                if (value.user.equals("Jack")) {
                    out.collect(value.user + " clicks " + value.url);
                } else if (value.user.equals("Tom")) {
                    out.collect(value.user);
                    out.collect(value.user);
                }
                // 输出当前事件时间
                System.out.println("timestamp:" + ctx.timestamp());
                // 输出当前水位线
                System.out.println("watermark:" + ctx.timerService().currentWatermark());

                System.out.println("IndexOfThisSubtask:" + getRuntimeContext().getIndexOfThisSubtask());

            }
        });

        process.print();

        env.execute();
    }
}

虽然Context的timeServer()里有注册定时器的功能,但是ProcessFunction实现不了,这时我们就需要自定义实现KeyedProcessFunction了。

KeyProcessFunction

KeyProcessFunction在实际开发中是用的比较多的。

KeyProcessFunction比ProcessFunction多了一个onTime功能。

基于处理时间的定时器:registerProcessingTimeTimer

package com.peng.process;


import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

/**
 * @author 海绵先生
 * @Description TODO 基于处理时间的定时器
 * @date 2022/11/6-19:30
 */
public class KeyedProcessFunctionDemo01 {
    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // source
        DataStreamSource<Event> eventDataStreamSource = env.addSource(new ClickSource());

        eventDataStreamSource.keyBy(Event::getUser)
                .process(new KeyedProcessFunction<String, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        long currTs = ctx.timerService().currentProcessingTime();
                        // ctx.getCurrentKey()获取当前的key值 new Timestamp()将时间转换成YYYY-MM-DD HH:MM:SS的格式
                        out.collect(ctx.getCurrentKey() + "数据到达时间:" + new Timestamp(currTs));//zhangsan数据到达时间:2022-11-06 20:04:20.269

                        // 注册一个10秒后的定时器  registerProcessingTimeTimer:基于处理时间
                        ctx.timerService().registerProcessingTimeTimer(currTs + 10*1000L);

                    }
                    // onTimer(当前时间,上下文,输出)
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        // ctx.getCurrentKey()获取当前的key值
                        out.collect(ctx.getCurrentKey() + "定时器触发时间:" + new Timestamp(timestamp));//zhangsan定时器触发时间:2022-11-06 20:04:30.269
                    }
                }).print();

        env.execute();
    }
}

基于事件时间的定时器:registerEventTimeTimer

package com.peng.process;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;

/**
 * @author 海绵先生
 * @Description TODO 基于事件时间定时器
 * @date 2022/11/6-20:21
 */
public class KeyedProcessFunctionDemo02 {
    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // source
        SingleOutputStreamOperator<Event> eventDataStreamSource = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        eventDataStreamSource.keyBy(Event::getUser)
                .process(new KeyedProcessFunction<String, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        long currTs = ctx.timerService().currentProcessingTime();
                        // ctx.getCurrentKey()获取当前的key值
                        out.collect(ctx.getCurrentKey() + " 数据到达时间:" + new Timestamp(currTs) + " watermark:" + ctx.timerService().currentWatermark());//lisi 数据到达时间:2022-11-06 20:29:57.643 watermark:1667737795406

                        // 注册一个10秒后的定时器  registerEventTimeTimer:基于处理时间
                        ctx.timerService().registerEventTimeTimer(currTs + 10*1000L);

                    }
                    // onTimer(当前时间,上下文,输出)
                    // 基于事件时间的定时器,只有当水位线大于定时器时间时才会触发
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        // ctx.getCurrentKey()获取当前的key值
                        out.collect(ctx.getCurrentKey() + " 定时器触发时间:" + new Timestamp(timestamp));// lisi 定时器触发时间:2022-11-06 20:30:07.643
                    }
                }).print();

        env.execute();
    }
}

窗体的水位线是没隔一段时间才生成的,因此会出现时间对不上的现状,这很正常。

当一个程序结束时,Flink会把水位线的值调整成最大值,这样所有的定时器就会全部实现。

ProcessWindowFunction

窗口处理函数 ProcessWindowFunction 的使用与其他窗口函数类似, 也是基于WindowedStream 直接调用方法就可以,只不过这时调用的是.process()。

stream.keyBy( t -> t.f0 )
	.window( TumblingEventTimeWindows.of(Time.seconds(10)) )
	.process(new MyProcessWindowFunction())
 
/***************************************************/
    public static MyProcessWindowFunction extends ProcessWindowFunction<IN, OUT, KEY, W extends Window> 

  • IN:input,数据流中窗口任务的输入数据类型。

  • OUT:output,窗口任务进行计算之后的输出数据类型。

  • KEY:数据中键 key 的类型。

  • W:窗口的类型,是Window 的子类型。一般情况下我们定义时间窗口,W 就是TimeWindow。

package com.peng.window;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.HashSet;

/**
 * @author 海绵先生
 * @Description TODO 自定义实现ProcessWindowFunction方法
 * @date 2022/11/11-15:56
 */
public class ProcessWindowFunctionDemo01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // source
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        })
                );
        stream.print("input");

        // 使用ProcessWindowFunction计算UV
        stream.keyBy(data -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new MyProcessWindowFunction())
                .print();

        env.execute();
    }

    //ProcessWindowFunction是一个抽象类,继承于AbstractRichFunction
    public static class MyProcessWindowFunction extends ProcessWindowFunction<Event, String, Boolean, TimeWindow>{//第一个参数:输入数据类型,第二个参数:输出数据类型,第三个参数:KEY值数据类型,第四个参数:窗体时间类型
        
        // 窗口结束时调用process方法
        @Override
        public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            // 用一个HashSet保存user->作用:去重
            HashSet<String> userSet = new HashSet<>();

            // elements中遍历数据,放到set中去重
            for (Event event : elements) {
                userSet.add(event.getUser());
            }

            int uv = userSet.size();
            //结合窗口信息
            long start = context.window().getStart();
            long end = context.window().getEnd();
            out.collect("窗口" + new Timestamp(start) + "~" + new Timestamp(end)
            + "UV值为:" + uv);

        }
    }
}

output

侧输出流

处理函数还有另外一个特有功能,就是将自定义的数据放入“侧输出流”(side output)输出。

绝大多数转换算子,输出的都是单一流,流里的数据类型只能有一种。如果使用 filter 算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费,而侧输出流可以认为是“主流”上分叉出的“支流”,所以可以由一条流产生出多条流,而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。

具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文的.output()方法就可以了。

   SingleOutputStreamOperator<Event> processedStream = eventSingleOutputStreamOperator.process(new ProcessFunction<Event, Event>() {
           @Override
           public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
               //out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag
               if (value.getUser().equals("zhangsan")) {
                   // 这里可以看到,侧输出流的数据类型不一定是ProcessFunction里已定义好的数据类型
                   ctx.output(zhangsanTag, Tuple3.of(value.getUrl(), value.getUrl(), value.getTimestamp()));
               } else if (value.getUser().equals("wangwu")) {
                   ctx.output(wangwuTag, Tuple3.of(value.getUrl(), value.getUrl(), value.getTimestamp()));
               } else {
                   out.collect(value);
               }

           }
       });

这里 output()方法需要传入两个参数,第一个是一个“输出标签”OutputTag,用来标识侧输出流,一般会在外部统一声明;第二个就是要输出的数据。

我们可以在外部先将OutputTag 声明出来:

   	OutputTag<Tuple3<String, String, Long>> zhangsanTag = new OutputTag<Tuple3<String, String, Long>>("zhangsan"){};
       OutputTag<Tuple3<String, String, Long>> wangwuTag = new OutputTag<Tuple3<String, String, Long>>("wangwu"){};

不加{},Flink可能推断不出OutputTag的数据类型

如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用.getSideOutput() 方法,传入对应的OutputTag,这个方式与窗口API 中获取侧输出流是完全一样的。

processedStream.getSideOutput(zhangsanTag).print("zhangsan");
processedStream.getSideOutput(wangwuTag).print("wangwu");

完整测试代码:

SideOutPutStreamTest01.java

package com.peng.process;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

/**
 * @author 海绵先生
 * @Description TODO 测试侧输出流
 * @date 2022/11/11-20:43
 */
public class SideOutPutStreamTest01 {
    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // source
        DataStreamSource<Event> dataStreamSource = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        // 定义一个输出标签
        OutputTag<Tuple3<String, String, Long>> zhangsanTag = new OutputTag<Tuple3<String, String, Long>>("zhangsan"){};
        OutputTag<Tuple3<String, String, Long>> wangwuTag = new OutputTag<Tuple3<String, String, Long>>("wangwu"){};


        SingleOutputStreamOperator<Event> processedStream = eventSingleOutputStreamOperator.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
                if (value.getUser().equals("zhangsan")) {
                    ctx.output(zhangsanTag, Tuple3.of(value.getUrl(), value.getUrl(), value.getTimestamp()));
                } else if (value.getUser().equals("wangwu")) {
                    ctx.output(wangwuTag, Tuple3.of(value.getUrl(), value.getUrl(), value.getTimestamp()));
                } else {
                    out.collect(value);
                }

            }
        });

        processedStream.getSideOutput(zhangsanTag).print("zhangsan");
        processedStream.getSideOutput(wangwuTag).print("wangwu");
        processedStream.print("else");

        env.execute();
    }
}

ClinkSource.java

package com.peng.process;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Timestamp;
import java.util.Random;

/**
 * @author 海绵先生
 * @Description TODO 自定义数据源
 * @date 2022/11/6-19:40
 */
public class ClickSource extends RichParallelSourceFunction<Event> {
    private String[] names = {"zhangsan","lisi","wangwu","zhaoliu"};
    private String[] urls = {"./user","./lookspouce","./like"};
    private boolean flag = true;
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        Random random = new Random();
        while (flag){
            String user = names[random.nextInt(4)];
            String url = urls[random.nextInt(3)];
            long timeMillis = System.currentTimeMillis();
            ctx.collect(new Event(user,url,timeMillis));
            Thread.sleep(2000);// 每两秒产生一次数据
        }
    }

    @Override
    public void cancel() {
        flag = false;

    }
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Event{
    private String user;
    private String url;
    private Long timestamp;

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';
    }
}

运行输出结果:

else> Event{user='zhaoliu', url='./lookspouce', timestamp=2022-11-11 21:21:52.9}
else> Event{user='zhaoliu', url='./lookspouce', timestamp=2022-11-11 21:21:55.065}
zhangsan> (./lookspouce,./lookspouce,1668172917075)
wangwu> (./lookspouce,./lookspouce,1668172919080)
wangwu> (./lookspouce,./lookspouce,1668172921087)
wangwu> (./user,./user,1668172923089)
wangwu> (./user,./user,1668172925097)
zhangsan> (./like,./like,1668172927100)
wangwu> (./lookspouce,./lookspouce,1668172929111)
wangwu> (./like,./like,1668172931122)
else> Event{user='lisi', url='./user', timestamp=2022-11-11 21:22:13.135}
else> Event{user='lisi', url='./user', timestamp=2022-11-11 21:22:15.145}

标签:Flink,flink,ctx,new,org,apache,import,处理函数
From: https://www.cnblogs.com/Mr-Sponge/p/17037101.html

相关文章

  • 基于Flink CDC的现代数据栈 (Modern Data Stack)实现
    ......
  • 基于SpringBoot 使用 Flink 收发Kafka消息
    前言这周学习下Flink相关的知识,学习到一个读写Kafka消息的示例,自己动手实践了一下,别人示例使用的是普通的JavaMain方法,没有用到springboot.我们在实际工作中会使用spr......
  • MySQL4 - 数据处理函数(单行)
    数据处理函数又被称为单行处理函数特点:一个输入对应一个输出,相应的多行处理函数:多个输入(处理多条记录)一个输出常见函数:lower转换为小写upper转换为大写SELECTL......
  • 为什么巨头的 Flink 作业运行都在 YARN 上?(附源码)
    曾有人调侃:HBase没有资源什么事情也做不了,Spark占用了资源却没有事情可做? 那YARN了解一下?01YARN!伴随着Hadoop生态的发展,不断涌现了多种多样的技术组件Hive、HBase、Spa......
  • Flink:CEP
    基本概念CEPCEP就是复杂事件处理(ComplexEventProcessing)的缩写。而FlinkCEP就是Flink实现的一个用于复杂事件处理的库。具体的处理过程是,把事件流中的一个个简......
  • Flink:CEP
    基本概念CEPCEP就是复杂事件处理(ComplexEventProcessing)的缩写。而FlinkCEP就是Flink实现的一个用于复杂事件处理的库。具体的处理过程是,把事件流中的一个个简......
  • Flink Spark jdbc读写数据库导致oom和提升性能解决办法
     fetchsize=Integer.MIN_VALUE 作用如果不设置上述值,默认读取jdbc数据时,会默认获取所有的行到resultset中,数据量大会导致oom和占用大量内存reWriteBatchedInserts=t......
  • Flink:TableAPI 和 SQL
    快速上手引入依赖要在代码中使用TableAPI,必须引入相关的依赖。这里的依赖是一个Java的“桥接器”,主要就是负责TableAPI和下层DataStreamAPI的连接支持,按照不同的......
  • Flink mini-batch "引发" 的乱序问题
    问题描述近期业务反馈,开启了mini-batch之后,出现了数据不准的情况,关掉了mini-batch之后,就正常了,因此业务方怀疑,是不是Flink的mini-batch存在bug?问题排查......
  • flink orc hive 2.1.1 源码bug处理
    先说一下我们公司的线上集群配置: CDH6.3.1,hive2.1.1 ,由于公司是做车联网业务方向的,所以数据量很大,同事小A,在往集群写数据,发现写入的数据不能在hive表里查询,他写往......