以下一段代码是基于K-means和SVM的URL检测,分为正常访问,XSS攻击,目录遍历攻击以及SQL注入攻击,是我根据双分类的URL检测改的,有一定的注释,但想理解重要部分每一行的意思,特别是class Baseframe下Train函数,kmeans函数和transform函数每一行的作用。
```python
import os
import time
import urllib
import pickle
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.cluster import KMeans
from sklearn.linear_model import LogisticRegression
from scipy.sparse import csr_matrix, lil_matrix, coo_matrix
# kmeans聚合的维度
k = 80
# ngram系数
n = 2
# 是否使用kmeans
use_k = True
# 新定义输出方法,便于调试
def printT(word):
a = time.strftime('%Y-%m-%d %H:%M:%S: ', time.localtime(time.time()))
print(a + str(word))
da = []
da0 = []
da1 = []
da2 = []
da3 = []
with open("data/URL.txt", 'r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
line = line.strip()
line = line.split('\t')
da.append(line)
for i in range(len(da)):
if da[i][1] == '0' :
da0.append(da[i][0][:])
if da[i][1] == '1' :
da1.append(da[i][0][:])
if da[i][1] == '2' :
da2.append(da[i][0][:])
if da[i][1] == '3' :
da3.append(da[i][0][:])
print(type(da0))
# 训练模型基类
class Baseframe(object):
def __init__(self):
pass
# 训练
def Train(self):
# 读取数据
printT("Loading safe Data:")
safe_list = da0
printT("Loading xss Data:")
xss_list = da1
printT("Loading dta Data:")
dta_list = da2
printT("Loading sql Data:")
sql_list = da3
# 整合数据
data = [safe_list, xss_list, dta_list, sql_list]
# print(data[0])
printT("Done, safe Numbers:" + str(len(data[0])) + " xss Numbers:" + str(len(data[1])) + " dta Numbers:" + str(len(data[2])) + " sql Numbers:" + str(len(data[3])))
# 打标记
safe_y = [0 for i in range(len(data[0]))]
xss_y = [1 for i in range(len(data[1]))]
dta_y = [2 for i in range(len(data[2]))]
sql_y = [3 for i in range(len(data[3]))]
y = safe_y + xss_y + dta_y + sql_y
# 数据向量化预处理
# 定义矢量化实例
self.vectorizer = TfidfVectorizer(tokenizer=self.get_ngrams)
# 把不规律的文本字符串列表转换成规律的([i,j],weight)的矩阵X[url条数,分词总类的总数,理论上少于256^n]
# i表示第几条url,j对应于term编号(或者说是词片编号)
X = self.vectorizer.fit_transform(data[0] + data[1] + data[2] + data[3])
print(X)
print(X.shape)
printT("Data Dimentions: " + str(X.shape))
# 通过kmeans降维
if use_k:
X = self.transform(self.kmeans(X))
printT("Kmeans Succeed")
printT("Devide Training Data")
# 使用train_test_split分割X,y列表(testsize表示测试占的比例)(random为种子)
# X_train矩阵的数目对应 y_train列表的数目(一一对应) -->> 用来训练模型
# X_test矩阵的数目对应(一一对应) -->> 用来测试模型的准确性
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
printT('Devide Succeed')
printT('Begin Training:')
printT(self.classifier)
self.classifier.fit(X_train, y_train)
# 使用测试值对模型的准确度进行计算
printT(self.getname() + 'Model Accuracy:{}'.format(self.classifier.score(X_test, y_test)))
# 保存训练结果
with open('model/' + self.getname() + '.pickle', 'wb') as output:
pickle.dump(self, output)
# 数据预处理裁剪字符格式
def get_ngrams(self, query):
tempQuery = str(query)
ngrams = []
for i in range(0, len(tempQuery)-n):
ngrams.append(tempQuery[i:i+n])
return ngrams
def kmeans(self, weight):
printT('Matrix before kmeans: ' + str(weight.shape))
weight = weight.tolil().transpose()
# 同一组数据 同一个k值的聚类结果是一样的。保存结果避免重复运算
try:
with open('model/k' + str(k) + '.label', 'r') as input:
printT('loading kmeans success')
a = input.read().split(' ')
self.label = [int(i) for i in a[:-1]]
except FileNotFoundError:
printT('Start Kmeans: ')
clf = KMeans(n_clusters=k, precompute_distances=False)
s = clf.fit(weight)
printT(s)
# 保存聚类的结果
self.label = clf.labels_
with open('model/k' + str(k) + '.label', 'w') as output:
for i in self.label:
output.write(str(i) + ' ')
printT('kmeans succeed,total: ' + str(k) + ' classes')
return weight
# 转换成聚类后结果输入转置后的矩阵返回转置好的矩阵
def transform(self, weight):
a = set()
# 用coo存可以存储重复位置的元素
row = []
col = []
data = []
# i代表旧矩阵行号label[i]代表新矩阵的行号
for i in range(len(self.label)):
if self.label[i] in a:
continue
a.add(self.label[i])
for j in range(i, len(self.label)):
if self.label[j] == self.label[i]:
temp = weight[j].rows[0]
col += temp
temp = [self.label[i] for t in range(len(temp))]
row += temp
data += weight[j].data[0]
newWeight = coo_matrix((data, (row, col)), shape=(k,weight.shape[1]))
return newWeight.transpose()
# 对新的请求列表进行预测
def predict(self, new_queries):
# try:
# with open('model/' + self.getname() + '.pickle', 'rb') as input:
# self = pickle.load(input)
# printT('loading ' + self.getname() + ' model success')
# except FileNotFoundError:
printT('start to train the ' + self.getname() + ' model')
self.Train()
printT('start predict:')
# 解码
new_queries = [urllib.parse.unquote(url) for url in new_queries]
X_predict = self.vectorizer.transform(new_queries)
if use_k:
printT('Transform Data')
X_predict = self.transform(X_predict.tolil().transpose())
printT('Transform Succeed, Start Predicting:')
res = self.classifier.predict(X_predict)
printT('Predict Succeed, Total:' + str(len(res)))
result = {}
result[0] = []
result[1] = []
result[2] = []
result[3] = []
# 两个列表并入一个元组列表
for q, r in zip(new_queries, res):
result[r].append(q)
printT('safe query: ' + str(len(result[0])))
printT('xss query: ' + str(len(result[1])))
printT('dta query: ' + str(len(result[2])))
printT('sql query: ' + str(len(result[3])))
return result
class SVM(Baseframe):
def getname(self):
if use_k:
return 'SVM__n'+str(n)+'_k'+str(k)
return 'SVM_n'+str(n)
def __init__(self):
# 定理逻辑回归方法模型
self.classifier = svm.SVC()
引用chatgpt部分指引作答:
这段代码实现了基于K-means和SVM的URL检测,用于将URL进行分类,分为正常访问、XSS攻击、目录遍历攻击以及SQL注入攻击。
第1行定义了kmeans聚合的维度k。第2行定义了ngram系数n。第4行定义了是否使用kmeans的标志use_k。第6-12行定义了一个新的输出方法printT,用于调试。第14-21行读取了URL.txt文件中的数据并分别存储到da0、da1、da2和da3列表中。
第23-89行定义了一个基类Baseframe,实现了训练、分类等方法。其中,Train()方法首先将数据读取出来,并将其整合到data列表中,同时为每个样本打上标记。接下来使用TfidfVectorizer实例将数据进行向量化预处理,并使用kmeans算法将维度降为k。最后使用train_test_split分割数据,将其分为训练集和测试集,并使用SVM进行模型训练。最后,计算模型的准确度并输出。
添加注释后的代码:
# kmeans聚合的维度
k = 80
# ngram系数
n = 2
# 是否使用kmeans
use_k = True
# 新定义输出方法,便于调试
def printT(word):
a = time.strftime('%Y-%m-%d %H:%M:%S: ', time.localtime(time.time()))
print(a + str(word))
da = []
da0 = []
da1 = []
da2 = []
da3 = []
# 打开文件并逐行读取
with open("data/URL.txt", 'r', encoding='utf-8') as f:
lines = f.readlines()
# 将每一行数据处理为数组格式并存入总数组中
for line in lines:
line = line.strip()
line = line.split('\t')
da.append(line)
# 根据数据类别进行分类
for i in range(len(da)):
if da[i][1] == '0' :
da0.append(da[i][0][:])
if da[i][1] == '1' :
da1.append(da[i][0][:])
if da[i][1] == '2' :
da2.append(da[i][0][:])
if da[i][1] == '3' :
da3.append(da[i][0][:])
# 输出 da0 的数据类型
print(type(da0))
# 训练模型基类
class Baseframe(object):
def __init__(self):
pass
# 训练
def Train(self):
# 读取数据
printT("Loading safe Data:")
# 分类为“安全”的 url 列表
safe_list = da0
printT("Loading xss Data:")
# 分类为“跨站脚本攻击”的 url 列表
xss_list = da1
printT("Loading dta Data:")
# 分类为“数据库攻击”的 url 列表
dta_list = da2
printT("Loading sql Data:")
# 分类为“SQL注入”的 url 列表
sql_list = da3
# 整合数据
data = [safe_list, xss_list, dta_list, sql_list]
# print(data[0])
# 输出各个分类的 url 数量
printT("Done, safe Numbers:" + str(len(data[0])) + " xss Numbers:" + str(len(data[1])) + " dta Numbers:" + str(len(data[2])) + " sql Numbers:" + str(len(data[3])))
# 打标记
safe_y = [0 for i in range(len(data[0]))]
xss_y = [1 for i in range(len(data[1]))]
dta_y = [2 for i in range(len(data[2]))]
sql_y = [3 for i in range(len(data[3]))]
# 对标记后的数据进行合并
y = safe_y + xss_y + dta_y + sql_y
# 数据向量化预处理
# 定义矢量化实例,使用 ngram 进行切分
self.vectorizer = TfidfVectorizer(tokenizer=self.get_ngrams)
# 把不规律的文本字符串列表转换成规律的([i,j],weight)的矩阵X[url条数,分词总类的总数,理论上少于256^n]
# i表示第几条url,j对应于term编号(或者说是词片编号)
X = self.vectorizer.fit_transform(data[0] + data[1] + data[2] + data[3]) # 将四个数据集合并后使用TF-IDF向量化
print(X) # 打印X矩阵
print(X.shape) # 打印X矩阵的形状
printT("Data Dimentions: " + str(X.shape)) # 打印X矩阵的形状
if use_k: # 如果使用KMeans降维
X = self.transform(self.kmeans(X)) # 调用kmeans函数进行降维并进行转换
printT("Kmeans Succeed") # 打印KMeans成功
printT("Devide Training Data") # 打印分割训练数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # 使用train_test_split函数进行数据集分割
printT('Devide Succeed') # 打印数据集分割成功
printT('Begin Training:') # 打印开始训练
printT(self.classifier) # 打印分类器
self.classifier.fit(X_train, y_train) # 使用训练数据进行模型训练
printT(self.getname() + 'Model Accuracy:{}'.format(self.classifier.score(X_test, y_test))) # 打印模型的准确度
with open('model/' + self.getname() + '.pickle', 'wb') as output: # 将模型保存为pickle文件
pickle.dump(self, output)
def get_ngrams(self, query): # 数据预处理裁剪字符格式
tempQuery = str(query)
ngrams = []
for i in range(0, len(tempQuery)-n):
ngrams.append(tempQuery[i:i+n])
return ngrams
def kmeans(self, weight): # 对输入的矩阵进行KMeans聚类并进行降维
printT('Matrix before kmeans: ' + str(weight.shape)) # 打印KMeans处理前矩阵的形状
weight = weight.tolil().transpose() # 将输入矩阵转化为LIL格式后转置
try:
with open('model/k' + str(k) + '.label', 'r') as input: # 尝试打开保存的聚类结果
printT('loading kmeans success') # 打印成功加载聚类结果
a = input.read().split(' ')
self.label = [int(i) for i in a[:-1]] # 将聚类结果转化为标签
except FileNotFoundError:
printT('Start Kmeans: ') # 打印开始进行KMeans聚类
clf = KMeans(n_clusters=k, precompute_distances=False)
# 创建k-means聚类器对象,设置簇的数量为k,不使用预计算距离
s = clf.fit(weight)
# 用权重weight训练聚类器clf,得到聚类模型s
printT(s)
# 输出聚类模型s的信息
# 保存聚类的结果
self.label = clf.labels_
# 将聚类的结果存储到类的label属性中
with open('model/k' + str(k) + '.label', 'w') as output:
for i in self.label:
output.write(str(i) + ' ')
# 将聚类的结果写入文件‘model/k'+str(k)+'.label’中
printT('kmeans succeed,total: ' + str(k) + ' classes')
# 输出聚类成功信息,总共聚为k类
return weight
# 返回权重weight
# 转换成聚类后结果输入转置后的矩阵返回转置好的矩阵
def transform(self, weight):
a = set()
# 创建空集合a用来存储出现过的聚类结果
row = []
col = []
data = []
# 创建空列表row,col,data用来存储转换后的矩阵元素信息
for i in range(len(self.label)):
if self.label[i] in a:
continue
a.add(self.label[i])
# 如果聚类结果已经在a集合中出现过,跳过循环
# 否则将聚类结果添加到a集合中
for j in range(i, len(self.label)):
if self.label[j] == self.label[i]:
# 如果第j个元素的聚类结果和第i个元素相同
temp = weight[j].rows[0]
# 从weight的第j行中获取非零元素所在的列号
col += temp
temp = [self.label[i] for t in range(len(temp))]
# 构建与temp相同长度的列表,每个元素为第i个元素的聚类结果
row += temp
# 将构建的列表添加到row列表中
data += weight[j].data[0]
# 从weight的第j行中获取非零元素的值并添加到data列表中
newWeight = coo_matrix((data, (row, col)), shape=(k,weight.shape[1]))
# 用row、col、data构建COO稀疏矩阵newWeight
# 稀疏矩阵的行数为k,列数为weight矩阵的列数
return newWeight.transpose()
# 返回newWeight的转置矩阵
# 对新的请求列表进行预测
def predict(self, new_queries):
# try:
# with open('model/' + self.getname() + '.pickle', 'rb') as input:
# self = pickle.load(input)
# printT('loading ' + self.getname() + ' model success')
# except FileNotFoundError:
#打印训练模型的提示信息
printT('start to train the ' + self.getname() + ' model')
#调用自身的Train方法进行训练
self.Train()
#打印开始预测的提示信息
printT('start predict:')
#对输入的查询进行解码,将其中的%xx还原成原始字符
new_queries = [urllib.parse.unquote(url) for url in new_queries]
将解码后的查询转换为特征向量
X_predict = self.vectorizer.transform(new_queries)
if use_k:
# 对X_predict转置,以便处理特征
X_predict = self.transform(X_predict.tolil().transpose())
#打印特征转换成功的提示信息
printT('Transform Succeed, Start Predicting:')
#对输入的特征进行分类预测
res = self.classifier.predict(X_predict)
#打印预测成功并输出预测的结果总数
printT('Predict Succeed, Total:' + str(len(res)))
#创建一个字典,用于存储预测结果
result = {}
#分别为0、1、2、3四个类别创建一个空列表
result[0] = []
result[1] = []
result[2] = []
result[3] = []
#将每个查询的预测结果与其对应的查询并入一个元组,并将其添加到对应类别的列表中
for q, r in zip(new_queries, res):
result[r].append(q)
#打印每个类别的预测结果总数
printT('safe query: ' + str(len(result[0])))
printT('xss query: ' + str(len(result[1])))
printT('dta query: ' + str(len(result[2])))
printT('sql query: ' + str(len(result[3])))
#返回预测结果字典
return result
class SVM(Baseframe):
def getname(self):
# 如果use_k为True,则返回SVM__n{n}_k{k}的字符串,否则返回SVM_n{n}的字符串
if use_k:
return 'SVM__n'+str(n)+'_k'+str(k)
return 'SVM_n'+str(n)
def __init__(self):
# 创建一个SVM分类器对象作为模型
self.classifier = svm.SVC()
以下答案由GPT-3.5大模型与博主波罗歌共同编写:
该代码是一个基于K-means和SVM的URL检测器,可以将URL分为正常访问、XSS攻击、目录遍历攻击以及SQL注入攻击四类。下面逐行进行解释:
import os
import time
import urllib
import pickle
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.cluster import KMeans
from sklearn.linear_model import LogisticRegression
from scipy.sparse import csr_matrix, lil_matrix, coo_matrix
# kmeans聚合的维度
k = 80
# ngram系数
n = 2
# 是否使用kmeans
use_k = True
该部分定义了聚类维度k、ngram的系数n以及是否使用K-Means聚类来压缩数据维度,其中该代码在后续模型训练时使用K-Means对数据进行了降维。
# 新定义输出方法,便于调试
def printT(word):
a = time.strftime('%Y-%m-%d %H:%M:%S: ', time.localtime(time.time()))
print(a + str(word))
da = []
da0 = []
da1 = []
da2 = []
da3 = []
with open("data/URL.txt", 'r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
line = line.strip()
line = line.split('\t')
da.append(line)
#将da根据URL类型划分到不同的列表中
for i in range(len(da)):
if da[i][1] == '0' :
da0.append(da[i][0][:])
if da[i][1] == '1' :
da1.append(da[i][0][:])
if da[i][1] == '2' :
da2.append(da[i][0][:])
if da[i][1] == '3' :
da3.append(da[i][0][:])
该部分读取了一个包含URL和对应类型的txt文件,并将其划分到不同的列表中,分别对应不同的URL类型,用于后续的训练。
# 训练模型基类
class Baseframe(object):
def __init__(self):
pass
# 训练
def Train(self):
# 读取数据
printT("Loading safe Data:")
safe_list = da0
printT("Loading xss Data:")
xss_list = da1
printT("Loading dta Data:")
dta_list = da2
printT("Loading sql Data:")
sql_list = da3
# 整合数据
data = [safe_list, xss_list, dta_list, sql_list]
# print(data[0])
printT("Done, safe Numbers:" + str(len(data[0])) + " xss Numbers:" + str(len(data[1])) + " dta Numbers:" + str(len(data[2])) + " sql Numbers:" + str(len(data[3])))
# 打标记
safe_y = [0 for i in range(len(data[0]))]
xss_y = [1 for i in range(len(data[1]))]
dta_y = [2 for i in range(len(data[2]))]
sql_y = [3 for i in range(len(data[3]))]
y = safe_y + xss_y + dta_y + sql_y
# 数据向量化预处理
# 定义矢量化实例
self.vectorizer = TfidfVectorizer(tokenizer=self.get_ngrams)
# 把不规律的文本字符串列表转换成规律的 ([i, j], weight) 的矩阵 X[url条数, 分词总类总数,理论上少于256^n]
# i表示第几条url,j对应于term编号(或者说是词片编号)
X = self.vectorizer.fit_transform(data[0] + data[1] + data[2] + data[3])
# 通过kmeans降维
if use_k:
X = self.transform(self.kmeans(X))
# 使用train_test_split分割X,y列表(testsize表示测试占的比例)(random为种子)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
printT('Begin Training:')
printT(self.classifier)
# 训练模型
self.classifier.fit(X_train, y_train)
# 使用测试值对模型的准确度进行计算
printT(self.getname() + ' Model Accuracy:{}'.format(self.classifier.score(X_test, y_test)))
# 保存训练结果
with open('model/' + self.getname() + '.pickle', 'wb') as output:
pickle.dump(self, output)
# 数据预处理裁剪字符格式
def get_ngrams(self, query):
tempQuery = str(query)
ngrams = []
for i in range(0, len(tempQuery)-n):
ngrams.append(tempQuery[i:i+n])
return ngrams
def kmeans(self, weight):
# kmeans聚类进行数据降维
weight = weight.tolil().transpose()
try:
with open('model/k' + str(k) + '.label', 'r') as input:
a = input.read().split(' ')
self.label = [int(i) for i in a[:-1]]
except FileNotFoundError:
clf = KMeans(n_clusters=k, precompute_distances=False)
s = clf.fit(weight)
self.label = clf.labels_
# 保存聚类的结果
with open('model/k' + str(k) + '.label', 'w') as output:
for i in self.label:
output.write(str(i) + ' ')
return weight
def transform(self, weight):
# 对应于kmeans之后的结果,将权重重新聚合
a = set()
row = []
col = []
data = []
for i in range(len(self.label)):
if self.label[i] in a:
continue
a.add(self.label[i])
for j in range(i, len(self.label)):
if self.label[j] == self.label[i]:
temp = weight[j].rows[0]
col += temp
temp = [self.label[i] for t in range(len(temp))]
row += temp
data += weight[j].data[0]
newWeight = coo_matrix((data, (row, col)), shape=(k,weight.shape[1]))
return newWeight.transpose()
def predict(self, new_queries):
# 载入模型并对新请求进行预测
self.Train()
# 解码
new_queries = [urllib.parse.unquote(url) for url in new_queries]
X_predict = self.vectorizer.transform(new_queries)
if use_k:
X_predict = self.transform(X_predict.tolil().transpose())
# 进行预测
res = self.classifier.predict(X_predict)
result = {}
result[0] = []
result[1] = []
result[2] = []
result[3] = []
# 将预测结果写入字典
for q, r in zip(new_queries, res):
result[r].append(q)
return result
# 改函数需要在子类中重写,返回模型名称
def getname(self):
pass
该部分定义了训练模型的基类Baseframe
,用于训练SVM模型,完成模型训练后,可以根据新的请求进行预测。
训练的主要过程如下:
(1)读取数据
(2)整合、打标记
(3)数据向量化预处理
(4)通过kmeans降
如果我的回答解决了您的问题,请采纳!