开发思路:我们来创建一个Python脚本,用于演示如何为电商平台的不同监控指标类型(例如:CPU使用率、内存使用率、网络流量、订单量、错误率等)预测和设置动态告警阈值,以提升监控的覆盖率和有效性。

该脚本将包含以下功能:

  1. 模拟生成多种类型的电商平台监控指标时间序列数据。
  2. 对每种指标类型进行数据探索和特征分析。
  3. 使用统计方法(如基于历史分位数)和机器学习方法(如Isolation Forest用于异常检测)来动态计算告警阈值。
  4. 可视化展示原始数据和计算出的动态阈值。
  5. 生成一份包含分析过程、阈值设定逻辑和建议的综合报告。

import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings("ignore")

# --- 配置 ---
NUM_DAYS = 30  # 模拟30天的数据
REPORT_PREFIX = '电商多类型指标动态告警阈值预测报告'
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

# 定义指标类型及其基础参数
METRIC_CONFIGS = {
    'cpu_utilization': {
        'name': 'CPU使用率 (%)',
        'base_level': 30,
        'trend_slope': 0.05, # 每天平均变化0.05%
        'noise_std': 5,
        'seasonal_weekly': True,
        'event_multiplier_range': (2, 5) # 大促时变为2-5倍
    },
    'memory_utilization': {
        'name': '内存使用率 (%)',
        'base_level': 50,
        'trend_slope': 0.02,
        'noise_std': 4,
        'seasonal_weekly': True,
        'event_multiplier_range': (1.5, 3)
    },
    'network_in_traffic': {
        'name': '入站网络流量 (MB/s)',
        'base_level': 100,
        'trend_slope': 0.5,
        'noise_std': 20,
        'seasonal_weekly': True,
        'event_multiplier_range': (3, 8)
    },
    'order_count': {
        'name': '订单数量 (单/分钟)',
        'base_level': 50,
        'trend_slope': 0.3,
        'noise_std': 15,
        'seasonal_weekly': True,
        'event_multiplier_range': (5, 15)
    },
    'error_rate': {
        'name': '错误率 (万分之一)',
        'base_level': 5,
        'trend_slope': 0.01,
        'noise_std': 2,
        'seasonal_weekly': False,
        'event_multiplier_range': (3, 10)
    }
}

# --- 数据生成 ---

def generate_sample_metrics_data(n_days, configs):
    """生成模拟的多种电商监控指标数据"""
    print("--- 正在生成模拟电商监控指标数据 ---")
    start_date = datetime.now() - timedelta(days=n_days-1)
    dates = pd.date_range(start=start_date, periods=n_days, freq='D')
    
    data_dict = {'date': dates}
    
    for metric_key, config in configs.items():
        # 1. 基础水平和长期趋势
        base_level = config['base_level']
        trend_slope = config['trend_slope']
        trend = np.arange(n_days) * trend_slope
        
        # 2. 季节性 (周度)
        weekly_effect = np.ones(n_days)
        if config['seasonal_weekly']:
            day_of_week = pd.Series(dates).dt.dayofweek
            # 假设周末业务量大,资源使用高
            weekly_effect = np.where(day_of_week.isin([5, 6]), 1.2, 1.0)
        
        # 3. 特定事件效应 (模拟大促)
        event_effect = np.ones(n_days)
        # 模拟一个大促日 (例如第15天)
        promo_day_idx = n_days // 2
        if 0 <= promo_day_idx < n_days:
            multiplier = np.random.uniform(*config['event_multiplier_range'])
            event_effect[promo_day_idx] *= multiplier
            if promo_day_idx > 0:
                event_effect[promo_day_idx-1] *= np.random.uniform(1.2, multiplier * 0.8)
            if promo_day_idx < n_days - 1:
                event_effect[promo_day_idx+1] *= np.random.uniform(1.1, multiplier * 0.6)
        
        # 4. 随机噪声
        noise = np.random.normal(0, config['noise_std'], n_days)
        
        # 5. 组合所有成分
        values = base_level + trend
        if config['seasonal_weekly']:
            values = values * weekly_effect
        values = values * event_effect + noise
        
        # 确保值在合理范围内 (例如CPU使用率不超过100%)
        if 'utilization' in metric_key:
            values = np.clip(values, 0, 100)
        elif metric_key == 'error_rate':
            values = np.maximum(values, 0) # 错误率不能为负
            
        data_dict[metric_key] = np.round(values, 2)
    
    df = pd.DataFrame(data_dict)
    
    csv_filename = f'{REPORT_PREFIX}_模拟监控指标数据.csv'
    df.to_csv(csv_filename, index=False, encoding='utf-8-sig')
    print(f"模拟监控指标数据已生成并保存至: {csv_filename}")
    return df

# --- 阈值预测 ---

def calculate_dynamic_thresholds(df, configs):
    """为每种指标计算动态告警阈值"""
    print("\n--- 正在计算动态告警阈值 ---")
    thresholds = {}
    
    for metric_key, config in configs.items():
        values = df[metric_key].values
        metric_name = config['name']
        
        print(f"  -> 为 '{metric_name}' 计算阈值...")
        
        # 方法1: 基于历史分位数 (例如95%分位数作为上限阈值)
        upper_threshold_percentile = 95
        lower_threshold_percentile = 5 # 例如,对于某些指标,过低也可能异常
        upper_threshold = np.percentile(values, upper_threshold_percentile)
        lower_threshold = np.percentile(values, lower_threshold_percentile)
        
        # 方法2: 使用Isolation Forest进行无监督异常检测
        # 该方法可以识别与历史模式显著不同的点
        clf = IsolationForest(contamination=0.1, random_state=RANDOM_SEED) # 假设有10%的异常
        # Isolation Forest需要2D数组
        values_reshaped = values.reshape(-1, 1)
        anomaly_labels = clf.fit_predict(values_reshaped)
        
        # 获取被标记为异常的点,用于确定阈值边界
        # 注意:Isolation Forest可能标记高低两种异常,需要区分
        normal_values = values[anomaly_labels == 1]
        if len(normal_values) > 0:
            # 基于正常数据重新计算分位数,更稳健
            robust_upper = np.percentile(normal_values, upper_threshold_percentile)
            robust_lower = np.percentile(normal_values, lower_threshold_percentile)
        else:
            # 如果所有点都被标记为异常(不太可能),回退到原始方法
            robust_upper, robust_lower = upper_threshold, lower_threshold
            
        # 确定最终阈值 (可以结合多种方法)
        # 这里我们采用一种简单的策略:使用稳健分位数,并留有一定buffer
        buffer_percent = 2.0
        final_upper = robust_upper * (1 + buffer_percent/100.0) if robust_upper > 0 else robust_upper + buffer_percent
        final_lower = robust_lower * (1 - buffer_percent/100.0) if robust_lower > 0 and 'utilization' not in metric_key else max(0, robust_lower - buffer_percent)
        
        # 对于利用率类指标,上限通常设为100
        if 'utilization' in metric_key:
            final_upper = min(final_upper, 100.0)
            
        thresholds[metric_key] = {
            'name': metric_name,
            'method': f'Isolation Forest + 稳健{upper_threshold_percentile}分位数',
            'upper': round(final_upper, 2),
            'lower': round(final_lower, 2),
            'raw_upper_percentile': round(upper_threshold, 2),
            'raw_lower_percentile': round(lower_threshold, 2),
            'robust_upper': round(robust_upper, 2),
            'robust_lower': round(robust_lower, 2)
        }
        
    return thresholds

# --- 可视化 ---

def visualize_thresholds(df, thresholds, configs):
    """可视化数据和阈值"""
    print("\n--- 正在生成可视化图表 ---")
    plot_paths = []
    
    n_metrics = len(configs)
    n_cols = 2
    n_rows = (n_metrics + n_cols - 1) // n_cols
    
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5 * n_rows))
    axes = axes.flatten() if n_metrics > 1 else [axes]
    
    for i, (metric_key, config) in enumerate(configs.items()):
        ax = axes[i]
        metric_name = config['name']
        values = df[metric_key]
        threshold_info = thresholds[metric_key]
        
        ax.plot(df['date'], values, marker='o', linestyle='-', markersize=4, label='历史数据')
        ax.axhline(y=threshold_info['upper'], color='r', linestyle='--', linewidth=1.5, label=f"动态上限阈值: {threshold_info['upper']}")
        ax.axhline(y=threshold_info['lower'], color='orange', linestyle='--', linewidth=1.5, label=f"动态下限阈值: {threshold_info['lower']}")
        ax.axhline(y=threshold_info['raw_upper_percentile'], color='pink', linestyle=':', linewidth=1, label=f"原始95%分位数: {threshold_info['raw_upper_percentile']}")
        
        ax.set_title(f"{metric_name} 动态阈值")
        ax.set_xlabel('日期')
        ax.set_ylabel(metric_name)
        ax.legend()
        ax.grid(True, alpha=0.5)
    
    # 隐藏多余的子图
    for j in range(i+1, len(axes)):
        fig.delaxes(axes[j])
        
    plt.tight_layout()
    plot_filename = f'{REPORT_PREFIX}_动态阈值可视化.png'
    plt.savefig(plot_filename, bbox_inches='tight')
    plt.close()
    plot_paths.append(plot_filename)
    print(f"动态阈值可视化图表已保存至: {plot_filename}")
    
    return plot_paths

# --- 报告生成 ---

def generate_threshold_report(thresholds, plot_paths):
    """生成最终的阈值预测分析报告"""
    print("\n--- 正在生成动态告警阈值预测报告 ---")
    from datetime import datetime
    report_filename = f"{REPORT_PREFIX}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
    
    with open(report_filename, 'w', encoding='utf-8') as f:
        f.write("=" * 70 + "\n")
        f.write("        电商平台多类型指标动态告警阈值预测与分析报告\n")
        f.write(f"              生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write("=" * 70 + "\n\n")

        f.write("--- 1. 项目概述 ---\n")
        f.write("本项目旨在为电商平台的多种关键监控指标(如CPU、内存、网络、订单、错误率等)\n")
        f.write("预测和设定动态告警阈值。相比于静态阈值,动态阈值能更好地适应业务变化,\n")
        f.write("减少误报和漏报,从而提升监控系统的覆盖范围和有效性。\n\n")

        f.write("--- 2. 数据概览 ---\n")
        f.write(f"数据来源: 模拟生成的 {NUM_DAYS} 天电商平台监控指标数据。\n")
        f.write("包含的指标类型:\n")
        for key, config in METRIC_CONFIGS.items():
            f.write(f"  - {config['name']}\n")
        f.write("数据特征: 包含长期趋势、周度季节性以及模拟的大促事件效应。\n")
        f.write("原始数据已保存为 CSV 文件。\n\n")

        f.write("--- 3. 阈值计算方法 ---\n")
        f.write("采用以下方法相结合来计算动态阈值:\n")
        f.write("1. 历史分位数法: \n")
        f.write("   - 计算指标历史数据的高百分位数(如95%)和低百分位数(如5%)。\n")
        f.write("   - 作为初步的阈值参考。\n")
        f.write("2. Isolation Forest (隔离森林) 异常检测:\n")
        f.write("   - 一种无监督机器学习算法,能识别与大多数数据显著不同的点。\n")
        f.write("   - 用于过滤掉历史数据中的异常波动,使阈值计算更稳健。\n")
        f.write("3. 综合策略:\n")
        f.write("   - 基于Isolation Forest过滤后的“正常”数据,重新计算分位数。\n")
        f.write("   - 在稳健分位数基础上增加一个小的安全缓冲区(Buffer)。\n")
        f.write("   - 对特定指标(如利用率)施加硬性限制(如<=100%)。\n\n")
        
        f.write("--- 4. 各指标动态阈值结果 ---\n")
        for metric_key, threshold_info in thresholds.items():
            f.write(f"\n  -> 指标: {threshold_info['name']}\n")
            f.write(f"     - 计算方法: {threshold_info['method']}\n")
            f.write(f"     - 最终动态上限阈值: {threshold_info['upper']}\n")
            f.write(f"     - 最终动态下限阈值: {threshold_info['lower']}\n")
            f.write(f"     - 原始95%分位数(上限参考): {threshold_info['raw_upper_percentile']}\n")
            f.write(f"     - 原始5%分位数(下限参考): {threshold_info['raw_lower_percentile']}\n")
            f.write(f"     - Isolation Forest过滤后95%分位数: {threshold_info['robust_upper']}\n")
            f.write(f"     - Isolation Forest过滤后5%分位数: {threshold_info['robust_lower']}\n")
        f.write("\n")

        f.write("--- 5. 可视化分析 ---\n")
        f.write("所有指标的历史数据及其对应的动态阈值已在图表中展示。\n")
        for path in plot_paths:
            f.write(f"  - 图表: {path}\n")
        f.write("\n")

        f.write("--- 6. 结论与建议 ---\n")
        f.write("1. 阈值动态化: \n")
        f.write("   - 不同类型的指标应采用不同的阈值策略。\n")
        f.write("   - 动态阈值能自动适应业务的自然波动(如周末高峰),减少无效告警。\n")
        f.write("2. 模型迭代: \n")
        f.write("   - 建议定期(如每日/每周)使用最新数据重新计算阈值,以适应系统和业务的长期变化。\n")
        f.write("3. 多模型融合: \n")
        f.write("   - 可以结合更多统计模型(如ARIMA预测区间)或机器学习模型来进一步优化阈值。\n")
        f.write("4. 业务联动: \n")
        f.write("   - 在已知的大促活动前,可以临时调整阈值计算逻辑,或将活动期数据作为特殊场景处理。\n")
        f.write("5. 告警分级: \n")
        f.write("   - 可以设置多级阈值(如警告、严重),对应不同的响应流程。\n")
        f.write("6. 可视化监控: \n")
        f.write("   - 将动态阈值实时展示在监控面板上,便于运维人员理解告警背景。\n\n")

        f.write("=" * 70 + "\n")
        f.write("                             报告结束\n")
        f.write("=" * 70 + "\n")

    print(f"动态告警阈值预测报告已生成: {report_filename}")

# --- 主函数 ---

def main():
    """主函数"""
    # 1. 生成数据
    df_metrics = generate_sample_metrics_data(NUM_DAYS, METRIC_CONFIGS)
    
    # 2. 计算动态阈值
    thresholds = calculate_dynamic_thresholds(df_metrics, METRIC_CONFIGS)
    
    # 3. 可视化
    plot_paths = visualize_thresholds(df_metrics, thresholds, METRIC_CONFIGS)
    
    # 4. 生成报告
    generate_threshold_report(thresholds, plot_paths)
    
    print("\n电商平台多类型指标动态告警阈值预测分析流程完成。")

if __name__ == "__main__":
    main()