8.1 自动化数据抽取与清洗

【理论讲解】

数据分析的一个痛点是重复性的数据抽取和清洗工作。通过自动化,我们可以大大提高效率,减少人工错误,确保数据的新鲜度和准确性。

核心技术:

  • 定时任务: 利用操作系统或Python库,在特定时间自动运行脚本。
  • API接口: 从电商平台、广告平台等开放接口自动获取数据。
  • Web Scraping (网络爬虫): 从网页上抓取数据(需注意合法性和道德性)。

【自动生成数据集与代码实例】

我们将模拟一个每天更新的销售数据文件,并演示如何定时读取和清洗。

python

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import time # 用于模拟定时任务的延迟

# --- 数据集生成 (模拟每日更新的销售数据) ---
np.random.seed(42)

def generate_daily_sales_file(date, filename_prefix='daily_sales_', num_records=50):
    data = []
    product_ids = [f'P{i:03d}' for i in range(10)]
    user_ids = [f'U{i:04d}' for i in range(20)]
    
    for _ in range(num_records):
        order_id = f'ORD{date.strftime("%Y%m%d")}{_ + 1:03d}'
        user_id = np.random.choice(user_ids)
        product_id = np.random.choice(product_ids)
        price = round(np.random.uniform(20, 500), 2)
        quantity = np.random.randint(1, 5)
        # 模拟一些缺失值
        if np.random.rand() < 0.05: # 5%的概率缺失价格
            price = np.nan
        if np.random.rand() < 0.02: # 2%的概率缺失数量
            quantity = np.nan
        
        data.append([order_id, user_id, product_id, price, quantity, date.strftime('%Y-%m-%d %H:%M:%S')])
    
    df_daily = pd.DataFrame(data, columns=['order_id', 'user_id', 'product_id', 'price', 'quantity', 'order_time'])
    
    # 写入CSV文件
    filename = f"{filename_prefix}{date.strftime('%Y%m%d')}.csv"
    df_daily.to_csv(filename, index=False)
    print(f"生成并保存文件: {filename}")
    return filename

# 模拟生成过去3天的每日销售数据
start_date_gen = datetime.now() - timedelta(days=3)
generated_files = []
for i in range(4):
    generated_files.append(generate_daily_sales_file(start_date_gen + timedelta(days=i)))

# --- 自动化数据抽取与清洗脚本 ---
print("\n--- 自动化数据抽取与清洗脚本 ---")

def automated_data_pipeline(data_folder='./', last_processed_date=None):
    """
    自动从指定文件夹读取新的销售数据文件,进行清洗和合并。
    """
    all_data_frames = []
    
    # 获取所有销售文件
    sales_files = [f for f in os.listdir(data_folder) if f.startswith('daily_sales_') and f.endswith('.csv')]
    sales_files.sort() # 按文件名排序,确保按日期顺序处理

    for file_name in sales_files:
        file_date_str = file_name.replace('daily_sales_', '').replace('.csv', '')
        file_date = datetime.strptime(file_date_str, '%Y%m%d').date()

        # 只处理比上次处理日期新的文件
        if last_processed_date and file_date <= last_processed_date:
            print(f"跳过已处理文件: {file_name}")
            continue
            
        print(f"正在处理文件: {file_name}")
        file_path = os.path.join(data_folder, file_name)
        df_daily = pd.read_csv(file_path)
        
        # --- 数据清洗 ---
        # 1. 数据类型转换
        df_daily['order_time'] = pd.to_datetime(df_daily['order_time'])
        
        # 2. 缺失值处理 (这里简单填充,实际根据业务需求)
        df_daily['price'].fillna(df_daily['price'].mean(), inplace=True)
        df_daily['quantity'].fillna(df_daily['quantity'].median(), inplace=True)
        
        # 3. 异常值处理 (这里简单过滤掉价格或数量为负的,实际可能用Z-score等)
        df_daily = df_daily[df_daily['price'] >= 0]
        df_daily = df_daily[df_daily['quantity'] >= 0]
        
        # 4. 衍生新特征
        df_daily['total_amount'] = df_daily['price'] * df_daily['quantity']
        
        all_data_frames.append(df_daily)
        
    if all_data_frames:
        # 合并所有处理后的数据
        df_combined = pd.concat(all_data_frames, ignore_index=True)
        print(f"\n成功合并 {len(all_data_frames)} 个文件,总数据量: {len(df_combined)} 条")
        # 更新上次处理日期
        new_last_processed_date = max([datetime.strptime(f.replace('daily_sales_', '').replace('.csv', ''), '%Y%m%d').date() for f in sales_files if datetime.strptime(f.replace('daily_sales_', '').replace('.csv', ''), '%Y%m%d').date() > (last_processed_date if last_processed_date else datetime.min.date())])
        return df_combined, new_last_processed_date
    else:
        print("\n没有新的文件需要处理。")
        return pd.DataFrame(), last_processed_date

# 第一次运行:处理所有历史数据
current_processed_date = None
combined_df, current_processed_date = automated_data_pipeline(last_processed_date=current_processed_date)
print("\n第一次运行后的合并数据预览:\n", combined_df.head())
print(f"当前已处理到日期: {current_processed_date}")

# 模拟第二天有新数据生成,然后再次运行
print("\n--- 模拟第二天有新数据,再次运行 ---")
new_day_date = start_date_gen + timedelta(days=4)
generate_daily_sales_file(new_day_date)

# 再次运行自动化脚本
time.sleep(1) # 模拟延迟
combined_df_updated, current_processed_date = automated_data_pipeline(last_processed_date=current_processed_date)
print("\n第二次运行后的合并数据预览:\n", combined_df_updated.head())
print(f"当前已处理到日期: {current_processed_date}")

# 【运营策略建议】
print("\n--- 自动化数据抽取与清洗的运营价值 ---")
print("1. **数据实时性:** 确保运营分析基于最新数据,快速响应市场变化。")
print("2. **减少错误:** 自动化流程减少人工干预,降低数据处理错误率。")
print("3. **提高效率:** 释放数据分析师的时间,让他们专注于更高价值的分析和洞察。")

【互动问答】

  • 为什么需要定时任务来自动化数据抽取?
  • API接口和Web Scraping在数据获取方面各有什么优缺点?
  • 在自动化清洗脚本中,如何处理不同文件可能存在的列名不一致问题?
  • 如何确保自动化脚本在出现错误时能够及时通知我们?(提示:日志记录、邮件通知)
  • 除了读取CSV,你还能想到哪些常用的数据源需要自动化抽取?