【运营数据分析-进阶篇】 数据报告与自动化
8.1 自动化数据抽取与清洗
【理论讲解】
数据分析的一个痛点是重复性的数据抽取和清洗工作。通过自动化,我们可以大大提高效率,减少人工错误,确保数据的新鲜度和准确性。
核心技术:
- 定时任务: 利用操作系统或Python库,在特定时间自动运行脚本。
- API接口: 从电商平台、广告平台等开放接口自动获取数据。
- Web Scraping (网络爬虫): 从网页上抓取数据(需注意合法性和道德性)。
【自动生成数据集与代码实例】
我们将模拟一个每天更新的销售数据文件,并演示如何定时读取和清洗。
python
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import time # 用于模拟定时任务的延迟
# --- 数据集生成 (模拟每日更新的销售数据) ---
np.random.seed(42)
def generate_daily_sales_file(date, filename_prefix='daily_sales_', num_records=50):
data = []
product_ids = [f'P{i:03d}' for i in range(10)]
user_ids = [f'U{i:04d}' for i in range(20)]
for _ in range(num_records):
order_id = f'ORD{date.strftime("%Y%m%d")}{_ + 1:03d}'
user_id = np.random.choice(user_ids)
product_id = np.random.choice(product_ids)
price = round(np.random.uniform(20, 500), 2)
quantity = np.random.randint(1, 5)
# 模拟一些缺失值
if np.random.rand() < 0.05: # 5%的概率缺失价格
price = np.nan
if np.random.rand() < 0.02: # 2%的概率缺失数量
quantity = np.nan
data.append([order_id, user_id, product_id, price, quantity, date.strftime('%Y-%m-%d %H:%M:%S')])
df_daily = pd.DataFrame(data, columns=['order_id', 'user_id', 'product_id', 'price', 'quantity', 'order_time'])
# 写入CSV文件
filename = f"{filename_prefix}{date.strftime('%Y%m%d')}.csv"
df_daily.to_csv(filename, index=False)
print(f"生成并保存文件: {filename}")
return filename
# 模拟生成过去3天的每日销售数据
start_date_gen = datetime.now() - timedelta(days=3)
generated_files = []
for i in range(4):
generated_files.append(generate_daily_sales_file(start_date_gen + timedelta(days=i)))
# --- 自动化数据抽取与清洗脚本 ---
print("\n--- 自动化数据抽取与清洗脚本 ---")
def automated_data_pipeline(data_folder='./', last_processed_date=None):
"""
自动从指定文件夹读取新的销售数据文件,进行清洗和合并。
"""
all_data_frames = []
# 获取所有销售文件
sales_files = [f for f in os.listdir(data_folder) if f.startswith('daily_sales_') and f.endswith('.csv')]
sales_files.sort() # 按文件名排序,确保按日期顺序处理
for file_name in sales_files:
file_date_str = file_name.replace('daily_sales_', '').replace('.csv', '')
file_date = datetime.strptime(file_date_str, '%Y%m%d').date()
# 只处理比上次处理日期新的文件
if last_processed_date and file_date <= last_processed_date:
print(f"跳过已处理文件: {file_name}")
continue
print(f"正在处理文件: {file_name}")
file_path = os.path.join(data_folder, file_name)
df_daily = pd.read_csv(file_path)
# --- 数据清洗 ---
# 1. 数据类型转换
df_daily['order_time'] = pd.to_datetime(df_daily['order_time'])
# 2. 缺失值处理 (这里简单填充,实际根据业务需求)
df_daily['price'].fillna(df_daily['price'].mean(), inplace=True)
df_daily['quantity'].fillna(df_daily['quantity'].median(), inplace=True)
# 3. 异常值处理 (这里简单过滤掉价格或数量为负的,实际可能用Z-score等)
df_daily = df_daily[df_daily['price'] >= 0]
df_daily = df_daily[df_daily['quantity'] >= 0]
# 4. 衍生新特征
df_daily['total_amount'] = df_daily['price'] * df_daily['quantity']
all_data_frames.append(df_daily)
if all_data_frames:
# 合并所有处理后的数据
df_combined = pd.concat(all_data_frames, ignore_index=True)
print(f"\n成功合并 {len(all_data_frames)} 个文件,总数据量: {len(df_combined)} 条")
# 更新上次处理日期
new_last_processed_date = max([datetime.strptime(f.replace('daily_sales_', '').replace('.csv', ''), '%Y%m%d').date() for f in sales_files if datetime.strptime(f.replace('daily_sales_', '').replace('.csv', ''), '%Y%m%d').date() > (last_processed_date if last_processed_date else datetime.min.date())])
return df_combined, new_last_processed_date
else:
print("\n没有新的文件需要处理。")
return pd.DataFrame(), last_processed_date
# 第一次运行:处理所有历史数据
current_processed_date = None
combined_df, current_processed_date = automated_data_pipeline(last_processed_date=current_processed_date)
print("\n第一次运行后的合并数据预览:\n", combined_df.head())
print(f"当前已处理到日期: {current_processed_date}")
# 模拟第二天有新数据生成,然后再次运行
print("\n--- 模拟第二天有新数据,再次运行 ---")
new_day_date = start_date_gen + timedelta(days=4)
generate_daily_sales_file(new_day_date)
# 再次运行自动化脚本
time.sleep(1) # 模拟延迟
combined_df_updated, current_processed_date = automated_data_pipeline(last_processed_date=current_processed_date)
print("\n第二次运行后的合并数据预览:\n", combined_df_updated.head())
print(f"当前已处理到日期: {current_processed_date}")
# 【运营策略建议】
print("\n--- 自动化数据抽取与清洗的运营价值 ---")
print("1. **数据实时性:** 确保运营分析基于最新数据,快速响应市场变化。")
print("2. **减少错误:** 自动化流程减少人工干预,降低数据处理错误率。")
print("3. **提高效率:** 释放数据分析师的时间,让他们专注于更高价值的分析和洞察。")
【互动问答】
- 为什么需要定时任务来自动化数据抽取?
- API接口和Web Scraping在数据获取方面各有什么优缺点?
- 在自动化清洗脚本中,如何处理不同文件可能存在的列名不一致问题?
- 如何确保自动化脚本在出现错误时能够及时通知我们?(提示:日志记录、邮件通知)
- 除了读取CSV,你还能想到哪些常用的数据源需要自动化抽取?