课程目标

掌握时间序列预测、库存优化模型、自动报表生成。

核心技能

  • 时间序列预测(ARIMA、Prophet)
  • 库存优化与安全库存计算
  • 自动化报表生成

课件内容

1. 销售预测模型

python# 安装依赖:pip install pandas numpy matplotlib statsmodels scikit-learn prophet
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

class SalesForecaster:
    def __init__(self, sales_data):
        """
        初始化销售预测器
        
        参数:
        sales_data: 销售数据,需包含日期和销售额
        """
        self.sales_data = sales_data.copy()
        self.forecast_results = {}
        
        # 确保日期格式
        if 'date' in self.sales_data.columns:
            self.sales_data['date'] = pd.to_datetime(self.sales_data['date'])
        elif 'order_date' in self.sales_data.columns:
            self.sales_data['date'] = pd.to_datetime(self.sales_data['order_date'])
            self.sales_data = self.sales_data.rename(columns={'order_date': 'date'})
    
    def prepare_time_series(self, freq='D'):
        """准备时间序列数据"""
        # 按日期聚合销售额
        if 'order_amount' in self.sales_data.columns:
            ts_data = self.sales_data.groupby('date')['order_amount'].sum().reset_index()
        elif 'sales' in self.sales_data.columns:
            ts_data = self.sales_data.groupby('date')['sales'].sum().reset_index()
        else:
            # 假设有quantity和unit_price
            self.sales_data['sales'] = self.sales_data['quantity'] * self.sales_data['unit_price']
            ts_data = self.sales_data.groupby('date')['sales'].sum().reset_index()
        
        # 设置日期索引
        ts_data = ts_data.set_index('date')
        
        # 重采样到指定频率
        ts_data = ts_data.resample(freq).sum()
        
        # 填充缺失值
        ts_data = ts_data.fillna(method='ffill').fillna(0)
        
        return ts_data
    
    def moving_average_forecast(self, ts_data, window=7, forecast_days=30):
        """移动平均预测"""
        # 计算移动平均
        ma = ts_data.rolling(window=window).mean()
        
        # 预测:使用最后window天的平均值
        last_values = ts_data.iloc[-window:].values.flatten()
        forecast = np.full(forecast_days, np.mean(last_values))
        
        # 创建预测日期
        last_date = ts_data.index[-1]
        forecast_dates = pd.date_range(
            start=last_date + pd.Timedelta(days=1),
            periods=forecast_days,
            freq=ts_data.index.freq
        )
        
        forecast_df = pd.DataFrame({
            'date': forecast_dates,
            'forecast': forecast,
            'model': 'Moving Average'
        })
        
        self.forecast_results['moving_average'] = {
            'forecast': forecast_df,
            'ma_values': ma,
            'window': window
        }
        
        return forecast_df
    
    def exponential_smoothing_forecast(self, ts_data, alpha=0.3, forecast_days=30):
        """指数平滑预测"""
        from statsmodels.tsa.holtwinters import SimpleExpSmoothing
        
        # 拟合模型
        model = SimpleExpSmoothing(ts_data)
        fitted_model = model.fit(smoothing_level=alpha, optimized=False)
        
        # 预测
        forecast = fitted_model.forecast(forecast_days)
        
        # 创建预测数据框
        last_date = ts_data.index[-1]
        forecast_dates = pd.date_range(
            start=last_date + pd.Timedelta(days=1),
            periods=forecast_days,
            freq=ts_data.index.freq
        )
        
        forecast_df = pd.DataFrame({
            'date': forecast_dates,
            'forecast': forecast.values,
            'model': 'Exponential Smoothing'
        })
        
        self.forecast_results['exponential_smoothing'] = {
            'forecast': forecast_df,
            'fitted_values': fitted_model.fittedvalues,
            'alpha': alpha
        }
        
        return forecast_df
    
    def arima_forecast(self, ts_data, order=(1,1,1), forecast_days=30):
        """ARIMA预测"""
        from statsmodels.tsa.arima.model import ARIMA
        
        try:
            # 拟合ARIMA模型
            model = ARIMA(ts_data, order=order)
            fitted_model = model.fit()
            
            # 预测
            forecast = fitted_model.forecast(steps=forecast_days)
            
            # 创建预测数据框
            last_date = ts_data.index[-1]
            forecast_dates = pd.date_range(
                start=last_date + pd.Timedelta(days=1),
                periods=forecast_days,
                freq=ts_data.index.freq
            )
            
            forecast_df = pd.DataFrame({
                'date': forecast_dates,
                'forecast': forecast.values,
                'model': f'ARIMA{order}'
            })
            
            self.forecast_results['arima'] = {
                'forecast': forecast_df,
                'fitted_values': fitted_model.fittedvalues,
                'order': order,
                'aic': fitted_model.aic,
                'bic': fitted_model.bic
            }
            
            return forecast_df
            
        except Exception as e:
            print(f"ARIMA模型拟合失败: {e}")
            return None
    
    def prophet_forecast(self, ts_data, forecast_days=30):
        """Prophet预测(需要安装fbprophet)"""
        try:
            from prophet import Prophet
            
            # 准备Prophet格式数据
            prophet_data = ts_data.reset_index()
            prophet_data.columns = ['ds', 'y']
            
            # 创建并拟合模型
            model = Prophet(
                yearly_seasonality=True,
                weekly_seasonality=True,
                daily_seasonality=False
            )
            model.fit(prophet_data)
            
            # 创建未来日期
            future = model.make_future_dataframe(periods=forecast_days, freq='D')
            
            # 预测
            forecast = model.predict(future)
            
            # 提取预测结果
            forecast_df = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(forecast_days)
            forecast_df.columns = ['date', 'forecast', 'lower_bound', 'upper_bound']
            forecast_df['model'] = 'Prophet'
            
            self.forecast_results['prophet'] = {
                'forecast': forecast_df,
                'model_object': model,
                'components': forecast[['ds', 'trend', 'yearly', 'weekly']]
            }
            
            return forecast_df
            
        except ImportError:
            print("Prophet未安装,跳过此方法")
            print("安装命令: pip install prophet")
            return None
        except Exception as e:
            print(f"Prophet模型拟合失败: {e}")
            return None
    
    def evaluate_forecast(self, actual, forecast, model_name):
        """评估预测效果"""
        from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
        
        # 确保长度一致
        min_len = min(len(actual), len(forecast))
        actual = actual[:min_len]
        forecast = forecast[:min_len]
        
        metrics = {
            'MAE': mean_absolute_error(actual, forecast),
            'RMSE': np.sqrt(mean_squared_error(actual, forecast)),
            'MAPE': np.mean(np.abs((actual - forecast) / actual)) * 100,
            'R2': r2_score(actual, forecast)
        }
        
        print(f"\n{model_name} 预测效果评估:")
        print(f"  MAE(平均绝对误差): {metrics['MAE']:.2f}")
        print(f"  RMSE(均方根误差): {metrics['RMSE']:.2f}")
        print(f"  MAPE(平均绝对百分比误差): {metrics['MAPE']:.2f}%")
        print(f"  R²(决定系数): {metrics['R2']:.4f}")
        
        return metrics
    
    def visualize_forecasts(self, ts_data, forecast_days=30):
        """可视化预测结果"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # 原始时间序列
        ax1 = axes[0, 0]
        ts_data.plot(ax=ax1, color='blue', linewidth=2, label='实际销售额')
        ax1.set_title('原始销售时间序列', fontsize=12, fontweight='bold')
        ax1.set_xlabel('日期')
        ax1.set_ylabel('销售额')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 移动平均预测
        if 'moving_average' in self.forecast_results:
            ax2 = axes[0, 1]
            ts_data.plot(ax=ax2, color='blue', alpha=0.5, label='实际值')
            self.forecast_results['moving_average']['ma_values'].plot(
                ax=ax2, color='red', linewidth=2, label='移动平均'
            )
            forecast_df = self.forecast_results['moving_average']['forecast']
            ax2.plot(forecast_df['date'], forecast_df['forecast'], 
                    color='green', linewidth=2, linestyle='--', label='预测')
            ax2.set_title('移动平均预测', fontsize=12, fontweight='bold')
            ax2.legend()
            ax2.grid(True, alpha=0.3)
        
        # 指数平滑预测
        if 'exponential_smoothing' in self.forecast_results:
            ax3 = axes[1, 0]
            ts_data.plot(ax=ax3, color='blue', alpha=0.5, label='实际值')
            self.forecast_results['exponential_smoothing']['fitted_values'].plot(
                ax=ax3, color='orange', linewidth=2, label='拟合值'
            )
            forecast_df = self.forecast_results['exponential_smoothing']['forecast']
            ax3.plot(forecast_df['date'], forecast_df['forecast'], 
                    color='green', linewidth=2, linestyle='--', label='预测')
            ax3.set_title('指数平滑预测', fontsize=12, fontweight='bold')
            ax3.legend()
            ax3.grid(True, alpha=0.3)
        
        # 多模型对比
        ax4 = axes[1, 1]
        ts_data.tail(60).plot(ax=ax4, color='black', linewidth=2, label='实际值')
        
        colors = ['red', 'green', 'blue', 'purple']
        for i, (model_name, result) in enumerate(self.forecast_results.items()):
            if 'forecast' in result:
                forecast_df = result['forecast']
                ax4.plot(forecast_df['date'], forecast_df['forecast'], 
                        color=colors[i % len(colors)], linewidth=2, 
                        linestyle='--', label=f'{model_name}预测')
        
        ax4.set_title('多模型预测对比', fontsize=12, fontweight='bold')
        ax4.legend()
        ax4.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

class InventoryOptimizer:
    def __init__(self, sales_data, lead_time=7, service_level=0.95):
        """
        库存优化器
        
        参数:
        sales_data: 销售数据
        lead_time: 补货提前期(天)
        service_level: 服务水平(库存满足率)
        """
        self.sales_data = sales_data.copy()
        self.lead_time = lead_time
        self.service_level = service_level
        self.z_score = self._calculate_z_score(service_level)
    
    def _calculate_z_score(self, service_level):
        """计算服务水平对应的Z值"""
        from scipy import stats
        return stats.norm.ppf(service_level)
    
    def calculate_demand_stats(self, product_id=None, freq='D'):
        """计算需求统计"""
        if product_id:
            product_data = self.sales_data[self.sales_data['product_id'] == product_id]
        else:
            product_data = self.sales_data
        
        # 按日期聚合
        if 'date' in product_data.columns:
            date_col = 'date'
        elif 'order_date' in product_data.columns:
            date_col = 'order_date'
            product_data = product_data.rename(columns={'order_date': 'date'})
        
        # 计算每日需求
        daily_demand = product_data.groupby('date')['quantity'].sum().resample(freq).sum()
        
        stats = {
            'mean_demand': daily_demand.mean(),
            'std_demand': daily_demand.std(),
            'cv_demand': daily_demand.std() / daily_demand.mean() if daily_demand.mean() > 0 else 0,
            'min_demand': daily_demand.min(),
            'max_demand': daily_demand.max(),
            'median_demand': daily_demand.median()
        }
        
        return stats, daily_demand
    
    def calculate_eoq(self, annual_demand, ordering_cost, holding_cost_per_unit):
        """计算经济订货批量(EOQ)"""
        eoq = np.sqrt((2 * annual_demand * ordering_cost) / holding_cost_per_unit)
        return eoq
    
    def calculate_safety_stock(self, demand_mean, demand_std, lead_time):
        """计算安全库存"""
        safety_stock = self.z_score * demand_std * np.sqrt(lead_time)
        return safety_stock
    
    def calculate_reorder_point(self, demand_mean, demand_std, lead_time):
        """计算再订货点"""
        safety_stock = self.calculate_safety_stock(demand_mean, demand_std, lead_time)
        reorder_point = (demand_mean * lead_time) + safety_stock
        return reorder_point
    
    def optimize_inventory(self, product_id, ordering_cost=50, holding_cost_rate=0.2, unit_cost=100):
        """优化单个产品的库存"""
        # 计算需求统计
        stats, daily_demand = self.calculate_demand_stats(product_id)
        
        # 年化需求
        annual_demand = stats['mean_demand'] * 365
        
        # 计算EOQ
        holding_cost_per_unit = unit_cost * holding_cost_rate
        eoq = self.calculate_eoq(annual_demand, ordering_cost, holding_cost_per_unit)
        
        # 计算安全库存和再订货点
        safety_stock = self.calculate_safety_stock(
            stats['mean_demand'], 
            stats['std_demand'], 
            self.lead_time
        )
        
        reorder_point = self.calculate_reorder_point(
            stats['mean_demand'], 
            stats['std_demand'], 
            self.lead_time
        )
        
        # 库存策略
        strategy = {
            'product_id': product_id,
            'annual_demand': annual_demand,
            'mean_daily_demand': stats['mean_demand'],
            'std_daily_demand': stats['std_demand'],
            'demand_cv': stats['cv_demand'],
            'eoq': eoq,
            'safety_stock': safety_stock,
            'reorder_point': reorder_point,
            'max_inventory': eoq + safety_stock,
            'ordering_cost': ordering_cost,
            'holding_cost_per_unit': holding_cost_per_unit,
            'lead_time': self.lead_time,
            'service_level': self.service_level
        }
        
        return strategy
    
    def visualize_inventory_analysis(self, product_id):
        """可视化库存分析"""
        stats, daily_demand = self.calculate_demand_stats(product_id)
        strategy = self.optimize_inventory(product_id)
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # 1. 需求分布
        ax1 = axes[0, 0]
        daily_demand.plot(kind='hist', bins=20, ax=ax1, alpha=0.7, color='blue')
        ax1.axvline(stats['mean_demand'], color='red', linestyle='--', linewidth=2, label='平均需求')
        ax1.axvline(stats['mean_demand'] + stats['std_demand'], color='orange', linestyle=':', linewidth=2, label='±1标准差')
        ax1.axvline(stats['mean_demand'] - stats['std_demand'], color='orange', linestyle=':', linewidth=2)
        ax1.set_title(f'{product_id} 需求分布', fontsize=12, fontweight='bold')
        ax1.set_xlabel('日需求量')
        ax1.set_ylabel('频次')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 2. 时间序列需求
        ax2 = axes[0, 1]
        daily_demand.plot(ax=ax2, color='green', linewidth=1)
        ax2.axhline(y=stats['mean_demand'], color='red', linestyle='--', linewidth=2, label='平均需求')
        ax2.fill_between(daily_demand.index, 
                        stats['mean_demand'] - stats['std_demand'],
                        stats['mean_demand'] + stats['std_demand'],
                        alpha=0.2, color='orange', label='标准差范围')
        ax2.set_title(f'{product_id} 需求时间序列', fontsize=12, fontweight='bold')
        ax2.set_xlabel('日期')
        ax2.set_ylabel('需求量')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
        
        # 3. 库存策略示意图
        ax3 = axes[1, 0]
        # 模拟库存变化
        days = 100
        inventory_level = []
        current_inventory = strategy['max_inventory']
        demand_series = np.random.normal(stats['mean_demand'], stats['std_demand'], days)
        
        for demand in demand_series:
            current_inventory = max(0, current_inventory - demand)
            if current_inventory <= strategy['reorder_point']:
                current_inventory += strategy['eoq']
            inventory_level.append(current_inventory)
        
        ax3.plot(range(days), inventory_level, color='blue', linewidth=2, label='库存水平')
        ax3.axhline(y=strategy['reorder_point'], color='red', linestyle='--', 
                   linewidth=2, label='再订货点')
        ax3.axhline(y=strategy['safety_stock'], color='orange', linestyle=':', 
                   linewidth=2, label='安全库存')
        ax3.set_title('库存策略模拟', fontsize=12, fontweight='bold')
        ax3.set_xlabel('天数')
        ax3.set_ylabel('库存量')
        ax3.legend()
        ax3.grid(True, alpha=0.3)
        
        # 4. 成本分析
        ax4 = axes[1, 1]
        order_quantities = np.linspace(strategy['eoq'] * 0.5, strategy['eoq'] * 2, 50)
        ordering_costs = (annual_demand / order_quantities) * strategy['ordering_cost']
        holding_costs = (order_quantities / 2) * strategy['holding_cost_per_unit']
        total_costs = ordering_costs + holding_costs
        
        ax4.plot(order_quantities, ordering_costs, label='订货成本', color='blue')
        ax4.plot(order_quantities, holding_costs, label='持有成本', color='green')
        ax4.plot(order_quantities, total_costs, label='总成本', color='red', linewidth=2)
        ax4.axvline(x=strategy['eoq'], color='black', linestyle='--', 
                   linewidth=2, label='EOQ')
        ax4.set_title('成本分析', fontsize=12, fontweight='bold')
        ax4.set_xlabel('订货批量')
        ax4.set_ylabel('成本')
        ax4.legend()
        ax4.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        return strategy

# 案例数据脚本
def create_forecast_sample_data():
    """生成预测分析示例数据"""
    np.random.seed(42)
    
    # 生成2年的日销售数据
    dates = pd.date_range('2022-01-01', '2023-12-31', freq='D')
    n_days = len(dates)
    
    # 基础趋势 + 季节性 + 随机噪声
    trend = np.linspace(1000, 5000, n_days)
    yearly_seasonality = 1000 * np.sin(2 * np.pi * np.arange(n_days) / 365)
    weekly_seasonality = 500 * np.sin(2 * np.pi * np.arange(n_days) / 7)
    noise = np.random.normal(0, 200, n_days)
    
    sales = trend + yearly_seasonality + weekly_seasonality + noise
    sales = np.maximum(sales, 0)  # 确保非负
    
    # 创建DataFrame
    sales_data = pd.DataFrame({
        'date': dates,
        'sales': sales.round(2),
        'quantity': (sales / 100).astype(int) + np.random.randint(1, 10, n_days)
    })
    
    # 添加产品信息
    products = ['P001', 'P002', 'P003', 'P004', 'P005']
    product_sales = []
    
    for date, total_sales, total_qty in zip(dates, sales, sales_data['quantity']):
        for product in products:
            # 分配销售到不同产品
            product_share = np.random.dirichlet(np.ones(len(products)))
            product_sales.append({
                'date': date,
                'product_id': product,
                'sales': total_sales * product_share[products.index(product)],
                'quantity': max(1, int(total_qty * product_share[products.index(product)]))
            })
    
    product_sales_df = pd.DataFrame(product_sales)
    
    # 保存数据
    sales_data.to_csv('daily_sales_forecast.csv', index=False)
    product_sales_df.to_csv('product_sales_detail.csv', index=False)
    
    print("✅ 预测示例数据已生成:")
    print(f"  日销售数据: {sales_data.shape}")
    print(f"  产品明细数据: {product_sales_df.shape}")
    
    return sales_data, product_sales_df

# 自动报表生成
class AutoReportGenerator:
    def __init__(self, forecast_results, inventory_strategies):
        self.forecast_results = forecast_results
        self.inventory_strategies = inventory_strategies
    
    def generate_html_report(self, filename='sales_forecast_report.html'):
        """生成HTML格式报告"""
        html_content = """
        <!DOCTYPE html>
        <html>
        <head>
            <meta charset="UTF-8">
            <title>电商销售预测与库存优化报告</title>
            <style>
                body { font-family: Arial, sans-serif; margin: 40px; }
                .header { text-align: center; padding: 20px; background: #f0f0f0; }
                .section { margin: 30px 0; padding: 20px; border: 1px solid #ddd; }
                .metric { display: inline-block; margin: 10px 20px; padding: 10px; background: #e8f4f8; }
                table { width: 100%; border-collapse: collapse; margin: 20px 0; }
                th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
                th { background-color: #f2f2f2; }
                .forecast { background: #f9f9f9; padding: 15px; }
                .inventory { background: #f0f8ff; padding: 15px; }
            </style>
        </head>
        <body>
            <div class="header">
                <h1>📊 电商销售预测与库存优化报告</h1>
                <p>生成时间: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """</p>
            </div>
        """
        
        # 添加预测结果
        html_content += """
            <div class="section">
                <h2>📈 销售预测结果</h2>
                <div class="forecast">
        """
        
        for model_name, result in self.forecast_results.items():
            if 'forecast' in result:
                forecast_df = result['forecast']
                html_content += f"""
                    <h3>{model_name} 预测</h3>
                    <p>未来30天预测销售额: ¥{forecast_df['forecast'].sum():,.2f}</p>
                    <p>日均预测: ¥{forecast_df['forecast'].mean():,.2f}</p>
                """
        
        html_content += """
                </div>
            </div>
        """
        
        # 添加库存策略
        html_content += """
            <div class="section">
                <h2>📦 库存优化策略</h2>
                <div class="inventory">
                    <table>
                        <tr>
                            <th>产品ID</th>
                            <th>年需求量</th>
                            <th>EOQ</th>
                            <th>安全库存</th>
                            <th>再订货点</th>
                            <th>最大库存</th>
                        </tr>
        """
        
        for strategy in self.inventory_strategies:
            html_content += f"""
                        <tr>
                            <td>{strategy['product_id']}</td>
                            <td>{strategy['annual_demand']:,.0f}</td>
                            <td>{strategy['eoq']:,.1f}</td>
                            <td>{strategy['safety_stock']:,.1f}</td>
                            <td>{strategy['reorder_point']:,.1f}</td>
                            <td>{strategy['max_inventory']:,.1f}</td>
                        </tr>
            """
        
        html_content += """
                    </table>
                </div>
            </div>
            
            <div class="section">
                <h2>💡 建议与行动计划</h2>
                <ol>
                    <li><strong>采购建议:</strong>根据EOQ和安全库存制定采购计划</li>
                    <li><strong>库存监控:</strong>设置再订货点预警机制</li>
                    <li><strong>销售预测:</strong>结合季节性调整销售目标</li>
                    <li><strong>成本优化:</strong>定期评估订货成本和持有成本</li>
                </ol>
            </div>
        </body>
        </html>
        """
        
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(html_content)
        
        print(f"✅ HTML报告已生成: {filename}")
        return filename
    
    def generate_excel_report(self, filename='sales_forecast_report.xlsx'):
        """生成Excel格式报告"""
        with pd.ExcelWriter(filename, engine='openpyxl') as writer:
            # 预测结果
            for model_name, result in self.forecast_results.items():
                if 'forecast' in result:
                    result['forecast'].to_excel(writer, sheet_name=f'{model_name}_预测', index=False)
            
            # 库存策略
            strategies_df = pd.DataFrame(self.inventory_strategies)
            strategies_df.to_excel(writer, sheet_name='库存策略', index=False)
            
            # 汇总表
            summary_data = {
                '指标': ['总预测销售额', '平均EOQ', '平均安全库存', '平均再订货点'],
                '数值': [
                    sum(r['forecast']['forecast'].sum() for r in self.forecast_results.values() if 'forecast' in r),
                    strategies_df['eoq'].mean(),
                    strategies_df['safety_stock'].mean(),
                    strategies_df['reorder_point'].mean()
                ],
                '单位': ['元', '件', '件', '件']
            }
            summary_df = pd.DataFrame(summary_data)
            summary_df.to_excel(writer, sheet_name='汇总', index=False)
        
        print(f"✅ Excel报告已生成: {filename}")
        return filename

# 运行示例
if __name__ == "__main__":
    print("="*60)
    print("Day 6: 销售预测与库存优化")
    print("="*60)
    
    # 1. 生成示例数据
    print("\n步骤1: 生成预测分析示例数据...")
    sales_data, product_sales = create_forecast_sample_data()
    
    # 2. 销售预测
    print("\n步骤2: 销售预测分析...")
    forecaster = SalesForecaster(sales_data)
    
    # 准备时间序列
    ts_data = forecaster.prepare_time_series(freq='D')
    print(f"时间序列数据形状: {ts_data.shape}")
    
    # 多种预测方法
    print("\n使用多种方法进行预测:")
    
    # 移动平均
    ma_forecast = forecaster.moving_average_forecast(ts_data, window=30, forecast_days=30)
    print(f"  ✓ 移动平均预测完成")
    
    # 指数平滑
    es_forecast = forecaster.exponential_smoothing_forecast(ts_data, alpha=0.3, forecast_days=30)
    print(f"  ✓ 指数平滑预测完成")
    
    # ARIMA
    arima_forecast = forecaster.arima_forecast(ts_data, order=(1,1,1), forecast_days=30)
    if arima_forecast is not None:
        print(f"  ✓ ARIMA预测完成")
    
    # Prophet(可选)
    prophet_forecast = forecaster.prophet_forecast(ts_data, forecast_days=30)
    if prophet_forecast is not None:
        print(f"  ✓ Prophet预测完成")
    
    # 3. 可视化预测结果
    print("\n步骤3: 生成预测可视化...")
    forecaster.visualize_forecasts(ts_data, forecast_days=30)
    
    # 4. 库存优化
    print("\n步骤4: 库存优化分析...")
    optimizer = InventoryOptimizer(product_sales, lead_time=7, service_level=0.95)
    
    # 分析前5个产品
    products_to_analyze = ['P001', 'P002', 'P003', 'P004', 'P005']
    inventory_strategies = []
    
    for product_id in products_to_analyze:
        print(f"\n分析产品: {product_id}")
        strategy = optimizer.optimize_inventory(
            product_id, 
            ordering_cost=50, 
            holding_cost_rate=0.2, 
            unit_cost=100
        )
        inventory_strategies.append(strategy)
        
        print(f"  年需求量: {strategy['annual_demand']:,.0f}")
        print(f"  EOQ: {strategy['eoq']:,.1f}")
        print(f"  安全库存: {strategy['safety_stock']:,.1f}")
        print(f"  再订货点: {strategy['reorder_point']:,.1f}")
    
    # 5. 可视化库存分析
    print("\n步骤5: 生成库存分析可视化...")
    optimizer.visualize_inventory_analysis('P001')
    
    # 6. 生成自动报表
    print("\n步骤6: 生成自动报表...")
    report_generator = AutoReportGenerator(
        forecaster.forecast_results,
        inventory_strategies
    )
    
    # 生成HTML报告
    html_report = report_generator.generate_html_report()
    
    # 生成Excel报告
    excel_report = report_generator.generate_excel_report()
    
    print("\n" + "="*60)
    print("✅ 分析完成!")
    print("="*60)
    print("生成的文件:")
    print(f"  1. HTML报告: {html_report}")
    print(f"  2. Excel报告: {excel_report}")
    print(f"  3. 原始数据: daily_sales_forecast.csv")
    print(f"  4. 产品明细: product_sales_detail.csv")
    print("\n建议下一步:")
    print("  1. 根据预测结果调整销售目标")
    print("  2. 按照库存策略优化采购计划")
    print("  3. 设置库存预警监控")

安装要求

bash# 基础依赖
pip install pandas==1.5.3 numpy==1.24.3 matplotlib==3.7.1 seaborn==0.12.2

# 时间序列分析
pip install statsmodels==0.14.0 scikit-learn==1.3.0

# Prophet预测(可选)
pip install prophet==1.1.4

# Excel报告生成
pip install openpyxl==3.1.2 xlsxwriter==3.1.2

# 科学计算
pip install scipy==1.11.4

课程总结

4-6天技能掌握清单

  1. Day 4:数据采集与清洗
    • ✅ 多源数据自动采集(Excel、CSV、MySQL)
    • ✅ 自动化数据清洗(缺失值、异常值、重复值)
    • ✅ 数据类型标准化与转换
  2. Day 5:电商核心指标与RFM分析
    • ✅ 电商核心KPI计算(GMV、转化率、客单价等)
    • ✅ RFM用户价值分层模型
    • ✅ 销售漏斗分析与可视化
  3. Day 6:销售预测与库存优化
    • ✅ 时间序列预测(移动平均、指数平滑、ARIMA)
    • ✅ 库存优化模型(EOQ、安全库存、再订货点)
    • ✅ 自动化报表生成(HTML、Excel)

运行说明

  1. 环境准备: bash# 创建虚拟环境 python -m venv ecommerce_env source ecommerce_env/bin/activate # Linux/Mac # 或 ecommerce_env\Scripts\activate # Windows # 安装依赖 pip install -r requirements.txt
  2. 运行顺序
    • 按Day 4 → Day 5 → Day 6顺序运行
    • 每个脚本都是独立的,包含示例数据生成
    • 所有输出文件会自动保存到当前目录
  3. 自定义数据
    • 替换示例数据文件为自己的电商数据
    • 调整参数以适应实际业务场景
    • 扩展模型以处理更复杂的需求

业务应用价值

  1. 数据驱动决策:基于数据分析制定销售策略
  2. 库存优化:降低库存成本,提高周转率
  3. 用户精细化运营:RFM分群实现精准营销
  4. 自动化报表:减少人工工作量,提高效率

这套课程体系从数据采集到预测优化,形成了完整的电商数据分析闭环,学员通过4-6天的学习可以掌握电商全链路数据分析的核心技能。