课程目标

掌握从多源(Excel、数据库、API)自动化采集电商数据,并进行标准化清洗。

核心技能

  • 多源数据自动采集
  • 缺失值、异常值、重复值自动化处理
  • 数据标准化与类型转换

课件内容

1. 数据采集模块

# 安装依赖:pip install pandas numpy sqlalchemy pymysql openpyxl xlrd
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import os

class DataCollector:
    def __init__(self):
        self.data_sources = {}
    
    def from_excel(self, file_path, sheet_name=0):
        """从Excel文件读取数据"""
        try:
            df = pd.read_excel(file_path, sheet_name=sheet_name)
            print(f"✅ Excel数据加载成功: {file_path}, 形状: {df.shape}")
            return df
        except Exception as e:
            print(f"❌ Excel加载失败: {e}")
            return None
    
    def from_csv(self, file_path, encoding='utf-8'):
        """从CSV文件读取数据"""
        try:
            df = pd.read_csv(file_path, encoding=encoding)
            print(f"✅ CSV数据加载成功: {file_path}, 形状: {df.shape}")
            return df
        except Exception as e:
            print(f"❌ CSV加载失败: {e}")
            return None
    
    def from_mysql(self, host, user, password, database, query):
        """从MySQL数据库读取数据"""
        try:
            engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}/{database}')
            df = pd.read_sql(query, engine)
            print(f"✅ MySQL数据加载成功, 形状: {df.shape}")
            return df
        except Exception as e:
            print(f"❌ MySQL连接失败: {e}")
            return None
    
    def merge_multiple_sources(self, data_dict):
        """合并多个数据源"""
        merged_data = pd.DataFrame()
        for name, df in data_dict.items():
            if merged_data.empty:
                merged_data = df
            else:
                merged_data = pd.merge(merged_data, df, how='outer')
        return merged_data

# 案例数据脚本
def create_sample_data():
    """生成示例电商数据"""
    np.random.seed(42)
    
    # 用户数据
    user_data = pd.DataFrame({
        'user_id': range(1001, 1021),
        'user_name': [f'user_{i}' for i in range(1001, 1021)],
        'registration_date': pd.date_range('2024-01-01', periods=20),
        'user_level': np.random.choice(['青铜', '白银', '黄金', '铂金', '钻石'], 20),
        'total_spent': np.random.uniform(100, 10000, 20).round(2),
        'city': np.random.choice(['北京', '上海', '广州', '深圳', '杭州'], 20)
    })
    
    # 订单数据
    order_data = pd.DataFrame({
        'order_id': range(5001, 5021),
        'user_id': np.random.choice(range(1001, 1021), 20),
        'order_date': pd.date_range('2024-03-01', periods=20),
        'product_id': np.random.choice(['P001', 'P002', 'P003', 'P004', 'P005'], 20),
        'quantity': np.random.randint(1, 10, 20),
        'unit_price': np.random.uniform(50, 500, 20).round(2),
        'payment_method': np.random.choice(['支付宝', '微信', '银行卡', '信用卡'], 20),
        'order_status': np.random.choice(['已完成', '已发货', '待付款', '已取消'], 20)
    })
    
    # 商品数据
    product_data = pd.DataFrame({
        'product_id': ['P001', 'P002', 'P003', 'P004', 'P005'],
        'product_name': ['智能手机', '笔记本电脑', '无线耳机', '智能手表', '平板电脑'],
        'category': ['电子产品', '电子产品', '配件', '电子产品', '电子产品'],
        'brand': ['品牌A', '品牌B', '品牌C', '品牌D', '品牌E'],
        'cost_price': [1999, 5999, 299, 1299, 2999],
        'market_price': [2499, 6999, 399, 1599, 3499]
    })
    
    # 保存示例数据
    user_data.to_csv('sample_user_data.csv', index=False, encoding='utf-8')
    order_data.to_csv('sample_order_data.csv', index=False, encoding='utf-8')
    product_data.to_csv('sample_product_data.csv', index=False, encoding='utf-8')
    
    print("✅ 示例数据已生成:")
    print(f"  用户数据: {user_data.shape}")
    print(f"  订单数据: {order_data.shape}")
    print(f"  商品数据: {product_data.shape}")
    
    return user_data, order_data, product_data

# 运行示例
if __name__ == "__main__":
    # 1. 生成示例数据
    print("="*50)
    print("步骤1: 生成示例电商数据")
    print("="*50)
    user_df, order_df, product_df = create_sample_data()
    
    # 2. 数据采集演示
    print("\n" + "="*50)
    print("步骤2: 多源数据采集演示")
    print("="*50)
    
    collector = DataCollector()
    
    # 从CSV采集
    user_data = collector.from_csv('sample_user_data.csv')
    order_data = collector.from_csv('sample_order_data.csv')
    product_data = collector.from_csv('sample_product_data.csv')
    
    # 显示数据预览
    print("\n用户数据预览:")
    print(user_data.head())
    
    print("\n订单数据预览:")
    print(order_data.head())
    
    print("\n商品数据预览:")
    print(product_data.head())

2. 数据清洗模块

class DataCleaner:
    def __init__(self):
        self.cleaning_report = {}
    
    def detect_missing_values(self, df):
        """检测缺失值"""
        missing_info = df.isnull().sum()
        missing_percent = (missing_info / len(df) * 100).round(2)
        
        missing_df = pd.DataFrame({
            '缺失数量': missing_info,
            '缺失比例%': missing_percent
        })
        
        self.cleaning_report['missing_values'] = missing_df[missing_df['缺失数量'] > 0]
        return self.cleaning_report['missing_values']
    
    def handle_missing_values(self, df, strategy='mean', columns=None):
        """处理缺失值"""
        df_clean = df.copy()
        
        if columns is None:
            columns = df.columns
        
        for col in columns:
            if df[col].isnull().sum() > 0:
                if strategy == 'mean' and pd.api.types.is_numeric_dtype(df[col]):
                    df_clean[col] = df[col].fillna(df[col].mean())
                elif strategy == 'median' and pd.api.types.is_numeric_dtype(df[col]):
                    df_clean[col] = df[col].fillna(df[col].median())
                elif strategy == 'mode':
                    df_clean[col] = df[col].fillna(df[col].mode()[0](@ref)
                elif strategy == 'drop':
                    df_clean = df_clean.dropna(subset=[col])
                elif strategy == 'ffill':
                    df_clean[col] = df[col].fillna(method='ffill')
                elif strategy == 'bfill':
                    df_clean[col] = df[col].fillna(method='bfill')
                else:
                    df_clean[col] = df[col].fillna('未知')
        
        print(f"✅ 缺失值处理完成 - 策略: {strategy}")
        return df_clean
    
    def detect_outliers(self, df, column, method='iqr'):
        """检测异常值"""
        if not pd.api.types.is_numeric_dtype(df[column]):
            print(f"⚠️  {column} 不是数值型,跳过异常值检测")
            return None
        
        if method == 'iqr':
            Q1 = df[column].quantile(0.25)
            Q3 = df[column].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
            
            outlier_info = {
                '列名': column,
                '异常值数量': len(outliers),
                '异常值比例%': (len(outliers) / len(df) * 100).round(2),
                '下限': lower_bound,
                '上限': upper_bound,
                '最小值': df[column].min(),
                '最大值': df[column].max()
            }
            
            self.cleaning_report.setdefault('outliers', []).append(outlier_info)
            return outliers
        
        return None
    
    def handle_outliers(self, df, column, method='cap'):
        """处理异常值"""
        if not pd.api.types.is_numeric_dtype(df[column]):
            return df
        
        df_clean = df.copy()
        
        if method == 'cap':
            Q1 = df[column].quantile(0.25)
            Q3 = df[column].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            df_clean[column] = df_clean[column].clip(lower_bound, upper_bound)
            print(f"✅ 异常值处理完成 - {column}: 封顶法")
        
        elif method == 'remove':
            Q1 = df[column].quantile(0.25)
            Q3 = df[column].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            df_clean = df_clean[(df_clean[column] >= lower_bound) & 
                               (df_clean[column] <= upper_bound)]
            print(f"✅ 异常值处理完成 - {column}: 删除法")
        
        return df_clean
    
    def detect_duplicates(self, df, subset=None):
        """检测重复值"""
        if subset is None:
            duplicates = df[df.duplicated()]
        else:
            duplicates = df[df.duplicated(subset=subset)]
        
        self.cleaning_report['duplicates'] = {
            '重复行数': len(duplicates),
            '重复比例%': (len(duplicates) / len(df) * 100).round(2)
        }
        
        return duplicates
    
    def remove_duplicates(self, df, subset=None, keep='first'):
        """删除重复值"""
        df_clean = df.drop_duplicates(subset=subset, keep=keep)
        removed = len(df) - len(df_clean)
        print(f"✅ 删除 {removed} 条重复记录")
        return df_clean
    
    def standardize_data_types(self, df, column_type_map):
        """标准化数据类型"""
        df_clean = df.copy()
        
        for col, dtype in column_type_map.items():
            if col in df_clean.columns:
                try:
                    if dtype == 'datetime':
                        df_clean[col] = pd.to_datetime(df_clean[col])
                    elif dtype == 'numeric':
                        df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')
                    elif dtype == 'category':
                        df_clean[col] = df_clean[col].astype('category')
                    elif dtype == 'string':
                        df_clean[col] = df_clean[col].astype(str)
                    print(f"  {col} -> {dtype}")
                except Exception as e:
                    print(f"⚠️  {col} 类型转换失败: {e}")
        
        return df_clean
    
    def generate_cleaning_report(self):
        """生成清洗报告"""
        print("\n" + "="*50)
        print("数据清洗报告")
        print("="*50)
        
        if 'missing_values' in self.cleaning_report:
            print("\n1. 缺失值检测:")
            if not self.cleaning_report['missing_values'].empty:
                print(self.cleaning_report['missing_values'])
            else:
                print("  无缺失值")
        
        if 'outliers' in self.cleaning_report:
            print("\n2. 异常值检测:")
            for outlier_info in self.cleaning_report['outliers']:
                print(f"  {outlier_info['列名']}: {outlier_info['异常值数量']} 个异常值 "
                      f"({outlier_info['异常值比例%']}%)")
        
        if 'duplicates' in self.cleaning_report:
            print("\n3. 重复值检测:")
            dup_info = self.cleaning_report['duplicates']
            print(f"  重复行数: {dup_info['重复行数']} "
                  f"({dup_info['重复比例%']}%)")

# 运行数据清洗示例
def run_data_cleaning_demo():
    print("\n" + "="*50)
    print("步骤3: 数据清洗演示")
    print("="*50)
    
    # 创建有问题的测试数据
    np.random.seed(42)
    test_data = pd.DataFrame({
        'user_id': list(range(1001, 1011)) * 2,  # 故意制造重复
        'age': [25, 30, None, 35, 40, 200, 28, None, 32, 38, 
                25, 30, 22, 35, 40, 45, 28, 29, 32, 38],  # 包含缺失和异常
        'income': [50000, 60000, 70000, None, 90000, 
                  100000, 55000, 65000, 75000, 85000] * 2,
        'city': ['北京', '上海', '广州', '深圳', '杭州'] * 4,
        'purchase_date': pd.date_range('2024-01-01', periods=20)
    })
    
    print("原始数据:")
    print(test_data.head())
    print(f"形状: {test_data.shape}")
    
    # 初始化清洗器
    cleaner = DataCleaner()
    
    # 1. 检测缺失值
    print("\n1. 缺失值检测:")
    missing_df = cleaner.detect_missing_values(test_data)
    if not missing_df.empty:
        print(missing_df)
    
    # 2. 处理缺失值
    test_data_clean = cleaner.handle_missing_values(test_data, strategy='mean')
    
    # 3. 检测异常值
    print("\n2. 异常值检测:")
    outliers = cleaner.detect_outliers(test_data_clean, 'age')
    if outliers is not None and len(outliers) > 0:
        print(f"年龄异常值: {len(outliers)} 个")
        print(outliers[['user_id', 'age']])
    
    # 4. 处理异常值
    test_data_clean = cleaner.handle_outliers(test_data_clean, 'age', method='cap')
    
    # 5. 检测重复值
    print("\n3. 重复值检测:")
    duplicates = cleaner.detect_duplicates(test_data_clean, subset=['user_id'])
    print(f"重复记录数: {len(duplicates)}")
    
    # 6. 删除重复值
    test_data_clean = cleaner.remove_duplicates(test_data_clean, subset=['user_id'])
    
    # 7. 标准化数据类型
    type_map = {
        'user_id': 'numeric',
        'age': 'numeric',
        'income': 'numeric',
        'city': 'category',
        'purchase_date': 'datetime'
    }
    test_data_clean = cleaner.standardize_data_types(test_data_clean, type_map)
    
    # 8. 生成报告
    cleaner.generate_cleaning_report()
    
    print("\n清洗后数据:")
    print(test_data_clean.head())
    print(f"形状: {test_data_clean.shape}")
    
    return test_data_clean

if __name__ == "__main__":
    # 运行完整示例
    run_data_cleaning_demo()

安装要求

# 基础依赖
pip install pandas==1.5.3 numpy==1.24.3 sqlalchemy==1.4.46 pymysql==1.0.2
pip install openpyxl==3.1.2 xlrd==2.0.1

# 可选:数据可视化
pip install matplotlib==3.7.1 seaborn==0.12.2