暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Python-决策树

原创 kayla 2023-03-21
564
data = pd.read_csv('dataset/loans.csv').sample(40000)
display(data.sample(10))
display(data.shape)
# grade: 贷款级别
# sub_grade: 贷款细分级别
# short_emp: 一年以内短期雇佣
# emp_length_num: 受雇年限
# home_ownership:居住状态(自有,按揭,租住)
# dti:贷款占收入比例
# purpose:贷款用途
# term:贷款周期
# last_delinq_none:贷款申请人是否有不良记录 
# last_major_derog_none:贷款申请人是否有还款逾期90天以上记录
# revol_util:透支额度占信用比例
# total_rec_late_fee:逾期罚款总额
# safe_loans:贷款是否安全

from sklearn.preprocessing import LabelEncoder
from collections import defaultdict

# 将以上非数值数据映射为数值型。
d = defaultdict(LabelEncoder)
data = data.apply(lambda x : d[x.name].fit_transform(x))
X_train = data.iloc[:800, :-1]
y_train = data.iloc[:800, -1]
test_X = data.iloc[800:, :-1]
test_y = data.iloc[800:, -1]
display(X_train)

# 特征二值化
# 由于特征值太过复杂,不利于处理。我们根据均值来二值化
for i in X_train.columns:
mean = np.mean(X_train[i])
for j in range(len(X_train[i])):
X_train[i].values[j] = 1 if X_train[i].values[j]>mean else 0
for i in test_X.columns:
mean = np.mean(test_X[i])
for j in range(len(test_X[i])):
test_X[i].values[j] = 1 if test_X[i].values[j]>mean else 0
display(X_train)

from collections import Counter
from tqdm import tqdm

class DecisionTreeID3:
def __init__(self):
pass
def calc_entropy(self):
'''计算信息熵、条件熵和信息增益'''
# 把标签放在数据最后一列,方便create_tree处理
self.X = pd.concat([self.X, self.y], axis=1)
# ************ 计算信息熵 **************
yes = np.asarray(self.X[self.X[self.X.columns[-1]]==1])
no = np.asarray(self.X[self.X[self.X.columns[-1]]==0])
P_yes = len(yes)/len(self.X)
P_no = len(no)/len(self.X)
self.HX = - P_yes*np.log2(P_yes)- P_no*np.log2(P_no)
# display("信息熵 = " +str(self.HX))
# ************ 计算条件熵 **************
# H存放条件熵
self.Gda = []
# 遍历每一特征列。除了标签列
for i in self.X.columns[:-1]:
# 存放条件熵
Hi = 0
# 获取当前特征每种情况出现的次数,以便计算每个情况各自的概率
condProbCollections = Counter(self.X[i]).items()
# 每种特征可能有N中情况,累加
for k in condProbCollections:
# 获取当前条件X发生的总样本(包含所有列)
samples_of_current = self.X[self.X[i]==k[0]]
# 获取当前条件X发生的总样本(仅包含当前特征列)
samples_of_current_features = samples_of_current[i]
# 获取当前条件X发生下,被判定为安全和不安全的总次数
total = len(samples_of_current_features)
# 安全总次数
k_safe = len(samples_of_current[samples_of_current[samples_of_current.columns[-1]]==1])
# 不安全总次数
k_unsafe = total - k_safe
# 计算安全和不安全的概率
P_k_safe = k_safe/total
P_k_unsafe = k_unsafe/total
# 累加条件熵
log_P_k_safe = 0 if P_k_safe==0 else np.log2(P_k_safe) # 防止出现0值报错
log_P_k_unsafe = 0 if P_k_unsafe==0 else np.log2(P_k_unsafe) # 防止出现0值报错
Hi += - (total/len(self.X))*(P_k_safe * log_P_k_safe + P_k_unsafe * log_P_k_unsafe)
# 保存信息增益
self.Gda.append({"value":self.HX - Hi, "feature":i})
# print("信息增益为")
# print(self.Gda)
def create_tree(self, node=False):
'''构建决策树结构。决策树信息存储在JSON字典当中

Parameters
-----
node: Series类型,用于传递当前节点包含的信息

Return
-----
tree: 构建好的树json结构
'''
# 递归出口
if len(self.Gda)==0:
return node.iloc[:1,-1].values[0]
# 获取第一个特征列
feature = self.Gda[0]['feature']
# 删除该列,以便递归时不会重复到达这里
del self.Gda[0]
# 获取当前特征每种情况出现的次数,以便计算每个情况各自的概率
condProbCollections = Counter(self.X[feature]).items()
# print(feature)
# print(condProbCollections)
# 定义树字典。必须使用特征feature作为键名
tree = {feature:{}}
for [value,counts] in condProbCollections:
# print(condProbCollections)
# print(value)
tree[feature][value] = self.create_tree(self.X[self.X[feature]==value])
# 保存树结构
self.tree = tree
return tree
def fit(self, X, y):
'''训练

Parameters
-----
X: 训练数据,形如 [样本数量,特征数量]
y: 类数组类型,形状为:[样本数量]
'''
self.X = X
self.y = y
self.calc_entropy()
self.create_tree()
def predict_item(self, x, node=False):
'''构建决策树结构。决策树信息存储在JSON字典当中

Parameters
-----
node: Series类型,用于传递当前节点包含的信息

Return
-----
tree: 构建好的树json结构
'''
if node==0 or node==1:
return node
label = -1
# 获取当前节点名
key = next(iter(node))
# 如果当前节点的值等于0,递归0下面的分支,否则1
if x[key].values[0] == 0:
label = self.predict_item(x, node=node[key][0])
else:
label = self.predict_item(x, node=node[key][1])
return label
def predict(self, X):
'''对样本进行预测
Parameters:
X: 类数组类型,可以是List也可以是Ndarray,形状为: [样本数量,特征数量]
Returns:
数组类型,预测结果
'''
result = []
for i in range(len(X)):
result.append(self.predict_item(X.iloc[i:i+1,], node=self.tree))
return result

dt = DecisionTreeID3()
dt.fit(X_train, y_train)
result = dt.predict(test_X)
display(np.sum(result==test_y)/len(result))



a=[{"a":1},{"b":2},{"c":3}]
c = iter(a)
display(next(c))
display(next(c))
display(next(c))
复制


data = pd.read_csv('dataset/loans.csv').sample(40000)
display(data.sample(10))
display(data.shape)
# grade: 贷款级别
# sub_grade: 贷款细分级别
# short_emp: 一年以内短期雇佣
# emp_length_num: 受雇年限
# home_ownership:居住状态(自有,按揭,租住)
# dti:贷款占收入比例
# purpose:贷款用途
# term:贷款周期
# last_delinq_none:贷款申请人是否有不良记录 
# last_major_derog_none:贷款申请人是否有还款逾期90天以上记录
# revol_util:透支额度占信用比例
# total_rec_late_fee:逾期罚款总额
# safe_loans:贷款是否安全

from sklearn.preprocessing import LabelEncoder
from collections import defaultdict

# 将以上非数值数据映射为数值型。
d = defaultdict(LabelEncoder)
data = data.apply(lambda x : d[x.name].fit_transform(x))
X_train = data.iloc[:800, :-1]
y_train = data.iloc[:800, -1]
test_X = data.iloc[800:, :-1]
test_y = data.iloc[800:, -1]
display(X_train)

# 特征二值化
# 由于特征值太过复杂,不利于处理。我们根据均值来二值化
for i in X_train.columns:
mean = np.mean(X_train[i])
for j in range(len(X_train[i])):
X_train[i].values[j] = 1 if X_train[i].values[j]>mean else 0
for i in test_X.columns:
mean = np.mean(test_X[i])
for j in range(len(test_X[i])):
test_X[i].values[j] = 1 if test_X[i].values[j]>mean else 0
display(X_train)

from collections import Counter
from tqdm import tqdm

class DecisionTreeID3:
def __init__(self):
pass
def calc_entropy(self):
'''计算信息熵、条件熵和信息增益'''
# 把标签放在数据最后一列,方便create_tree处理
self.X = pd.concat([self.X, self.y], axis=1)
# ************ 计算信息熵 **************
yes = np.asarray(self.X[self.X[self.X.columns[-1]]==1])
no = np.asarray(self.X[self.X[self.X.columns[-1]]==0])
P_yes = len(yes)/len(self.X)
P_no = len(no)/len(self.X)
self.HX = - P_yes*np.log2(P_yes)- P_no*np.log2(P_no)
# display("信息熵 = " +str(self.HX))
# ************ 计算条件熵 **************
# H存放条件熵
self.Gda = []
# 遍历每一特征列。除了标签列
for i in self.X.columns[:-1]:
# 存放条件熵
Hi = 0
# 获取当前特征每种情况出现的次数,以便计算每个情况各自的概率
condProbCollections = Counter(self.X[i]).items()
# 每种特征可能有N中情况,累加
for k in condProbCollections:
# 获取当前条件X发生的总样本(包含所有列)
samples_of_current = self.X[self.X[i]==k[0]]
# 获取当前条件X发生的总样本(仅包含当前特征列)
samples_of_current_features = samples_of_current[i]
# 获取当前条件X发生下,被判定为安全和不安全的总次数
total = len(samples_of_current_features)
# 安全总次数
k_safe = len(samples_of_current[samples_of_current[samples_of_current.columns[-1]]==1])
# 不安全总次数
k_unsafe = total - k_safe
# 计算安全和不安全的概率
P_k_safe = k_safe/total
P_k_unsafe = k_unsafe/total
# 累加条件熵
log_P_k_safe = 0 if P_k_safe==0 else np.log2(P_k_safe) # 防止出现0值报错
log_P_k_unsafe = 0 if P_k_unsafe==0 else np.log2(P_k_unsafe) # 防止出现0值报错
Hi += - (total/len(self.X))*(P_k_safe * log_P_k_safe + P_k_unsafe * log_P_k_unsafe)
# 保存信息增益
self.Gda.append({"value":self.HX - Hi, "feature":i})
# print("信息增益为")
# print(self.Gda)
def create_tree(self, node=False):
'''构建决策树结构。决策树信息存储在JSON字典当中

Parameters
-----
node: Series类型,用于传递当前节点包含的信息

Return
-----
tree: 构建好的树json结构
'''
# 递归出口
if len(self.Gda)==0:
return node.iloc[:1,-1].values[0]
# 获取第一个特征列
feature = self.Gda[0]['feature']
# 删除该列,以便递归时不会重复到达这里
del self.Gda[0]
# 获取当前特征每种情况出现的次数,以便计算每个情况各自的概率
condProbCollections = Counter(self.X[feature]).items()
# print(feature)
# print(condProbCollections)
# 定义树字典。必须使用特征feature作为键名
tree = {feature:{}}
for [value,counts] in condProbCollections:
# print(condProbCollections)
# print(value)
tree[feature][value] = self.create_tree(self.X[self.X[feature]==value])
# 保存树结构
self.tree = tree
return tree
def fit(self, X, y):
'''训练

Parameters
-----
X: 训练数据,形如 [样本数量,特征数量]
y: 类数组类型,形状为:[样本数量]
'''
self.X = X
self.y = y
self.calc_entropy()
self.create_tree()
def predict_item(self, x, node=False):
'''构建决策树结构。决策树信息存储在JSON字典当中

Parameters
-----
node: Series类型,用于传递当前节点包含的信息

Return
-----
tree: 构建好的树json结构
'''
if node==0 or node==1:
return node
label = -1
# 获取当前节点名
key = next(iter(node))
# 如果当前节点的值等于0,递归0下面的分支,否则1
if x[key].values[0] == 0:
label = self.predict_item(x, node=node[key][0])
else:
label = self.predict_item(x, node=node[key][1])
return label
def predict(self, X):
'''对样本进行预测
Parameters:
X: 类数组类型,可以是List也可以是Ndarray,形状为: [样本数量,特征数量]
Returns:
数组类型,预测结果
'''
result = []
for i in range(len(X)):
result.append(self.predict_item(X.iloc[i:i+1,], node=self.tree))
return result

dt = DecisionTreeID3()
dt.fit(X_train, y_train)
result = dt.predict(test_X)
display(np.sum(result==test_y)/len(result))


a=[{"a":1},{"b":2},{"c":3}]
c = iter(a)
display(next(c))
display(next(c))
display(next(c))

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论