数据技能(SKILLS) :Day 4:数据采集与清洗自动化
课程目标
掌握从多源(Excel、数据库、API)自动化采集电商数据,并进行标准化清洗。
核心技能
- 多源数据自动采集
- 缺失值、异常值、重复值自动化处理
- 数据标准化与类型转换
课件内容
1. 数据采集模块
# 安装依赖:pip install pandas numpy sqlalchemy pymysql openpyxl xlrd
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import os
class DataCollector:
def __init__(self):
self.data_sources = {}
def from_excel(self, file_path, sheet_name=0):
"""从Excel文件读取数据"""
try:
df = pd.read_excel(file_path, sheet_name=sheet_name)
print(f"✅ Excel数据加载成功: {file_path}, 形状: {df.shape}")
return df
except Exception as e:
print(f"❌ Excel加载失败: {e}")
return None
def from_csv(self, file_path, encoding='utf-8'):
"""从CSV文件读取数据"""
try:
df = pd.read_csv(file_path, encoding=encoding)
print(f"✅ CSV数据加载成功: {file_path}, 形状: {df.shape}")
return df
except Exception as e:
print(f"❌ CSV加载失败: {e}")
return None
def from_mysql(self, host, user, password, database, query):
"""从MySQL数据库读取数据"""
try:
engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}/{database}')
df = pd.read_sql(query, engine)
print(f"✅ MySQL数据加载成功, 形状: {df.shape}")
return df
except Exception as e:
print(f"❌ MySQL连接失败: {e}")
return None
def merge_multiple_sources(self, data_dict):
"""合并多个数据源"""
merged_data = pd.DataFrame()
for name, df in data_dict.items():
if merged_data.empty:
merged_data = df
else:
merged_data = pd.merge(merged_data, df, how='outer')
return merged_data
# 案例数据脚本
def create_sample_data():
"""生成示例电商数据"""
np.random.seed(42)
# 用户数据
user_data = pd.DataFrame({
'user_id': range(1001, 1021),
'user_name': [f'user_{i}' for i in range(1001, 1021)],
'registration_date': pd.date_range('2024-01-01', periods=20),
'user_level': np.random.choice(['青铜', '白银', '黄金', '铂金', '钻石'], 20),
'total_spent': np.random.uniform(100, 10000, 20).round(2),
'city': np.random.choice(['北京', '上海', '广州', '深圳', '杭州'], 20)
})
# 订单数据
order_data = pd.DataFrame({
'order_id': range(5001, 5021),
'user_id': np.random.choice(range(1001, 1021), 20),
'order_date': pd.date_range('2024-03-01', periods=20),
'product_id': np.random.choice(['P001', 'P002', 'P003', 'P004', 'P005'], 20),
'quantity': np.random.randint(1, 10, 20),
'unit_price': np.random.uniform(50, 500, 20).round(2),
'payment_method': np.random.choice(['支付宝', '微信', '银行卡', '信用卡'], 20),
'order_status': np.random.choice(['已完成', '已发货', '待付款', '已取消'], 20)
})
# 商品数据
product_data = pd.DataFrame({
'product_id': ['P001', 'P002', 'P003', 'P004', 'P005'],
'product_name': ['智能手机', '笔记本电脑', '无线耳机', '智能手表', '平板电脑'],
'category': ['电子产品', '电子产品', '配件', '电子产品', '电子产品'],
'brand': ['品牌A', '品牌B', '品牌C', '品牌D', '品牌E'],
'cost_price': [1999, 5999, 299, 1299, 2999],
'market_price': [2499, 6999, 399, 1599, 3499]
})
# 保存示例数据
user_data.to_csv('sample_user_data.csv', index=False, encoding='utf-8')
order_data.to_csv('sample_order_data.csv', index=False, encoding='utf-8')
product_data.to_csv('sample_product_data.csv', index=False, encoding='utf-8')
print("✅ 示例数据已生成:")
print(f" 用户数据: {user_data.shape}")
print(f" 订单数据: {order_data.shape}")
print(f" 商品数据: {product_data.shape}")
return user_data, order_data, product_data
# 运行示例
if __name__ == "__main__":
# 1. 生成示例数据
print("="*50)
print("步骤1: 生成示例电商数据")
print("="*50)
user_df, order_df, product_df = create_sample_data()
# 2. 数据采集演示
print("\n" + "="*50)
print("步骤2: 多源数据采集演示")
print("="*50)
collector = DataCollector()
# 从CSV采集
user_data = collector.from_csv('sample_user_data.csv')
order_data = collector.from_csv('sample_order_data.csv')
product_data = collector.from_csv('sample_product_data.csv')
# 显示数据预览
print("\n用户数据预览:")
print(user_data.head())
print("\n订单数据预览:")
print(order_data.head())
print("\n商品数据预览:")
print(product_data.head())
2. 数据清洗模块
class DataCleaner:
def __init__(self):
self.cleaning_report = {}
def detect_missing_values(self, df):
"""检测缺失值"""
missing_info = df.isnull().sum()
missing_percent = (missing_info / len(df) * 100).round(2)
missing_df = pd.DataFrame({
'缺失数量': missing_info,
'缺失比例%': missing_percent
})
self.cleaning_report['missing_values'] = missing_df[missing_df['缺失数量'] > 0]
return self.cleaning_report['missing_values']
def handle_missing_values(self, df, strategy='mean', columns=None):
"""处理缺失值"""
df_clean = df.copy()
if columns is None:
columns = df.columns
for col in columns:
if df[col].isnull().sum() > 0:
if strategy == 'mean' and pd.api.types.is_numeric_dtype(df[col]):
df_clean[col] = df[col].fillna(df[col].mean())
elif strategy == 'median' and pd.api.types.is_numeric_dtype(df[col]):
df_clean[col] = df[col].fillna(df[col].median())
elif strategy == 'mode':
df_clean[col] = df[col].fillna(df[col].mode()[0](@ref)
elif strategy == 'drop':
df_clean = df_clean.dropna(subset=[col])
elif strategy == 'ffill':
df_clean[col] = df[col].fillna(method='ffill')
elif strategy == 'bfill':
df_clean[col] = df[col].fillna(method='bfill')
else:
df_clean[col] = df[col].fillna('未知')
print(f"✅ 缺失值处理完成 - 策略: {strategy}")
return df_clean
def detect_outliers(self, df, column, method='iqr'):
"""检测异常值"""
if not pd.api.types.is_numeric_dtype(df[column]):
print(f"⚠️ {column} 不是数值型,跳过异常值检测")
return None
if method == 'iqr':
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
outlier_info = {
'列名': column,
'异常值数量': len(outliers),
'异常值比例%': (len(outliers) / len(df) * 100).round(2),
'下限': lower_bound,
'上限': upper_bound,
'最小值': df[column].min(),
'最大值': df[column].max()
}
self.cleaning_report.setdefault('outliers', []).append(outlier_info)
return outliers
return None
def handle_outliers(self, df, column, method='cap'):
"""处理异常值"""
if not pd.api.types.is_numeric_dtype(df[column]):
return df
df_clean = df.copy()
if method == 'cap':
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df_clean[column] = df_clean[column].clip(lower_bound, upper_bound)
print(f"✅ 异常值处理完成 - {column}: 封顶法")
elif method == 'remove':
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df_clean = df_clean[(df_clean[column] >= lower_bound) &
(df_clean[column] <= upper_bound)]
print(f"✅ 异常值处理完成 - {column}: 删除法")
return df_clean
def detect_duplicates(self, df, subset=None):
"""检测重复值"""
if subset is None:
duplicates = df[df.duplicated()]
else:
duplicates = df[df.duplicated(subset=subset)]
self.cleaning_report['duplicates'] = {
'重复行数': len(duplicates),
'重复比例%': (len(duplicates) / len(df) * 100).round(2)
}
return duplicates
def remove_duplicates(self, df, subset=None, keep='first'):
"""删除重复值"""
df_clean = df.drop_duplicates(subset=subset, keep=keep)
removed = len(df) - len(df_clean)
print(f"✅ 删除 {removed} 条重复记录")
return df_clean
def standardize_data_types(self, df, column_type_map):
"""标准化数据类型"""
df_clean = df.copy()
for col, dtype in column_type_map.items():
if col in df_clean.columns:
try:
if dtype == 'datetime':
df_clean[col] = pd.to_datetime(df_clean[col])
elif dtype == 'numeric':
df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')
elif dtype == 'category':
df_clean[col] = df_clean[col].astype('category')
elif dtype == 'string':
df_clean[col] = df_clean[col].astype(str)
print(f" {col} -> {dtype}")
except Exception as e:
print(f"⚠️ {col} 类型转换失败: {e}")
return df_clean
def generate_cleaning_report(self):
"""生成清洗报告"""
print("\n" + "="*50)
print("数据清洗报告")
print("="*50)
if 'missing_values' in self.cleaning_report:
print("\n1. 缺失值检测:")
if not self.cleaning_report['missing_values'].empty:
print(self.cleaning_report['missing_values'])
else:
print(" 无缺失值")
if 'outliers' in self.cleaning_report:
print("\n2. 异常值检测:")
for outlier_info in self.cleaning_report['outliers']:
print(f" {outlier_info['列名']}: {outlier_info['异常值数量']} 个异常值 "
f"({outlier_info['异常值比例%']}%)")
if 'duplicates' in self.cleaning_report:
print("\n3. 重复值检测:")
dup_info = self.cleaning_report['duplicates']
print(f" 重复行数: {dup_info['重复行数']} "
f"({dup_info['重复比例%']}%)")
# 运行数据清洗示例
def run_data_cleaning_demo():
print("\n" + "="*50)
print("步骤3: 数据清洗演示")
print("="*50)
# 创建有问题的测试数据
np.random.seed(42)
test_data = pd.DataFrame({
'user_id': list(range(1001, 1011)) * 2, # 故意制造重复
'age': [25, 30, None, 35, 40, 200, 28, None, 32, 38,
25, 30, 22, 35, 40, 45, 28, 29, 32, 38], # 包含缺失和异常
'income': [50000, 60000, 70000, None, 90000,
100000, 55000, 65000, 75000, 85000] * 2,
'city': ['北京', '上海', '广州', '深圳', '杭州'] * 4,
'purchase_date': pd.date_range('2024-01-01', periods=20)
})
print("原始数据:")
print(test_data.head())
print(f"形状: {test_data.shape}")
# 初始化清洗器
cleaner = DataCleaner()
# 1. 检测缺失值
print("\n1. 缺失值检测:")
missing_df = cleaner.detect_missing_values(test_data)
if not missing_df.empty:
print(missing_df)
# 2. 处理缺失值
test_data_clean = cleaner.handle_missing_values(test_data, strategy='mean')
# 3. 检测异常值
print("\n2. 异常值检测:")
outliers = cleaner.detect_outliers(test_data_clean, 'age')
if outliers is not None and len(outliers) > 0:
print(f"年龄异常值: {len(outliers)} 个")
print(outliers[['user_id', 'age']])
# 4. 处理异常值
test_data_clean = cleaner.handle_outliers(test_data_clean, 'age', method='cap')
# 5. 检测重复值
print("\n3. 重复值检测:")
duplicates = cleaner.detect_duplicates(test_data_clean, subset=['user_id'])
print(f"重复记录数: {len(duplicates)}")
# 6. 删除重复值
test_data_clean = cleaner.remove_duplicates(test_data_clean, subset=['user_id'])
# 7. 标准化数据类型
type_map = {
'user_id': 'numeric',
'age': 'numeric',
'income': 'numeric',
'city': 'category',
'purchase_date': 'datetime'
}
test_data_clean = cleaner.standardize_data_types(test_data_clean, type_map)
# 8. 生成报告
cleaner.generate_cleaning_report()
print("\n清洗后数据:")
print(test_data_clean.head())
print(f"形状: {test_data_clean.shape}")
return test_data_clean
if __name__ == "__main__":
# 运行完整示例
run_data_cleaning_demo()
安装要求
# 基础依赖
pip install pandas==1.5.3 numpy==1.24.3 sqlalchemy==1.4.46 pymysql==1.0.2
pip install openpyxl==3.1.2 xlrd==2.0.1
# 可选:数据可视化
pip install matplotlib==3.7.1 seaborn==0.12.2