首页 > 其他分享 >Flink:DataStreamAPI

Flink:DataStreamAPI

时间:2022-12-21 20:56:01浏览次数:46  
标签:stream user Flink public DataStreamAPI new event Event

执行环境

获取的执行环境是 StreamExecutionEnvironment 类的对象。在代码中创建执行环境的方法,就是调用这个类的静态方法。

getExecutionEnvironment

根据上下文直接得到正确的结果:

  • 如果程序是独立运行的,返回一个本地执行环境。
  • 如果创建了 jar 包,从命令行调用它并提交到集群执行,返回集群的执行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

createLocalEnvironment

返回一个本地执行环境,可以在调用时传入一个参数,指定默认的并行度,如果不传入,并行度默认为本地的 CPU 核数。

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

createRemoteEnvironment

返回集群执行环境,需要在调用时指定 JobManager 的主机名和端口,并指定要在集群中运行的 Jar 包。

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("host", 777, "path/to/jarFile.jar");

源算子(Source)

一般将数据的输入来源称作数据源(data source),而读取数据的算子就是源算子(source operator)。

准备工作

构建一个实际应用场景,网站的访问操作,可以抽象成一个三元组(用户名,访问的 url,时间戳)。

创建一个 Event 类,将用户行为包装成一个对象。需要满足以下条件:

  • 类是公有的
  • 属性是公有的
  • 属性的类型是可以序列化的

Flink 会把这样的类作为一个特殊的 POJO 数据类型来对待,方便数据的解析和序列化。

public class Event {
    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';
    }
}

添加数据源方法

从文本中获取数据

DataStreamSource<String> stream = env.readTextFile("datas/clicks.txt");
stream.print("txt");

从集合中读取数据

ArrayList<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(2);
DataStreamSource<Integer> numStream = env.fromCollection(nums);
numStream.print("nums");

ArrayList<Event> events = new ArrayList<>();
events.add(new Event("Mary", "./home", 1000L));
events.add(new Event("Bob", "./cart", 2000L));
DataStreamSource<Event> eventStream = env.fromCollection(events);
eventStream.print("event");

从元素中读取数据

DataStreamSource<Event> eventStream2 = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L)
);
eventStream2.print("event2");

从 socket 文本流中读取

DataStreamSource<String> lineDataStream = env.socketTextStream("127.0.0.1", 7777);
lineDataStream.print("socket");

从 kafka 中读取数据

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1");
DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));
kafkaStream.print("kafka");

自定义 Source

DataStreamSource<Event> customStream = env.addSource(new ClickSource());
customStream.print();
public class ClickSource implements SourceFunction<Event> {
    // 声明一个标志位
    private Boolean running = true;

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        Random random = new Random();
        String[] users = {"Marry", "Alice", "Bob"};
        String[] urls = {"./home", "./cart", "./fav"};
        while (true) {
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            Long timestamp = Calendar.getInstance().getTimeInMillis();
            sourceContext.collect(new Event(user, url, timestamp));
            Thread.sleep(1000L);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

自定义并行 Source

DataStreamSource<Integer> parallelCustomStream = env.addSource(new ParallelCustomSource()).setParallelism(2);
parallelCustomStream.print();
public class ParallelCustomSource implements ParallelSourceFunction<Integer> {
    private Boolean running = true;
    private Random random = new Random();

    @Override
    public void run(SourceContext<Integer> sourceContext) throws Exception {
        while (true) {
            sourceContext.collect(random.nextInt());
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

数据类型

Flink 使用“类型信息”来统一表示数据类型。TypeInformation 类是 Flink 中所有类型描述符的基类,它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

  • 基本类型:Java 基本类型及其包装类,再加上 Void、String、Date、BigDecimal 和 BigInteger
  • 数组类型:基本类型数组(Primitive Array)和对象数组(Object Array)
  • 复合数据类型
    • Java 元组类型(Tuple):Tuple0~Tuple25,不支持空字段
    • Scala 样例类及 Scala 元组:不支持空字段
    • 行类型(Row):可以认为是具有任意个字段的元组,并支持空字段
    • POJO:Flink 自定义的类似 Java Bean 模式的类
      • 类是公共的(public)和独立的(standalone,没有非静态的内部类)
      • 类有一个公共的无参构造方法
      • 类的所有字段是 public 且非 final 的
  • 辅助类型:Option、Either、List、Map 等
  • 泛化类型(Generic):如果没有按照 POJO 类型的要求定义,就会被当成泛化类型

类型提示

Flink 的类型提取系统可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。

但是,由于 Java 中泛型擦出的存在,在某些特殊情况下(Lambda 表达式),自动提取的信息不够精细。

因此,Flink 专门提供了 TypeHint 类,可以捕捉泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。也可以通过 .return() 方法,明确指定转化后 DataStream 里的元素类型。

转换算子(Transformation)

数据源读入数据之后,就可以使用各种转换算子,将一个或者多个 DataStream 转换为新的 DataStream。

一个 Flink 程序的核心就是所有的转换操作,它们决定了处理的业务逻辑。

基本转换算子

映射 Map

map 主要用于将数据流中的数据进行转换,形成新的数据流。

只需要基于 DataStream 调用 map() 方法即可。方法需要传入的参数是接口 MapFunction 的实现,返回类型还是 DataStream,不过泛型(流中的元素类型)可能改变。

// 从元素中提取数据
DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L)
);

// 进行转换,提取user字段
// 1.使用自定义,实现MapFunction接口
SingleOutputStreamOperator<String> result1 = stream.map(new MyMapper());

// 2.使用匿名类实现MapFunction接口
SingleOutputStreamOperator<String> result2 = stream.map(new MapFunction<Event, String>() {
    @Override
    public String map(Event event) throws Exception {
        return event.user;
    }
});

// 3.传入lambda函数
SingleOutputStreamOperator<String> result3 = stream.map(data -> data.user);
// 自定义MapFunction
public static class MyMapper implements MapFunction<Event, String>{
    @Override
    public String map(Event event) throws Exception {
        return event.user;
    }
}

过滤 Filter

filter 操作通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,如果是 True 则元素正常输出,反之过滤掉。

进行 filter 操作后,流中的数据类型不发生改变。filter 转换需要传入的参数需要实现 FilterFunction 接口,而FilterFunction 接口需要实现 filter() 方法,相当于一个返回布尔类型的条件表达式。

// 从元素中提取数据
DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L)
);

// 进行转换,只留下Bob
// 1.传入一个实现FilterFunction接口的类
SingleOutputStreamOperator<Event> result1 = stream.filter(new MyFilter());

// 2.使用匿名类实现FilterFunction接口
SingleOutputStreamOperator<Event> result2 = stream.filter(new FilterFunction<Event>() {
    @Override
    public boolean filter(Event event) throws Exception {
        return event.user.equals("Bob");
    }
});

// 3.传入lambda函数
SingleOutputStreamOperator<Event> result3 = stream.filter(event -> event.user.equals("Bob"));
// 自定义FilterFunction
public static class MyFilter implements FilterFunction<Event>{
    @Override
    public boolean filter(Event event) throws Exception {
        return event.user.equals("Bob");
    }
}

扁平映射 FlatMap

flatMap 操作主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。

可以认为是“扁平化”和“映射”两步操作,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素进行转换处理。

与 map 一样,flatMap 可以使用 lambda 表达式或者实现 FlatMapFunction 接口来进行传参。

// 从元素中提取数据
DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L)
);

// 1.传入一个实现FlatMapFunction接口的类
stream.flatMap(new MyFlatMap()).print();

// 2.使用匿名类实现FlatMapFunction接口
stream.flatMap(new FlatMapFunction<Event, String>() {
    @Override
    public void flatMap(Event event, Collector<String> collector) throws Exception {
        collector.collect(event.user);
        collector.collect(event.url);
        collector.collect(event.timestamp.toString());
    }
}).print();

// 3.传入lambda函数
stream.flatMap((Event event, Collector<String>collector) -> {
    collector.collect(event.user);
    collector.collect(event.url);
    collector.collect(event.timestamp.toString());
}).returns(new TypeHint<String>() {}).print();
// 自定义FlatMapFunction
public static class MyFlatMap implements FlatMapFunction<Event, String>{
    @Override
    public void flatMap(Event event, Collector<String> collector) throws Exception {
        collector.collect(event.user);
        collector.collect(event.url);
        collector.collect(event.timestamp.toString());
    }
}

聚合算子

聚合(Aggregation):计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于把所有数据聚在一起进行汇总合并。对应于 MapReduce 中的 reduce 操作。

按键分区 keyBy

对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。而我们对海量数据进行聚合要先进行分区并行处理,这样才能提高效率。所以,做聚合之前,需要先用 keyBy 进行分区。

keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里的分区是并行处理的子任务,对应着任务槽(task slot)。

在内部,keyBy 操作是通过计算 key 的哈希值对分区数进行取模来实现的。所以 key 如果是 POJO 的话,必须要重写 hashCode 方法。

// 从元素中提取数据
DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L),
        new Event("Bob", "./home", 3000L),
        new Event("Mary", "./cart", 2000L)
);

// 按键分组之后进行聚合,然后提取用户最近一次操作
// max是只更新目标字段,其他字段保持该key首次出现的值
stream.keyBy(new KeySelector<Event, String>() {
    @Override
    public String getKey(Event event) throws Exception {
        return event.user;
    }
}).max("timestamp").print("max:");

// 传入lambda函数
// maxBy是更新所有字段
stream.keyBy(event -> event.user).maxBy("timestamp").print("maxBy:");

reduce

// 从元素中提取数据
DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L),
        new Event("Bob", "./home", 3000L),
        new Event("Mary", "./cart", 2000L),
        new Event("Mary", "./cart", 6000L)
);

// 需求:找出访问量最大的用户
// 1.统计每个用户的访问次数
SingleOutputStreamOperator<Tuple2<String, Long>> userClickNum = stream.map(event -> Tuple2.of(event.user, 1L))
        .returns(new TypeHint<Tuple2<String, Long>>() {})
        .keyBy(tuple -> tuple.f0)
        .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

// 2.选取当前最活跃的用户
SingleOutputStreamOperator<Tuple2<String, Long>> result = userClickNum.keyBy(data -> "key")
        .reduce((t1, t2) -> t1.f1 > t2.f1 ? t1 : t2);

用户自定义函数 UDF

SourceFunction 接口、MapFunction 接口、ReduceFuntion 接口等都继承了 Function 接口,这个接口是空的,主要是为了方便扩展为单一抽象方法(Single Abstract Method,SAM)接口,即“函数接口”。

Java8 实现的 lambda 表达式就可以实现 SAM 接口,这样做的好处是,不仅可以通过自定义函数类或者匿名函数类来实现接口,也可以直接传入 lambda 表达式。这就是用户自定义函数(user-defined function,UDF)。

函数类

对于大多是操作而言,都需要传入一个用户自定义函数(UDF),实现相关操作的接口,来完成处理逻辑的定义。

如前面基本转换算子例子中方法一(自定义函数类)和方法二(匿名函数类)。

Lambda

Lambda 表达式允许以简介的方式实现函数,以及将函数作为参数进行传递,而不必申明额外的(匿名)类。

Flink 所有的算子都可以使用 Lambda 表达式,但是,当 Lambda 表达式使用 Java 的泛型时,需要显示的申明类型信息。

富函数类

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其 Rich 版本。

富函数类一般以抽象类的形式出现,如 RichMapFunction。与常规函数类不同的是,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,可以实现更复杂的功能。

RichFunction 有生命周期的概念。典型的生命周期方法有:

  • open() 方法,是 Rich Function 的初始化方法,会开启一个算子的生命周期。当一个算子被调用之前,open() 方法会首先被调用。所以像文件 IO 的创建、数据库的连接、配置文件的读取等一次性的工作,都适合在 open() 中实现。
  • 方法中完成。。
  • close() 方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。

需要注意的是,对于一个并行子任务来说,生命周期方法只会调用一次;而对应的,例如 RichMapFunction 中的map(),在每条数据到来后都会触发一次调用。

// 从元素中提取数据
DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L)
);

stream.map(new MyRichMap()).print();
public static class MyRichMap extends RichMapFunction<Event, Integer>{
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        System.out.println("open生命周期被调用" + getRuntimeContext().getIndexOfThisSubtask() + "号任务启动");
    }

    @Override
    public Integer map(Event event) throws Exception {
        return event.url.length();
    }

    @Override
    public void close() throws Exception {
        super.close();
        System.out.println("close生命周期被调用" + getRuntimeContext().getIndexOfThisSubtask() + "号任务结束");
    }
}

物理分区

“分区”操作就是要将数据进行重新分布,传递到不同的流分区,进行下一步处理。

keyBy 操作就是一种按照键的哈希值进行重新分区的操作,只不过这种分区操作只能保证把数据按 key “分开”,至于分得均不均匀、每个 key 的数据具体会分到哪一分区去,这些是完全无从控制的,因此是逻辑分区(软分区)。

物理分区(physical partitioning)就是真正的控制分区策略,精准地调配数据,告诉每个 数据到底去哪里。

// 从元素中提取数据
DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L),
        new Event("Bob", "./home", 3000L),
        new Event("Mary", "./cart", 2000L)
);

// 1.随机分区
stream.shuffle().print().setParallelism(4);

// 2.轮询分区
stream.rebalance().print().setParallelism(4);

// 3.重缩放分区
env.addSource(new RichParallelSourceFunction<Integer>() {
    @Override
    public void run(SourceContext<Integer> sourceContext) throws Exception {
        for (int i = 0; i < 8; i++) {
            // 将奇偶数分别发送到0号和1号并行分区
            if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                sourceContext.collect(i);
            }
        }
    }

    @Override
    public void cancel() {

    }
}).setParallelism(2).rescale().print().setParallelism(4);

// 广播
stream.broadcast().print().setParallelism(4);

// 全局分区
stream.global().print();

// 自定义重分区
env.fromElements(1, 2, 3, 4, 5, 6, 7, 8).partitionCustom(new Partitioner<Integer>() {
    @Override
    public int partition(Integer integer, int i) {
        return integer % 2;
    }
}, new KeySelector<Integer, Integer>() {
    @Override
    public Integer getKey(Integer integer) throws Exception {
        return integer;
    }
}).print();

输出算子(Sink)

连接到外部系统

Flink 作为一个快速的分布式实时流处理系统,对稳定性和容错性要求极高。

“状态一致性”是指一旦出现故障,我们应该有能力恢复之前的状态,保障处理结果的正确性。

Flink 内部提供了一致性检查点(checkpoint)来保障我们可以回滚到正确的状态。但如果我们在处理过程中任意读写外部系统,发生故障后就很难回退到从前了。为了避免这样的问题,DataStream API 专门提供了向外部写入数据的 addSink 方法,addSink 方法对应着一个“Sink”算子,主要是用来实现与外部系统连接并将数据写入。

Flink 所有对外的输出操作,一般都是利用 Sink 算子完成的。

输出到文件

Flink 提供了一个流式文件系统的连接器:StreamingFileSink,它继承自抽象类 RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactly once)的一致性语义。

StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink 支持的文件系统。它可以保证精确一次的状态一致性。主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件。

我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。

DataStreamSource<Event> stream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L),
        new Event("Bob", "./home", 3000L),
        new Event("Mary", "./cart", 2000L)
);

StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),
                new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(
                DefaultRollingPolicy.builder()
                        // 最大大小设置为1G
                        .withMaxPartSize(1024 * 1024 * 1024)
                        // 间隔15分钟就滚动一次(开启新的文件)
                        .withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))
                        // 间隔5分钟没有任何数据,就认为任务结束
                        .withInactivityInterval(TimeUnit.MINUTES.toMinutes(5))
                .build())
        .build();

stream.map(event -> event.toString()).addSink(streamingFileSink);

输出到 Kafka

Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。

输出到 Redis

Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目提供了 Flink-Redis 的连接工具。

  • 导入 Redis 连接器依赖
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
  • 启动 Redis 集群
  • 编写代码
DataStreamSource<Event> stream = env.addSource(new ClickSource());

// 创建一个jedis连接配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();

// 写入Redis
stream.addSink(new RedisSink<>(config, new MyRedisMapper()));
// 自定义类实现RedisMapper接口
public static class MyRedisMapper implements RedisMapper<Event> {
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "table_name");
    }

    @Override
    public String getKeyFromData(Event event) {
        return event.user;
    }

    @Override
    public String getValueFromData(Event event) {
        return event.url;
    }
}

输出到 Elasticsearch

Flink 为 Elasticsearch 专门提供了官方的 Sink 连接器。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
    <version>1.12.1</version>
</dependency>
// 定义hosts列表
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200));

// 定义ElasticsearchSinkFunction
ElasticsearchSinkFunction<Event> EsSinkFunction = new ElasticsearchSinkFunction<Event>() {
    @Override
    public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
        HashMap<String, String> map = new HashMap<>();
        map.put(event.user, event.url);
        // 构建一个IndexRequest
        IndexRequest request = Requests.indexRequest().index("table_name").source(map);
        requestIndexer.add(request);
    }
};

// 写入ES
stream.addSink(new ElasticsearchSink.Builder<>(httpHosts, EsSinkFunction).build());

输出到 MySQL

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.12.1</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>
stream.addSink(JdbcSink.sink(
        "insert into table_name (user, url) value (?, ?)",
        ((preparedStatement, event) -> {
            preparedStatement.setString(1, event.user);
            preparedStatement.setString(2, event.url);
        }),
        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://localhost:3306/test?serverTimezone=GMT%useSSL=false")
                .withDriverName("com.mysql.cj.jdbc.Driver")
                .withUsername("root")
                .withPassword("12345678")
                .build()
));

自定义 Sink 输出

Flink 提供了通用的 SinkFunction 接口和对应的 RichSinkDunction 抽象类,只要实现它,通过简单地调用 DataStream 的 addSink() 方法就可以自定义写入任何外部存储器中。

在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

标签:stream,user,Flink,public,DataStreamAPI,new,event,Event
From: https://www.cnblogs.com/fireonfire/p/16997228.html

相关文章

  • Flink使用TableAPi方式读取和写入Hive
    以下是一个简单的参考实例,用来验证通过FlinkSQL来跑批方式清洗Hive数据可行的。(1)验证了Hive中org.openx.data.jsonserde.JsonSerDe格式的表是可以直接读取数据出来的(2)通......
  • Flink的概念、特点及运行原理
    Flink是一个针对流数据和批数据的分布式处理引擎,主要用Java代码实现。目前,Flink主要还是依靠开源社区的贡献来发展的。对于Flink,其处理的数据主要是流数据,批数据只是流......
  • XTransfer技术专家亮相Flink Forward Asia 2022
    FlinkForward是由Apache官方授权的ApacheFlink社区官方技术大会,本次大会介绍了Flink社区的最新动态和发展计划,包括FlinkTableStore流批一体的新数仓架构、FlinkCDC全增......
  • Windows安装Flink20220915
    1.官方下载地址 https://flink.apache.org/zh/downloads.html#apache-flink-1144  最好用国内镜像下载比较快 下载后对压缩包解压,路径自定义2.安装包中是不含启动bat......
  • Flink 运行错误 java.lang.OutOfMemoryError: Direct buffer memory
    如遇到如下错误,表示需要调大配置项taskmanager.memory.framework.off-heap.size的值,taskmanager.memory.framework.off-heap.size的默认值为128MB,错误显示不够用需要调......
  • Linux 安装 Flink
    文档:https://ifeve.com/flink-quick-start/下载地址:https://flink.apache.org/downloads.html下载:https://dlcdn.apache.org/flink/flink-1.15.1/flink-1.15.1-bin-scal......
  • 基于云原生的集群自愈系统 Flink Cluster Inspector
    作者:舟柒、楼台1.业务背景与挑战1.1实时计算集群现状关于热点机器处理一直是阿里云Flink集群运维的一大痛点,不管在日常还是大促都已经是比较严重的问题,同时这也是分布......
  • IDEA上运行Flink任务
    欢迎访问我的GitHub这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demosIDEA是常用的IDE,我们编写的flink任务代码如果能直接在IDEA运行,......
  • Flink1.9.2源码编译和使用
    欢迎访问我的GitHub这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos为什么要编译flink源码用于学习,在IDEA上开发的flinkjob,能直接......
  • 回顾 | Apache Flink Meetup 杭州站圆满结束(附PPT下载)
    5月16日,2020年首场ApacheFlinkMeetup·杭州站在线直播圆满结束。本次Meetup邀请了来自袋鼠云、网易云音乐、有赞及阿里巴巴的四位技术专家分享关于实时数仓、1.10......