点蓝字关注 设为星标 ☆ 优先赏阅
数据化审计-SmartAudit:问题导向、应用至上、解决痛点
技术贴时间,代码很业余,专业人士请忽略!
问题提出
这是一个多次被问到,也是审计工作中多次遇到的问题:
知道资金的源头,想知道资金最终流向了哪里?资金流动过程中是否有归集?
比如,在如下示例数据中,需要追踪客户 N0、 N1 转出资金的最终去向,有哪些客户是资金归集客户?
这个问题的难点在于,资金流动过程中,经过几手过账是不确定的,资金也可能存在拆分归集后的划转,多个客户之间的交易时间也是交叉不同步的。
因此,这个问题就转为三个子问题:
资金流转的路径(path) 资金流转过程中的归集点(node) 资金流转有时间维度,流转过程中后手的日期要晚于前手
业务思路
资金交易涉及众多对手,随着时间的推移,就构成了资金交易网络。比如,上述示例数据用网络可视化展现后如下:
从图中可见,上述三个子问题就转化为如何可视化展现:
从N0、N1 到 E、F 的路径,经过哪些点 资金归集流转的D、C等关键节点 剔除无关交易。B->C 有两笔资金划转,但其中一笔交易日期为 20210122 的交易晚于上一手 N0->B 的日期20210201。C2->E 也类似。
技术方案
既然是交易网络,那么它就是复杂网络的子集,可以用复杂网络的相关算法进行分析。
关于复杂网络的相关知识,参见《数字化审计实务指南》(人民邮电出版社)P97页,或本公众号的文章:《利用社交网络分析(SNA)挖出“围标”线索》、《前沿科技在数字化审计中的应用案例》。
从复杂网络技术的角度,上述问题就转为:
找到以 N0、N1 为 source 到网络中其他点连通的路径,且经过的点最多。这就是复杂网络中的最短路径算法(shortest_paths),网络中的一个点到另一个点的路径不止一条,每条路径的长度可能不同,把路径长度最短的那条叫做最短路径。 找到网络中中介中心性( Betweeness Centrality )最高的点D、C。中介中心性(Betweeness Centrality)可以简单理解为,其他任意两个点要建立连接关系,都需要这个点为中介。在信息传播过程中,网络中这样点就是大V,在资金网络中,这样的客户就是资金中介。
Python环境下的 networkx 库是专门用于复杂网络分析的库,提供了大量的算法和函数。
在 networkx 库中求解最短路径(shortest_paths)的函数有 all_shortest_paths 、 all_simple_edge_paths 。求解中介中心性(Betweeness Centrality)的函数有 betweenness_centrality 、 load_centrality 、 edge_betweenness_centrality 。
详细介绍,参见 networkx 库官方文档的算法库说明:https://networkx.org/documentation/stable/reference/algorithms/index.html
数据分析环境
本文分析所使用的环境具体如下:
软件或环境 | 说明 |
---|---|
Win10 64位 | 系统环境 |
Python 3.8.5 | 数据分析语言平台 |
networkx 2.5 | 复杂网络分析库 |
pandas 1.1.3 | 数据读取处理库 |
pyvis 0.1.9 | 关系网络可视化库 |
networkx 自带的draw函数也可以可视化网络,但生成的是静态图,没有交互性。本例使用PyVis 库进行交互式展现。
PyVis 是一个可交互的图可视化库,可以直接读取和交互式展现networkx生成的网络。官方文档:https://pyvis.readthedocs.io/en/latest/index.html
在生成交互式网络时,如果熟悉HTML+CSS,可以直接修改PyVis 安装目录下的模板文件 python3\Lib\site-packages\pyvis\templates\template.html ,美化可视化效果。
实现代码
代码可以按住屏幕,左右滑动查看
1.将交易数据转为有向交易网络
# -*- coding:utf-8 -*-
import networkx as nx
import pandas as pd
from pyvis.network import Network
# 读取流水数据
df_trade = pd.read_excel('交易流水示例.xlsx',converters={'转账日期':str})
# 交易是有方向的 按照方向提取数据生成图 转入转出分别生成有向图digraph
G1 = nx.from_pandas_edgelist(df_trade.loc[df_trade['转出转入标志']=='转出'],
source='客户名称',target='对方客户名称',
edge_attr=['转账日期','转账金额'],
create_using=nx.MultiDiGraph())
G2 = nx.from_pandas_edgelist(df_trade.loc[df_trade['转出转入标志']=='转入'],
source='对方客户名称',target='客户名称',
edge_attr=['转账日期','转账金额'],
create_using=nx.MultiDiGraph())
# 合并两个子图
G = nx.compose(G1,G2)复制
2.计算中介中心性
# 计算点的 中介中心性(Betweeness Centrality)
betweenness = nx.load_centrality(G)
# 取中介中心性最大的5个点
print(sorted(betweenness.items(),key = lambda x:x[1],reverse=True)[0:5])
# 转为dict存储 后续用于可视化设置点大小
keynodesdict = dict(sorted(betweenness.items(),key = lambda x:x[1],reverse=True)[0:5])复制
从输出结果看,D、N2、C等中介度值最大,是资金归集点,这和人工对图的观察是一致的。
[('D', 0.1388888888888889),
('N2', 0.1),
('C', 0.08888888888888889),
('B', 0.05555555555555556),
('N5', 0.044444444444444446)]
3.找出以 N0/N1 为源头的最短路径
# 定义用于存储最短路径中边的list
# 后续用于标注路径中这些边的颜色
specEdges = []
# 计算点和其他点的路径长度
pathlen = dict(nx.shortest_path_length(G))
# 找出以'N0','N1'为源头的路径中最长的路径
for node in ['N0','N1']:
for k,v in pathlen[node].items():
# 找出路径中最长的路径
if (v==max(pathlen[node].values())):
# 找到最远的目标点,然后将源头和目标点之间的最短路径都提取出来
# 没有考虑日期因素
for p in nx.all_simple_edge_paths(G,source=node,target=k):
for i in range(0,len(p)-1):
e1 = p[i]
e2 = p[i+1]
e1_date = (G.get_edge_data(e1[0],e1[1])[e1[2]]['转账日期'])
e2_date = (G.get_edge_data(e2[0],e2[1])[e2[2]]['转账日期'])
# 考虑日期因素
# 如果后一手的转账日期小于前一手 则路径就到当前为止
if e1_date>e2_date:
p = p[0:i+1]
break
# 显示有效路径 并存入最短路径list
print(p)
specEdges = specEdges + p
# 不同的路径中 边有重复 去重
specEdges = list(set(specEdges))复制
从上面的交易网络观察可见,以'N0','N1'为源头的世界尽头是'E'、'F'。shortest_path_length 计算出的结果如下图,与人工观察是一致的。
4.用PyVis可视化
# 初始化pyvis网络图
net = Network(height='800px',width='800px',directed=True,heading='资金网络')
# 由于两点之间有多条边 必须设置参数
opts = '''
var options = {
"physics": {
"minVelocity": 0.5,
"solver": "forceAtlas2Based"
}
}
'''
net.set_options(opts)
# 边的标签设置为转账日期
for edge in G.edges(data=True):
edge[2]['label'] = edge[2]['转账日期']
# 边的光标悬停提示设置为转账金额
for edge in G.edges(data=True):
edge[2]['title'] = str(edge[2]['转账金额'])+'元'
# 根据点中介度大小设置点的尺寸
for k,v in keynodesdict.items():
G.nodes[k]['size']=10*(v/min(keynodesdict.values()))
# 根据点中介度大小设置最大三个点为红色
for k,v in list(keynodesdict.items())[0:3]:
G.nodes[k]['color']='red'
# 将找出来的边标注为红色
for edge in specEdges:
G.edges[edge]['color']='red'
net.from_nx(G)
#输出并生成demo2.html
net.show('资金可视化追踪.html')复制
生成的是一个HTML文件,浏览器自动打开后,可见资金可视化追踪图。如下图:
“因为公众号平台更改了推送规则,如果不想错过新文章,记得读完点一下“赞”、“在看”,这样每次新文章推送才会第一时间出现在您的订阅列表里。
”