日常数据分析工作中,经常遇到不同项目都需要重复编写类似的清洗代码:加载CSV、删除缺失值、转换日期格式、过滤异常值。这种重复劳动不仅低效,而且后期维护困难。本文介绍如何通过面向对象编程(OOP)构建通用的 DataCleaner 类,并设计 Pipeline(管道)模式将数据处理步骤串联起来,配合日志记录和异常处理,打造一套可复用的数据分析模块。
- import pandas as pd
- import numpy as np
- import logging
- from typing import List, Dict, Optional, Union
- # 配置基础日志
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [%(levelname)s] %(message)s",
- datefmt="%H:%M:%S"
- )
复制代码
一、面向对象设计:将数据与操作封装到类中
使用类可以把数据(DataFrame)和清洗、转换、统计等操作打包在一起,避免全局变量混乱。下面的 SimpleAnalyzer 展示了核心结构:
- class SimpleAnalyzer:
- """简单的数据分析器类"""
- def __init__(self, name="默认分析器"):
- self.name = name
- self.data = None
- self.report = None
- print(f"分析器 '{self.name}' 已创建")
- def load_data(self, filepath):
- self.data = pd.read_csv(filepath)
- print(f"已加载 {len(self.data)} 行数据")
- return self.data
- def summary(self):
- if self.data is None:
- return "请先加载数据"
- result = {
- "行数": len(self.data),
- "列数": len(self.data.columns),
- "缺失值": int(self.data.isnull().sum().sum()),
- "重复行": int(self.data.duplicated().sum())
- }
- self.report = result
- return result
- def __str__(self):
- return f"分析器: {self.name}, 数据: {len(self.data) if self.data is not None else '未加载'}行"
复制代码
二、构建通用数据清洗器 DataCleaner
DataCleaner 类封装了加载、去重、填充缺失、类型转换、过滤、重命名等常见操作,所有方法返回 self 以支持链式调用。内部维护 history 列表记录每一步操作,并通过 logging 输出日志。
- class DataCleaner:
- """通用数据清洗器"""
- def __init__(self, data: Optional[pd.DataFrame] = None, name: str = "Cleaner"):
- self.name = name
- self.data = data
- self.history = []
- self._log = logging.getLogger(self.name)
- def log_action(self, action: str, details: str = ""):
- entry = {"action": action, "details": details, "rows": len(self.data) if self.data is not None else 0}
- self.history.append(entry)
- self._log.info(f"[{action}] {details}")
- def load_csv(self, filepath: str, **kwargs) -> "DataCleaner":
- try:
- self.data = pd.read_csv(filepath, **kwargs)
- self.log_action("加载", f"从 {filepath} 加载了 {len(self.data)} 行数据")
- except FileNotFoundError:
- self._log.error(f"文件不存在: {filepath}")
- raise
- except Exception as e:
- self._log.error(f"加载失败: {e}")
- raise
- return self
- def load_dict(self, data_dict: Dict) -> "DataCleaner":
- self.data = pd.DataFrame(data_dict)
- self.log_action("创建", f"从字典创建了 {len(self.data)} 行数据")
- return self
- def remove_duplicates(self, subset: Optional[List[str]] = None) -> "DataCleaner":
- before = len(self.data)
- self.data = self.data.drop_duplicates(subset=subset)
- removed = before - len(self.data)
- self.log_action("去重", f"删除了 {removed} 行重复数据")
- return self
- def fill_missing(self, strategy: str = "mean", columns: Optional[List[str]] = None) -> "DataCleaner":
- cols = columns or self.data.select_dtypes(include=[np.number]).columns.tolist()
- filled_count = 0
- for col in cols:
- if col not in self.data.columns:
- continue
- missing_before = self.data[col].isnull().sum()
- if strategy == "mean":
- self.data[col] = self.data[col].fillna(self.data[col].mean())
- elif strategy == "median":
- self.data[col] = self.data[col].fillna(self.data[col].median())
- elif strategy == "mode":
- self.data[col] = self.data[col].fillna(self.data[col].mode()[0])
- elif strategy == "zero":
- self.data[col] = self.data[col].fillna(0)
- elif strategy == "forward":
- self.data[col] = self.data[col].ffill()
- filled_count += missing_before
- self.log_action("填充缺失值", f"策略={strategy}, 填充了 {filled_count} 个缺失值")
- return self
- def convert_types(self, type_map: Dict[str, str]) -> "DataCleaner":
- for col, target_type in type_map.items():
- if col not in self.data.columns:
- self._log.warning(f"列不存在: {col}")
- continue
- try:
- if target_type == "datetime":
- self.data[col] = pd.to_datetime(self.data[col])
- elif target_type == "int":
- self.data[col] = pd.to_numeric(self.data[col]).astype("Int64")
- elif target_type == "float":
- self.data[col] = pd.to_numeric(self.data[col]).astype(float)
- elif target_type == "str":
- self.data[col] = self.data[col].astype(str)
- elif target_type == "category":
- self.data[col] = self.data[col].astype("category")
- self.log_action("类型转换", f"{col} -> {target_type}")
- except Exception as e:
- self._log.error(f"类型转换失败 {col}: {e}")
- return self
- def filter_rows(self, condition) -> "DataCleaner":
- before = len(self.data)
- self.data = self.data[condition]
- removed = before - len(self.data)
- self.log_action("过滤", f"删除了 {removed} 行,剩余 {len(self.data)} 行")
- return self
- def rename_columns(self, name_map: Dict[str, str]) -> "DataCleaner":
- self.data = self.data.rename(columns=name_map)
- self.log_action("重命名", f"重命名了 {len(name_map)} 列")
- return self
- def get_summary(self) -> pd.DataFrame:
- if self.data is None:
- return pd.DataFrame()
- summary = pd.DataFrame({
- "类型": self.data.dtypes,
- "非空数": self.data.count(),
- "缺失数": self.data.isnull().sum(),
- "缺失率": self.data.isnull().sum() / len(self.data),
- "唯一值数": self.data.nunique()
- })
- num_cols = self.data.select_dtypes(include=[np.number]).columns
- if len(num_cols) > 0:
- stats = self.data[num_cols].describe().T
- stats = stats.rename(columns={
- "mean": "均值", "std": "标准差", "min": "最小值",
- "25%": "25分位", "50%": "中位数", "75%": "75分位", "max": "最大值"
- })
- summary = summary.join(stats)
- return summary
- def save_csv(self, filepath: str) -> "DataCleaner":
- self.data.to_csv(filepath, index=False)
- self.log_action("保存", f"已保存至 {filepath}")
- return self
- def print_history(self):
- print("\n=== 操作历史 ===")
- for i, entry in enumerate(self.history, 1):
- print(f" {i}. [{entry['action']}] {entry['details']} (当前: {entry['rows']}行)")
复制代码
三、链式调用演示
- sample_data = {
- "日期": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-03"],
- "产品": ["A", "B", None, "A", "A"],
- "销售额": [100, 200, 150, None, 100],
- "数量": [10, 20, 15, 8, 10],
- "单价": [10.0, 10.0, 10.0, 12.5, 10.0],
- }
- cleaner = DataCleaner(name="销售数据清洗")
- (cleaner
- .load_dict(sample_data)
- .remove_duplicates()
- .fill_missing(strategy="mean", columns=["销售额"])
- .fill_missing(strategy="mode", columns=["产品"])
- .convert_types({"日期": "datetime"})
- .filter_rows(lambda df: df["销售额"] > 0)
- )
- print("\n=== 清洗后的数据 ===")
- print(cleaner.data)
- print("\n=== 数据摘要 ===")
- print(cleaner.get_summary())
- cleaner.print_history()
复制代码
四、Pipeline 管道:将步骤标准化
Pipeline 像一个工厂流水线,每个步骤只负责一件事。通过 add_step 注册处理函数,run 方法依次执行并捕获异常。
- class AnalysisPipeline:
- """数据分析管道"""
- def __init__(self, name="分析管道"):
- self.name = name
- self.steps = []
- self.data = None
- self.results = {}
- self._log = logging.getLogger(name)
- def add_step(self, name: str, func, **kwargs):
- self.steps.append({"name": name, "func": func, "kwargs": kwargs})
- self._log.info(f"添加步骤: {name}")
- return self
- def run(self, data: pd.DataFrame) -> pd.DataFrame:
- self.data = data.copy()
- self._log.info(f"开始运行管道 '{self.name}', 输入数据: {len(self.data)} 行")
- for step in self.steps:
- try:
- self._log.info(f"执行步骤: {step['name']}")
- self.data = step["func"](self.data, **step["kwargs"])
- self.results[step["name"]] = {"rows": len(self.data), "columns": len(self.data.columns)}
- self._log.info(f" -> 完成: {len(self.data)} 行, {len(self.data.columns)} 列")
- except Exception as e:
- self._log.error(f"步骤 '{step['name']}' 执行失败: {e}")
- raise
- self._log.info(f"管道运行完成!")
- return self.data
- def get_report(self) -> str:
- report = [f"\n=== 管道运行报告: {self.name} ==="]
- for name, info in self.results.items():
- report.append(f" {name}: {info['rows']} 行, {info['columns']} 列")
- return "\n".join(report)
复制代码
定义各个步骤的处理函数(可以独立于类外部):
- def step_remove_duplicates(df, **kwargs):
- before = len(df)
- df = df.drop_duplicates()
- logging.info(f" 去重: {before} -> {len(df)} 行")
- return df
- def step_fill_missing(df, strategy="mean", columns=None, **kwargs):
- cols = columns or df.select_dtypes(include=[np.number]).columns.tolist()
- for col in cols:
- if col in df.columns:
- if strategy == "mean":
- df[col] = df[col].fillna(df[col].mean())
- elif strategy == "median":
- df[col] = df[col].fillna(df[col].median())
- logging.info(f" 填充缺失值: 策略={strategy}")
- return df
- def step_convert_dates(df, columns=None, **kwargs):
- cols = columns or []
- for col in cols:
- if col in df.columns:
- df[col] = pd.to_datetime(df[col])
- logging.info(f" 日期转换: {len(cols)} 列")
- return df
- def step_add_features(df, **kwargs):
- num_cols = df.select_dtypes(include=[np.number]).columns
- for col in num_cols:
- df[f"{col}_标准化"] = (df[col] - df[col].mean()) / df[col].std()
- logging.info(f" 添加特征: {len(num_cols)} 个标准化列")
- return df
- def step_filter_valid(df, **kwargs):
- before = len(df)
- df = df.dropna(subset=df.select_dtypes(include=[np.number]).columns[:1])
- logging.info(f" 过滤: {before} -> {len(df)} 行")
- return df
复制代码
使用 Pipeline 串联步骤:
- sample_data = pd.DataFrame({
- "日期": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-02"],
- "A": [100, 200, None, 200],
- "B": [50, 100, 75, 100],
- "C": ["x", "y", "z", "y"]
- })
- pipeline = AnalysisPipeline("销售数据处理管道")
- (pipeline
- .add_step("去重", step_remove_duplicates)
- .add_step("填充缺失值", step_fill_missing, strategy="mean")
- .add_step("日期转换", step_convert_dates, columns=["日期"])
- .add_step("特征工程", step_add_features)
- .add_step("数据过滤", step_filter_valid)
- )
- result = pipeline.run(sample_data)
- print(pipeline.get_report())
- print("\n最终数据:")
- print(result)
复制代码
五、日志系统配置:推荐使用 logging 替代 print
logging 支持级别过滤、格式化时间、文件输出等,适合生产环境。以下函数可快速配置带文件输出的日志器:
- import os
- def setup_logger(name: str, log_file: str = None, level: int = logging.INFO) -> logging.Logger:
- logger = logging.getLogger(name)
- logger.setLevel(level)
- if logger.handlers:
- return logger
- formatter = logging.Formatter(
- "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S"
- )
- console_handler = logging.StreamHandler()
- console_handler.setFormatter(formatter)
- logger.addHandler(console_handler)
- if log_file:
- os.makedirs(os.path.dirname(log_file) if os.path.dirname(log_file) else ".", exist_ok=True)
- file_handler = logging.FileHandler(log_file, encoding="utf-8")
- file_handler.setFormatter(formatter)
- logger.addHandler(file_handler)
- logger.info(f"日志文件: {log_file}")
- return logger
复制代码
六、异常处理最佳实践
自定义异常类可以携带 detail 信息,让错误定位更精准:
- class DataProcessingError(Exception):
- def __init__(self, message: str, detail: str = ""):
- super().__init__(message)
- self.detail = detail
- self.message = message
- def __str__(self):
- if self.detail:
- return f"{self.message} - {self.detail}"
- return self.message
复制代码
在 DataCleaner 和 Pipeline 中,已经使用了 try-except 包装关键操作,配合 logging.error 记录错误后重新抛出,让上层调用者可以统一处理。
七、配置文件管理(思路)
对于项目级别的脚本,建议将清洗策略、文件路径、数据库连接等参数抽离到独立的配置文件中(如 JSON 或 YAML),使用 configparser 或 pyyaml 读取,避免硬编码。
八、总结
通过 DataCleaner 类和 Pipeline 管道的组合,我们可以将重复的数据清洗逻辑封装成模块,调用时只需配置步骤参数,代码可读性和可维护性大幅提升。加上日志和异常处理,脚本达到生产可用级别。后续还可以扩展为支持多格式数据源、并行处理、缓存等高级功能。 |