首页 > 编程语言 >48、Flink DataStream API 编程指南(1)- DataStream 入门示例

48、Flink DataStream API 编程指南(1)- DataStream 入门示例

时间:2023-12-18 12:01:25浏览次数:37  
标签:DataStream flink 示例 Flink environment program parallelism




文章目录

  • Flink 系列文章
  • 一、Flink DataStream API 编程指南
  • 1、DataStream 是什么?
  • 2、Flink 程序剖析
  • 3、第一个完整示例
  • 4、入门示例
  • 1)、maven依赖
  • 2)、代码
  • 3)、验证



本文介绍了Flink DataStream API的编程指南第一部分,即介绍flink的source、transformation和sink的编程过程以及入门示例。其中source和sink各自的内容分别给出了具体的示例以及关于transformation的关联文章介绍。
本专题内容较长,故分为三个部分,即:
48、Flink DataStream API 编程指南(1)- DataStream 入门示例

48、Flink DataStream API 编程指南(2)- DataStream的source、transformation、sink、调试

48、Flink DataStream API 编程指南(3)- 完整版

本文由于是在IDE中做的例子,基本上不依赖外部环境,除了具体的示例,比如读nc等则需要相应的环境。
本文分为4个部分,即介绍datastream、flink的编程模型、入门示例几部分。
本文的示例是在Flink 1.17和Flink 1.13.5版本中运行。

一、Flink DataStream API 编程指南

Flink 中的 DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。

为了创建你自己的 Flink DataStream 程序,建议从 Flink 程序剖析开始,然后逐渐添加自己的 stream transformation。其余部分作为附加的算子和高级特性的参考。

1、DataStream 是什么?

DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。

DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)。

你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流连接在一起。

2、Flink 程序剖析

Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:

  • 获取一个执行环境(execution environment);
  • 加载/创建初始数据;
  • 指定数据相关的转换;
  • 指定计算结果的存储位置;
  • 触发程序执行。

现在我们将对这些步骤逐一进行概述,更多细节请参考相关章节。请注意,Java DataStream API 的所有核心类都可以在 org.apache.flink.streaming.api 中找到。

StreamExecutionEnvironment 是所有 Flink 程序的基础。

可以使用 StreamExecutionEnvironment 的如下静态方法获取 StreamExecutionEnvironment:

/**
     * Creates an execution environment that represents the context in which the program is
     * currently executed. If the program is invoked standalone, this method returns a local
     * execution environment, as returned by {@link #createLocalEnvironment()}.
     *
     * @return The execution environment of the context in which the program is executed.
     */
    public static StreamExecutionEnvironment getExecutionEnvironment() {
        return getExecutionEnvironment(new Configuration());
    }

    /**
     * Creates an execution environment that represents the context in which the program is
     * currently executed. If the program is invoked standalone, this method returns a local
     * execution environment, as returned by {@link #createLocalEnvironment(Configuration)}.
     *
     * <p>When executed from the command line the given configuration is stacked on top of the
     * global configuration which comes from the {@code flink-conf.yaml}, potentially overriding
     * duplicated options.
     *
     * @param configuration The configuration to instantiate the environment with.
     * @return The execution environment of the context in which the program is executed.
     */
    public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
        return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
                .map(factory -> factory.createExecutionEnvironment(configuration))
                .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
    }

    /**
     * Creates a {@link LocalStreamEnvironment}. The local execution environment will run the
     * program in a multi-threaded fashion in the same JVM as the environment was created in. The
     * default parallelism of the local environment is the number of hardware contexts (CPU cores /
     * threads), unless it was specified differently by {@link #setParallelism(int)}.
     *
     * @return A local execution environment.
     */
    public static LocalStreamEnvironment createLocalEnvironment() {
        return createLocalEnvironment(defaultLocalParallelism);
    }

    /**
     * Creates a {@link LocalStreamEnvironment}. The local execution environment will run the
     * program in a multi-threaded fashion in the same JVM as the environment was created in. It
     * will use the parallelism specified in the parameter.
     *
     * @param parallelism The parallelism for the local environment.
     * @return A local execution environment with the specified parallelism.
     */
    public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
        return createLocalEnvironment(parallelism, new Configuration());
    }

    /**
     * Creates a {@link LocalStreamEnvironment}. The local execution environment will run the
     * program in a multi-threaded fashion in the same JVM as the environment was created in. It
     * will use the parallelism specified in the parameter.
     *
     * @param parallelism The parallelism for the local environment.
     * @param configuration Pass a custom configuration into the cluster
     * @return A local execution environment with the specified parallelism.
     */
    public static LocalStreamEnvironment createLocalEnvironment(
            int parallelism, Configuration configuration) {
        Configuration copyOfConfiguration = new Configuration();
        copyOfConfiguration.addAll(configuration);
        copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
        return createLocalEnvironment(copyOfConfiguration);
    }

    /**
     * Creates a {@link LocalStreamEnvironment}. The local execution environment will run the
     * program in a multi-threaded fashion in the same JVM as the environment was created in.
     *
     * @param configuration Pass a custom configuration into the cluster
     * @return A local execution environment with the specified parallelism.
     */
    public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) {
        if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
            return new LocalStreamEnvironment(configuration);
        } else {
            Configuration copyOfConfiguration = new Configuration();
            copyOfConfiguration.addAll(configuration);
            copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, defaultLocalParallelism);
            return new LocalStreamEnvironment(copyOfConfiguration);
        }
    }

    /**
     * Creates a {@link LocalStreamEnvironment} for local program execution that also starts the web
     * monitoring UI.
     *
     * <p>The local execution environment will run the program in a multi-threaded fashion in the
     * same JVM as the environment was created in. It will use the parallelism specified in the
     * parameter.
     *
     * <p>If the configuration key 'rest.port' was set in the configuration, that particular port
     * will be used for the web UI. Otherwise, the default port (8081) will be used.
     */
    @PublicEvolving
    public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
        checkNotNull(conf, "conf");

        if (!conf.contains(RestOptions.PORT)) {
            // explicitly set this option so that it's not set to 0 later
            conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
        }

        return createLocalEnvironment(conf);
    }

    /**
     * Creates a {@link RemoteStreamEnvironment}. The remote environment sends (parts of) the
     * program to a cluster for execution. Note that all file paths used in the program must be
     * accessible from the cluster. The execution will use no parallelism, unless the parallelism is
     * set explicitly via {@link #setParallelism}.
     *
     * @param host The host name or address of the master (JobManager), where the program should be
     *     executed.
     * @param port The port of the master (JobManager), where the program should be executed.
     * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
     *     program uses user-defined functions, user-defined input formats, or any libraries, those
     *     must be provided in the JAR files.
     * @return A remote environment that executes the program on a cluster.
     */
    public static StreamExecutionEnvironment createRemoteEnvironment(
            String host, int port, String... jarFiles) {
        return new RemoteStreamEnvironment(host, port, jarFiles);
    }

    /**
     * Creates a {@link RemoteStreamEnvironment}. The remote environment sends (parts of) the
     * program to a cluster for execution. Note that all file paths used in the program must be
     * accessible from the cluster. The execution will use the specified parallelism.
     *
     * @param host The host name or address of the master (JobManager), where the program should be
     *     executed.
     * @param port The port of the master (JobManager), where the program should be executed.
     * @param parallelism The parallelism to use during the execution.
     * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
     *     program uses user-defined functions, user-defined input formats, or any libraries, those
     *     must be provided in the JAR files.
     * @return A remote environment that executes the program on a cluster.
     */
    public static StreamExecutionEnvironment createRemoteEnvironment(
            String host, int port, int parallelism, String... jarFiles) {
        RemoteStreamEnvironment env = new RemoteStreamEnvironment(host, port, jarFiles);
        env.setParallelism(parallelism);
        return env;
    }

    /**
     * Creates a {@link RemoteStreamEnvironment}. The remote environment sends (parts of) the
     * program to a cluster for execution. Note that all file paths used in the program must be
     * accessible from the cluster. The execution will use the specified parallelism.
     *
     * @param host The host name or address of the master (JobManager), where the program should be
     *     executed.
     * @param port The port of the master (JobManager), where the program should be executed.
     * @param clientConfig The configuration used by the client that connects to the remote cluster.
     * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
     *     program uses user-defined functions, user-defined input formats, or any libraries, those
     *     must be provided in the JAR files.
     * @return A remote environment that executes the program on a cluster.
     */
    public static StreamExecutionEnvironment createRemoteEnvironment(
            String host, int port, Configuration clientConfig, String... jarFiles) {
        return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
    }

    /**
     * Gets the default parallelism that will be used for the local execution environment created by
     * {@link #createLocalEnvironment()}.
     *
     * @return The default local parallelism
     */
    @PublicEvolving
    public static int getDefaultLocalParallelism() {
        return defaultLocalParallelism;
    }

    /**
     * Sets the default parallelism that will be used for the local execution environment created by
     * {@link #createLocalEnvironment()}.
     *
     * @param parallelism The parallelism to use as the default local parallelism.
     */
    @PublicEvolving
    public static void setDefaultLocalParallelism(int parallelism) {
        defaultLocalParallelism = parallelism;
    }

通常,只需要使用 getExecutionEnvironment() 即可,因为该方法会根据上下文做正确的处理:如果你在 IDE 中执行你的程序或将其作为一般的 Java 程序执行,那么它将创建一个本地环境,该环境将在你的本地机器上执行你的程序。如果你基于程序创建了一个 JAR 文件,并通过命令行运行它,Flink 集群管理器将执行程序的 main 方法,同时 getExecutionEnvironment() 方法会返回一个执行环境以在集群上执行你的程序。

为了指定 data sources,执行环境提供了一些方法,支持使用各种方法从文件中读取数据:你可以直接逐行读取数据,像读 CSV 文件一样,或使用任何第三方提供的 source。

如果你只是将一个文本文件作为一个行的序列来读取,那么可以使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> users = env.readTextFile("file:///D:/workspace/bigdata-component/hadoop/test/in/flink/");

这将生成一个 DataStream,然后你可以在上面应用转换(transformation)来创建新的派生 DataStream。

你可以调用 DataStream 上具有转换功能的方法来应用转换。例如,一个 map 的转换如下所示:

DataStream<Tuple3<Integer, String, Integer>> parsed = users.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
			@Override
			public Tuple3<Integer, String, Integer> map(String value) {
				// 文件数据格式形如:1|107860|7191
				String[] line = value.split(",");

				return Tuple3.of(Integer.valueOf(line[0]), line[1], Integer.valueOf(line[2]));
			}
		});

这将通过把原始集合中的每一行转换为一个Tuple3<Integer, String, Integer>来创建一个新的 DataStream。

一旦你有了包含最终结果的 DataStream,你就可以通过创建 sink 把它写到外部系统。下面是一些用于创建 sink 的示例方法:

parsed.print();

parsed.writeAsText("file:///D:/workspace/bigdata-component/hadoop/test/out/flink");

一旦指定了完整的程序,需要调用 StreamExecutionEnvironment 的 execute() 方法来触发程序执行。根据 ExecutionEnvironment 的类型,执行会在你的本地机器上触发,或将你的程序提交到某个集群上执行。

execute() 方法将等待作业完成,然后返回一个 JobExecutionResult,其中包含执行时间和累加器结果。

如果不想等待作业完成,可以通过调用 StreamExecutionEnvironment 的 executeAsync() 方法来触发作业异步执行。它会返回一个 JobClient,你可以通过它与刚刚提交的作业进行通信。如下是使用 executeAsync() 实现 execute() 语义的示例。

final JobClient jobClient = env.executeAsync();

final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

关于程序执行的最后一部分对于理解何时以及如何执行 Flink 算子是至关重要的。所有 Flink 程序都是延迟执行的:当程序的 main 方法被执行时,数据加载和转换不会直接发生。相反,每个算子都被创建并添加到 dataflow 形成的有向图。当执行被执行环境的 execute() 方法显示地触发时,这些算子才会真正执行。程序是在本地执行还是在集群上执行取决于执行环境的类型。

延迟计算允许你构建复杂的程序,Flink 会将其作为一个整体的计划单元来执行。

3、第一个完整示例

  • maven依赖
<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
	</dependencies>
  • 代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestFileSystemDemo {

	public static void main(String[] args) throws Exception {
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<String> orders = env.readTextFile("file:///D:/workspace/bigdata-component/hadoop/test/in/flink/");

		DataStream<Tuple3<Integer, String, Integer>> parsed = orders.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
			@Override
			public Tuple3<Integer, String, Integer> map(String value) {
				// 文件数据格式形如:1|107860|7191
				String[] line = value.split(",");

				return Tuple3.of(Integer.valueOf(line[0]), line[1], Integer.valueOf(line[2]));
			}
		});

		parsed.print();
		
		parsed.writeAsText("file:///D:/workspace/bigdata-component/hadoop/test/out/flink");
		env.execute();

	}

}
  • 运行结果
    控制台输出结果
8> (1,alan,15)
16> (4,alan_chan,30)
13> (3,alanchanchn,25)
3> (5,alan_chan_chn,45)
10> (2,alanchan,20)

文件输出结果见下图

48、Flink DataStream API 编程指南(1)- DataStream 入门示例_flink sql


48、Flink DataStream API 编程指南(1)- DataStream 入门示例_大数据_02

4、入门示例

如下是一个完整的、可运行的程序示例,它是基于流窗口的单词统计应用程序,计算 5 秒窗口内来自 Web 套接字的单词数。

1)、maven依赖

见本文上述示例中的maven依赖。

2)、代码

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * @author alanchan
 *
 */
public class TestWindowWordCount {
	
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		DataStream<Tuple2<String, Integer>> dataStream = 
				env.socketTextStream("192.168.10.42", 9999)
					  .flatMap(new Splitter()).keyBy(value -> value.f0)
					  .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
					  .sum(1);

		dataStream.print();

		env.execute("Window WordCount");
	}

	public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
		@Override
		public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
			for (String word : sentence.split(",")) {
				out.collect(new Tuple2<String, Integer>(word, 1));
			}
		}
	}

}

3)、验证

前提是nc已经安装好了。

  • 启动nc并输入数据
# 在192.168.10.42上使用nc -lk 9999 向指定端口发送数据
# nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据 
# 如果没有该命令可以下安装 yum install -y nc
[alanchan@server2 bin]$ nc -lk 9999
alan,alach,alanchan,hello
alan_chan,hi,flink
alan,flink,good
alan,alach,alanchan,hello
hello,123
  • 启动应用程序,并观察控制台输出

应用程序启动后,再在nc中输入数据

13> (alan,1)
5> (alanchan,1)
8> (alach,1)
5> (hello,1)
16> (alan_chan,1)
13> (flink,1)
6> (hi,1)
13> (alan,1)
11> (good,1)
13> (flink,1)
8> (alach,1)
5> (alanchan,1)
13> (alan,1)
5> (hello,1)
5> (hello,1)
4> (123,1)

如果想查看大于 1 的计数,在 5 秒内重复输入相同的单词即可(如果无法快速输入,则可以将窗口大小从 5 秒增加 )。

以上,本文介绍了Flink DataStream API的编程指南第一部分,即介绍flink的source、transformation和sink的编程过程以及入门示例。其中source和sink各自的内容分别给出了具体的示例以及关于transformation的关联文章介绍。
本专题内容较长,故分为三个部分,即:
48、Flink DataStream API 编程指南(1)- DataStream 入门示例

48、Flink DataStream API 编程指南(2)- DataStream的source、transformation、sink、调试

48、Flink DataStream API 编程指南(3)- 完整版

标签:DataStream,flink,示例,Flink,environment,program,parallelism
From: https://blog.51cto.com/alanchan2win/8871091

相关文章

  • 47、Flink 的指标报告介绍(graphite、influxdb、prometheus、statsd和datalog)及示例(jmx
    文章目录Flink系列文章一、MetricReporters1、概述及示例2、入门示例0)、特别说明1)、配置2)、验证3)、自定义的指标收集器3、基于标志符格式vs.基于tags格式4、Pushvs.Pull5、发送器1)、JMX2)、Graphite2)、InfluxDB4)、Prometheus5)、PrometheusPushGateway6)、StatsD7)、Datadog8)......
  • 48、Flink DataStream API 编程指南(3)- 完整版
    文章目录Flink系列文章一、FlinkDataStreamAPI编程指南1、DataStream是什么?2、Flink程序剖析3、第一个完整示例4、入门示例1)、maven依赖2)、代码3)、验证5、DataSources1)、基于文件2)、基于套接字3)、基于集合4)、自定义6、DataStreamTransformations7、DataSinks8、Iteratio......
  • 阿里云AnalyticDB基于Flink CDC+Hudi实现多表全增量入湖实践
    湖仓一体(LakeHouse)是大数据领域的重要发展方向,提供了流批一体和湖仓结合的新场景。阿里云AnalyticDB for MySQL基于 Apache Hudi 构建了新一代的湖仓平台,提供日志、CDC等多种数据源一键入湖,在离线计算引擎融合分析等能力。本文将主要介绍AnalyticDB for MySQL基于Apache ......
  • Flutter使用SharedPreferences示例
    SharedPreferencesAndroid原生开发经常会用SharedPreferences来保存一些设置,Flutter用什么来保存这些设置呢?在Flutter中,你可以使用shared_preferences插件来实现类似Android原生开发中的SharedPreferences功能,用于在应用程序中保存和检索持久化的键值对。具体使用首先,在你的Fl......
  • AntDesignBlazor示例——分页查询
    本示例是AntDesignBlazor的入门示例,在学习的同时分享出来,以供新手参考。示例代码仓库:https://gitee.com/known/BlazorDemo1.学习目标分页查询框架天气数据分页功能表格自定义分页2.创建分页查询框架Table组件分页默认为前端分页,即所有数据一次性加载到前端进行分页;在......
  • Golang io.Pipe()函数及示例
    https://geek-docs.com/go-tutorials/go-examples/g_io-pipe-function-in-golang-with-examples.html 在Go语言中,io包提供了基本的I/O原语接口,其主要工作是封装这些原语的正在进行的实现。Go语言中的Pipe()函数用于创建并发的内存管道,在将期望io.Reader的代码与期望io.Writer......
  • net8获取泛微token以及访问api示例
    工作中涉及到调用泛微的场景,官方的示例又臭又长,抽空用NET8简化了写法,为了简化http访问,用了Flurl.Http这个库。在座各位大佬,我们直接就看代码了 usingSystem.Security.Cryptography;usingSystem.Text.Json;usingFlurl.Http;//请按照官方内容做好初始化工作https://e-c......
  • 【C++】异常处理 ④ ( 异常接口声明 | 异常接口语法 | 抛出一种类型的异常 | 抛出多种
    文章目录一、异常接口声明1、异常接口引入2、异常接口语法3、抛出一种类型的异常4、抛出多种类型的异常5、抛出任何类型异常-不声明异常接口/声明throw(...)6、不能抛出任何类型异常-声明throw()7、抛出异常类型错误博客总结://1.不会抛出异常voidfun()throw();......
  • Python多线程计算的方法及示例代码
    Python是一种非常流行的编程语言,支持多种并发编程的方式,其中包括多线程。多线程允许程序同时执行多个任务,从而提高了程序的运行效率。下面将介绍Python中多线程计算的几种常见方法,并给出具体的示例代码。方法一:使用标准库threadingPython标准库中提供了threading模块,通过创建Thread......
  • C# lock示例
    这两天项目加了个需求,需要给PLC发心跳信号我又不想在原来的循环中加,所以想着再弄个timer来定时发信号。只是这样会有一个问题,就是冲突。两个线程之间,有可能同时与plc发生通讯,引起数据混乱。privatevoidbutton1_Click(objectsender,EventArgse){......