查看: 168|回复: 2

Python数据分析:DataCleaner类与Pipeline管道实现可复用数据清洗

[复制链接]
发表于 7 小时前 | 显示全部楼层 |阅读模式
日常数据分析工作中,经常遇到不同项目都需要重复编写类似的清洗代码:加载CSV、删除缺失值、转换日期格式、过滤异常值。这种重复劳动不仅低效,而且后期维护困难。本文介绍如何通过面向对象编程(OOP)构建通用的 DataCleaner 类,并设计 Pipeline(管道)模式将数据处理步骤串联起来,配合日志记录和异常处理,打造一套可复用的数据分析模块。
  1. import pandas as pd
  2. import numpy as np
  3. import logging
  4. from typing import List, Dict, Optional, Union
  5. # 配置基础日志
  6. logging.basicConfig(
  7.     level=logging.INFO,
  8.     format="%(asctime)s [%(levelname)s] %(message)s",
  9.     datefmt="%H:%M:%S"
  10. )
复制代码

一、面向对象设计:将数据与操作封装到类中

使用类可以把数据(DataFrame)和清洗、转换、统计等操作打包在一起,避免全局变量混乱。下面的 SimpleAnalyzer 展示了核心结构:
  1. class SimpleAnalyzer:
  2.     """简单的数据分析器类"""
  3.     def __init__(self, name="默认分析器"):
  4.         self.name = name
  5.         self.data = None
  6.         self.report = None
  7.         print(f"分析器 '{self.name}' 已创建")
  8.     def load_data(self, filepath):
  9.         self.data = pd.read_csv(filepath)
  10.         print(f"已加载 {len(self.data)} 行数据")
  11.         return self.data
  12.     def summary(self):
  13.         if self.data is None:
  14.             return "请先加载数据"
  15.         result = {
  16.             "行数": len(self.data),
  17.             "列数": len(self.data.columns),
  18.             "缺失值": int(self.data.isnull().sum().sum()),
  19.             "重复行": int(self.data.duplicated().sum())
  20.         }
  21.         self.report = result
  22.         return result
  23.     def __str__(self):
  24.         return f"分析器: {self.name}, 数据: {len(self.data) if self.data is not None else '未加载'}行"
复制代码

二、构建通用数据清洗器 DataCleaner

DataCleaner 类封装了加载、去重、填充缺失、类型转换、过滤、重命名等常见操作,所有方法返回 self 以支持链式调用。内部维护 history 列表记录每一步操作,并通过 logging 输出日志。
  1. class DataCleaner:
  2.     """通用数据清洗器"""
  3.     def __init__(self, data: Optional[pd.DataFrame] = None, name: str = "Cleaner"):
  4.         self.name = name
  5.         self.data = data
  6.         self.history = []
  7.         self._log = logging.getLogger(self.name)
  8.     def log_action(self, action: str, details: str = ""):
  9.         entry = {"action": action, "details": details, "rows": len(self.data) if self.data is not None else 0}
  10.         self.history.append(entry)
  11.         self._log.info(f"[{action}] {details}")
  12.     def load_csv(self, filepath: str, **kwargs) -> "DataCleaner":
  13.         try:
  14.             self.data = pd.read_csv(filepath, **kwargs)
  15.             self.log_action("加载", f"从 {filepath} 加载了 {len(self.data)} 行数据")
  16.         except FileNotFoundError:
  17.             self._log.error(f"文件不存在: {filepath}")
  18.             raise
  19.         except Exception as e:
  20.             self._log.error(f"加载失败: {e}")
  21.             raise
  22.         return self
  23.     def load_dict(self, data_dict: Dict) -> "DataCleaner":
  24.         self.data = pd.DataFrame(data_dict)
  25.         self.log_action("创建", f"从字典创建了 {len(self.data)} 行数据")
  26.         return self
  27.     def remove_duplicates(self, subset: Optional[List[str]] = None) -> "DataCleaner":
  28.         before = len(self.data)
  29.         self.data = self.data.drop_duplicates(subset=subset)
  30.         removed = before - len(self.data)
  31.         self.log_action("去重", f"删除了 {removed} 行重复数据")
  32.         return self
  33.     def fill_missing(self, strategy: str = "mean", columns: Optional[List[str]] = None) -> "DataCleaner":
  34.         cols = columns or self.data.select_dtypes(include=[np.number]).columns.tolist()
  35.         filled_count = 0
  36.         for col in cols:
  37.             if col not in self.data.columns:
  38.                 continue
  39.             missing_before = self.data[col].isnull().sum()
  40.             if strategy == "mean":
  41.                 self.data[col] = self.data[col].fillna(self.data[col].mean())
  42.             elif strategy == "median":
  43.                 self.data[col] = self.data[col].fillna(self.data[col].median())
  44.             elif strategy == "mode":
  45.                 self.data[col] = self.data[col].fillna(self.data[col].mode()[0])
  46.             elif strategy == "zero":
  47.                 self.data[col] = self.data[col].fillna(0)
  48.             elif strategy == "forward":
  49.                 self.data[col] = self.data[col].ffill()
  50.             filled_count += missing_before
  51.         self.log_action("填充缺失值", f"策略={strategy}, 填充了 {filled_count} 个缺失值")
  52.         return self
  53.     def convert_types(self, type_map: Dict[str, str]) -> "DataCleaner":
  54.         for col, target_type in type_map.items():
  55.             if col not in self.data.columns:
  56.                 self._log.warning(f"列不存在: {col}")
  57.                 continue
  58.             try:
  59.                 if target_type == "datetime":
  60.                     self.data[col] = pd.to_datetime(self.data[col])
  61.                 elif target_type == "int":
  62.                     self.data[col] = pd.to_numeric(self.data[col]).astype("Int64")
  63.                 elif target_type == "float":
  64.                     self.data[col] = pd.to_numeric(self.data[col]).astype(float)
  65.                 elif target_type == "str":
  66.                     self.data[col] = self.data[col].astype(str)
  67.                 elif target_type == "category":
  68.                     self.data[col] = self.data[col].astype("category")
  69.                 self.log_action("类型转换", f"{col} -> {target_type}")
  70.             except Exception as e:
  71.                 self._log.error(f"类型转换失败 {col}: {e}")
  72.         return self
  73.     def filter_rows(self, condition) -> "DataCleaner":
  74.         before = len(self.data)
  75.         self.data = self.data[condition]
  76.         removed = before - len(self.data)
  77.         self.log_action("过滤", f"删除了 {removed} 行,剩余 {len(self.data)} 行")
  78.         return self
  79.     def rename_columns(self, name_map: Dict[str, str]) -> "DataCleaner":
  80.         self.data = self.data.rename(columns=name_map)
  81.         self.log_action("重命名", f"重命名了 {len(name_map)} 列")
  82.         return self
  83.     def get_summary(self) -> pd.DataFrame:
  84.         if self.data is None:
  85.             return pd.DataFrame()
  86.         summary = pd.DataFrame({
  87.             "类型": self.data.dtypes,
  88.             "非空数": self.data.count(),
  89.             "缺失数": self.data.isnull().sum(),
  90.             "缺失率": self.data.isnull().sum() / len(self.data),
  91.             "唯一值数": self.data.nunique()
  92.         })
  93.         num_cols = self.data.select_dtypes(include=[np.number]).columns
  94.         if len(num_cols) > 0:
  95.             stats = self.data[num_cols].describe().T
  96.             stats = stats.rename(columns={
  97.                 "mean": "均值", "std": "标准差", "min": "最小值",
  98.                 "25%": "25分位", "50%": "中位数", "75%": "75分位", "max": "最大值"
  99.             })
  100.             summary = summary.join(stats)
  101.         return summary
  102.     def save_csv(self, filepath: str) -> "DataCleaner":
  103.         self.data.to_csv(filepath, index=False)
  104.         self.log_action("保存", f"已保存至 {filepath}")
  105.         return self
  106.     def print_history(self):
  107.         print("\n=== 操作历史 ===")
  108.         for i, entry in enumerate(self.history, 1):
  109.             print(f"  {i}. [{entry['action']}] {entry['details']} (当前: {entry['rows']}行)")
复制代码

三、链式调用演示
  1. sample_data = {
  2.     "日期": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-03"],
  3.     "产品": ["A", "B", None, "A", "A"],
  4.     "销售额": [100, 200, 150, None, 100],
  5.     "数量": [10, 20, 15, 8, 10],
  6.     "单价": [10.0, 10.0, 10.0, 12.5, 10.0],
  7. }
  8. cleaner = DataCleaner(name="销售数据清洗")
  9. (cleaner
  10.     .load_dict(sample_data)
  11.     .remove_duplicates()
  12.     .fill_missing(strategy="mean", columns=["销售额"])
  13.     .fill_missing(strategy="mode", columns=["产品"])
  14.     .convert_types({"日期": "datetime"})
  15.     .filter_rows(lambda df: df["销售额"] > 0)
  16. )
  17. print("\n=== 清洗后的数据 ===")
  18. print(cleaner.data)
  19. print("\n=== 数据摘要 ===")
  20. print(cleaner.get_summary())
  21. cleaner.print_history()
复制代码

四、Pipeline 管道:将步骤标准化

Pipeline 像一个工厂流水线,每个步骤只负责一件事。通过 add_step 注册处理函数,run 方法依次执行并捕获异常。
  1. class AnalysisPipeline:
  2.     """数据分析管道"""
  3.     def __init__(self, name="分析管道"):
  4.         self.name = name
  5.         self.steps = []
  6.         self.data = None
  7.         self.results = {}
  8.         self._log = logging.getLogger(name)
  9.     def add_step(self, name: str, func, **kwargs):
  10.         self.steps.append({"name": name, "func": func, "kwargs": kwargs})
  11.         self._log.info(f"添加步骤: {name}")
  12.         return self
  13.     def run(self, data: pd.DataFrame) -> pd.DataFrame:
  14.         self.data = data.copy()
  15.         self._log.info(f"开始运行管道 '{self.name}', 输入数据: {len(self.data)} 行")
  16.         for step in self.steps:
  17.             try:
  18.                 self._log.info(f"执行步骤: {step['name']}")
  19.                 self.data = step["func"](self.data, **step["kwargs"])
  20.                 self.results[step["name"]] = {"rows": len(self.data), "columns": len(self.data.columns)}
  21.                 self._log.info(f" -> 完成: {len(self.data)} 行, {len(self.data.columns)} 列")
  22.             except Exception as e:
  23.                 self._log.error(f"步骤 '{step['name']}' 执行失败: {e}")
  24.                 raise
  25.         self._log.info(f"管道运行完成!")
  26.         return self.data
  27.     def get_report(self) -> str:
  28.         report = [f"\n=== 管道运行报告: {self.name} ==="]
  29.         for name, info in self.results.items():
  30.             report.append(f"  {name}: {info['rows']} 行, {info['columns']} 列")
  31.         return "\n".join(report)
复制代码

定义各个步骤的处理函数(可以独立于类外部):
  1. def step_remove_duplicates(df, **kwargs):
  2.     before = len(df)
  3.     df = df.drop_duplicates()
  4.     logging.info(f"  去重: {before} -> {len(df)} 行")
  5.     return df
  6. def step_fill_missing(df, strategy="mean", columns=None, **kwargs):
  7.     cols = columns or df.select_dtypes(include=[np.number]).columns.tolist()
  8.     for col in cols:
  9.         if col in df.columns:
  10.             if strategy == "mean":
  11.                 df[col] = df[col].fillna(df[col].mean())
  12.             elif strategy == "median":
  13.                 df[col] = df[col].fillna(df[col].median())
  14.     logging.info(f"  填充缺失值: 策略={strategy}")
  15.     return df
  16. def step_convert_dates(df, columns=None, **kwargs):
  17.     cols = columns or []
  18.     for col in cols:
  19.         if col in df.columns:
  20.             df[col] = pd.to_datetime(df[col])
  21.     logging.info(f"  日期转换: {len(cols)} 列")
  22.     return df
  23. def step_add_features(df, **kwargs):
  24.     num_cols = df.select_dtypes(include=[np.number]).columns
  25.     for col in num_cols:
  26.         df[f"{col}_标准化"] = (df[col] - df[col].mean()) / df[col].std()
  27.     logging.info(f"  添加特征: {len(num_cols)} 个标准化列")
  28.     return df
  29. def step_filter_valid(df, **kwargs):
  30.     before = len(df)
  31.     df = df.dropna(subset=df.select_dtypes(include=[np.number]).columns[:1])
  32.     logging.info(f"  过滤: {before} -> {len(df)} 行")
  33.     return df
复制代码

使用 Pipeline 串联步骤:
  1. sample_data = pd.DataFrame({
  2.     "日期": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-02"],
  3.     "A": [100, 200, None, 200],
  4.     "B": [50, 100, 75, 100],
  5.     "C": ["x", "y", "z", "y"]
  6. })
  7. pipeline = AnalysisPipeline("销售数据处理管道")
  8. (pipeline
  9.     .add_step("去重", step_remove_duplicates)
  10.     .add_step("填充缺失值", step_fill_missing, strategy="mean")
  11.     .add_step("日期转换", step_convert_dates, columns=["日期"])
  12.     .add_step("特征工程", step_add_features)
  13.     .add_step("数据过滤", step_filter_valid)
  14. )
  15. result = pipeline.run(sample_data)
  16. print(pipeline.get_report())
  17. print("\n最终数据:")
  18. print(result)
复制代码

五、日志系统配置:推荐使用 logging 替代 print

logging 支持级别过滤、格式化时间、文件输出等,适合生产环境。以下函数可快速配置带文件输出的日志器:
  1. import os
  2. def setup_logger(name: str, log_file: str = None, level: int = logging.INFO) -> logging.Logger:
  3.     logger = logging.getLogger(name)
  4.     logger.setLevel(level)
  5.     if logger.handlers:
  6.         return logger
  7.     formatter = logging.Formatter(
  8.         "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
  9.         datefmt="%Y-%m-%d %H:%M:%S"
  10.     )
  11.     console_handler = logging.StreamHandler()
  12.     console_handler.setFormatter(formatter)
  13.     logger.addHandler(console_handler)
  14.     if log_file:
  15.         os.makedirs(os.path.dirname(log_file) if os.path.dirname(log_file) else ".", exist_ok=True)
  16.         file_handler = logging.FileHandler(log_file, encoding="utf-8")
  17.         file_handler.setFormatter(formatter)
  18.         logger.addHandler(file_handler)
  19.         logger.info(f"日志文件: {log_file}")
  20.     return logger
复制代码

六、异常处理最佳实践

自定义异常类可以携带 detail 信息,让错误定位更精准:
  1. class DataProcessingError(Exception):
  2.     def __init__(self, message: str, detail: str = ""):
  3.         super().__init__(message)
  4.         self.detail = detail
  5.         self.message = message
  6.     def __str__(self):
  7.         if self.detail:
  8.             return f"{self.message} - {self.detail}"
  9.         return self.message
复制代码

在 DataCleaner 和 Pipeline 中,已经使用了 try-except 包装关键操作,配合 logging.error 记录错误后重新抛出,让上层调用者可以统一处理。

七、配置文件管理(思路)

对于项目级别的脚本,建议将清洗策略、文件路径、数据库连接等参数抽离到独立的配置文件中(如 JSON 或 YAML),使用 configparser 或 pyyaml 读取,避免硬编码。

八、总结

通过 DataCleaner 类和 Pipeline 管道的组合,我们可以将重复的数据清洗逻辑封装成模块,调用时只需配置步骤参数,代码可读性和可维护性大幅提升。加上日志和异常处理,脚本达到生产可用级别。后续还可以扩展为支持多格式数据源、并行处理、缓存等高级功能。
回复

使用道具 举报

发表于 5 小时前 | 显示全部楼层

Re: Python数据分析:DataCleaner类与Pipeline管道实现可复用数据清洗

非常实用的分享!用类封装数据清洗操作,再加上链式调用和日志记录,确实能大幅提升复用性。特别是 `log_action` 和异常处理的设计,让调试和追溯变得更方便,这对于多次运行或生产环境很有帮助。 有一个小疑问:正文标题提到了 “Pipeline 管道”,但内容只展示了 `DataCleaner` 类的结构。你是打算在后面介绍 `Pipeline` 的实现方式,比如把多种清洗步骤作为一个可配置的序列来执行?还是说这里的链式调用本身已经充当了管道的作用?希望能看到更完整的管道串联示例,比如如何动态添加步骤、跳过或重试失败步骤等。 另外,`history` 列表只记录了操作名和行数,如果后续需要回撤某一步或者导出完整的数据清洗报告,有没有考虑扩展它?比如保存每个步骤前后的数据快照(通过深拷贝或者只记录变化量)?期待继续更新这部分内容!
回复 支持 反对

使用道具 举报

发表于 2 小时前 | 显示全部楼层

Re: Python数据分析:DataCleaner类与Pipeline管道实现可复用数据清洗

感谢分享!这个 DataCleaner 类设计得很实用,链式调用和日志记录让代码可读性和维护性都提升了不少。特别是 `load_csv` 里的异常处理,加了 try-except 和日志输出,比直接读文件要稳健很多。我平时也经常要做类似的数据清洗,重复写那些 `dropna`、`to_datetime` 确实很烦,用类封装起来确实是个好思路。 想问一下楼主,Pipeline 管道部分是怎么把多个清洗步骤串起来的?是像 sklearn 那种 `fit_transform` 模式,还是直接在 `DataCleaner` 里再加一个 `pipeline` 方法让用户自定义步骤列表?另外,如果中间某一步出错导致数据状态不对,有没有什么回滚或者检查点机制? 期待后面的完整代码,这种可复用模块在团队协作里太有用了。
回复 支持 反对

使用道具 举报

发表于 半小时前 | 显示全部楼层

Re: Python数据分析:DataCleaner类与Pipeline管道实现可复用数据清洗

非常棒的分享!把数据清洗封装成类并用pipeline串联,确实能告别重复劳动。尤其喜欢 `history` 列表和日志记录——之前排查清洗步骤时全靠手动加print,现在看日志就能追溯每一步做了什么,省心很多。链式调用写起来也超流畅。如果能再补充几个常用的清洗函数(比如正则匹配替换、基于分位数的异常值过滤),或者支持导出清洗报告(markdown/html),感觉会更通用。感谢抛砖引玉,马上实践一下这个模式!
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

Hacking Group 021A

旗下站点

态势感知中心

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

关注微信公众号

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2026-6-17 17:00 , Processed in 0.042382 second(s), 17 queries , Gzip On, Redis On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部