一、介绍
Alink是基于Flink的通用算法平台。
1.1 数据聚类介绍
1.可以定义为5组数据类型的特征字段名称:
sepal_length double, sepal_width double, petal_length double, petal_width double,
2.控制特征字段为petal_width
3.聚合类型为category
3.主要特征控制为:
sepal_length double, sepal_width double, petal_length double, petal_width double
二、Java API
2.1 Alink-KMeans.java
public class AlinkDemo {
public static void main(String[] args) throws Exception {
String URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv";
String SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
BatchOperator data = new CsvSourceBatchOp()
.setFilePath(URL)
.setSchemaStr(SCHEMA_STR);
VectorAssembler va = new VectorAssembler()
.setSelectedCols(new String[]{"sepal_length", "sepal_width", "petal_length", "petal_width"})
.setOutputCol("features");
KMeans kMeans = new KMeans().setVectorCol("features").setK(3)
.setPredictionCol("prediction_result")
.setPredictionDetailCol("prediction_detail")
.setReservedCols("category")
.setMaxIter(100);
Pipeline pipeline = new Pipeline().add(va).add(kMeans);
pipeline.fit(data).transform(data).print();
}
}
2.2 pom
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.9_2.11</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.0</version>
</dependency>
2.3 打包插件
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>com.wang.flink.alink.AlinkDemo</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
三、 本地执行聚类结果
四、上传Flink集群
4.1 执行 打包 mvn -package
4.2 上传至Linux
sftp> put D:/APP/IDEA/workplace/FlinkTurbineFaultDiagnosis/target/Flink-TurbineFaultDiagnosis-1.0-SNAPSHOT-jar-with-dependencies.jar
4.3 运行FLink执行
bin/flink run -p 1 -c com.wang.flink.alink.AlinkDemo /root/Flink-TurbineFaultDiagnosis-1.0-SNAPSHOT-jar-with-dependencies.jar
4.4 Flink集群概览
访问集群节点:http://202.206.212.189:8081/