全栈工程师开发手册 (作者:栾鹏)
python数据挖掘系列教程
K最近邻简介
K最近邻属于一种估值或分类算法,他的解释很容易。
我们假设一个人的优秀成为设定为1、2、3、4、5、6、7、8、9、10数值表示,其中10表示最优秀,1表示最不优秀。
我们都知道近朱者赤,近墨者黑,我们想看一个人是什么样的,看他的朋友是什么样的就可以了。
1、如何来考察一个人?
我们通过特征属性来描述一个对象的特征,例如是否抽烟、是否运动、工资、年龄。
2、怎样才算是跟朋友亲近?
通过计算距离函数来表示两个对象的相似度。例如具有相同的习惯、兴趣
3、不同亲近程度的朋友对考察人的影响?
通过为不同距离设置权重,来表示不同近邻对待测对象的亲近度。例如5个亲近朋友中,最亲密的两个朋友对考察人物的影响非常大,其他三个朋友的影响几乎可以忽略。
4、选多少个亲近朋友来考察比较合适呢?
k的取值问题
K最近邻(KNN)概念:
它的工作原理是:存在一个样本数据集合,所有特征属性已知,并且样本集中每个对象都已知所属分类。对不知道分类的待测对象,将待测对象的每个特征属性与样本集中数据对应的特征属性进行比较,然后算法提取样本最相似对象(最近邻)的分类标签。一般来说,我们只选择样本数据集中前k个最相似的对象数据,这就是k-近邻算法中k的出处,通常k是不大于20的整数。最后根据这k个数据的特征和属性,判断待测数据的分类。
K最近的三个基本要素
1、k值的选取。(在应用中,k值一般选择一个比较小的值,一般选用交叉验证来取最优的k值)
2、距离度量。(距离:误差绝对值p次方求和再求p次根。欧式距离:p=2的距离。曼哈顿距离:p=1的距离。p为无穷大时,距离为各个维度上距离的最大值)
3、分类决策规则。(也就是如何根据k个最近邻决定待测对象的分类。k最近邻的分类决策规则一般选用多数表决)
KNN基本执行步骤:
(1)计算待测对象和训练集中每个样本点的欧式距离
(2)对上面的所有距离值排序
(3)选出k个最小距离的样本作为“选民”
(4)根据“选民”预测待测样本的分类或值
KNN特点
(1)原理简单
(2)保存模型需要保存所有样本集
(3)训练过程很快,预测速度很慢
优点
简单好用,容易理解,精度高,理论成熟,既可以用来做分类也可以用来做回归;
可用于非线性分类;
可用于数值型数据和离散型数据(既可以用来估值,又可以用来分类)
训练时间复杂度为O(n);无数据输入假定;
对异常值不敏感。
准确度高,对数据没有假设,对outlier不敏感;
缺点:
计算复杂性高;空间复杂性高;需要大量的内存
样本不平衡问题(即有些类别的样本数量很多,而其它样本的数量很少);
一般数值很大的时候不用这个,计算量太大。但是单个样本又不能太少,否则容易发生误分。
最大的缺点是无法给出数据的内在含义。
在上面的描述中思考以下问题:
样本属性如何选择?如何计算两个对象间距离?当样本各属性的类型和尺度不同时如何处理?各属性不同重要程度如何处理?模型的好坏如何评估?
估值案例
今天我们使用k最近邻算法来构建白酒的价格模型。这是一个估值模型,不是分类案例。需要预测的回归值为白酒的价格
构造数据集
我们先考虑一个简单的价格模型。我们知道白酒的价格跟等级、年代有很大的关系。我们需要知道一批白酒的数据作为样本数据集,其中包含每瓶白酒的等级、年代、和价格(输出值)。
这批数据集你可以使用市场问卷调查,或者网络爬虫获取。为了简单我们自己模拟产生。
构建一个葡萄酒样本数据集。
from random import random,randint
import math
# 根据等级和年代对价格进行模拟
def wineprice(rating,age):
peak_age=rating-50
# 根据等级计算价格
price=rating/2
if age>peak_age:
# 经过“峰值年”,后续5年里其品质将会变差
price=price*(5-(age-peak_age)/2)
else:
# 价格在接近“峰值年”时会增加到原值的5倍
price=price*(5*((age+1)/peak_age))
if price<0: price=0
return price
# 生成一批数据代表样本数据集
def wineset1():
rows=[]
for i in range(300):
# 随机生成年代和等级
rating=random()*50+50
age=random()*50
# 得到一个参考价格
price=wineprice(rating,age)
# 添加一些噪音
price*=(random()*0.2+0.9)
# 加入数据集
rows.append({'input':(rating,age),'result':price})
return rows
样本间的距离
要使用k最近邻,首先要知道哪些是最近邻,所以我们还要有一个功能,就是要能计算两个对象之间的相似度。我们这里使用欧几里得距离作为数据间的距离,代表相似度。
# 使用欧几里得距离,定义距离
def euclidean(v1,v2):
d=0.0
for i in range(len(v1)):
d+=(v1[i]-v2[i])**2
return math.sqrt(d)
获取与新数据距离最近的k个样本数据
有了样本数据集,和计算两个样本间的距离。我们就可以对待测数据,计算待测数据与样本数据集中每个样本之间的距离。并对所有距离进行排序,获取距离最近的前k个样本。
# 计算待测商品和样本数据集中任一商品间的距离。data样本数据集,vec1待测商品
def getdistances(data,vec1):
distancelist=[]
# 遍历样本数据集中的每一项
for i in range(len(data)):
vec2=data[i]['input']
# 添加距离到距离列表
distancelist.append((euclidean(vec1,vec2),i))
# 距离排序
distancelist.sort()
return distancelist #返回距离列表
根据距离最近的k个样本数据预测新对象的输出值
上面的步骤,我们已经获取了距离最近的k个样本对象。那么如何根据k个样本对象的属性和价格计算待测对象的价格呢?
1、简单求均值
# 对距离值最小的前k个结果求平均
def knnestimate(data,vec1,k=5):
# 得到经过排序的距离值
dlist=getdistances(data,vec1)
avg=0.0
# 对前k项结果求平均
for i in range(k):
idx=dlist[i][1]
avg+=data[idx]['result']
avg=avg/k
return avg
2、求加权平均
如果使用直接求均值,有可能存在前k个最近邻中,可能会存在距离很远的数据,但是他仍然属于最近的前K个数据。
比如设定了k=5,距离最近的3个样本对象距离待测对象很近,但是第4、5个样本对象已经非常远离了待测对象。
当存在这种情况时,对前k个样本数据直接求均值会有偏差,所以使用加权平均,为较远的节点赋予较小的权值,对较近的节点赋予较大的权值。
那么具体该怎么根据数据间距离分配权值呢?这里使用三种递减函数作为权值分配方法。
三种权值分配的思想就是距离远的权值小,距离近的权值大。
2.1、使用反函数为近邻分配权重。
def inverseweight(dist,num=1.0,const=0.1):
return num/(dist+const)
2.2、使用减法函数为近邻分配权重。当最近距离都大于const时不可用。
def subtractweight(dist,const=1.0):
if dist>const:
return 0
else:
return const-dist
2.3、使用高斯函数为距离分配权重。
def gaussian(dist,sigma=5.0):
return math.e**(-dist**2/(2*sigma**2))
有了权值分配方法,下面就可以计算加权平均了。
# 对距离值最小的前k个结果求加权平均
def weightedknn(data,vec1,k=5,weightf=gaussian):
# 得到距离值
dlist=getdistances(data,vec1)
avg=0.0
totalweight=0.0
# 得到加权平均
for i in range(k):
dist=dlist[i][0]
idx=dlist[i][1]
weight=weightf(dist)
avg+=weight*data[idx]['result']
totalweight+=weight
if totalweight==0: return 0
avg=avg/totalweight
return avg
第一次测试
到此基本的knn算法就讲解完了,下面我们进行一下测试。
if __name__=='__main__':
data = wineset1() #获取样本数据集
result=knnestimate(data,(95.0,3.0)) #根据最近邻直接求平均进行预测
print(result)
result=weightedknn(data,(95.0,3.0),weightf=inverseweight) #使用反函数做权值分配方法,进行加权平均
print(result)
result = weightedknn(data, (95.0, 3.0), weightf=subtractweight) # 使用减法函数做权值分配方法,进行加权平均
print(result)
result = weightedknn(data, (95.0, 3.0), weightf=gaussian) # 使用高斯函数做权值分配方法,进行加权平均
print(result)
以上内容,我们完成了对k最近邻的模型构造,并完成了对一个待测数据的预测。那么这个测试结果究竟好不好呢?又如何评估我们的建立的模型的好坏呢?下面我们来了解一下交叉验证。
交叉验证
交叉验证是一种将样本集划分为训练集和待测集,使用样本集进行模型建立,然后使用模型对待测集进行预测,并查看预测结果与真实结果之间的误差,进而对模型进行评估的方法。每划分一次样本集就相当于建立了一个模型。待测集中每一个对象都可以进行一次测试。
思考:交叉验证中如何将样本集划分训练集和待测集、测试多少次、有了预测结果如何计算模型好坏、能不能把预测结果返回到新的模型训练中?这些问题读者可以在今后学习中思考
交叉验证用来验证你的算法或算法参数的好坏,比如上面的加权分配算法我们有三种方式,究竟哪个更好呢?我们可以使用交叉验证进行查看。
随机选择样本数据集中95%作为训练集,5%作为待测集,对待测集中数据进行预测并与已知结果进行比较,计算准确率,查看算法效果。
读者需要注意,交叉验证中并不是一定使用准确率来判断模型好坏,只不过准确率是用的最多的评估标准。
下面使用代码实现交叉验证。
首先实现将样本数据集划分为训练集和待测集两个子集的功能。
# 划分数据。test待测集占的比例。其他数据为训练集
def dividedata(data,test=0.05):
trainset=[]
testset=[]
for row in data:
if random()<test:
testset.append(row)
else:
trainset.append(row)
return trainset,testset
然后再来计算预测结果与真实结果之间的准确率。以此来表示系统模型的好坏。
# 对使用算法进行预测的结果的误差进行统计,以此判断算法好坏。algf为算法函数,trainset为训练数据集,testset为待测数据集
def testalgorithm(algf,trainset,testset):
error=0.0
for row in testset:
guess=algf(trainset,row['input']) #这一步要和样本数据的格式保持一致,列表内个元素为一个字典,每个字典包含input和result两个属性。而且函数参数是列表和元组
error+=(row['result']-guess)**2
#print row['result'],guess
#print error/len(testset)
return error/len(testset)
有了数据拆分和算法性能误差统计函数。我们就可以在原始数据集上进行多次这样的拆分统计实验,计算平均误差。
# 将数据拆分和误差统计合并在一起。对数据集进行多次划分,并验证算法好坏
def crossvalidate(algf,data,trials=100,test=0.1):
error=0.0
for i in range(trials):
trainset,testset=dividedata(data,test)
error+=testalgorithm(algf,trainset,testset)
return error/trials
上面我们完成了交叉验证的全部功能,下面我们来尝试一下应用这个功能。
交叉验证测试
if __name__=='__main__': #只有在执行当前模块时才会运行此函数
data = wineset1() #创建第一批数据集
print(data)
error = crossvalidate(knnestimate,data) #对直接求均值算法进行评估
print('平均误差:'+str(error))
def knn3(d,v): return knnestimate(d,v,k=3) #定义一个函数指针。参数为d列表,v元组
error = crossvalidate(knn3, data) #对直接求均值算法进行评估
print('平均误差:' + str(error))
def knninverse(d,v): return weightedknn(d,v,weightf=inverseweight) #定义一个函数指针。参数为d列表,v元组
error = crossvalidate(knninverse, data) #对使用反函数做权值分配方法进行评估
print('平均误差:' + str(error))
上面我们已经完成了knn模型的建模和使用。以及使用交叉验证完成对模型中算法的选择和模型的评估。
下面我们来看一些复杂的情况。
不同类型、尺度的属性,无用属性
实际生活中,白酒的价格不仅与年代和等级有关系,还与酒瓶容量等其他属性有很大的关系。
所以我们这里来讨论一下当样本数据集中可能存在复杂属性的情况。
1、属性类型不同,属性尺度不同的情况
在样本数据集中的对象各个属性可能并不是取值范围相同、类型相同的数值,比如上面的酒的属性可能包含档次(0-100),酒的年限(0-50),酒的容量(三种容量375.0ml、750.0ml、1500.0ml),
2、存在无效属性的情况
在我们获取的样本数据中还有可能包含无用的属性,比如酒箱的号码(1-2000之间的整数)。在计算样本距离时,取值范围大的属性的变化会严重影响取值范围小的属性的变化,以至于结果会忽略取值范围小的属性。而且无用属性的变化也会增加数据之间的距离。比如两瓶非常相似的酒,两瓶酒之间的距离本应为接近0,但是由于两瓶酒的酒箱号码分别为1和100,则在计算两瓶酒之间的距离时就会接近100。而这属于一个很大的距离。
针对上面的情况,要求我们能对样本数据的属性进行缩放到合适的范围,并要能删除无效属性。
我们先来模拟构造一批数据集,这批数据集本应是网络爬虫获取或市场统计获取。为了简单我们使用代码模拟获取这么一批数据。它包含了酒的档次、年限、容量、酒箱的号码以及酒的价格。
构造新的数据集
# 构建新数据集,模拟不同类型、尺度的属性
def wineset2():
rows=[]
for i in range(300):
rating=random()*50+50 #酒的档次
age=random()*50 #酒的年限
aisle=float(randint(1,2000)) #酒箱的号码(无关属性)
bottlesize=[375.0,750.0,1500.0][randint(0,2)] #酒的容量
price=wineprice(rating,age) #酒的价格
price*=(bottlesize/750)
price*=(random()*0.2+0.9)
rows.append({'input':(rating,age,aisle,bottlesize),'result':price})
return rows
实现按比例对属性的取值进行缩放的功能
# 按比例对属性进行缩放,scale为各属性的值的缩放比例。
def rescale(data,scale):
scaleddata=[]
for row in data:
scaled=[scale[i]*row['input'][i] for i in range(len(scale))]
scaleddata.append({'input':scaled,'result':row['result']})
return scaleddata
那就还剩一个问题,究竟各个属性缩放多少才合适呢。这是一个优化问题,我们可以通过优化技术寻找最优化解。
优化,就是寻找函数极值点的问题。寻找在一定自变量范围内使函数值最小或最大的自变量的取值。函数一般设定为成本函数,自变量为函数的参数,也就是算法的参数。参数和函数值可以是离散或连续值。
比如属性比例缩放,每个属性的缩放比例就是自变量,成本函数就是预测结果的错误率,寻找方法就是简单的暴力尝试每一个自变量的值,寻找使成本函数最小的自变量。
而需要优化的成本函数,就是通过缩放以后进行预测的结果与真实结果之间的误差值。误差值越小越好。误差值的计算同前面交叉验证时使用的相同crossvalidate函数
下面构建成本函数
# 生成成本函数。
def createcostfunction(algf,data):
def costf(scale):
sdata=rescale(data,scale)
return crossvalidate(algf,sdata,trials=10)
return costf
weightdomain=[(0,10)]*4 #将缩放比例这个题解的取值范围设置为0-10,可以自己设定,用于优化算法
我们通过优化技术查找到了所有属性合适的缩放比例,我们也就可以对样本数据集的属性进行缩放后建立k最近邻模型。当有待测数据到来时,我们也对待测对象的属性进行缩放后,与模型中的对象(比例缩放后的样本对象)进行距离计算,获取k最近邻。
测试代码
if __name__=='__main__': #只有在执行当前模块时才会运行此函数
# ======================缩放比例优化======================
data = wineset2() # 创建第2批数据集
print(data)
import optimization
costf=createcostfunction(knnestimate,data) #创建成本函数
result = optimization.annealingoptimize(weightdomain,costf,step=2) #使用退火算法寻找最优解
print(result)
上面我们又解决了一种带有不同类型、尺度的属性,无用属性等复杂属性的情况,这是数据来源的问题,下面我们将面临最后一个问题,是输出结果的问题。
预测结果的不对称分布
我们上面建立的酒的价格模型。这是正规市场中的酒。但是如何这酒是逃税酒,那么逃税酒的价格仍然和酒的等级、年份、容量有相似的关系,不过比正规酒要便宜40%。而我们买一瓶酒时并不知道这是正规酒就还是逃税酒。因此当我们预测价格时我们更情愿说如果这酒是正规酒,则价格大约是m1元,如果是逃税酒,这价格是m2元。
当然还有另外一种结果输出的形式,就是n1%的概率是m1元,n2%的概率是m2元,这种概率输出方式。
在数据挖掘中,对于样本数据集包含多种分布情况时,待测对象的预测结果我们也希望不唯一的表示。我们可以使用概率结果进行表示,输出每种结果的值和出现的概率。
我们模拟一批数据,样本数据中价格存在正规酒和逃税酒这两种形式的分布。
构造数据集
def wineset3():
rows=wineset1()
for row in rows:
if random()<0.5:
# 一半的可能是逃税酒
row['result']*=0.6
return rows
如果以结果概率的形式存在,首先我们要能够计算概率值
# 计算概率。data样本数据集,vec1预测数据,low,high结果范围,weightf为根据距离进行权值分配的函数
def probguess(data,vec1,low,high,k=5,weightf=gaussian):
dlist=getdistances(data,vec1) #获取距离列表
nweight=0.0
tweight=0.0
for i in range(k):
dist=dlist[i][0] #距离
idx=dlist[i][1] #酒箱的号码
weight=weightf(dist) #权值
v=data[idx]['result'] #真实结果
# 当前数据点位于指定范围内么?
if v>=low and v<=high:
nweight+=weight #指定范围的权值之和
tweight+=weight #总的权值之和
if tweight==0: return 0
# 概率等于位于指定范围内的权重值除以所有权重值
return nweight/tweight
对于多种输出、以概率和值的形式表示的结果,我们可以使用累积概率分布图或概率密度图的形式表现。
绘制累积概率分布图
from pylab import *
# 绘制累积概率分布图。data样本数据集,vec1预测数据,high取值最高点,k近邻范围,weightf权值分配
def cumulativegraph(data,vec1,high,k=5,weightf=gaussian):
t1=arange(0.0,high,0.1)
cprob=array([probguess(data,vec1,0,v,k,weightf) for v in t1]) #预测产生的不同结果的概率
plot(t1,cprob)
show()
绘制概率密度图
# 绘制概率密度图
def probabilitygraph(data,vec1,high,k=5,weightf=gaussian,ss=5.0):
# 建立一个代表价格的值域范围
t1=arange(0.0,high,0.1)
# 得到整个值域范围内的所有概率
probs=[probguess(data,vec1,v,v+0.1,k,weightf) for v in t1]
# 通过加上近邻概率的高斯计算结果,对概率值做平滑处理
smoothed=[]
for i in range(len(probs)):
sv=0.0
for j in range(0,len(probs)):
dist=abs(i-j)*0.1
weight=gaussian(dist,sigma=ss)
sv+=weight*probs[j]
smoothed.append(sv)
smoothed=array(smoothed)
plot(t1,smoothed)
show()
测试代码
if __name__=='__main__': #只有在执行当前模块时才会运行此函数
data = wineset3() # 创建第3批数据集
print(data)
cumulativegraph(data,(1,1),120) #绘制累积概率密度
probabilitygraph(data,(1,1),6) #绘制概率密度图
到此k最近邻的所有内容讲解完毕。
k近邻算法中k的选取以及特征归一化的重要性
如果我们选取较小的k值,那么就会意味着我们的整体模型会变得复杂,容易发生过拟合。因为k太小会导致过拟合,很容易将一些噪声(如上图离五边形很近的黑色圆点)学习到模型中,而忽略了数据真实的分布!
如果k值太大就会造成模型过于简单,完全忽略训练数据实例中的大量有用信息,是不可取的。
那么我们一般怎么选取呢?李航博士书上讲到,我们一般选取一个较小的数值,通常采取 交叉验证法来选取最优的k值。(也就是说,选取k值很重要的关键是实验调参,类似于神经网络选取多少层这种,通过调整超参数来得到一个较好的结果)
kd树
下面我们来介绍一下当数据量很大情况下,对于快速寻找k最近邻的算法——kd树。
kd树(K-dimension tree)是一种对k维空间中的实例点进行存储以便对其进行快速检索的树形数据结构。kd树是是一种二叉树,表示对k维空间的一个划分,构造kd树相当于(相当于只是为了方便的理解,在内存中都是以二叉树的形式存在)不断地用垂直于坐标轴的超平面将K维空间切分,构成一系列的K维超矩形区域。kd树的每个结点对应于一个k维超矩形区域。利用kd树可以省去对大部分数据点的搜索,从而减少搜索的计算量。
对一个三维空间,kd树按照一定的划分规则把这个三维空间划分了多个空间,如下图所示
类比“二分查找”:给出一组数据:[9 1 4 7 2 5 0 3 8],要查找8。如果挨个查找(线性扫描),那么将会把数据集都遍历一遍。而如果排一下序那数据集就变成了:[0 1 2 3 4 5 6 7 8 9],按前一种方式我们进行了很多没有必要的查找,现在如果我们以5为分界点,那么数据集就被划分为了左右两个“簇” [0 1 2 3 4]和[6 7 8 9]。因此,根本就没有必要进入第一个簇,可以直接进入第二个簇进行查找。把二分查找中的数据点换成k维数据点,这样的划分就变成了用超平面对k维空间的划分。空间划分就是对数据点进行分类,“挨得近”的数据点就在一个空间里面。
构造kd树的方法如下:构造根结点,使根结点对应于K维空间中包含所有实例点的超矩形区域;通过下面的递归的方法,不断地对k维空间进行切分,生成子结点。在超矩形区域上选择一个坐标轴和在此坐标轴上的一个切分点,确定一个超平面,这个超平面通过选定的切分点并垂直于选定的坐标轴,将当前超矩形区域切分为左右两个子区域(子结点);这时,实例被分到两个子区域,这个过程直到子区域内没有实例时终止(终止时的结点为叶结点)。在此过程中,将实例保存在相应的结点上。通常,循环的择坐标轴对空间切分,选择训练实例点在坐标轴上的中位数为切分点,这样得到的kd树是平衡的(平衡二叉树:它是一棵空树,或其左子树和右子树的深度之差的绝对值不超过1,且它的左子树和右子树都是平衡二叉树)。
KD树中每个节点是一个向量,和二叉树按照数的大小划分不同的是,KD树每层需要选定向量中的某一维,然后根据这一维按左小右大的方式划分数据。在构建KD树时,关键需要解决2个问题:(1)选择向量的哪一维进行划分;(2)如何划分数据。第一个问题简单的解决方法可以是选择随机选择某一维或按顺序选择,但是更好的方法应该是在数据比较分散的那一维进行划分(分散的程度可以根据方差来衡量)。好的划分方法可以使构建的树比较平衡,可以每次选择中位数来进行划分,这样问题2也得到了解决。
构造平衡kd树算法:
输入:k维空间数据集,其中;
输出:kd树
(1)开始:构造根结点,根结点对应于包含的k维空间的超矩形区域。选择为坐标轴,以中所有实例的坐标的中位数为切分点,将根结点对应的超矩形区域切分为两个子区域。切分由通过切分点并与坐标轴垂直的超平面实现。由根结点生成深度为1的左、右子结点:左子结点对应坐标小于切分点的子区域,右子结点对应于坐标大于切分点的子区域。将落在切分超平面上的实例点保存在根结点。
(2)重复。对深度为的结点,选择为切分的坐标轴,,以该结点的区域中所有实例的坐标的中位数为切分点,将该结点对应的超矩形区域切分为两个子区域。切分由通过切分点并与坐标轴垂直的超平面实现。由该结点生成深度为的左、右子结点:左子结点对应坐标小于切分点的子区域,右子结点对应坐标大于切分点的子区域。将落在切分超平面上的实例点保存在该结点。
下面用一个简单的2维平面上的例子来进行说明。
例. 给定一个二维空间数据集:,构造一个平衡kd树。
解:根结点对应包含数据集的矩形,选择轴,6个数据点的坐标中位数是6,这里选最接近的(7,2)点,以平面=7将空间分为左、右两个子矩形(子结点);接着左矩形以=4分为两个子矩形(左矩形中{(2,3),(5,4),(4,7)}点的坐标中位数正好为4),右矩形以=6分为两个子矩形,如此递归,最后得到如下图所示的特征空间划分和kd树。
下面的代码用递归的方式构建了kd树,通过前序遍历可以进行验证。这里只是简单地采用坐标轮换方式选取分割轴,为了更高效的分割空间,也可以计算所有数据点在每个维度上的数值的方差,然后选择方差最大的维度作为当前节点的划分维度。方差越大,说明这个维度上的数据越不集中(稀疏、分散),也就说明了它们就越不可能属于同一个空间,因此需要在这个维度上进行划分。
# kd-tree每个结点中主要包含的数据结构如下
class KdNode(object):
def __init__(self, dom_elt, split, left, right):
self.dom_elt = dom_elt # k维向量节点(k维空间中的一个样本点)
self.split = split # 整数(进行分割维度的序号)
self.left = left # 该结点分割超平面左子空间构成的kd-tree
self.right = right # 该结点分割超平面右子空间构成的kd-tree
# 构建kd树
class KdTree(object):
def __init__(self, data):
k = len(data[0]) # 数据维度
def CreateNode(split, data_set): # 按第split维划分数据集exset创建KdNode
if not data_set: # 数据集为空
return None
# key参数的值为一个函数,此函数只有一个参数且返回一个值用来进行比较
# operator模块提供的itemgetter函数用于获取对象的哪些维的数据,参数为需要获取的数据在对象中的序号
# data_set.sort(key=itemgetter(split)) # 按要进行分割的那一维数据排序
data_set.sort(key=lambda x: x[split])
split_pos = len(data_set) // 2 # //为Python中的整数除法
median = data_set[split_pos] # 中位数分割点
split_next = (split + 1) % k # cycle coordinates
# 递归的创建kd树
return KdNode(median, split,
CreateNode(split_next, data_set[:split_pos]), # 创建左子树
CreateNode(split_next, data_set[split_pos + 1:])) # 创建右子树
self.root = CreateNode(0, data) # 从第0维分量开始构建kd树,返回根节点
# KDTree的前序遍历
def preorder(root):
print(root.dom_elt)
if root.left: # 节点不为空
preorder(root.left)
if root.right:
preorder(root.right)
if __name__ == "__main__":
data = [[2, 3], [5, 4], [9, 6], [4, 7], [8, 1], [7, 2]]
kd = KdTree(data)
preorder(kd.root)
搜索kd树
利用kd树可以省去对大部分数据点的搜索,从而减少搜索的计算量。下面以搜索最近邻点为例加以叙述:给定一个目标点,搜索其最近邻,首先找到包含目标点的叶节点;然后从该叶结点出发,依次回退到父结点;不断查找与目标点最近邻的结点,当确定不可能存在更近的结点时终止。这样搜索就被限制在空间的局部区域上,效率大为提高。
用kd树的最近邻搜索:
输入:已构造的kd树;目标点x;
输出:x的最近邻。
(1) 在kd树中找出包含目标点x的叶结点:从根结点出发,递归的向下访问kd树。若目标点当前维的坐标值小于切分点的坐标值,则移动到左子结点,否则移动到右子结点。直到子结点为叶结点为止;
(2) 以此叶结点为“当前最近点”;
(3) 递归的向上回退,在每个结点进行以下操作:
- (a) 如果当前结点保存的实例点比“当前最近点”距目标点更近,则以该实例点为“当前最近点”,之前的最近实例点为“之前最近点”进行b步骤。否则继续向上回退。
- (b)经过步骤a,当前节点已经是“当前最近点”了。既然我们是向上回退(向根节点回退),那么 “之前最近点”一定是在当前结点一个子结点对应的区域。检查当前节点的另一个子结点对应的区域是否有更近的点。具体的,检查另一个子结点对应的区域是否与以目标点为球心、以目标点与“当前最近点”间的距离为半径的超球体相交。如果相交,可能在另一个子结点对应的区域内存在距离目标更近的点,移动到另一个子结点。接着,递归的进行最近邻搜索。如果不相交,向上回退。
(4) 当回退到根结点时,搜索结束。最后的“当前最近点”即为x的最近邻点。
以先前构建好的kd树为例,查找目标点(3,4.5)的最近邻点。同样先进行二叉查找,先从(7,2)查找到(5,4)节点,在进行查找时是由y = 4为分割超平面的,由于查找点为y值为4.5,因此进入右子空间查找到(4,7),形成搜索路径:(7,2)→(5,4)→(4,7),取(4,7)为当前最近邻点。以目标查找点为圆心,目标查找点到当前最近点的距离2.69为半径确定一个红色的圆。然后回溯到(5,4),计算其与查找点之间的距离为2.06,则该结点比当前最近点距目标点更近,以(5,4)为当前最近点。用同样的方法再次确定一个绿色的圆,可见该圆和y = 4超平面相交,所以需要进入(5,4)结点的另一个子空间进行查找。(2,3)结点与目标点距离为1.8,比当前最近点要更近,所以最近邻点更新为(2,3),最近距离更新为1.8,同样可以确定一个蓝色的圆。接着根据规则回退到根结点(7,2),蓝色圆与x=7的超平面不相交,因此不用进入(7,2)的右子空间进行查找。至此,搜索路径回溯完,返回最近邻点(2,3),最近距离1.8。
如果实例点是随机分布的,kd树搜索的平均计算复杂度是O(logN),这里N是训练实例数。kd树更适用于训练实例数远大于空间维数时的k近邻搜索。当空间维数接近训练实例数时,它的效率会迅速下降,几乎接近线性扫描。
下面的代码对构建好的kd树进行搜索,寻找与目标点最近的样本点:
# kd-tree每个结点中主要包含的数据结构如下
class KdNode(object):
def __init__(self, dom_elt, split, left, right):
self.dom_elt = dom_elt # k维向量节点(k维空间中的一个样本点)
self.split = split # 整数(进行分割维度的序号)
self.left = left # 该结点分割超平面左子空间构成的kd-tree
self.right = right # 该结点分割超平面右子空间构成的kd-tree
# 构建kd树
class KdTree(object):
def __init__(self, data):
k = len(data[0]) # 数据维度
def CreateNode(split, data_set): # 按第split维划分数据集exset创建KdNode
if not data_set: # 数据集为空
return None
# key参数的值为一个函数,此函数只有一个参数且返回一个值用来进行比较
# operator模块提供的itemgetter函数用于获取对象的哪些维的数据,参数为需要获取的数据在对象中的序号
# data_set.sort(key=itemgetter(split)) # 按要进行分割的那一维数据排序
data_set.sort(key=lambda x: x[split])
split_pos = len(data_set) // 2 # //为Python中的整数除法
median = data_set[split_pos] # 中位数分割点
split_next = (split + 1) % k # cycle coordinates
# 递归的创建kd树
return KdNode(median, split,
CreateNode(split_next, data_set[:split_pos]), # 创建左子树
CreateNode(split_next, data_set[split_pos + 1:])) # 创建右子树
self.root = CreateNode(0, data) # 从第0维分量开始构建kd树,返回根节点
# KDTree的前序遍历
def preorder(root):
print(root.dom_elt)
if root.left: # 节点不为空
preorder(root.left)
if root.right:
preorder(root.right)
from math import sqrt
from collections import namedtuple
# 定义一个namedtuple,分别存放最近坐标点、最近距离和访问过的节点数
result = namedtuple("Result_tuple", "nearest_point nearest_dist nodes_visited")
def find_nearest(tree, point):
k = len(point) # 数据维度
def travel(kd_node, target, max_dist):
if kd_node is None:
return result([0] * k, float("inf"), 0) # python中用float("inf")和float("-inf")表示正负无穷
nodes_visited = 1
s = kd_node.split # 进行分割的维度
pivot = kd_node.dom_elt # 进行分割的“轴”
if target[s] <= pivot[s]: # 如果目标点第s维小于分割轴的对应值(目标离左子树更近)
nearer_node = kd_node.left # 下一个访问节点为左子树根节点
further_node = kd_node.right # 同时记录下右子树
else: # 目标离右子树更近
nearer_node = kd_node.right # 下一个访问节点为右子树根节点
further_node = kd_node.left
temp1 = travel(nearer_node, target, max_dist) # 进行遍历找到包含目标点的区域
nearest = temp1.nearest_point # 以此叶结点作为“当前最近点”
dist = temp1.nearest_dist # 更新最近距离
nodes_visited += temp1.nodes_visited
if dist < max_dist:
max_dist = dist # 最近点将在以目标点为球心,max_dist为半径的超球体内
temp_dist = abs(pivot[s] - target[s]) # 第s维上目标点与分割超平面的距离
if max_dist < temp_dist: # 判断超球体是否与超平面相交
return result(nearest, dist, nodes_visited) # 不相交则可以直接返回,不用继续判断
# ----------------------------------------------------------------------
# 计算目标点与分割点的欧氏距离
temp_dist = sqrt(sum((p1 - p2) ** 2 for p1, p2 in zip(pivot, target)))
if temp_dist < dist: # 如果“更近”
nearest = pivot # 更新最近点
dist = temp_dist # 更新最近距离
max_dist = dist # 更新超球体半径
# 检查另一个子结点对应的区域是否有更近的点
temp2 = travel(further_node, target, max_dist)
nodes_visited += temp2.nodes_visited
if temp2.nearest_dist < dist: # 如果另一个子结点内存在更近距离
nearest = temp2.nearest_point # 更新最近点
dist = temp2.nearest_dist # 更新最近距离
return result(nearest, dist, nodes_visited)
return travel(tree.root, point, float("inf")) # 从根节点开始递归
from time import clock
from random import random
# 产生一个k维随机向量,每维分量值在0~1之间
def random_point(k):
return [random() for _ in range(k)]
# 产生n个k维随机向量
def random_points(k, n):
return [random_point(k) for _ in range(n)]
if __name__ == "__main__":
data = [[2, 3], [5, 4], [9, 6], [4, 7], [8, 1], [7, 2]]
kd = KdTree(data)
preorder(kd.root)
ret = find_nearest(kd, [3, 4.5])
print(ret)
N = 400000
t0 = clock()
kd2 = KdTree(random_points(3, N)) # 构建包含四十万个3维空间样本点的kd树
ret2 = find_nearest(kd2, [0.1, 0.5, 0.8]) # 四十万个样本点中寻找离目标最近的点
t1 = clock()
print("time: ", t1 - t0, "s")
print(ret2)