抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >


基于 K 均值聚类的网络流量异常检测 (pyspark)

异常检测常用于检测欺诈、网络攻击、服务器及传感设备故障。在这些应用中,我们要能够找出以前从未见过的新型异常,如新欺诈方式、新入侵方法或新服务器故障模式。

数据集

KDD Cup 1999 数据集

下载 KDD Cup 1999 数据集

数据集为 CSV 格式,每个连接占一行,包含 38 个特征。

单行示例:

0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.

最后的字段表示类别标号。大多数标号为 normal., 但也有一些样本代表各种网络攻击。

算法

K 均值聚类算法

聚类算法是指将一堆没有标签的数据自动划分成几类的方法,这个方法要保证同一类的数据有相似的特征。

K均值聚类算法

算法过程

K-Means 算法的特点是类别的个数是人为给定的。是一个迭代求解的聚类算法,属于划分型的聚类方法,即首先创建 K 个划分,然后迭代地将样本从一个划分转移到另一个划分来改善最终聚类的效果。其过程大致如下。

(1)根据给定的 K 值选取 K 个样本点作为初始划分中心。

(2)计算所有样本点到每一个划分中心的距离,并将所有样本点划分到距离最近的划分中心。

(3)计算每个划分中样本点的平均值,并将其作为新的中心。

(4)循环进行步骤(2)和步骤(3)直至最大迭代次数,或划分中心的变化小于某一预定义阈值。

伪代码

function K-Means(输入数据,中心点个数K) 
获取输入数据的维度Dim和个数N
随机生成K个Dim维的点
while(算法未收敛)
对N个点:计算每个点属于哪一类。
对于K个中心点:
1,找出所有属于自己这一类的所有数据点
2,把自己的坐标修改为这些数据点的中心点坐标
end
输出结果
end

K-Means 的一个重要的假设是:数据之间的相似度可以使用欧氏距离度量,如果不能使用欧氏距离度量,要先把数据转换到能用欧氏距离度量,这一点很重要。可以使用欧氏距离度量的意思就是欧氏距离越小,两个数据相似度越高。

假设簇划分为(,,…), 则优化目标是最小化平方误差 SSE:

其中是簇的均值向量,也称为质心,表达式为:

这是一个 NP 难题,因此只能采用启发式迭代方法。

K-Means 采用的启发式方式很简单,用下面一组图就可以形象的描述:

启发式迭代

图 a 表达了初始的数据集,假设 k=2。在图 b 中,随机选择了两个 k 类所对应的类别质心,即图中的红色质心和蓝色质心,然后分别求样本中所有点到这两个质心的距离,并标记每个样本的类别为和该样本距离最小的质心的类别,如图 c 所示,经过计算样本和红色质心和蓝色质心的距离,得到了所有样本点的第一轮迭代后的类别。此时对当前标记为红色和蓝色的点分别求其新的质心,如图 d 所示,新的红色质心和蓝色质心的位置已经发生了变动。图 e 和图 f 重复了在图 c 和图 d 的过程,即将所有点的类别标记为距离最近的质心的类别并求新的质心。最终得到的两个类别如图 f。

K-means 聚类最优 k 值的选取(手肘法)

手肘法的核心指标是 SSE (sum of the squared errors,误差平方和), 公式见上文。

K-means聚类最优k值的选取(手肘法)

核心思想是:随着聚类数 k 的增大,样本划分会更加精细,每个簇的聚合程度会逐渐提高,那么误差平方和 SSE 自然会逐渐变小。并且,当 k 小于真实聚类数时,由于 k 的增大会大幅增加每个簇的聚合程度,故 SSE 的下降幅度会很大,而当 k 到达真实聚类数时,再增加 k 所得到的聚合程度回报会迅速变小,所以 SSE 的下降幅度会骤减,然后随着 k 值的继续增大而趋于平缓,也就是说 SSE 和 k 的关系图是一个手肘的形状,而这个肘部对应的 k 值就是数据的真实聚类数。

K-means聚类最优k值的选取(手肘法)

特征的规范化

去除数据的单位限制,将其转化为无量纲的纯数值,便于不同单位或量级的指标能够进行计算和比较。

1、数据的中心化

所谓数据的中心化是指数据集中的各项数据减去数据集的均值。

2、数据的标准化

所谓数据的标准化是指中心化之后的数据在除以数据集的标准差,即数据集中的各项数据减去数据集的均值再除以数据集的标准差。

特征的规范化可以通过将每个特征转换为标准得分来完成。这就是说用对每个特征值求平均,用每个特征值减去平均值,然后除以特征值的标准差,如下标准分计算公式所示:

类别型变量

类别型特征可以用 one-hot 编码转换为几个二元特征,这几个二元特征可以看成数值型维度。

使用 N 位状态寄存器来对 N 个状态进行编码,每个状态都由他独立的寄存器位,并且在任意时候,其中只有一位有效。

解决了分类器不好处理属性数据的问题;在一定程度上也起到了扩充特征的作用。

聚类结果评价指标

Entropy(熵)

好的聚类应该和人工标签保持一致,大部分情况下,标签相同的数据点应聚在一起,而标签不同的数据点不应该在一起,并且簇内的数据点标签相同。熵值会变得很小。

对于一个聚类 i,首先计算聚类 i 中的成员(member)属于类(class)j 的概率

其中是在聚类 i 中所有成员的个数,是聚类 i 中的成员属于类 j 的个数。

每个聚类的 entropy 可以表示为

其中 L 是类(class)的个数。

整个聚类划分的 entropy 为

其中 K 是聚类(cluster)的数目,m 是整个聚类划分所涉及到的成员个数。

Accuracy (准确率)

比较每一条聚类结果是否和真的结果一致.

其中 N 表示文档总数,表示正确聚类的文档数.

实验过程

准备数据,上传至 HDFS

HDFS 创建文件夹

HDFS创建文件夹

hadoop 关闭安全模式

hadoop关闭安全模式

上传 KDD Cup 1999 数据集

上传KDD Cup 1999 数据集

上传KDD Cup 1999 数据集

上传KDD Cup 1999 数据集

查看上传成功

查看上传成功

通过 kddcup.names 加载列名称

names=[]
with open("/export/work/F/3/data/kddcup.names") as f:
line = f.readline()
line = f.readline()
while line:
names.append(line.split(":")[0])
line = f.readline()

names.append("label")

输出列名称:

['duration', 'protocol_type', 'service', 'flag', 'src_bytes', 'dst_bytes', 'land', 'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in', 'num_compromised', 'root_shell', 'su_attempted', 'num_root', 'num_file_creations', 'num_shells', 'num_access_files', 'num_outbound_cmds', 'is_host_login', 'is_guest_login', 'count', 'srv_count', 'serror_rate', 'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate', 'same_srv_rate', 'diff_srv_rate', 'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count', 'dst_host_same_srv_rate', 'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate', 'dst_host_srv_diff_host_rate', 'dst_host_serror_rate', 'dst_host_srv_serror_rate', 'dst_host_rerror_rate', 'dst_host_srv_rerror_rate', 'label']

构建 Dataframe

names=['duration', 'protocol_type', 'service', 'flag', 'src_bytes', 'dst_bytes', 'land', 'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in', 'num_compromised', 'root_shell', 'su_attempted', 'num_root', 'num_file_creations', 'num_shells', 'num_access_files', 'num_outbound_cmds', 'is_host_login', 'is_guest_login', 'count', 'srv_count', 'serror_rate', 'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate', 'same_srv_rate', 'diff_srv_rate', 'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count', 'dst_host_same_srv_rate', 'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate', 'dst_host_srv_diff_host_rate', 'dst_host_serror_rate', 'dst_host_srv_serror_rate', 'dst_host_rerror_rate', 'dst_host_srv_rerror_rate', 'label']

from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
# 构建Dataframe
conf = SparkConf().setAppName("applicaiton").set("spark.executor.heartbeatInterval","200000").set("spark.network.timeout","300000")
sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)
testRDD = sc.textFile("/3/corrected")
fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True) if fieldName in ["protocol_type", "service", "flag","label"] else StructField(fieldName, FloatType(), nullable = True) , names))
schema = StructType(fields)
rowRDD = testRDD.map(lambda line : line.split(",")).map(lambda attr : Row(float(attr[0]),attr[1],attr[2],attr[3],float(attr[4]),float(attr[5]),float(attr[6]),float(attr[7]),float(attr[8]),float(attr[9]),float(attr[10]),float(attr[11]),float(attr[12]),float(attr[13]),float(attr[14]),float(attr[15]),float(attr[16]),float(attr[17]),float(attr[18]),float(attr[19]),float(attr[20]),float(attr[21]),float(attr[22]),float(attr[23]),float(attr[24]),float(attr[25]),float(attr[26]),float(attr[27]),float(attr[28]),float(attr[29]),float(attr[30]),float(attr[31]),float(attr[32]),float(attr[33]),float(attr[34]),float(attr[35]),float(attr[36]),float(attr[37]),float(attr[38]),float(attr[39]),float(attr[40]),attr[41]))
testDF = spark.createDataFrame(rowRDD, schema)


dataRDD = sc.textFile("/3/kddcup.data")
fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True) if fieldName in ["protocol_type", "service", "flag","label"] else StructField(fieldName, FloatType(), nullable = True) , names))
schema = StructType(fields)
rowRDD = dataRDD.map(lambda line : line.split(",")).map(lambda attr : Row(float(attr[0]),attr[1],attr[2],attr[3],float(attr[4]),float(attr[5]),float(attr[6]),float(attr[7]),float(attr[8]),float(attr[9]),float(attr[10]),float(attr[11]),float(attr[12]),float(attr[13]),float(attr[14]),float(attr[15]),float(attr[16]),float(attr[17]),float(attr[18]),float(attr[19]),float(attr[20]),float(attr[21]),float(attr[22]),float(attr[23]),float(attr[24]),float(attr[25]),float(attr[26]),float(attr[27]),float(attr[28]),float(attr[29]),float(attr[30]),float(attr[31]),float(attr[32]),float(attr[33]),float(attr[34]),float(attr[35]),float(attr[36]),float(attr[37]),float(attr[38]),float(attr[39]),float(attr[40]),attr[41]))
dataDF = spark.createDataFrame(rowRDD, schema)

数据集统计

统计数据集中各个类别标号以及每类样本有多少,并展示。

数据集的类别标号以及每类样本数

dataDF.groupBy("label").count().show(10000)

+----------------+-------+
| label| count|
+----------------+-------+
| warezmaster.| 20|
| smurf.|2807886|
| pod.| 264|
| imap.| 12|
| nmap.| 2316|
| guess_passwd.| 53|
| ipsweep.| 12481|
| portsweep.| 10413|
| satan.| 15892|
| land.| 21|
| loadmodule.| 9|
| ftp_write.| 8|
|buffer_overflow.| 30|
| rootkit.| 10|
| warezclient.| 1020|
| teardrop.| 979|
| perl.| 3|
| phf.| 4|
| multihop.| 7|
| neptune.|1072017|
| back.| 2203|
| spy.| 2|
| normal.| 972781|
+----------------+-------+

测试集的类别标号以及每类样本数

testDF.groupBy("label").count().show(10000) 

+----------------+------+
| label| count|
+----------------+------+
| snmpguess.| 2406|
| xlock.| 9|
| warezmaster.| 1602|
| processtable.| 759|
| smurf.|164091|
| pod.| 87|
| worm.| 2|
| snmpgetattack.| 7741|
| mscan.| 1053|
| nmap.| 84|
| imap.| 1|
| xterm.| 13|
| sqlattack.| 2|
| guess_passwd.| 4367|
| mailbomb.| 5000|
| xsnoop.| 4|
| ipsweep.| 306|
| portsweep.| 354|
| named.| 17|
| satan.| 1633|
| land.| 9|
| loadmodule.| 2|
| ftp_write.| 3|
| sendmail.| 17|
|buffer_overflow.| 22|
| httptunnel.| 158|
| apache2.| 794|
| saint.| 736|
| rootkit.| 13|
| teardrop.| 12|
| perl.| 2|
| phf.| 2|
| multihop.| 18|
| udpstorm.| 2|
| neptune.| 58001|
| back.| 1098|
| ps.| 16|
| normal.| 60593|
+----------------+------+

尝试聚类

from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.clustering import KMeans,KMeansModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import DataFrame
import random

用 VectorAssembler 创建一个特征向量,基于这些特征向量用一个 K 均值实现来创建一个模型,再用一个管道将它们拼接在一起。从得到的模型中,可以提取并检验簇群中心。

numericOnly = dataDF.drop("protocol_type", "service", "flag").cache()
assembler = VectorAssembler(inputCols=numericOnly.drop("label").columns, outputCol="featureVector")
kmeans = KMeans().setPredictionCol("cluster").setFeaturesCol("featureVector")
pipeline = Pipeline().setStages([assembler, kmeans])
pipelineModel = pipeline.fit(numericOnly)
kmeansModel = pipelineModel.stages[-1]
for i in kmeansModel.clusterCenters():print(i)

输出:

[4.83401949e+01 1.83462154e+03 8.26203195e+02 5.71611720e-06                    
6.48779303e-04 7.96173468e-06 1.24376586e-02 3.20510858e-05
1.43529049e-01 8.08830584e-03 6.81851124e-05 3.67464677e-05
1.29349608e-02 1.18874823e-03 7.43095237e-05 1.02114351e-03
0.00000000e+00 4.08294086e-07 8.35165553e-04 3.34973508e+02
2.95267146e+02 1.77970317e-01 1.78036989e-01 5.76648988e-02
5.77299094e-02 7.89884132e-01 2.11796105e-02 2.82608102e-02
2.32981078e+02 1.89214283e+02 7.53713390e-01 3.07109788e-02
6.05051931e-01 6.46410786e-03 1.78091184e-01 1.77885898e-01
5.79276115e-02 5.76592214e-02]
[1.09990000e+04 0.00000000e+00 1.30993741e+09 0.00000000e+00
0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
0.00000000e+00 0.00000000e+00 0.00000000e+00 1.00000000e+00
1.00000000e+00 0.00000000e+00 0.00000000e+00 1.00000000e+00
1.00000000e+00 1.00000000e+00 0.00000000e+00 0.00000000e+00
2.55000000e+02 1.00000000e+00 0.00000000e+00 6.49999976e-01
1.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
1.00000000e+00 1.00000000e+00]

对这些数字做一个直观的解释并不容易,但是每一个数字都表示模型生成的一个簇群中心,也称为质心(centroid)。就每个数值输入特征而言,这些值是质心的坐标。

k 的选择

如果每个数据点都紧靠最近的质心,则可认为聚类是较优的。这里的 “近” 采用欧氏距离定义。这是评估聚类质量的一种简单又常用的方法,使用与所有点之间距离的平均值,有时也可以使用平方距离的平均值。实际上,KMeansModel 提供了一个 computeCost 方法来计算平方距离的总和,并且很容易用来计算平方距离的平均值。

numericOnly = dataDF.drop("protocol_type", "service", "flag").cache()
# computeCost 方法来计算平方距离的总和,并且很容易用来计算平方距离的平均值。
def clusteringScore0(data,k):
assembler = VectorAssembler(inputCols=data.drop("label").columns, outputCol="featureVector")
kmeans = KMeans().setSeed(int(random.random()*10)).setK(k).setPredictionCol("cluster").setFeaturesCol("featureVector") #.setMaxIter(40).setTol(1.0e-5)
pipeline = Pipeline().setStages([assembler, kmeans])
pipelineModel = pipeline.fit(data)
kmeansModel = pipelineModel.stages[-1]
Srore=kmeansModel.computeCost(assembler.transform(data)) / data.count()
return Srore


for k in range(20, 100, 20):
print([k, clusteringScore0(numericOnly, k)])

输出结果:

[20, 148277112.23861197]                                                        
[40, 49940659.143821806]
[60, 18265796.561388526]
[80, 15313289.324247833]

输出结果显示得分随着 k 的增加而降低

输出结果显示得分随着 k 的增加而降低

增加迭代时间可以优化聚类结果。算法提供了 setTol () 来设置一个阈值,该阈值控制聚类过程中簇质心进行有效移动的最小值。降低该阈值能使质心继续移动更长的时间。使用 setMaxIter () 增加最大迭代次数也可以防止它过早停止,代价是可能需要更多的计算。

def clusteringScore1(data,k):
assembler = VectorAssembler(inputCols=data.drop("label").columns, outputCol="featureVector")
kmeans = KMeans().setSeed(int(random.random()*10)).setK(k).setPredictionCol("cluster").setFeaturesCol("featureVector").setMaxIter(40).setTol(1.0e-5)
pipeline = Pipeline().setStages([assembler, kmeans])
pipelineModel = pipeline.fit(data)
kmeansModel = pipelineModel.stages[-1]
Srore=kmeansModel.computeCost(assembler.transform(data)) / data.count()
return Srore


for k in range(20, 120, 20):
print([k, clusteringScore1(numericOnly, k)])

输出结果:

[20, 148277112.23861197]                                                        
[40, 11564470.915401561]
[60, 16343181.409780543]
[80, 22323383.079484705]
[100, 7572838.84573523]

在 k 过了 100 这个点之后得分下降还是很明显,所以 k 的拐点值应该大于 100。

糟糕的情况是,前面的结果中 k=80 时的距离居然比 k=60 的距离大。这不应该发生,因为 k 取更大值时,聚类的结果应该至少与 k 取一个较小值时的结果一样好。问题的原因在于,这种给定 k 值的 K 均值算法并不一定能得到最优聚类。K 均值的迭代过程是从一个随机点开始的,因此可能收敛于一个局部最小值,这个局部最小值可能还不错,但并不是全局最优的。

在 k 过了 100 这个点之后得分下降还是很明显,所以 k 的拐点值应该大于 100。

特征的规范化

特征的规范化可以通过将每个特征转换为标准得分来完成。这就是说用对每个特征值求平均,用每个特征值减去平均值,然后除以特征值的标准差。

由于减去平均值相当于把所有数据点沿相同方法移动相同距离,不影响点之间的欧氏距离,所以实际上减去平均值对聚类结果没有影响。

from pyspark.ml.feature import StandardScaler

def clusteringScore2(data,k):
assembler = VectorAssembler(inputCols=data.drop("label").columns, outputCol="featureVector")
scaler = StandardScaler(inputCol="featureVector", outputCol="scaledFeatureVector", withStd=True, withMean=False)
kmeans = KMeans().setSeed(int(random.random()*10)).setK(k).setPredictionCol("cluster").setFeaturesCol("scaledFeatureVector").setMaxIter(40).setTol(1.0e-5)
pipeline = Pipeline().setStages([assembler,scaler,kmeans])
pipelineModel = pipeline.fit(data)
kmeansModel = pipelineModel.stages[-1]
Srore=kmeansModel.computeCost(pipelineModel.transform(data)) / data.count()
return Srore

for k in range(60, 300, 30):
print([k, clusteringScore2(numericOnly, k)])

这有助于将维度放到更平等的基准上,而且在绝对的意义上,看点之间的绝对距离(也就是代价)要小得多。然而,k 值还没有出现一个明显的点,超过该点后,增加 k 值对于改善代价没有明显的作用:

[60, 1.1611941370693641]
[90, 0.7236962692254361]
[120, 0.5581874996147724]
[150, 0.3886887438817504]
[180, 0.3333248112741165]
[210, 0.27497680552057235]
[240, 0.2556693718314817]
[270, 0.22710138015576076]

特征的规范化

类别型变量

归一化使聚类结果有了可贵的进步,但聚类结果还有进一步提升的空间。比如说,几个特征由于不是数值型就被去掉了,于是这些特征里有价值的信息也被丢掉了。如果将这些信息以某种形式加回来,我们应该能得到更好的聚类。

类别型特征可以用 one-hot 编码转换为几个二元特征,这几个二元特征可以看成数值型维度。举个例子,数据集的第二列代表协议类型,取值可能是 tcp、udp 或 icmp。可以把它们看成 3 个特征,分别取名为 is_tcp、is_udp 和 is_icmp。这样,特征值 tcp 就变成 1,0,0,udp 对应 0,1,0,icmp 对应 0,0,1,以此类推。

from pyspark.ml.feature import OneHotEncoder, StringIndexer
# 类别型特征可以用 one-hot 编码转换为几个二元特征,这几个二元特征可以看成数值型维度。
def oneHotPipeline(inputCol):
indexer = StringIndexer(inputCol=inputCol,outputCol=inputCol + "_indexed").setHandleInvalid("keep")
encoder = OneHotEncoder(inputCol=inputCol + "_indexed",outputCol=inputCol + "_vec")
pipeline = Pipeline().setStages([indexer, encoder])
return (pipeline, inputCol + "_vec")

def clusteringScore3(data,k):
protoTypeEncoder, protoTypeVecCol = oneHotPipeline("protocol_type")
serviceEncoder, serviceVecCol = oneHotPipeline("service")
flagEncoder, flagVecCol = oneHotPipeline("flag")
assembleCols = (set(data.columns)-set(["label", "protocol_type", "service", "flag"])).union(set([protoTypeVecCol, serviceVecCol, flagVecCol]))
assembler = VectorAssembler(inputCols=list(assembleCols), outputCol="featureVector")
scaler = StandardScaler(inputCol="featureVector", outputCol="scaledFeatureVector", withStd=True, withMean=False)
kmeans = KMeans().setSeed(int(random.random()*10)).setK(k).setPredictionCol("cluster").setFeaturesCol("scaledFeatureVector").setMaxIter(40).setTol(1.0e-5)
pipeline = Pipeline().setStages([protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans])
pipelineModel = pipeline.fit(data)
kmeansModel = pipelineModel.stages[-1]
Srore=kmeansModel.computeCost(pipelineModel.transform(data)) / data.count()
return Srore

for k in range(60, 300, 30):
print([k, clusteringScore3(dataDF, k)])

输出:

[60, 38.01382297522162]
[90, 16.419330083446177]
[120, 3.2093992442174235]
[150, 2.1454678299121843]
[180, 1.6142523558430413]
[210, 1.3533093788147306]
[240, 1.0616778921723296]
[270, 0.9068134376554267]

类别型变量

局部放大:

局部放大

这些样本结果表明,从 k=180 这个点开始,评分值的变化趋于平缓。至少现在聚类使用了所有的输入特征。

利用标号的熵信息

标签告诉我们每个数据点的真实性质。好的聚类应该和人工标签保持一致,大部分情况 下,标签相同的数据点应聚在一起,而标签不同的数据点不应该在一起,并且簇内的数据 点标签相同。

良好的聚类结果簇中样本类别大体相同,因而熵值较低。我们可以对各个簇的熵加权平均,将结果作为聚类得分:

import numpy

def entropy(x):
ent = 0.0
x_value_list = [x[i] for i in range(x.shape[0])]
n=sum(x_value_list)
for x_value in x_value_list:
p = float(x_value) / n
ent -= p * numpy.log(p)
return ent


def fitPipeline4(data, k):
protoTypeEncoder, protoTypeVecCol = oneHotPipeline("protocol_type")
serviceEncoder, serviceVecCol = oneHotPipeline("service")
flagEncoder, flagVecCol = oneHotPipeline("flag")
assembleCols = (set(data.columns)-set(["label", "protocol_type", "service", "flag"])).union(set([protoTypeVecCol, serviceVecCol, flagVecCol]))
assembler = VectorAssembler(inputCols=list(assembleCols), outputCol="featureVector")
scaler = StandardScaler(inputCol="featureVector", outputCol="scaledFeatureVector", withStd=True, withMean=False)
kmeans = KMeans().setSeed(int(random.random()*10)).setK(k).setPredictionCol("cluster").setFeaturesCol("scaledFeatureVector").setMaxIter(40).setTol(1.0e-5)
pipeline = Pipeline().setStages([protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans])
pipelineModel = pipeline.fit(data)
return pipelineModel

# 良好的聚类结果簇中样本类别大体相同,因而熵值较低。对各个簇的熵加权平均,将结果作为聚类得分
def clusteringScore4(data, k):
pipelineModel = fitPipeline4(data, k)
clusterLabel = pipelineModel.transform(data).select("cluster", "label")
pd=clusterLabel.toPandas()
Sum=0
for name, group in pd.groupby("cluster"):
labelsize=group.count()[0]
a=numpy.array(group.groupby('label').count())
b=[]
for i in range(len(a)):
for j in range(len(a[i])):
b.append(a[i][j])
One=labelsize*entropy(numpy.array(b))
Sum=Sum+One
return Sum/data.count()

for k in range(60, 300, 30):
print([k, clusteringScore4(dataDF, k)])

输出结果:

[60, 0.038993775215004474]
[90, 0.02985377476611417]
[120, 0.02266161774992263]
[150, 0.020766076760220943]
[180, 0.017547365257679748]
[210, 0.012974819022593053]
[240, 0.007150061376894767]
[270, 0.00833981903044443]

利用标号的熵信息

跟以前一样,可以根据上面的分析结果大致看出 k 的合适取值。随着 k 的增加,熵不一定会减小,因此我们找到的可能是一个局部最小值。这里结果同样表明,k 取 240 可能比较合理,因为它的得分实际上低于 210 以及 270。

聚类实战

取 k=180

pipelineModel = fitPipeline4(dataDF, 180)
countByClusterLabel = pipelineModel.transform(dataDF).select("cluster", "label").groupBy("cluster", "label").count().orderBy("cluster", "label")
countByClusterLabel.show()

这里我们同样把每个簇的标号打印出来。聚类的结果中大部分属于同一簇,以及其他的少部分簇。

+-------+----------+-------+                                                    
|cluster| label| count|
+-------+----------+-------+
| 0| neptune.| 362876|
| 0|portsweep.| 1|
| 1| ipsweep.| 40|
| 1| nmap.| 6|
| 1| normal.| 3421|
| 1|portsweep.| 2|
| 1| satan.| 11|
| 1| smurf.|2807886|
| 2| neptune.| 1038|
| 2|portsweep.| 13|
| 2| satan.| 3|
| 3| ipsweep.| 13|
| 3| neptune.| 1046|
| 3| normal.| 38|
| 3|portsweep.| 11|
| 3| satan.| 3|
| 4| neptune.| 1034|
| 4| normal.| 4|
| 4|portsweep.| 7|
| 4| satan.| 4|
+-------+----------+-------+
only showing top 20 rows

现在可以建立一个真正的异常检测系统了。异常检测时需要度量新数据点到最近的簇质心 的距离。如果这个距离超过某个阈值,那么就表示这个新数据点是异常的。我们可以把阈 值设为已知数据中离中心最远的第 100 个点到中心的距离。

import os, tempfile
from pyspark.ml.linalg import Vector, Vectors

pipelineModel = fitPipeline4(dataDF, 180)
kmeansModel = pipelineModel.stages[-1]
kmeansModel.save("/model/3/kmeansModel")
pipelineModel.save("/model/3/pipelineModel")
centroids = kmeansModel.clusterCenters()
clustered = pipelineModel.transform(dataDF)
threshold=clustered.select("cluster", "scaledFeatureVector").rdd.map(lambda a:Vectors.squared_distance(centroids[a.cluster], a.scaledFeatureVector)).sortBy(lambda x: x).take(100)[-1]
print(threshold)

输出阈值:

3.232811853048799e-05

最后一步就是在新数据点出现的时候使用阈值进行评估。在 unlabled 数据上进行测试找出异常流量记录,并计算正确率。

clustered = pipelineModel.transform(testDF)
anomalies = clustered.rdd.filter(lambda a:Vectors.squared_distance(centroids[a.cluster], a.scaledFeatureVector) >= threshold).collect()
n=len(anomalies)
v=0
for i in anomalies:
if i["label"]!='normal.':
v=v+1

print("正确率:"+str(float(v)/n))

输出结果:

正确率:0.8051841158484633 

取 k=240

import os, tempfile
from pyspark.ml.linalg import Vector, Vectors
pipelineModel = fitPipeline4(dataDF, 240)
kmeansModel = pipelineModel.stages[-1]
kmeansModel.save("/model/3/test/kmeansModel")
pipelineModel.save("/model/3/test/pipelineModel")
centroids = kmeansModel.clusterCenters()
clustered = pipelineModel.transform(dataDF)
threshold=clustered.select("cluster", "scaledFeatureVector").rdd.map(lambda a:Vectors.squared_distance(centroids[a.cluster], a.scaledFeatureVector)).sortBy(lambda x: x).take(100)[-1]
print(threshold)
7.665805787851659e-06 
clustered = pipelineModel.transform(testDF)
anomalies = clustered.rdd.filter(lambda a:Vectors.squared_distance(centroids[a.cluster], a.scaledFeatureVector) >= threshold).collect()
n=len(anomalies)
v=0
for i in anomalies:
if i["label"]!='normal.':
v=v+1

print("正确率:"+str(float(v)/n))
正确率:0.8050769488123118

可以看出 K=180 是在 unlabled 数据上进行测试找出异常流量记录,计算正确率比 K=240 有较好的结果。

缩短计算的步长:

for k in range(150, 220, 10):
print([k, clusteringScore3(dataDF, k)])

得到评分结果:

[150, 2.292921780026778]
[160, 4.917845778754763]
[170, 2.0016455721528015]
[180, 1.7177635513092788]
[190, 1.5766159846344556]
[200, 1.5550587983858675]
[210, 1.2785418817225693]

评分结果

局部放大:

局部放大

取 k=190

import os, tempfile
from pyspark.ml.linalg import Vector, Vectors
pipelineModel = fitPipeline4(dataDF, 190)
kmeansModel = pipelineModel.stages[-1]
kmeansModel.save("/model/3/190/kmeansModel")
pipelineModel.save("/model/3/190/pipelineModel")
centroids = kmeansModel.clusterCenters()
clustered = pipelineModel.transform(dataDF)
threshold=clustered.select("cluster", "scaledFeatureVector").rdd.map(lambda a:Vectors.squared_distance(centroids[a.cluster], a.scaledFeatureVector)).sortBy(lambda x: x).take(100)[-1]
print(threshold)
3.247829147436459e-05 
clustered = pipelineModel.transform(testDF)
anomalies = clustered.rdd.filter(lambda a:Vectors.squared_distance(centroids[a.cluster], a.scaledFeatureVector) >= threshold).collect()
n=len(anomalies)
v=0
for i in anomalies:
if i["label"]!='normal.':
v=v+1

print("正确率:"+str(float(v)/n))
正确率:0.8051841158484633 

可以看出 K=190 是在 unlabled 数据上进行测试找出异常流量记录,计算正确率比 K=180 有较好的结果。

推荐阅读
基于Audioscrobbler数据集的音乐推荐(pyspark) 基于Audioscrobbler数据集的音乐推荐(pyspark) Spark RDD 编程 Spark RDD 编程 BERT 预训练模型及其应用案例 BERT 预训练模型及其应用案例 特征向量和特征值的几何本质 特征向量和特征值的几何本质 特征和分类器 特征和分类器 Python RailFenceCipher Python RailFenceCipher

留言区

Are You A Robot?