机器学习
1、含义:机器学习是人工智能的一个基本条件,是基于在大数据之上的,从数据中提取出模型,并利用模型对未知的数据进行预测。
2、机器学习流程
3、机器学习分类:监督学习与无监督学习
聚类(K-means)
定义:
1) 如果一个样本在特征空间中的k个最相似(特征空间中最邻近)的样本中的大多数属于某一个类别,则该样本也属于这个类别
2)欧氏距离计算
3)根据你的‘邻居’来判断你的属性
优点:简单,易于理解,易于实现,无需训练
缺点:
1)懒惰算法,对测试样本分类时的计算量大,内存开销大
2)必须指定K值,K值选择不当则分类精度不能保证(可以使用模型调优,超参数搜索)
应用场景:小数据场景,几千~几万样本,具体场景具体业务去测试
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.linalg
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object kmeans {
def main(args: Array[String]): Unit = {
// 无监督学习 --> 聚类
//创建spark连接
val spark: SparkSession = SparkSession
.builder()
.appName("Demo06KMeans")
.master("local[*]")
.config("spark.sql.shuffle.partitions", 2)
.getOrCreate()
//读取需要用于数据体征工程的数据
val sourceDF: DataFrame = spark
.read
.format("csv")
.option("sep", ",")
.schema("x Double,y Double")
.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\data\\kmeans.txt")
//导包
import spark.implicits._
import org.apache.spark.sql.functions._
//处理数据,将数据转化成数据特征工程模型所需要的格式数据--向量
val dataDF: DataFrame = sourceDF
.as[(Double, Double)]
.map {
case (x: Double, y: Double) =>
val denseVec: linalg.Vector = Vectors.dense(Array(x, y))
Tuple1(denseVec)
}.toDF("features")
//kmeans聚类要聚为几类
val kmeans: KMeans = new KMeans()
.setK(2) // 设置需要分为2类
//无监督学习不需要进行切分数据
//选用合适的数据模型进行训练
val kmeansModel: KMeansModel = kmeans.fit(dataDF)
val resDF: DataFrame = kmeansModel
.transform(dataDF)
//打印好数据结构
resDF.printSchema()
resDF
// 将Vector转成Array再转成String再以CSV格式保存
.as[(linalg.Vector, Int)]
.map {
case (features: linalg.Vector, prediction: Int) =>
(features.toArray(0),features.toArray(1) ,prediction)
}
.write
.format("csv")
//以覆盖的形式进行保存
.mode(SaveMode.Overwrite)
.save("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\kmeans")
}
}
4、 使用mllib,加载依赖,搭建环境,构建向量
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.4.5</version>
</dependency>
5.mllib流程
监督学习
1、加载数据,进行数据特征工程,将数据转化成机器学习模型所需要的类型-->向量
2、将数据切分成训练集,测试集,一般比例为8:2
3、选择合适的模型,将训练集带入模型进行训练
数据有无label
-->有,监督学习 -->label是离散的还是连续的-->离散-->选择 分类的算法-->逻辑回归(适合做二分类)
-->无,无监督学习 -->连续-->选择 回归的算法-->4、使用测试集去评估模型
5、保存数据
无监督学习
1、加载数据,进行数据特征工程,将数据转化成机器学习模型所需要的类型-->向量
2、无需切分数据将数据切分
3、选择合适的模型,将训练集带入模型进行训练
数据有无label
-->有,监督学习 -->label是离散的还是连续的-->离散-->选择 分类的算法-->逻辑回归(适合做二分类)
-->无,无监督学习 -->连续-->选择 回归的算法-->4、使用测试集去评估模型
5、保存数据
package day04_Mliab
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object Person02 {
def main(args: Array[String]): Unit = {
//1、加载数据,进行数据特征工程,将数据转化成机器学习模型所需要的类型-->向量
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("person")
.getOrCreate()
val person1DF: DataFrame = spark
.read
.format("libsvm")
.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\data\\人体指标.txt")
//2、将数据切分成训练集,测试集,一般比例为8:2
val splitDF: Array[Dataset[Row]] = person1DF.randomSplit(Array(0.8, 0.2))
val trainDF: Dataset[Row] = splitDF(0) //训练集 用于训练模型
val testDF: Dataset[Row] = splitDF(1) //测试集 用于评估模型
//3、选择合适的模型,将训练集带入模型进行训练
/**
* 数据有无label -->有,监督学习 -->label是离散的还是连续的-->离散-->选择 分类的算法-->逻辑回归(适合做二分类)
* -->无,无监督学习 -->连续-->选择 回归的算法-->
*
*/
//逻辑回归
val logisticRegression: LogisticRegression = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
val logisticRegressionModel: LogisticRegressionModel = logisticRegression.fit(trainDF)
//4、适用测试集去评估模型
val tranDF: DataFrame = logisticRegressionModel.transform(testDF)
// tranDF.show(truncate = false)
// tranDF.where("label != prediction").show()
import spark.implicits._
import org.apache.spark.sql.functions._
tranDF
.withColumn("flag",when($"label"===$"prediction",1) otherwise(0))
.groupBy()
.agg(sum($"flag")/count("*") as "准确率")
.show()
//5、保存数据 logisticRegressionModel.write.save("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\person")
}
}
6、Person训练评估&预测数据
package day04_Mliab
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object exercise {
/**
* 基于人体指标.txt数据
* 参考https://cloud.tencent.com/developer/article/1510724
*
* 将label中的1 当作男生
* 将label中的0 当作女生
* 计算 精确率(Precision)和召回率(Recall)
*/
def main(args: Array[String]): Unit = {
//1、加载数据,进行数据特征工程,将数据转化成机器学习模型所需要的类型-->向量
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("exercise")
.getOrCreate()
val person1DF: DataFrame = spark
.read
.format("libsvm")
.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\data\\人体指标.txt")
//2、将数据切分成训练集,测试集,一般比例为8:2
val splitDF: Array[Dataset[Row]] = person1DF.randomSplit(Array(0.8, 0.2))
val trainDF: Dataset[Row] = splitDF(0) //训练集 用于训练模型
val testDF: Dataset[Row] = splitDF(1) //测试集 用于评估模型
//3、选择合适的模型,将训练集带入模型进行训练
/**
* 数据有无label -->有,监督学习 -->label是离散的还是连续的-->离散-->选择 分类的算法-->逻辑回归(适合做二分类)
* -->无,无监督学习 -->连续-->选择 回归的算法-->
*
*/
//逻辑回归
val logisticRegression: LogisticRegression = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
val logisticRegressionModel: LogisticRegressionModel = logisticRegression.fit(trainDF)
//4、适用测试集去评估模型
val tranDF: DataFrame = logisticRegressionModel.transform(testDF)
import spark.implicits._
import org.apache.spark.sql.functions._
/**
0 1:5.3 2:3.5 3:2.5 4:106.4 5:67.5 6:69.1 7:83
1 1:5.9 2:3.9 3:3.0 4:135.0 5:82.8 6:79.5 7:64
1 1:6.5 2:4.2 3:3.3 4:140.4 5:85.0 6:79.8 7:69
1 1:5.4 2:4.0 3:3.0 4:135.6 5:88.6 6:70.1 7:72
0 1:4.5 2:3.6 3:2.4 4:101.1 5:77.1 6:65.1 7:87
0 1:4.7 2:3.8 3:2.8 4:98.7 5:69.3 6:65.5 7:77
0 1:4.6 2:3.4 3:2.2 4:104.7 5:69.4 6:52.3 7:90
0 1:4.5 2:3.7 3:3.0 4:113.9 5:73.5 6:71.2 7:79
将label中的1 当作男生
将label中的0 当作女生
TP:实际为男生,1预测为男生;1
FP:实际为女生,0预测为男生;1
FN:实际为男生,1预测为女生;0
TN:实际为女生,0预测为女生;0
精确率(Precision) = TP / (TP + FP) = 40/60 = 66.67%。它表示:预测为正的样本中有多少是真正的正样本,它是针对我们预测结果而言的。Precision又称为查准率。
召回率(Recall) = TP / (TP + FN) = 40/70 = 57.14% 。它表示:样本中的正例有多少被预测正确了, 它是针对我们原来的样本而言的。Recall又称为查全率。
*/
tranDF
.withColumn("flag",when($"label"===$"prediction",1) otherwise(0))
.groupBy()
.agg(sum($"flag")/count("*") as "准确率")
// .show(10000)
/**
* +------------------+
| 准确率|
+------------------+
|0.9415322580645161|
+------------------+
*/
tranDF
.where($"prediction"==="1.0")
.withColumn("flag",when($"label"===$"prediction",1)otherwise(0))
.groupBy()
.agg(sum("flag")/count("*") as "精确率")
// .show(10000)
/**
* +------------------+
| 精确率|
+------------------+
|0.9099437148217636|
+------------------+
*/
tranDF
.where($"label"==="1.0")
.withColumn("flag",when($"label"===$"prediction",1)otherwise(0))
.groupBy()
.agg(sum("flag")/count("*") as "召回率")
// .show(10000)
/**
* +-----------------+
| 召回率|
+-----------------+
|0.968054211035818|
+-----------------+
*/
}
}
package day04_Mliab
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}
//加载在训练好的模型进行预测
object PersonPredict03 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("personpredict")
.master("local[*]")
.getOrCreate()
//1、加载训练好的模型
val logisticRegressionModel: LogisticRegressionModel = LogisticRegressionModel.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\person")
/**
* 2、进行预测
* 对一条数据进行预测
* 1:5.3 2:3.5 3:2.5 4:106.4 5:67.5 6:69.1 7:83
*/
val predict1: Double = logisticRegressionModel.predict(Vectors.dense(Array(5.3, 3.5, 2.5, 106.4, 67.5, 69.1, 83)))
// println(s"预测的结果为:${predict1}")
// 对一批数据进行预测 transform
// 前提条件:需要将每一条数据变成模型能够识别的格式,即向量
val newPersonDF: DataFrame = spark
.read
.format("csv")
.option("sep","|")
.schema("person String")
.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\data\\person1.txt")
newPersonDF.show()
import spark.implicits._
val df: DataFrame = newPersonDF.map(row => {
val line: String = row.getAs[String]("person")
// 1:5.3 2:3.5 3:2.5 4:106.4 5:67.5 6:69.1 7:83
// 以元组形式返回
Tuple2(Vectors.dense(line.split(" ").map(_.split(":")(1).toDouble)),1.0)
}).toDF("features","useless")
logisticRegressionModel.transform(df).show()
}
}
7、手动数字识别测试估算&预测
package day04_Mliab
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.functions.{count, sum, when}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
object ImageVectors04 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("image")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
//加载数据
val imageDF: DataFrame = spark
.read
.format("image")
.load("D:\\DeskTop\\MLLib\\train\\train")
//加载结果数据
val resultDF: DataFrame = spark
.read
.format("csv")
.option("sep"," ")
.schema("fileName String,label Int")
.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\data\\image_res.txt")
import spark.implicits._
import org.apache.spark.sql.functions._
// imageDF.printSchema()
// imageDF.show(truncate = false)
val preimageDF: DataFrame = imageDF
.select($"image.origin", $"image.data")
.as[(String, Array[Byte])]
.map {
case (orgin: String, data: Array[Byte]) => {
//将二机制转化成十进制
val intarray: Array[Int] = data.map(_.toInt)
//将路径最后的文件名提取出来
val imagename: String = orgin.split("/").last
//将数据符合条件的重新赋值,正数置0,负数置1
val imagedate: Array[Double] = intarray.map(int => {
if (int < 0) {
1.0
} else {
0.0
}
})
//返回名字和转成符合数字模型的向量
(imagename, Vectors.dense(imagedate).toSparse)
}
//转成DF并加上列名
}.toDF("fileName", "features")
//2、将数据切分成训练集,测试集,一般比例为8:2
val array: Array[Dataset[Row]] = preimageDF
.join(resultDF, "fileName")
.select($"fileName", $"label", $"features")
.randomSplit(Array(0.8, 0.2))
val trainDF: Dataset[Row] = array(0) //训练集 用于训练模型
val testDF: Dataset[Row] = array(1) //测试集 用于评估模型
//3、选择合适的模型,将训练集带入模型进行训练
/**
* 数据有无label -->有,监督学习 -->label是离散的还是连续的-->离散-->选择 分类的算法-->逻辑回归(适合做二分类)
* -->无,无监督学习 -->连续-->选择 回归的算法-->
*
*/
//逻辑回归
val logisticRegression: LogisticRegression = new LogisticRegression()
.setMaxIter(10)//设置最大的迭代次数
.setFitIntercept(true)//设置是否有截距
val logisticRegressionModel: LogisticRegressionModel = logisticRegression.fit(trainDF)
//4、使用测试集去评估模型
val tranDF: DataFrame = logisticRegressionModel.transform(testDF)
// tranDF.show()
tranDF
.withColumn("flag",when($"label"===$"prediction",1) otherwise(0))
.groupBy()
.agg(sum($"flag")/count("*") as "准确率")
// .show(truncate = false)
logisticRegressionModel.write.overwrite().save("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\imagemodel")
}
}
package day04_Mliab
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}
object ImageVectorsPredict05 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("image")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
//加载数据
val imageDF: DataFrame = spark
.read
.format("image")
.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\data\\image")
import spark.implicits._
import org.apache.spark.sql.functions._
val preimageDF: DataFrame = imageDF
.select($"image.origin", $"image.data")
.as[(String, Array[Byte])]
.map {
case (orgin: String, data: Array[Byte]) => {
//将二机制转化成十进制
val intarray: Array[Int] = data.map(_.toInt)
//将路径最后的文件名提取出来
val imagename: String = orgin.split("/").last
//将数据符合条件的重新赋值,正数置0,负数置1
val imagedate: Array[Double] = intarray.map(int => {
if (int < 0) {
1.0
} else {
0.0
}
})
//返回名字和转成符合数字模型的向量
(imagename, Vectors.dense(imagedate).toSparse)
}
//转成DF并加上列名
}.toDF("fileName", "features")
val logisticRegressionModel: LogisticRegressionModel = LogisticRegressionModel.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\imagemodel")
val transDF: DataFrame = logisticRegressionModel.transform(preimageDF)
transDF.show()
}
}
8、贝叶斯模型
定义: 条件概率、联合概率计算方式与特征独立的关系去预测
优点:
1)朴素贝叶斯模型发源于古典数学理论,有稳定的分类效率。
2)对缺失数据不太敏感,算法也比较简单,常用于文本分类。
3)分类准确度高,速度快
缺点:由于使用了样本属性独立性的假设,所以如果特征属性有关联时其效果不好
1、什么是分类
分类是一种重要的数据分析形式,它提取刻画重要数据类的模型。这种模型称为分类器,预测分类的(离散的,无序的)类标号。例如医生对病人进行诊断是一个典型的分类过程,医生不是一眼就看出病人得了哪种病,而是要根据病人的症状和化验单结果诊断病人得了哪种病,采用哪种治疗方案。再比如,零售业中的销售经理需要分析客户数据,以便帮助他猜测具有某些特征的客户会购买某种商品。2、如何进行分类
数据分类是一个两阶段过程,包括学习阶段(构建分类模型)和分类阶段(使用模型预测给定数据的类标号)3、贝叶斯分类的基本概念
贝叶斯分类法是统计学分类方法,它可以预测类隶属关系的概率,如一个给定元组属于一个特定类的概率。贝叶斯分类基于贝叶斯定理。朴素贝叶斯分类法假定一个属性值在给定类上的概率独立于其他属性的值,这一假定称为类条件独立性
package day04_Mliab
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, IDF, IDFModel, Tokenizer}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import scala.collection.mutable
object Byesdemo07 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("Demo06KMeans")
.master("local[*]")
.config("spark.sql.shuffle.partitions", 2)
.getOrCreate()
//读取需要用于数据体征工程的数据
val sourceDF: DataFrame = spark
.read
.format("csv")
.option("sep", "\t")
.schema("label Int,text String")
.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\data\\bayesTrain.txt")
//对每一句话进行分词
import spark.implicits._
import org.apache.spark.sql.functions._
val sentenceData: DataFrame = sourceDF
.as[(Int, String)]
.map {
case (label: Int, text: String) =>
(label, text, Ikanalyer06.fit(text))
}.filter(t => {
t._3.nonEmpty
})
.map(t => (t._1, t._2, t._3.mkString(" ")))
.toDF("label", "text", "sentence")
//构建英文分词器 :TF-IDF
val tokenizer:Tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
//将数据传入英文分词器进行分词
val wordsData:DataFrame = tokenizer.transform(sentenceData)
val hashingTF:HashingTF = new HashingTF()//数据较大是,可以自己修改
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(Math.pow(2,18).toInt)
val featurizedData:DataFrame = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors
//计算IDF
val idf :IDF= new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel:IDFModel = idf.fit(featurizedData)
idfModel.write.overwrite().save("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\idf")
val rescaledData: DataFrame = idfModel.transform(featurizedData)
rescaledData.show(truncate = false)
//手动计算“爱你”这个词语的IDF
//计算所有文档的数量
// val count: Long = wordsData.count()
// //计算包括爱你这个词语的文档数量
// val cnt: Long = wordsData.filter(row => {
// val arr: mutable.WrappedArray[String] = row.getAs[mutable.WrappedArray[String]]("words")
// arr.contains("爱你")
// }).count()
// //计算IDF
// val ten: Double = Math.log10((count + 1) / (cnt + 1.0))
// val e: Double = Math.log((count + 1) / (cnt + 1.0))//IDF默认log以E为底
//2、切分数据集
val splitDF: Array[Dataset[Row]] = rescaledData.randomSplit(Array(0.8, 0.2))
val trainDF: Dataset[Row] = splitDF(0) //训练集 用于训练模型
val testDF: Dataset[Row] = splitDF(1) //测试集 用于评估模型
//3、选择合适算法模型-->贝叶斯分类
// Train a NaiveBayes model.
val model = new NaiveBayes()
.fit(trainDF)
//保存贝叶斯模型
model.write.overwrite().save("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\bayes")
// Select example rows to display.
val predictions = model.transform(testDF)
// predictions.show()
// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
//准确率
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
}
}
9、Ik分词器
package day04_Mliab
import org.wltea.analyzer.core.{IKSegmenter, Lexeme}
import java.io.StringReader
import scala.collection.mutable.ListBuffer
object Ikanalyer06 {
//进行中文分词
def fit(text:String):List[String]={
val iK = new IKSegmenter(new StringReader(text), true)
var lex: Lexeme = iK.next()
val listbuffer: ListBuffer[String] = ListBuffer[String]()
while (lex!=null){
listbuffer.append(lex.getLexemeText)
lex = iK.next()
}
listbuffer.toList
}
def main(args: Array[String]): Unit = {
val str:String="我希望你能够听到我的音乐是免费的"
Ikanalyer06.fit(str)
}
}
10、TF-IDF
TF-IDF(term frequency–inverse document frequency)是一种用于资讯检索与资讯探勘的常用加权技术。
TF-IDF是一种统计方法,用以评估一字词对于一个文件集或一个语料库中的其中一份文件的重要程度。
字词的重要性随着它在文件中出现的次数成正比增加,但同时会随着它在语料库中出现的频率成反比下降。
TF-IDF加权的各种形式常被搜寻引擎应用,作为文件与用户查询之间相关程度的度量或评级。除了TF-IDF以外,因特网上的搜寻引擎还会使用基于连结分析的评级方法,以确定文件在搜寻结果中出现的顺序。
词频 (term frequency, TF) 指的是某一个给定的词语在一份给定的文件中出现的次数。这个数字通常会被归一化(分子一般小于分母 区别于IDF),以防止它偏向长的文件。(同一个词语在长文件里可能会比短文件有更高的词频,而不管该词语重要与否。)
逆向文件频率 (inverse document frequency, IDF) 是一个词语普遍重要性的度量。某一特定词语的IDF,可以由总文件数目除以包含该词语之文件的数目,再将得到的商取对数得到。
某一特定文件内的高词语频率,以及该词语在整个文件集合中的低文件频率,可以产生出高权重的TF-IDF。因此,TF-IDF倾向于过滤掉常见的词语,保留重要的词语。
TF_IDF主要思路
如果某个词或短语在一篇文章中出现的频率TF高,并且在其他文章中很少出现,则认为此词或者短语具有很好的类别区分能力,适合用来分类。TFIDF实际上是:TF * IDF,TF词频(Term Frequency),IDF反文档频率(Inverse Document Frequency)。TF表示词条在文档d中出现的频率(另一说:TF词频(Term Frequency)指的是某一个给定的词语在该文件中出现的次数)。IDF的主要思想是:如果包含词条t的文档越少,也就是n越小,IDF越大,则说明词条t具有很好的类别区分能力。如果某一类文档C中包含词条t的文档数为m,而其它类包含t的文档总数为k,显然所有包含t的文档数n=m+k,当m大的时候,n也大,按照IDF公式得到的IDF的值会小,就说明该词条t类别区分能力不强。(另一说:IDF反文档频率(Inverse Document Frequency)是指果包含词条的文档越少,IDF越大,则说明词条具有很好的类别区分能力。)但是实际上,如果一个词条在一个类的文档中频繁出现,则说明该词条能够很好代表这个类的文本的特征,这样的词条应该给它们赋予较高的权重,并选来作为该类文本的特征词以区别与其它类文档。这就是IDF的不足之处。
TF计算公式
在一份给定的文件里,词频(term frequency,TF)指的是某一个给定的词语在该文件中出现的次数。这个数字是对词数(term count)的归一化,以防止它偏向长的文件。(同一个词语在长文件里可能会比短文件有更高的词数,而不管该词语重要与否。)对于在某一特定文件里的词语 来说,它的重要性可表示为:
以上式子中 是该词在文件中的出现次数,而分母则是在文件中所有字词的出现次数之和。
IDF计算公式
TF-IDF计算过程
11、微博数据处理
1、先训练数据,使用中文分词器进行分词,将分词的结果放入英文分词器,转化成对应的格式,接着使用TF将其数据转成向量(数据特征模型所需要的类型),接着将数据传入IDF进行输出,最后将输出的类型传入贝叶斯模型中,进行输出,最后得到最后的数据
进行预测数据使用之前保存的IdF模型和bayes模型进行训练转化数据,最终的结果进行输出;
//代码:(以下代码用于预测数据)//
package day04_Mliab
import org.apache.spark.ml.classification.NaiveBayesModel
import org.apache.spark.ml.feature.{HashingTF, IDFModel, Tokenizer}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
object bayes_exercise {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("Demo06KMeans")
.master("local[*]")
.config("spark.sql.shuffle.partitions", 2)
.getOrCreate()
//读取需要用于数据体征工程的数据
val sourceDF: DataFrame = spark
.read
.format("csv")
.option("sep", ",")
.schema("id String,data String,sum Int,sentence String,ip BigInt,wg BigInt")
.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\data\\comment.csv")
// sourceDF.show()
// sourceDF.printSchema()
/**
+----------------+--------------------+---+-------------------------------------+----------+----------------+
| id| data|sum| sentense| ip| wg|
+----------------+--------------------+---+-------------------------------------+----------+----------------+
|4695502736851918|Sat Oct 23 16:42:...| 25| 但美国没封锁呀|6417385970|4695501360594999|
|4695507152144658|Sat Oct 23 17:00:...| 6| 参考DOI:10.1101/202...|1231317854|4695501360594999|
|4695503973387448|Sat Oct 23 16:47:...| 28|扯了个蛋,疫情期间出生的孩子测智商...|6445295083|4695501360594999|
|4695503130329772|Sat Oct 23 16:44:...| 3| 多接触自然|1837260653|4695501360594999|
|4695503747418475|Sat Oct 23 16:46:...| 0| null|1842001241|4695501360594999|
|4695545044537182|Sat Oct 23 19:30:...| 7| 可笑,诋毁中国方案已经无所不用其极。|1662256544|4695501360594999|
|4695508335460355|Sat Oct 23 17:04:...| 3| 拿钱,渲染,不靠谱。|1216375635|4695501360594999|
|4695506514870656|Sat Oct 23 16:57:...| 3| //@御宅之猫MK:扯了个蛋,疫情...|1066025021|4695501360594999|
|4695501587614931|Sat Oct 23 16:37:...| 0| 转发微博|1648990441|4695501360594999|
*/
//对每一句话进行分词
import spark.implicits._
import org.apache.spark.sql.functions._
//筛选text数据为空的
val sourceDf: Dataset[Row] = sourceDF
.select($"id", $"sentence")
.filter(row => {
row.getAs[String]("sentence") != null
})
//挑选出需要的数据
val PreDF: DataFrame = sourceDf
.map(row => {
val id: String = row.getAs[String]("id")
val sentence: String = row.getAs[String]("sentence")
(id, sentence, Ikanalyer06.fit(sentence))
})
.filter(_._3.nonEmpty)
.map(t3 => (t3._1, t3._2, t3._3.mkString(" ")))
.toDF("id", "text", "sentence")
// PreDF.show()
//将数据text在中文分词器转换后由Array格式转化为WrapperArray格式
//构建英文分词器 :TF-IDF
val tokenizer:Tokenizer = new Tokenizer()
.setInputCol("sentence")
.setOutputCol("word")
//将数据传入英文分词器进行分词
val wordsData:DataFrame = tokenizer.transform(PreDF)
// 计算TF 词频
val hashingTF: HashingTF = new HashingTF()
.setInputCol("word")
.setOutputCol("rawFeatures")
.setNumFeatures(262144)
val featurizedData: DataFrame = hashingTF.transform(wordsData)
//将传出的数据用TF-IDF进行transform处理
// 加载之前的IDF模型 对新的数据集进行transform
val idfModel: IDFModel = IDFModel.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\idf")
.setInputCol("rawFeatures")
.setOutputCol("features")
//处理由英文分词器处理的数据
val rescaledData: DataFrame = idfModel.transform(featurizedData)
// 加载贝叶斯模型
val bayesModel: NaiveBayesModel = NaiveBayesModel.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\mllib\\bayes")
// 使用模型进行预测
val resDF: DataFrame = bayesModel.transform(rescaledData)
resDF.printSchema()
//输出数据
resDF.select($"id",$"text",$"sentence",$"word",$"prediction")
.show()
}
}
标签:机器,String,val,--,DataFrame,学习,import,spark
From: https://www.cnblogs.com/tanggf/p/16918934.html