【Python实践案例】电商平台数据分析和挖掘 – 欺诈行为检测
该脚本将包含以下功能:使用随机森林(Random Forest)进行有监督的欺诈检测
- 模拟生成包含正常交易和欺诈交易的电商交易数据。
- 进行数据预处理,包括处理缺失值、特征编码和创建新特征。
- 使用孤立森林(Isolation Forest)和局部异常因子(Local Outlier Factor)两种无监督学习算法进行异常检测。
- 使用随机森林(Random Forest)进行有监督的欺诈检测。
- 评估模型性能,并比较不同方法的效果。
- 识别欺诈交易的关键特征。
- 生成一份包含分析过程、结果和建议的综合报告。
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.neighbors import LocalOutlierFactor
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, classification_report
from sklearn.preprocessing import StandardScaler, LabelEncoder
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings("ignore")
# --- 配置 ---
NUM_TRANSACTIONS = 10000
FRAUD_RATIO = 0.05 # 5% 的欺诈交易
REPORT_PREFIX = '电商欺诈检测报告'
RANDOM_SEED = 42
# --- 数据生成 ---
def generate_sample_fraud_data(n_transactions, fraud_ratio):
"""生成模拟的电商交易数据,包含正常和欺诈交易"""
print("--- 正在生成模拟交易数据 ---")
np.random.seed(RANDOM_SEED)
data = []
transaction_ids = [f'trans_{i}' for i in range(1, n_transactions + 1)]
n_fraud = int(n_transactions * fraud_ratio)
n_normal = n_transactions - n_fraud
print(f"生成 {n_normal} 笔正常交易 和 {n_fraud} 笔欺诈交易...")
for i in range(n_transactions):
is_fraud = 1 if i < n_fraud else 0
# --- 基础交易信息 ---
user_id = f'user_{np.random.randint(1, n_transactions // 10)}' # 假设用户数是交易数的1/10
product_id = f'prod_{np.random.randint(1, 1000)}'
amount = np.random.lognormal(7, 1.5) # 交易金额 (对数正态分布)
timestamp = datetime.now() - timedelta(days=np.random.randint(0, 365), seconds=np.random.randint(0, 86400))
# --- 用户行为特征 ---
# 用户历史交易次数和总金额
user_tx_count = np.random.poisson(20) if not is_fraud else np.random.poisson(5)
user_total_spent = np.random.lognormal(9, 1.0) if not is_fraud else np.random.lognormal(7, 1.5)
# 用户平均交易金额
user_avg_amount = user_total_spent / max(user_tx_count, 1)
# --- 设备和地理位置特征 ---
device_id = f'device_{np.random.randint(1, 5000)}'
ip_address = f"192.168.{np.random.randint(0, 255)}.{np.random.randint(0, 255)}"
# 地理位置 (简化为国家和城市ID)
country = np.random.choice(['CN', 'US', 'UK', 'JP'], p=[0.6, 0.2, 0.1, 0.1])
city_id = np.random.randint(1, 100)
# --- 欺诈模式模拟 ---
if is_fraud:
# 欺诈交易可能具有的特征
# 1. 金额异常高
if np.random.random() < 0.3:
amount *= np.random.uniform(2, 10)
# 2. 短时间内多次交易 (Velocity)
if np.random.random() < 0.4:
# 这里简化处理,在数据生成时无法完全模拟时间序列,但在特征工程中可以体现
pass
# 3. 新设备/新IP
if np.random.random() < 0.5:
device_id = f'device_fraud_{np.random.randint(10000, 20000)}' # 使用一个看起来像新设备的ID
ip_address = f"10.{np.random.randint(0, 255)}.{np.random.randint(0, 255)}.{np.random.randint(0, 255)}"
# 4. 跨地区交易 (用户常用地区 vs 本次交易地区)
# 简化:假设欺诈交易更可能来自不同国家
if np.random.random() < 0.4:
country = np.random.choice(['RU', 'BR', 'IN']) # 模拟高风险国家
# 5. 用户行为异常 (例如,新用户大额交易)
if np.random.random() < 0.3:
user_tx_count = np.random.poisson(1) # 新用户
user_total_spent = amount # 总金额就是本次交易金额
data.append({
'transaction_id': transaction_ids[i],
'user_id': user_id,
'product_id': product_id,
'amount': round(amount, 2),
'timestamp': timestamp,
'user_tx_count': user_tx_count,
'user_total_spent': round(user_total_spent, 2),
'user_avg_amount': round(user_avg_amount, 2),
'device_id': device_id,
'ip_address': ip_address,
'country': country,
'city_id': city_id,
'is_fraud': is_fraud
})
df = pd.DataFrame(data)
# --- 衍生特征工程 (在数据生成后进行) ---
print("正在进行特征工程...")
df['timestamp'] = pd.to_datetime(df['timestamp'])
# 1. 时间特征
df['hour_of_day'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
# 2. 用户行为偏差特征
df['amount_to_user_avg_ratio'] = df['amount'] / (df['user_avg_amount'] + 1e-8) # 避免除以零
# 3. 速度特征 (每个用户每小时交易次数) - 简化模拟
user_hourly_counts = df.groupby(['user_id', pd.Grouper(key='timestamp', freq='H')]).size().reset_index(name='tx_per_hour')
df = df.merge(user_hourly_counts, on=['user_id', 'timestamp'], how='left')
df['tx_per_hour'].fillna(0, inplace=True)
# 4. 设备/IP新鲜度 (假设有一个记录首次出现时间的表,这里简化为是否是新设备)
# 简化处理:如果device_id包含'fraud'则认为是新设备
df['is_new_device'] = df['device_id'].str.contains('fraud').astype(int)
csv_filename = f'{REPORT_PREFIX}_模拟数据.csv'
df.to_csv(csv_filename, index=False, encoding='utf-8-sig')
print(f"模拟数据已生成并保存至: {csv_filename}")
return df
# --- 数据预处理 ---
def preprocess_data(df):
"""数据预处理"""
print("\n--- 正在进行数据预处理 ---")
df_processed = df.copy()
# 1. 编码分类变量
le_country = LabelEncoder()
df_processed['country_encoded'] = le_country.fit_transform(df_processed['country'])
# 2. 选择用于建模的特征列 (排除ID和标签)
feature_columns = [
'amount', 'user_tx_count', 'user_total_spent', 'user_avg_amount',
'country_encoded', 'city_id', 'hour_of_day', 'day_of_week',
'amount_to_user_avg_ratio', 'tx_per_hour', 'is_new_device'
]
X = df_processed[feature_columns]
y = df_processed['is_fraud']
print(f"预处理完成。特征矩阵形状: {X.shape}, 标签向量形状: {y.shape}")
# 打印类别分布
print(f"欺诈交易数量: {y.sum()}, 占比: {y.mean():.2%}")
return X, y, le_country, feature_columns
# --- 无监督异常检测 ---
def perform_unsupervised_detection(X, y):
"""执行无监督异常检测"""
print("\n--- 正在执行无监督异常检测 ---")
results = {}
# --- 1. Isolation Forest ---
print("训练 Isolation Forest 模型...")
# 注意:IsolationForest预测1为正常,-1为异常,需要转换
iso_forest = IsolationForest(contamination=0.05, random_state=RANDOM_SEED, n_estimators=100)
iso_forest.fit(X)
y_pred_iso = iso_forest.predict(X)
y_pred_iso = np.where(y_pred_iso == -1, 1, 0) # 转换为 1 (欺诈), 0 (正常)
results['Isolation Forest'] = {
'predictions': y_pred_iso,
'accuracy': accuracy_score(y, y_pred_iso),
'precision': precision_score(y, y_pred_iso, zero_division=0),
'recall': recall_score(y, y_pred_iso),
'f1': f1_score(y, y_pred_iso),
'auc': roc_auc_score(y, y_pred_iso) if len(np.unique(y_pred_iso)) > 1 else 0.0
}
# --- 2. Local Outlier Factor ---
print("训练 Local Outlier Factor 模型...")
# LOF不需要显式训练,直接fit_predict
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.05)
y_pred_lof = lof.fit_predict(X)
y_pred_lof_scores = -lof.negative_outlier_factor_ # 获取异常评分
y_pred_lof = np.where(y_pred_lof == -1, 1, 0) # 转换为 1 (欺诈), 0 (正常)
results['Local Outlier Factor'] = {
'predictions': y_pred_lof,
'scores': y_pred_lof_scores, # 保留评分用于可能的阈值调整
'accuracy': accuracy_score(y, y_pred_lof),
'precision': precision_score(y, y_pred_lof, zero_division=0),
'recall': recall_score(y, y_pred_lof),
'f1': f1_score(y, y_pred_lof),
'auc': roc_auc_score(y, y_pred_lof) if len(np.unique(y_pred_lof)) > 1 else 0.0
}
# --- 报告 ---
print("\n--- 无监督模型性能对比 ---")
unsup_comparison_df = pd.DataFrame({
'Model': list(results.keys()),
'Accuracy': [results[k]['accuracy'] for k in results.keys()],
'Precision': [results[k]['precision'] for k in results.keys()],
'Recall': [results[k]['recall'] for k in results.keys()],
'F1-Score': [results[k]['f1'] for k in results.keys()],
'AUC-ROC': [results[k]['auc'] for k in results.keys()]
})
print(unsup_comparison_df.round(4).to_string(index=False))
return results
# --- 有监督欺诈检测 ---
def perform_supervised_detection(X, y):
"""执行有监督欺诈检测"""
print("\n--- 正在执行有监督欺诈检测 ---")
# 划分训练集和测试集 (模拟在线学习,用历史数据训练,新数据测试)
# 为了演示,我们简单地划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=RANDOM_SEED, stratify=y)
# 特征缩放
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# --- Random Forest Classifier ---
print("训练 Random Forest 模型...")
model_rf = RandomForestClassifier(n_estimators=100, random_state=RANDOM_SEED, class_weight='balanced', n_jobs=-1) # 处理类别不平衡
model_rf.fit(X_train_scaled, y_train)
y_pred_rf = model_rf.predict(X_test_scaled)
y_pred_proba_rf = model_rf.predict_proba(X_test_scaled)[:, 1]
results = {
'Random Forest': {
'model': model_rf,
'scaler': scaler,
'predictions': y_pred_rf,
'probabilities': y_pred_proba_rf,
'accuracy': accuracy_score(y_test, y_pred_rf),
'precision': precision_score(y_test, y_pred_rf),
'recall': recall_score(y_test, y_pred_rf),
'f1': f1_score(y_test, y_pred_rf),
'auc': roc_auc_score(y_test, y_pred_proba_rf),
'y_test': y_test # 保存测试标签用于报告
}
}
# --- 报告 ---
rf_metrics = results['Random Forest']
print(f"\n--- Random Forest 模型性能 ---")
print(f"Accuracy: {rf_metrics['accuracy']:.4f}")
print(f"Precision: {rf_metrics['precision']:.4f}")
print(f"Recall: {rf_metrics['recall']:.4f}")
print(f"F1-Score: {rf_metrics['f1']:.4f}")
print(f"AUC-ROC: {rf_metrics['auc']:.4f}")
# 绘制混淆矩阵
cm = confusion_matrix(rf_metrics['y_test'], rf_metrics['predictions'])
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Reds',
xticklabels=['Normal', 'Fraud'],
yticklabels=['Normal', 'Fraud'])
plt.title('混淆矩阵 - Random Forest')
plt.xlabel('预测标签')
plt.ylabel('真实标签')
cm_path = f'{REPORT_PREFIX}_混淆矩阵_RandomForest.png'
plt.savefig(cm_path)
plt.close()
print(f"混淆矩阵图表已保存至: {cm_path}")
# 打印详细分类报告
print("\n--- Random Forest 详细分类报告 ---")
print(classification_report(rf_metrics['y_test'], rf_metrics['predictions'], target_names=['Normal', 'Fraud']))
return results, cm_path
# --- 特征重要性分析 ---
def analyze_feature_importance(model, feature_names, model_name):
"""分析并可视化特征重要性"""
print(f"\n--- 分析 {model_name} 特征重要性 ---")
if hasattr(model, 'feature_importances_'):
importances = model.feature_importances_
indices = np.argsort(importances)[::-1]
title = f'{model_name} - 特征重要性'
else:
print("模型不支持特征重要性分析。")
return None, None
# 创建特征重要性DataFrame
feature_importance_df = pd.DataFrame({
'feature': [feature_names[i] for i in indices],
'importance': [importances[i] for i in indices]
})
print("特征重要性排序:")
print(feature_importance_df.head(10).to_string(index=False))
# 绘制特征重要性
plt.figure(figsize=(10, 6))
sns.barplot(data=feature_importance_df.head(10), x='importance', y='feature', palette='viridis')
plt.title(title)
plt.xlabel('重要性')
plt.tight_layout()
feat_imp_path = f'{REPORT_PREFIX}_特征重要性_{model_name.replace(" ", "_")}.png'
plt.savefig(feat_imp_path)
plt.close()
print(f"特征重要性图表已保存至: {feat_imp_path}")
return feature_importance_df, feat_imp_path
# --- 报告生成 ---
def generate_fraud_detection_report(unsup_results, sup_results, cm_path, feat_imp_df, feat_imp_path):
"""生成最终的欺诈检测分析报告"""
print("\n--- 正在生成欺诈检测分析报告 ---")
from datetime import datetime
report_filename = f"{REPORT_PREFIX}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
sup_metrics = sup_results['Random Forest']
with open(report_filename, 'w', encoding='utf-8') as f:
f.write("=" * 50 + "\n")
f.write(" 电商平台用户风险管理和欺诈行为检测报告\n")
f.write(f" 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 50 + "\n\n")
f.write("--- 1. 项目概述 ---\n")
f.write("本项目旨在通过分析电商交易数据,运用机器学习技术识别潜在的欺诈行为,以保护平台和用户的资金安全。\n")
f.write("采用了无监督学习(异常检测)和有监督学习(分类)两种方法进行对比分析。\n\n")
f.write("--- 2. 数据概览 ---\n")
f.write("数据来源: 模拟生成的电商平台交易数据。\n")
f.write("数据规模: 10,000 笔交易,其中 5% (500笔) 为欺诈交易。\n")
f.write("关键字段: 交易ID, 用户ID, 商品ID, 交易金额, 时间戳, 用户历史行为, 设备信息, IP地址, 地理位置等。\n")
f.write("原始数据已保存为 CSV 文件。\n\n")
f.write("--- 3. 无监督异常检测 ---\n")
f.write("目标: 在没有标签的情况下,识别出与大多数数据显著不同的异常点。\n")
f.write("方法: Isolation Forest, Local Outlier Factor (LOF)。\n")
f.write("评估: 由于没有真实标签,评估主要基于模型发现的异常与已知欺诈样本的重合度。\n\n")
f.write("各模型性能对比:\n")
unsup_comparison_df = pd.DataFrame({
'Model': list(unsup_results.keys()),
'Accuracy': [unsup_results[k]['accuracy'] for k in unsup_results.keys()],
'Precision': [unsup_results[k]['precision'] for k in unsup_results.keys()],
'Recall': [unsup_results[k]['recall'] for k in unsup_results.keys()],
'F1-Score': [unsup_results[k]['f1'] for k in unsup_results.keys()],
'AUC-ROC': [unsup_results[k]['auc'] for k in unsup_results.keys()]
})
f.write(unsup_comparison_df.round(4).to_string(index=False))
f.write("\n\n注意:无监督方法的精确率和召回率可能不如监督方法,但能发现未知模式。\n\n")
f.write("--- 4. 有监督欺诈检测 ---\n")
f.write("目标: 利用已知的欺诈/正常标签,训练分类模型来预测新交易是否为欺诈。\n")
f.write("方法: Random Forest (使用了类别权重处理样本不平衡)。\n")
f.write("评估指标包括: 准确率 (Accuracy), 精确率 (Precision), 召回率 (Recall), F1分数 (F1-Score), AUC-ROC。\n\n")
f.write(f"Random Forest 模型在测试集上的表现:\n")
f.write(f" - 准确率 (Accuracy): {sup_metrics['accuracy']:.4f}\n")
f.write(f" - 精确率 (Precision): {sup_metrics['precision']:.4f}\n")
f.write(f" - 召回率 (Recall): {sup_metrics['recall']:.4f}\n")
f.write(f" - F1分数 (F1-Score): {sup_metrics['f1']:.4f}\n")
f.write(f" - AUC-ROC: {sup_metrics['auc']:.4f}\n")
f.write("混淆矩阵详细分析了模型的预测结果,图表已生成。\n")
f.write(f"混淆矩阵图表: {cm_path}\n\n")
f.write("--- 5. 关键驱动因素分析 ---\n")
f.write("通过分析最佳模型 (Random Forest) 的特征重要性,识别出区分欺诈和正常交易的最关键因素。\n")
f.write("特征重要性排序 (Top 10):\n")
f.write(feat_imp_df.head(10).to_string(index=False))
f.write("\n从上表可以看出,哪些交易特征最能指示欺诈行为。\n")
f.write(f"特征重要性图表: {feat_imp_path}\n\n")
f.write("--- 6. 风险管理与业务应用建议 ---\n")
f.write("1. 实时监控: 将训练好的模型部署到交易系统中,对每笔实时交易进行评分。\n")
f.write("2. 风险阈值: 根据业务可接受的误报率,设定欺诈概率阈值。超过阈值的交易进入人工审核或被暂时阻止。\n")
f.write("3. 多层防护:\n")
f.write(" - 结合无监督和有监督模型的结果,提高检测的鲁棒性。\n")
f.write(" - 对于高风险特征组合(如'新设备'+'大额交易'+'非常用国家'),即使模型评分不高也应加强关注。\n")
f.write("4. 规则引擎: 对于一些明显的欺诈模式(如IP地址黑名单),可以使用规则引擎快速拦截。\n")
f.write("5. 模型迭代: 定期使用最新的带标签数据重新训练模型,以适应不断变化的欺诈手段。\n")
f.write("6. 用户教育: 提醒用户保护账户安全,如不泄露密码、不点击可疑链接等。\n\n")
f.write("=" * 50 + "\n")
f.write(" 报告结束\n")
f.write("=" * 50 + "\n")
print(f"欺诈检测分析报告已生成: {report_filename}")
# --- 主函数 ---
def main():
"""主函数"""
# 1. 生成数据
df_fraud = generate_sample_fraud_data(NUM_TRANSACTIONS, FRAUD_RATIO)
# 2. 数据预处理
X, y, le_country, feature_cols = preprocess_data(df_fraud)
# 3. 无监督异常检测
unsup_results = perform_unsupervised_detection(X, y)
# 4. 有监督欺诈检测
sup_results, cm_path = perform_supervised_detection(X, y)
# 5. 特征重要性分析 (针对有监督模型)
best_model = sup_results['Random Forest']['model']
feat_imp_df, feat_imp_path = analyze_feature_importance(best_model, feature_cols, 'Random Forest')
# 6. 生成报告
generate_fraud_detection_report(unsup_results, sup_results, cm_path, feat_imp_df, feat_imp_path)
print("\n用户风险管理和欺诈行为检测分析流程完成。")
if __name__ == "__main__":
main()