数据技能(SKILLS) :Day 6:销售预测与库存优化
课程目标
掌握时间序列预测、库存优化模型、自动报表生成。
核心技能
- 时间序列预测(ARIMA、Prophet)
- 库存优化与安全库存计算
- 自动化报表生成
课件内容
1. 销售预测模型
python# 安装依赖:pip install pandas numpy matplotlib statsmodels scikit-learn prophet
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class SalesForecaster:
def __init__(self, sales_data):
"""
初始化销售预测器
参数:
sales_data: 销售数据,需包含日期和销售额
"""
self.sales_data = sales_data.copy()
self.forecast_results = {}
# 确保日期格式
if 'date' in self.sales_data.columns:
self.sales_data['date'] = pd.to_datetime(self.sales_data['date'])
elif 'order_date' in self.sales_data.columns:
self.sales_data['date'] = pd.to_datetime(self.sales_data['order_date'])
self.sales_data = self.sales_data.rename(columns={'order_date': 'date'})
def prepare_time_series(self, freq='D'):
"""准备时间序列数据"""
# 按日期聚合销售额
if 'order_amount' in self.sales_data.columns:
ts_data = self.sales_data.groupby('date')['order_amount'].sum().reset_index()
elif 'sales' in self.sales_data.columns:
ts_data = self.sales_data.groupby('date')['sales'].sum().reset_index()
else:
# 假设有quantity和unit_price
self.sales_data['sales'] = self.sales_data['quantity'] * self.sales_data['unit_price']
ts_data = self.sales_data.groupby('date')['sales'].sum().reset_index()
# 设置日期索引
ts_data = ts_data.set_index('date')
# 重采样到指定频率
ts_data = ts_data.resample(freq).sum()
# 填充缺失值
ts_data = ts_data.fillna(method='ffill').fillna(0)
return ts_data
def moving_average_forecast(self, ts_data, window=7, forecast_days=30):
"""移动平均预测"""
# 计算移动平均
ma = ts_data.rolling(window=window).mean()
# 预测:使用最后window天的平均值
last_values = ts_data.iloc[-window:].values.flatten()
forecast = np.full(forecast_days, np.mean(last_values))
# 创建预测日期
last_date = ts_data.index[-1]
forecast_dates = pd.date_range(
start=last_date + pd.Timedelta(days=1),
periods=forecast_days,
freq=ts_data.index.freq
)
forecast_df = pd.DataFrame({
'date': forecast_dates,
'forecast': forecast,
'model': 'Moving Average'
})
self.forecast_results['moving_average'] = {
'forecast': forecast_df,
'ma_values': ma,
'window': window
}
return forecast_df
def exponential_smoothing_forecast(self, ts_data, alpha=0.3, forecast_days=30):
"""指数平滑预测"""
from statsmodels.tsa.holtwinters import SimpleExpSmoothing
# 拟合模型
model = SimpleExpSmoothing(ts_data)
fitted_model = model.fit(smoothing_level=alpha, optimized=False)
# 预测
forecast = fitted_model.forecast(forecast_days)
# 创建预测数据框
last_date = ts_data.index[-1]
forecast_dates = pd.date_range(
start=last_date + pd.Timedelta(days=1),
periods=forecast_days,
freq=ts_data.index.freq
)
forecast_df = pd.DataFrame({
'date': forecast_dates,
'forecast': forecast.values,
'model': 'Exponential Smoothing'
})
self.forecast_results['exponential_smoothing'] = {
'forecast': forecast_df,
'fitted_values': fitted_model.fittedvalues,
'alpha': alpha
}
return forecast_df
def arima_forecast(self, ts_data, order=(1,1,1), forecast_days=30):
"""ARIMA预测"""
from statsmodels.tsa.arima.model import ARIMA
try:
# 拟合ARIMA模型
model = ARIMA(ts_data, order=order)
fitted_model = model.fit()
# 预测
forecast = fitted_model.forecast(steps=forecast_days)
# 创建预测数据框
last_date = ts_data.index[-1]
forecast_dates = pd.date_range(
start=last_date + pd.Timedelta(days=1),
periods=forecast_days,
freq=ts_data.index.freq
)
forecast_df = pd.DataFrame({
'date': forecast_dates,
'forecast': forecast.values,
'model': f'ARIMA{order}'
})
self.forecast_results['arima'] = {
'forecast': forecast_df,
'fitted_values': fitted_model.fittedvalues,
'order': order,
'aic': fitted_model.aic,
'bic': fitted_model.bic
}
return forecast_df
except Exception as e:
print(f"ARIMA模型拟合失败: {e}")
return None
def prophet_forecast(self, ts_data, forecast_days=30):
"""Prophet预测(需要安装fbprophet)"""
try:
from prophet import Prophet
# 准备Prophet格式数据
prophet_data = ts_data.reset_index()
prophet_data.columns = ['ds', 'y']
# 创建并拟合模型
model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False
)
model.fit(prophet_data)
# 创建未来日期
future = model.make_future_dataframe(periods=forecast_days, freq='D')
# 预测
forecast = model.predict(future)
# 提取预测结果
forecast_df = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(forecast_days)
forecast_df.columns = ['date', 'forecast', 'lower_bound', 'upper_bound']
forecast_df['model'] = 'Prophet'
self.forecast_results['prophet'] = {
'forecast': forecast_df,
'model_object': model,
'components': forecast[['ds', 'trend', 'yearly', 'weekly']]
}
return forecast_df
except ImportError:
print("Prophet未安装,跳过此方法")
print("安装命令: pip install prophet")
return None
except Exception as e:
print(f"Prophet模型拟合失败: {e}")
return None
def evaluate_forecast(self, actual, forecast, model_name):
"""评估预测效果"""
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
# 确保长度一致
min_len = min(len(actual), len(forecast))
actual = actual[:min_len]
forecast = forecast[:min_len]
metrics = {
'MAE': mean_absolute_error(actual, forecast),
'RMSE': np.sqrt(mean_squared_error(actual, forecast)),
'MAPE': np.mean(np.abs((actual - forecast) / actual)) * 100,
'R2': r2_score(actual, forecast)
}
print(f"\n{model_name} 预测效果评估:")
print(f" MAE(平均绝对误差): {metrics['MAE']:.2f}")
print(f" RMSE(均方根误差): {metrics['RMSE']:.2f}")
print(f" MAPE(平均绝对百分比误差): {metrics['MAPE']:.2f}%")
print(f" R²(决定系数): {metrics['R2']:.4f}")
return metrics
def visualize_forecasts(self, ts_data, forecast_days=30):
"""可视化预测结果"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 原始时间序列
ax1 = axes[0, 0]
ts_data.plot(ax=ax1, color='blue', linewidth=2, label='实际销售额')
ax1.set_title('原始销售时间序列', fontsize=12, fontweight='bold')
ax1.set_xlabel('日期')
ax1.set_ylabel('销售额')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 移动平均预测
if 'moving_average' in self.forecast_results:
ax2 = axes[0, 1]
ts_data.plot(ax=ax2, color='blue', alpha=0.5, label='实际值')
self.forecast_results['moving_average']['ma_values'].plot(
ax=ax2, color='red', linewidth=2, label='移动平均'
)
forecast_df = self.forecast_results['moving_average']['forecast']
ax2.plot(forecast_df['date'], forecast_df['forecast'],
color='green', linewidth=2, linestyle='--', label='预测')
ax2.set_title('移动平均预测', fontsize=12, fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 指数平滑预测
if 'exponential_smoothing' in self.forecast_results:
ax3 = axes[1, 0]
ts_data.plot(ax=ax3, color='blue', alpha=0.5, label='实际值')
self.forecast_results['exponential_smoothing']['fitted_values'].plot(
ax=ax3, color='orange', linewidth=2, label='拟合值'
)
forecast_df = self.forecast_results['exponential_smoothing']['forecast']
ax3.plot(forecast_df['date'], forecast_df['forecast'],
color='green', linewidth=2, linestyle='--', label='预测')
ax3.set_title('指数平滑预测', fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 多模型对比
ax4 = axes[1, 1]
ts_data.tail(60).plot(ax=ax4, color='black', linewidth=2, label='实际值')
colors = ['red', 'green', 'blue', 'purple']
for i, (model_name, result) in enumerate(self.forecast_results.items()):
if 'forecast' in result:
forecast_df = result['forecast']
ax4.plot(forecast_df['date'], forecast_df['forecast'],
color=colors[i % len(colors)], linewidth=2,
linestyle='--', label=f'{model_name}预测')
ax4.set_title('多模型预测对比', fontsize=12, fontweight='bold')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
class InventoryOptimizer:
def __init__(self, sales_data, lead_time=7, service_level=0.95):
"""
库存优化器
参数:
sales_data: 销售数据
lead_time: 补货提前期(天)
service_level: 服务水平(库存满足率)
"""
self.sales_data = sales_data.copy()
self.lead_time = lead_time
self.service_level = service_level
self.z_score = self._calculate_z_score(service_level)
def _calculate_z_score(self, service_level):
"""计算服务水平对应的Z值"""
from scipy import stats
return stats.norm.ppf(service_level)
def calculate_demand_stats(self, product_id=None, freq='D'):
"""计算需求统计"""
if product_id:
product_data = self.sales_data[self.sales_data['product_id'] == product_id]
else:
product_data = self.sales_data
# 按日期聚合
if 'date' in product_data.columns:
date_col = 'date'
elif 'order_date' in product_data.columns:
date_col = 'order_date'
product_data = product_data.rename(columns={'order_date': 'date'})
# 计算每日需求
daily_demand = product_data.groupby('date')['quantity'].sum().resample(freq).sum()
stats = {
'mean_demand': daily_demand.mean(),
'std_demand': daily_demand.std(),
'cv_demand': daily_demand.std() / daily_demand.mean() if daily_demand.mean() > 0 else 0,
'min_demand': daily_demand.min(),
'max_demand': daily_demand.max(),
'median_demand': daily_demand.median()
}
return stats, daily_demand
def calculate_eoq(self, annual_demand, ordering_cost, holding_cost_per_unit):
"""计算经济订货批量(EOQ)"""
eoq = np.sqrt((2 * annual_demand * ordering_cost) / holding_cost_per_unit)
return eoq
def calculate_safety_stock(self, demand_mean, demand_std, lead_time):
"""计算安全库存"""
safety_stock = self.z_score * demand_std * np.sqrt(lead_time)
return safety_stock
def calculate_reorder_point(self, demand_mean, demand_std, lead_time):
"""计算再订货点"""
safety_stock = self.calculate_safety_stock(demand_mean, demand_std, lead_time)
reorder_point = (demand_mean * lead_time) + safety_stock
return reorder_point
def optimize_inventory(self, product_id, ordering_cost=50, holding_cost_rate=0.2, unit_cost=100):
"""优化单个产品的库存"""
# 计算需求统计
stats, daily_demand = self.calculate_demand_stats(product_id)
# 年化需求
annual_demand = stats['mean_demand'] * 365
# 计算EOQ
holding_cost_per_unit = unit_cost * holding_cost_rate
eoq = self.calculate_eoq(annual_demand, ordering_cost, holding_cost_per_unit)
# 计算安全库存和再订货点
safety_stock = self.calculate_safety_stock(
stats['mean_demand'],
stats['std_demand'],
self.lead_time
)
reorder_point = self.calculate_reorder_point(
stats['mean_demand'],
stats['std_demand'],
self.lead_time
)
# 库存策略
strategy = {
'product_id': product_id,
'annual_demand': annual_demand,
'mean_daily_demand': stats['mean_demand'],
'std_daily_demand': stats['std_demand'],
'demand_cv': stats['cv_demand'],
'eoq': eoq,
'safety_stock': safety_stock,
'reorder_point': reorder_point,
'max_inventory': eoq + safety_stock,
'ordering_cost': ordering_cost,
'holding_cost_per_unit': holding_cost_per_unit,
'lead_time': self.lead_time,
'service_level': self.service_level
}
return strategy
def visualize_inventory_analysis(self, product_id):
"""可视化库存分析"""
stats, daily_demand = self.calculate_demand_stats(product_id)
strategy = self.optimize_inventory(product_id)
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 1. 需求分布
ax1 = axes[0, 0]
daily_demand.plot(kind='hist', bins=20, ax=ax1, alpha=0.7, color='blue')
ax1.axvline(stats['mean_demand'], color='red', linestyle='--', linewidth=2, label='平均需求')
ax1.axvline(stats['mean_demand'] + stats['std_demand'], color='orange', linestyle=':', linewidth=2, label='±1标准差')
ax1.axvline(stats['mean_demand'] - stats['std_demand'], color='orange', linestyle=':', linewidth=2)
ax1.set_title(f'{product_id} 需求分布', fontsize=12, fontweight='bold')
ax1.set_xlabel('日需求量')
ax1.set_ylabel('频次')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. 时间序列需求
ax2 = axes[0, 1]
daily_demand.plot(ax=ax2, color='green', linewidth=1)
ax2.axhline(y=stats['mean_demand'], color='red', linestyle='--', linewidth=2, label='平均需求')
ax2.fill_between(daily_demand.index,
stats['mean_demand'] - stats['std_demand'],
stats['mean_demand'] + stats['std_demand'],
alpha=0.2, color='orange', label='标准差范围')
ax2.set_title(f'{product_id} 需求时间序列', fontsize=12, fontweight='bold')
ax2.set_xlabel('日期')
ax2.set_ylabel('需求量')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 3. 库存策略示意图
ax3 = axes[1, 0]
# 模拟库存变化
days = 100
inventory_level = []
current_inventory = strategy['max_inventory']
demand_series = np.random.normal(stats['mean_demand'], stats['std_demand'], days)
for demand in demand_series:
current_inventory = max(0, current_inventory - demand)
if current_inventory <= strategy['reorder_point']:
current_inventory += strategy['eoq']
inventory_level.append(current_inventory)
ax3.plot(range(days), inventory_level, color='blue', linewidth=2, label='库存水平')
ax3.axhline(y=strategy['reorder_point'], color='red', linestyle='--',
linewidth=2, label='再订货点')
ax3.axhline(y=strategy['safety_stock'], color='orange', linestyle=':',
linewidth=2, label='安全库存')
ax3.set_title('库存策略模拟', fontsize=12, fontweight='bold')
ax3.set_xlabel('天数')
ax3.set_ylabel('库存量')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 4. 成本分析
ax4 = axes[1, 1]
order_quantities = np.linspace(strategy['eoq'] * 0.5, strategy['eoq'] * 2, 50)
ordering_costs = (annual_demand / order_quantities) * strategy['ordering_cost']
holding_costs = (order_quantities / 2) * strategy['holding_cost_per_unit']
total_costs = ordering_costs + holding_costs
ax4.plot(order_quantities, ordering_costs, label='订货成本', color='blue')
ax4.plot(order_quantities, holding_costs, label='持有成本', color='green')
ax4.plot(order_quantities, total_costs, label='总成本', color='red', linewidth=2)
ax4.axvline(x=strategy['eoq'], color='black', linestyle='--',
linewidth=2, label='EOQ')
ax4.set_title('成本分析', fontsize=12, fontweight='bold')
ax4.set_xlabel('订货批量')
ax4.set_ylabel('成本')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return strategy
# 案例数据脚本
def create_forecast_sample_data():
"""生成预测分析示例数据"""
np.random.seed(42)
# 生成2年的日销售数据
dates = pd.date_range('2022-01-01', '2023-12-31', freq='D')
n_days = len(dates)
# 基础趋势 + 季节性 + 随机噪声
trend = np.linspace(1000, 5000, n_days)
yearly_seasonality = 1000 * np.sin(2 * np.pi * np.arange(n_days) / 365)
weekly_seasonality = 500 * np.sin(2 * np.pi * np.arange(n_days) / 7)
noise = np.random.normal(0, 200, n_days)
sales = trend + yearly_seasonality + weekly_seasonality + noise
sales = np.maximum(sales, 0) # 确保非负
# 创建DataFrame
sales_data = pd.DataFrame({
'date': dates,
'sales': sales.round(2),
'quantity': (sales / 100).astype(int) + np.random.randint(1, 10, n_days)
})
# 添加产品信息
products = ['P001', 'P002', 'P003', 'P004', 'P005']
product_sales = []
for date, total_sales, total_qty in zip(dates, sales, sales_data['quantity']):
for product in products:
# 分配销售到不同产品
product_share = np.random.dirichlet(np.ones(len(products)))
product_sales.append({
'date': date,
'product_id': product,
'sales': total_sales * product_share[products.index(product)],
'quantity': max(1, int(total_qty * product_share[products.index(product)]))
})
product_sales_df = pd.DataFrame(product_sales)
# 保存数据
sales_data.to_csv('daily_sales_forecast.csv', index=False)
product_sales_df.to_csv('product_sales_detail.csv', index=False)
print("✅ 预测示例数据已生成:")
print(f" 日销售数据: {sales_data.shape}")
print(f" 产品明细数据: {product_sales_df.shape}")
return sales_data, product_sales_df
# 自动报表生成
class AutoReportGenerator:
def __init__(self, forecast_results, inventory_strategies):
self.forecast_results = forecast_results
self.inventory_strategies = inventory_strategies
def generate_html_report(self, filename='sales_forecast_report.html'):
"""生成HTML格式报告"""
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>电商销售预测与库存优化报告</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.header { text-align: center; padding: 20px; background: #f0f0f0; }
.section { margin: 30px 0; padding: 20px; border: 1px solid #ddd; }
.metric { display: inline-block; margin: 10px 20px; padding: 10px; background: #e8f4f8; }
table { width: 100%; border-collapse: collapse; margin: 20px 0; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.forecast { background: #f9f9f9; padding: 15px; }
.inventory { background: #f0f8ff; padding: 15px; }
</style>
</head>
<body>
<div class="header">
<h1>📊 电商销售预测与库存优化报告</h1>
<p>生成时间: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """</p>
</div>
"""
# 添加预测结果
html_content += """
<div class="section">
<h2>📈 销售预测结果</h2>
<div class="forecast">
"""
for model_name, result in self.forecast_results.items():
if 'forecast' in result:
forecast_df = result['forecast']
html_content += f"""
<h3>{model_name} 预测</h3>
<p>未来30天预测销售额: ¥{forecast_df['forecast'].sum():,.2f}</p>
<p>日均预测: ¥{forecast_df['forecast'].mean():,.2f}</p>
"""
html_content += """
</div>
</div>
"""
# 添加库存策略
html_content += """
<div class="section">
<h2>📦 库存优化策略</h2>
<div class="inventory">
<table>
<tr>
<th>产品ID</th>
<th>年需求量</th>
<th>EOQ</th>
<th>安全库存</th>
<th>再订货点</th>
<th>最大库存</th>
</tr>
"""
for strategy in self.inventory_strategies:
html_content += f"""
<tr>
<td>{strategy['product_id']}</td>
<td>{strategy['annual_demand']:,.0f}</td>
<td>{strategy['eoq']:,.1f}</td>
<td>{strategy['safety_stock']:,.1f}</td>
<td>{strategy['reorder_point']:,.1f}</td>
<td>{strategy['max_inventory']:,.1f}</td>
</tr>
"""
html_content += """
</table>
</div>
</div>
<div class="section">
<h2>💡 建议与行动计划</h2>
<ol>
<li><strong>采购建议:</strong>根据EOQ和安全库存制定采购计划</li>
<li><strong>库存监控:</strong>设置再订货点预警机制</li>
<li><strong>销售预测:</strong>结合季节性调整销售目标</li>
<li><strong>成本优化:</strong>定期评估订货成本和持有成本</li>
</ol>
</div>
</body>
</html>
"""
with open(filename, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"✅ HTML报告已生成: {filename}")
return filename
def generate_excel_report(self, filename='sales_forecast_report.xlsx'):
"""生成Excel格式报告"""
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
# 预测结果
for model_name, result in self.forecast_results.items():
if 'forecast' in result:
result['forecast'].to_excel(writer, sheet_name=f'{model_name}_预测', index=False)
# 库存策略
strategies_df = pd.DataFrame(self.inventory_strategies)
strategies_df.to_excel(writer, sheet_name='库存策略', index=False)
# 汇总表
summary_data = {
'指标': ['总预测销售额', '平均EOQ', '平均安全库存', '平均再订货点'],
'数值': [
sum(r['forecast']['forecast'].sum() for r in self.forecast_results.values() if 'forecast' in r),
strategies_df['eoq'].mean(),
strategies_df['safety_stock'].mean(),
strategies_df['reorder_point'].mean()
],
'单位': ['元', '件', '件', '件']
}
summary_df = pd.DataFrame(summary_data)
summary_df.to_excel(writer, sheet_name='汇总', index=False)
print(f"✅ Excel报告已生成: {filename}")
return filename
# 运行示例
if __name__ == "__main__":
print("="*60)
print("Day 6: 销售预测与库存优化")
print("="*60)
# 1. 生成示例数据
print("\n步骤1: 生成预测分析示例数据...")
sales_data, product_sales = create_forecast_sample_data()
# 2. 销售预测
print("\n步骤2: 销售预测分析...")
forecaster = SalesForecaster(sales_data)
# 准备时间序列
ts_data = forecaster.prepare_time_series(freq='D')
print(f"时间序列数据形状: {ts_data.shape}")
# 多种预测方法
print("\n使用多种方法进行预测:")
# 移动平均
ma_forecast = forecaster.moving_average_forecast(ts_data, window=30, forecast_days=30)
print(f" ✓ 移动平均预测完成")
# 指数平滑
es_forecast = forecaster.exponential_smoothing_forecast(ts_data, alpha=0.3, forecast_days=30)
print(f" ✓ 指数平滑预测完成")
# ARIMA
arima_forecast = forecaster.arima_forecast(ts_data, order=(1,1,1), forecast_days=30)
if arima_forecast is not None:
print(f" ✓ ARIMA预测完成")
# Prophet(可选)
prophet_forecast = forecaster.prophet_forecast(ts_data, forecast_days=30)
if prophet_forecast is not None:
print(f" ✓ Prophet预测完成")
# 3. 可视化预测结果
print("\n步骤3: 生成预测可视化...")
forecaster.visualize_forecasts(ts_data, forecast_days=30)
# 4. 库存优化
print("\n步骤4: 库存优化分析...")
optimizer = InventoryOptimizer(product_sales, lead_time=7, service_level=0.95)
# 分析前5个产品
products_to_analyze = ['P001', 'P002', 'P003', 'P004', 'P005']
inventory_strategies = []
for product_id in products_to_analyze:
print(f"\n分析产品: {product_id}")
strategy = optimizer.optimize_inventory(
product_id,
ordering_cost=50,
holding_cost_rate=0.2,
unit_cost=100
)
inventory_strategies.append(strategy)
print(f" 年需求量: {strategy['annual_demand']:,.0f}")
print(f" EOQ: {strategy['eoq']:,.1f}")
print(f" 安全库存: {strategy['safety_stock']:,.1f}")
print(f" 再订货点: {strategy['reorder_point']:,.1f}")
# 5. 可视化库存分析
print("\n步骤5: 生成库存分析可视化...")
optimizer.visualize_inventory_analysis('P001')
# 6. 生成自动报表
print("\n步骤6: 生成自动报表...")
report_generator = AutoReportGenerator(
forecaster.forecast_results,
inventory_strategies
)
# 生成HTML报告
html_report = report_generator.generate_html_report()
# 生成Excel报告
excel_report = report_generator.generate_excel_report()
print("\n" + "="*60)
print("✅ 分析完成!")
print("="*60)
print("生成的文件:")
print(f" 1. HTML报告: {html_report}")
print(f" 2. Excel报告: {excel_report}")
print(f" 3. 原始数据: daily_sales_forecast.csv")
print(f" 4. 产品明细: product_sales_detail.csv")
print("\n建议下一步:")
print(" 1. 根据预测结果调整销售目标")
print(" 2. 按照库存策略优化采购计划")
print(" 3. 设置库存预警监控")
安装要求
bash# 基础依赖
pip install pandas==1.5.3 numpy==1.24.3 matplotlib==3.7.1 seaborn==0.12.2
# 时间序列分析
pip install statsmodels==0.14.0 scikit-learn==1.3.0
# Prophet预测(可选)
pip install prophet==1.1.4
# Excel报告生成
pip install openpyxl==3.1.2 xlsxwriter==3.1.2
# 科学计算
pip install scipy==1.11.4
课程总结
4-6天技能掌握清单
- Day 4:数据采集与清洗
- ✅ 多源数据自动采集(Excel、CSV、MySQL)
- ✅ 自动化数据清洗(缺失值、异常值、重复值)
- ✅ 数据类型标准化与转换
- Day 5:电商核心指标与RFM分析
- ✅ 电商核心KPI计算(GMV、转化率、客单价等)
- ✅ RFM用户价值分层模型
- ✅ 销售漏斗分析与可视化
- Day 6:销售预测与库存优化
- ✅ 时间序列预测(移动平均、指数平滑、ARIMA)
- ✅ 库存优化模型(EOQ、安全库存、再订货点)
- ✅ 自动化报表生成(HTML、Excel)
运行说明
- 环境准备: bash
# 创建虚拟环境 python -m venv ecommerce_env source ecommerce_env/bin/activate # Linux/Mac # 或 ecommerce_env\Scripts\activate # Windows # 安装依赖 pip install -r requirements.txt - 运行顺序:
- 按Day 4 → Day 5 → Day 6顺序运行
- 每个脚本都是独立的,包含示例数据生成
- 所有输出文件会自动保存到当前目录
- 自定义数据:
- 替换示例数据文件为自己的电商数据
- 调整参数以适应实际业务场景
- 扩展模型以处理更复杂的需求
业务应用价值
- 数据驱动决策:基于数据分析制定销售策略
- 库存优化:降低库存成本,提高周转率
- 用户精细化运营:RFM分群实现精准营销
- 自动化报表:减少人工工作量,提高效率
这套课程体系从数据采集到预测优化,形成了完整的电商数据分析闭环,学员通过4-6天的学习可以掌握电商全链路数据分析的核心技能。