首页 > 编程语言 >Flink之状态编程 值状态(ValueState)列表状态(ListState)映射状态(MapState)归约状态(ReducingState)聚合状态(AggregatingState)广播状态

Flink之状态编程 值状态(ValueState)列表状态(ListState)映射状态(MapState)归约状态(ReducingState)聚合状态(AggregatingState)广播状态

时间:2024-02-01 09:02:11浏览次数:30  
标签:状态 ListState flink 归约 api import apache org public

Flink之状态编程 值状态(ValueState)列表状态(ListState)映射状态(MapState)归约状态(ReducingState)聚合状态(AggregatingState)广播状态(BroadcastState)

Flink之状态编程


一、按键分区状态(Keyed State)

1.1、值状态(ValueState)

1.1.1、定义

状态中只保存一个“值”(value)。ValueState本身是一个接口,源码中定义如下:

public interface ValueState<T> extends State {
T value() throws IOException;
void update(T value) throws IOException;
}
  • 1
  • 2
  • 3
  • 4

1.1.2、使用案例

利用ValueState和定时器每10秒输出一次用户的pv量

package com.hpsk.flink.state;
import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class ValueStateDS {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.addSource(new EventWithWatermarkSource());
SingleOutputStreamOperator<String> result = stream.keyBy(t -> t.user)
.process(new KeyedProcessFunction<String, Event, String>() {
  // 定义两个状态,保存当前 pv 值,以及定时器时间戳
  private ValueState<Long> valueState;
  private ValueState<Long> timerTsState;
  @Override
  public void open(Configuration parameters) throws Exception {
      valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("value-state", Long.class));
      timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class));
  }
  @Override
  public void processElement(Event value, Context ctx, Collector<String> collector) throws Exception {
      Long count = valueState.value();
      if (count == null) {
          valueState.update(1L);
      } else {
          valueState.update(count + 1);
      }
      // 注册定时器
      if (timerTsState.value() == null) {
          ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L);
          timerTsState.update(value.timestamp + 10 * 1000L);
      }
  }
  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
      out.collect("时间:"+new Timestamp(timestamp) + " ->用户:"+  ctx.getCurrentKey() + "的pv值为:" + valueState.value());
      timerTsState.clear();
  }
});
result.print(">>>>");
env.execute();
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

1.2、列表状态(ListState)

1.2.1、定义

将需要保存的数据,以列表(List)的形式组织起来。在 ListState接口中同样有一个类型参数 T,表示列表中数据的类型。ListState 也提供了一系列的方法来操作状态,使用方式
与一般的 List 非常相似。

1.2.2、使用案例

利用ListState进行实现sql中的join操作

package com.hpsk.flink.state;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
public class ListStateDS {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env.fromElements(
Tuple3.of("a", "stream-1", 1000L),
Tuple3.of("b", "stream-1", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {
return stringStringLongTuple3.f2;
}
}));
SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env.fromElements(
Tuple3.of("a", "stream-2", 3000L),
Tuple3.of("b", "stream-2", 4000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {
return stringStringLongTuple3.f2;
}
}));
stream1.keyBy(r -> r.f0)
.connect(stream2.keyBy(r -> r.f0))
.process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
  private ListState<Tuple3<String, String, Long>> listState1;
  private ListState<Tuple3<String, String, Long>> listState2;
  @Override
  public void open(Configuration parameters) throws Exception {
      listState1 = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("listState1", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG)));
      listState2 = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("listState2", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG)));
  }
  @Override
  public void processElement1(Tuple3<String, String, Long> left, Context context, Collector<String> out) throws Exception {
      listState1.add(left);
      for (Tuple3<String, String, Long> right : listState2.get()) {
          out.collect(left + " => " + right);
      }
  }
  @Override
  public void processElement2(Tuple3<String, String, Long> right, Context context, Collector<String> out) throws Exception {
      listState2.add(right);
      for (Tuple3<String, String, Long> left : listState1.get()) {
          out.collect(left + " => " + right);
      }
  }
})
.print();
env.execute();
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

1.3、映射状态(MapState)

1.3.1、定义

映射状态类似于 Java 中的 HashMap,可以记录一组KV的值,根据对应的Key对Value进行取值或更新。

1.3.2、使用案例

使用 KeyedProcessFunction 模拟滚动窗口

package com.hpsk.flink.state;
import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
// 使用 KeyedProcessFunction 模拟滚动窗口
public class MapStateDS {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> inputDS = env.addSource(new EventWithWatermarkSource());
SingleOutputStreamOperator<String> result = inputDS
.keyBy(t -> t.url)
.process(new FakeWindowResult(10000L));
result.print(">>>");
env.execute();
}
private static class FakeWindowResult extends KeyedProcessFunction<String, Event, String>{
// 定义属性,窗口长度
private Long windowSize;
// 声明状态,用map保存pv值(窗口 start,count)
private MapState<Long, Long> windowPvMapState;
public FakeWindowResult(Long windowSize) {
this.windowSize = windowSize;
}
@Override
public void open(Configuration parameters) throws Exception {
windowPvMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("mapState", Types.LONG, Types.LONG));
}
@Override
public void processElement(Event event, Context ctx, Collector<String> collector) throws Exception {
// 每来一条数据,就根据时间戳判断属于哪个窗口
long windowStart = event.timestamp / windowSize * windowSize;
long windowEnd = windowStart + windowSize;
// 注册 end -1 的定时器,窗口触发计算
ctx.timerService().registerEventTimeTimer(windowEnd - 1);
// 更新状态中的 pv 值
if (windowPvMapState.contains(windowStart)) {
Long pv = windowPvMapState.get(windowStart);
windowPvMapState.put(windowStart, pv + 1);
} else {
windowPvMapState.put(windowStart, 1L);
}
}
// 定时器触发,直接输出统计的 pv 结果
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
long windowEnd = timestamp + 1;
long windowStart = windowEnd - windowSize;
long pv = windowPvMapState.get(windowStart);
out.collect( "url:[" + ctx.getCurrentKey()
      + "]的访问量为" + pv
      + ",对应窗口为:[" + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd) + "]");
// 模拟窗口的销毁,清除 map 中的 key
windowPvMapState.remove(windowStart);
}
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

1.4、归约状态(ReducingState)

1.4.1、定义

将当前数据状态和之前的数据状态进行计算,内置reduce进行规约统计

1.4.2、使用案例

对用户点击事件流每 5 个数据统计一次平均时间戳

package com.hpsk.flink.state;
import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class ReducingStateDS {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> inputDS = env.addSource(new EventWithWatermarkSource());
inputDS.print("input ");
SingleOutputStreamOperator<String> result = inputDS
.keyBy(t -> t.user)
.flatMap(new RichFlatMapFunction<Event, String>() {
  // 定义ReducingState计算并存储结果
  private ReducingState<Tuple2<Event, Long>> reducingState;
  // 定义一个值状态,用来保存当前用户访问频次
  private ValueState<Long> countState;
  @Override
  public void open(Configuration parameters) throws Exception {
      // 初始化
      countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));
      reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Tuple2<Event, Long>>(
              "reducing-state",
              new ReduceFunction<Tuple2<Event, Long>>() {
                  @Override
                  public Tuple2<Event, Long> reduce(Tuple2<Event, Long> t1, Tuple2<Event, Long> t2) throws Exception {
                      // 对访问时间进行累加
                      return Tuple2.of(t1.f0, t2.f1 + t1.f1);
                  }
              },
              Types.TUPLE(Types.LONG, Types.LONG)
      ));
  }
  @Override
  public void flatMap(Event event, Collector<String> collector) throws Exception {
      Long count = countState.value();
      if (count == null) {
          count = 1L;
      } else {
          count ++;
      }
      // 更新访问次数
      countState.update(count);
      // 累加计算访问时间
      reducingState.add(Tuple2.of(event, event.timestamp));
      if (count == 5) {
          // 访问5次,输出平均访问时间
          collector.collect(event.user + " " + new Timestamp(reducingState.get().f1 / count));
          // 清空状态
          countState.clear();
          reducingState.clear();
      }
  }
});
result.print("output ");
env.execute();
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

1.5、聚合状态(AggregatingState)

1.5.1、定义

将状态表示为一个用于聚合操作的列表

1.5.2、使用案例

对用户点击事件流每 5 个数据统计一次平均时间戳

package com.hpsk.flink.state;
import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class AggregatingStateDS {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> inputDS = env.addSource(new EventWithWatermarkSource());
inputDS.print("input ");
SingleOutputStreamOperator<String> result = inputDS
.keyBy(t -> t.user)
.flatMap(new RichFlatMapFunction<Event, String>() {
// 定义聚合状态,用来计算平均时间戳
private AggregatingState<Event, Long> aggregatingState;
// 定义一个值状态,用来保存当前用户访问频次
private ValueState<Long> countState;
@Override
public void open(Configuration parameters) throws Exception {
countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));
aggregatingState = getRuntimeContext().getAggregatingState(
      new AggregatingStateDescriptor<Event, Tuple2<Long, Long>, Long>(
              "avg-ts",
              new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {
                  @Override
                  public Tuple2<Long, Long> createAccumulator() {
                      return Tuple2.of(0L, 0L);
                  }
                  @Override
                  public Tuple2<Long, Long> add(Event event, Tuple2<Long, Long> accumulator) {
                      return Tuple2.of(accumulator.f0 + event.timestamp, accumulator.f1 + 1);
                  }
                  @Override
                  public Long getResult(Tuple2<Long, Long> accumulator) {
                      return accumulator.f0 / accumulator.f1;
                  }
                  @Override
                  public Tuple2<Long, Long> merge(Tuple2<Long, Long> longLongTuple2, Tuple2<Long, Long> acc1) {
                      return null;
                  }
              },
              Types.TUPLE(Types.LONG, Types.LONG)));
}
@Override
public void flatMap(Event event, Collector<String> out) throws Exception {
Long count = countState.value();
if (count == null) {
  count = 1L;
} else {
  count ++;
}
countState.update(count);
aggregatingState.add(event);
if (count == 5) {
  out.collect((event.user + " 平均时间戳: " + new Timestamp(aggregatingState.get())));
  countState.clear();
}
}
});
result.print("output ");
env.execute();
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89

二、广播状态(BroadcastState)

2.1、定义

有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。

2.2、案例

模拟动态配置实时数仓中维度表创建

package com.hpsk.flink.function;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
// 模拟动态配置实时数仓中维度表创建
public class BroadcastProcessFunctionDS {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.配置流:维度表的配置表
SingleOutputStreamOperator<String> tableConfigStream = env.fromElements(
"table1,createTable",
"table2,createTable",
"table3,createTable");
// 3.主流:业务库实时数据流
SingleOutputStreamOperator<Tuple2<String, String>> MySqlTableStream = env.socketTextStream("hadoop102", 8888)
.map(line -> Tuple2.of(line.split(",")[0], line.split(",")[1]))
.returns(Types.TUPLE(Types.STRING, Types.STRING));
// 将配置流处理成广播流
MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, String.class);
BroadcastStream<String> broadcast = tableConfigStream.broadcast(mapStateDescriptor);
// 连接主流与广播流成连接流, 处理连接流,根据配置信息处理主流数据
BroadcastConnectedStream<Tuple2<String, String>, String> connectedStream = MySqlTableStream.connect(broadcast);
SingleOutputStreamOperator<String> result = connectedStream.process(new MyBroadcastProcessFunction(mapStateDescriptor));
// 5.输出结果
result.print("output ");
// 6.执行
env.execute();
}
public static class MyBroadcastProcessFunction extends BroadcastProcessFunction<Tuple2<String, String>, String, String>{
private MapStateDescriptor<String, String> mapStateDescriptor;
public MyBroadcastProcessFunction(MapStateDescriptor<String, String> mapStateDescriptor) {
this.mapStateDescriptor = mapStateDescriptor;
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String[] split = value.split(",");
broadcastState.put(split[0].trim(), split[1].trim());
}
@Override
public void processElement(Tuple2<String, String> value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String table = value.f0;
String create = broadcastState.get(table);
if (create != null) {
out.collect(value.f0 + "为配置表,需要在phoenix中建表 -> 建表语句:" + create + ", 数据为:" + value.f1);
} else {
out.collect(value.f0 + "业务表, 跳过建表");
}
}
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
原文链接:https://blog.csdn.net/qq_41924766/article/details/130834208

标签:状态,ListState,flink,归约,api,import,apache,org,public
From: https://www.cnblogs.com/sunny3158/p/18000481

相关文章

  • flink状态编程
    flink状态编程简单记录一下最近工作中常用的flink状态flink中可以创建不同类型的状态,如键控状态(KeyedState)和操作符状态(OperatorState)等。状态管理是在流处理的整个过程中保持状态的一种能力,它让我们能够在复杂的事件处理和流转换中保留重要的状态信息,例如:聚合结果、过滤条件......
  • 初始安装 Prometheus 监控k8s组件 target 状态显示失败的处理办法
    当我们使用Kubeadm完成Kubernetes的搭建时,默认控制面的组件的metrics端点会监听在本机127.0.0.1接口上,这会导致Prometheus/kube-Prometheus-stack开局配置无法从自动发现的端点拉取到指标。方法有如下两种:在使用kubeadm初始安装集群时,更新相关配置在Kubernete......
  • Windows Powershell 执行结束 返回状态码
    前言全局说明WindowsPowershell执行结束返回状态码一、1.源码用于将文件复制到文件夹的Powershell脚本$dest="C:est"New-Item$dest-typedirectory-force$source="c:samplefile.txt"Copy-Item$source$destexit$LASTEXITCODE$LASTEXITCODE保存Powershel......
  • 状态模式
    定义:允许一个对象在其内部状态改变时,改变它的行为类型:行为型适用场景:一个对象存在多个状态(不同状态下行为不同),且状态可以相互转换优点:将不同的状态隔离把各种状态的转换逻辑,分布到State的子类中,减少相互间依赖增加新的状态非常简单缺点:状态多的业务场景导致类数目增加,......
  • 深入浅出Java多线程(四):线程状态
    引言大家好,我是你们的老伙计秀才!今天带来的是[深入浅出Java多线程]系列的第四篇内容:线程状态。大家觉得有用请点赞,喜欢请关注!秀才在此谢过大家了!!!在现代软件开发中,多线程编程已经成为提升应用程序性能和响应能力的关键技术。Java作为一门支持多线程编程的主流语言,其内置的丰富并......
  • AS-Windows 客户端不显示文件状态图标
    关键字状态图标、注册表适用产品AS5.0.xASEnterprise6.0.xASExpress6.0.xASCloud6.0.x问题描述打开AnyShareWindows客户端不显示文件状态图标。 问题影响AnyShareWindows客户端文件状态图标不显示,无法判断文件状态,影响用户使用体验。问题原因杀毒软件等原因导致文件状......
  • 状态码是什么,为什么要对状态码进行监测
    很多企业和网络从业者在进行开展互联网业务经常会遇到出现状态码,如400、200等等。像http状态码在我们浏览网页、发送请求和接受信息的过程中都起着非常重要的作用。而状态码监测是网络编程中非常重要的一个环节,通过它可以判断客户端的请求是否成功,服务器是否正常处理请求,以及可能出......
  • openstack重置状态
    1.cindervolumesnapshot快照openstackvolumesnapshotset[--name<name>][--description<description>][--no-property][--property<key=value>[...]][--state<state>]<snapshot>-----------------......
  • iOS App审核状态和审核时间管理指南
    引言对于一款开发完成并准备上架的iOS应用程序来说,通过苹果公司的审核是非常重要的一步。苹果公司会对应用程序进行严格的检查,以确保应用程序的质量和安全性。本文将介绍iOS应用程序审核的流程和时间,希望能够帮助开发者更好地了解和处理审核过程中的问题。添加图片注释,不超......
  • 使用Java处理HTTP状态码:一场代码与数字的奇妙之旅
    在互联网的世界里,HTTP状态码就如同交通信号灯,告诉我们请求是否成功,或者出了什么问题。当我们在Java中与Web服务器打交道时,了解这些状态码是必不可少的。今天,就让我们一起踏上这段代码与数字的奇妙之旅,看看如何使用Java来处理这些HTTP状态码。首先,我们要明白HTTP状态码的作用。简单......