发布于

Python数据科学与机器学习实战:从数据处理到模型部署

作者

Python数据科学与机器学习实战:从数据处理到模型部署

Python在数据科学和机器学习领域占据主导地位,拥有丰富的生态系统和强大的工具库。本文将带您完整体验从数据处理到模型部署的全流程。

环境搭建和核心库

项目依赖配置

# requirements.txt
numpy==1.24.3
pandas==2.0.3
matplotlib==3.7.2
seaborn==0.12.2
scikit-learn==1.3.0
tensorflow==2.13.0
keras==2.13.1
xgboost==1.7.6
lightgbm==4.0.0
plotly==5.15.0
jupyter==1.0.0
ipykernel==6.25.0
streamlit==1.25.0
fastapi==0.101.1
uvicorn==0.23.2
joblib==1.3.2
optuna==3.3.0
mlflow==2.5.0

数据科学工具包

# src/data_science/utils.py - 数据科学工具包
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
from typing import List, Dict, Tuple, Optional, Union
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
import logging

# 设置样式和警告
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DataExplorer:
    """数据探索分析工具类"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        self.categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
        
    def basic_info(self) -> Dict:
        """获取数据基本信息"""
        info = {
            'shape': self.df.shape,
            'memory_usage': self.df.memory_usage(deep=True).sum() / 1024**2,  # MB
            'missing_values': self.df.isnull().sum().sum(),
            'duplicate_rows': self.df.duplicated().sum(),
            'numeric_columns': len(self.numeric_cols),
            'categorical_columns': len(self.categorical_cols)
        }
        
        logger.info(f"数据集形状: {info['shape']}")
        logger.info(f"内存使用: {info['memory_usage']:.2f} MB")
        logger.info(f"缺失值总数: {info['missing_values']}")
        logger.info(f"重复行数: {info['duplicate_rows']}")
        
        return info
    
    def missing_value_analysis(self) -> pd.DataFrame:
        """缺失值分析"""
        missing_data = pd.DataFrame({
            'Column': self.df.columns,
            'Missing_Count': self.df.isnull().sum(),
            'Missing_Percentage': (self.df.isnull().sum() / len(self.df)) * 100
        })
        
        missing_data = missing_data[missing_data['Missing_Count'] > 0].sort_values(
            'Missing_Percentage', ascending=False
        )
        
        if not missing_data.empty:
            # 可视化缺失值
            fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
            
            # 缺失值热图
            sns.heatmap(self.df.isnull(), cbar=True, ax=ax1, cmap='viridis')
            ax1.set_title('缺失值热图')
            
            # 缺失值柱状图
            missing_data.plot(x='Column', y='Missing_Percentage', kind='bar', ax=ax2)
            ax2.set_title('缺失值百分比')
            ax2.set_ylabel('缺失百分比 (%)')
            plt.xticks(rotation=45)
            
            plt.tight_layout()
            plt.show()
        
        return missing_data
    
    def correlation_analysis(self, method: str = 'pearson') -> pd.DataFrame:
        """相关性分析"""
        if len(self.numeric_cols) < 2:
            logger.warning("数值列少于2个,无法进行相关性分析")
            return pd.DataFrame()
        
        corr_matrix = self.df[self.numeric_cols].corr(method=method)
        
        # 相关性热图
        plt.figure(figsize=(12, 10))
        mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
        sns.heatmap(
            corr_matrix, 
            mask=mask, 
            annot=True, 
            cmap='coolwarm', 
            center=0,
            square=True,
            fmt='.2f'
        )
        plt.title(f'{method.capitalize()} 相关性矩阵')
        plt.tight_layout()
        plt.show()
        
        return corr_matrix
    
    def distribution_analysis(self, columns: Optional[List[str]] = None):
        """分布分析"""
        cols_to_analyze = columns or self.numeric_cols[:6]  # 限制显示数量
        
        if not cols_to_analyze:
            logger.warning("没有数值列可供分析")
            return
        
        n_cols = min(3, len(cols_to_analyze))
        n_rows = (len(cols_to_analyze) + n_cols - 1) // n_cols
        
        fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5*n_rows))
        if n_rows == 1:
            axes = [axes] if n_cols == 1 else axes
        else:
            axes = axes.flatten()
        
        for i, col in enumerate(cols_to_analyze):
            if i < len(axes):
                # 直方图和密度图
                self.df[col].hist(bins=30, alpha=0.7, ax=axes[i], density=True)
                self.df[col].plot(kind='density', ax=axes[i], alpha=0.7)
                axes[i].set_title(f'{col} 分布')
                axes[i].set_ylabel('密度')
        
        # 隐藏多余的子图
        for i in range(len(cols_to_analyze), len(axes)):
            axes[i].set_visible(False)
        
        plt.tight_layout()
        plt.show()
    
    def outlier_detection(self, method: str = 'iqr') -> Dict[str, List]:
        """异常值检测"""
        outliers = {}
        
        for col in self.numeric_cols:
            if method == 'iqr':
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                
                outlier_indices = self.df[
                    (self.df[col] < lower_bound) | (self.df[col] > upper_bound)
                ].index.tolist()
                
            elif method == 'zscore':
                z_scores = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
                outlier_indices = self.df[z_scores > 3].index.tolist()
            
            if outlier_indices:
                outliers[col] = outlier_indices
        
        # 可视化异常值
        if outliers:
            n_cols = min(3, len(outliers))
            n_rows = (len(outliers) + n_cols - 1) // n_cols
            
            fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5*n_rows))
            if n_rows == 1:
                axes = [axes] if n_cols == 1 else axes
            else:
                axes = axes.flatten()
            
            for i, (col, indices) in enumerate(outliers.items()):
                if i < len(axes):
                    self.df.boxplot(column=col, ax=axes[i])
                    axes[i].set_title(f'{col} 箱线图 ({len(indices)} 个异常值)')
            
            plt.tight_layout()
            plt.show()
        
        return outliers

class DataPreprocessor:
    """数据预处理工具类"""
    
    def __init__(self):
        self.scalers = {}
        self.encoders = {}
        
    def handle_missing_values(
        self, 
        df: pd.DataFrame, 
        strategy: Dict[str, str] = None
    ) -> pd.DataFrame:
        """处理缺失值"""
        df_processed = df.copy()
        
        if strategy is None:
            strategy = {}
        
        for col in df_processed.columns:
            if df_processed[col].isnull().any():
                col_strategy = strategy.get(col, 'auto')
                
                if col_strategy == 'auto':
                    if df_processed[col].dtype in ['object', 'category']:
                        # 分类变量用众数填充
                        mode_value = df_processed[col].mode()
                        if not mode_value.empty:
                            df_processed[col].fillna(mode_value[0], inplace=True)
                    else:
                        # 数值变量用中位数填充
                        median_value = df_processed[col].median()
                        df_processed[col].fillna(median_value, inplace=True)
                
                elif col_strategy == 'drop':
                    df_processed.dropna(subset=[col], inplace=True)
                
                elif col_strategy == 'forward_fill':
                    df_processed[col].fillna(method='ffill', inplace=True)
                
                elif col_strategy == 'backward_fill':
                    df_processed[col].fillna(method='bfill', inplace=True)
                
                elif isinstance(col_strategy, (int, float, str)):
                    df_processed[col].fillna(col_strategy, inplace=True)
        
        logger.info(f"缺失值处理完成,剩余缺失值: {df_processed.isnull().sum().sum()}")
        return df_processed
    
    def encode_categorical_features(
        self, 
        df: pd.DataFrame, 
        encoding_type: str = 'auto'
    ) -> pd.DataFrame:
        """编码分类特征"""
        df_encoded = df.copy()
        categorical_cols = df_encoded.select_dtypes(include=['object', 'category']).columns
        
        for col in categorical_cols:
            unique_values = df_encoded[col].nunique()
            
            if encoding_type == 'auto':
                # 自动选择编码方式
                if unique_values <= 2:
                    # 二分类用标签编码
                    encoder = LabelEncoder()
                    df_encoded[col] = encoder.fit_transform(df_encoded[col].astype(str))
                    self.encoders[col] = encoder
                elif unique_values <= 10:
                    # 少量类别用独热编码
                    encoded_cols = pd.get_dummies(df_encoded[col], prefix=col)
                    df_encoded = pd.concat([df_encoded.drop(col, axis=1), encoded_cols], axis=1)
                else:
                    # 大量类别用标签编码
                    encoder = LabelEncoder()
                    df_encoded[col] = encoder.fit_transform(df_encoded[col].astype(str))
                    self.encoders[col] = encoder
            
            elif encoding_type == 'label':
                encoder = LabelEncoder()
                df_encoded[col] = encoder.fit_transform(df_encoded[col].astype(str))
                self.encoders[col] = encoder
            
            elif encoding_type == 'onehot':
                encoded_cols = pd.get_dummies(df_encoded[col], prefix=col)
                df_encoded = pd.concat([df_encoded.drop(col, axis=1), encoded_cols], axis=1)
        
        logger.info(f"分类特征编码完成,特征数量: {df_encoded.shape[1]}")
        return df_encoded
    
    def scale_features(
        self, 
        df: pd.DataFrame, 
        method: str = 'standard',
        exclude_cols: List[str] = None
    ) -> pd.DataFrame:
        """特征缩放"""
        df_scaled = df.copy()
        numeric_cols = df_scaled.select_dtypes(include=[np.number]).columns.tolist()
        
        if exclude_cols:
            numeric_cols = [col for col in numeric_cols if col not in exclude_cols]
        
        if method == 'standard':
            scaler = StandardScaler()
        elif method == 'minmax':
            from sklearn.preprocessing import MinMaxScaler
            scaler = MinMaxScaler()
        elif method == 'robust':
            from sklearn.preprocessing import RobustScaler
            scaler = RobustScaler()
        else:
            logger.warning(f"未知的缩放方法: {method}")
            return df_scaled
        
        df_scaled[numeric_cols] = scaler.fit_transform(df_scaled[numeric_cols])
        self.scalers[method] = scaler
        
        logger.info(f"特征缩放完成,使用方法: {method}")
        return df_scaled
    
    def remove_outliers(
        self, 
        df: pd.DataFrame, 
        method: str = 'iqr',
        threshold: float = 1.5
    ) -> pd.DataFrame:
        """移除异常值"""
        df_clean = df.copy()
        numeric_cols = df_clean.select_dtypes(include=[np.number]).columns
        
        outlier_indices = set()
        
        for col in numeric_cols:
            if method == 'iqr':
                Q1 = df_clean[col].quantile(0.25)
                Q3 = df_clean[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - threshold * IQR
                upper_bound = Q3 + threshold * IQR
                
                col_outliers = df_clean[
                    (df_clean[col] < lower_bound) | (df_clean[col] > upper_bound)
                ].index
                
            elif method == 'zscore':
                z_scores = np.abs((df_clean[col] - df_clean[col].mean()) / df_clean[col].std())
                col_outliers = df_clean[z_scores > threshold].index
            
            outlier_indices.update(col_outliers)
        
        df_clean = df_clean.drop(outlier_indices)
        logger.info(f"移除异常值 {len(outlier_indices)} 个,剩余数据: {df_clean.shape[0]} 行")
        
        return df_clean

class MLModelTrainer:
    """机器学习模型训练器"""
    
    def __init__(self):
        self.models = {}
        self.results = {}
        
    def prepare_data(
        self, 
        df: pd.DataFrame, 
        target_col: str,
        test_size: float = 0.2,
        random_state: int = 42
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """准备训练数据"""
        X = df.drop(target_col, axis=1)
        y = df[target_col]
        
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state, stratify=y
        )
        
        logger.info(f"数据分割完成 - 训练集: {X_train.shape}, 测试集: {X_test.shape}")
        return X_train, X_test, y_train, y_test
    
    def train_classification_models(
        self, 
        X_train: np.ndarray, 
        X_test: np.ndarray,
        y_train: np.ndarray, 
        y_test: np.ndarray
    ) -> Dict:
        """训练分类模型"""
        from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
        from sklearn.linear_model import LogisticRegression
        from sklearn.svm import SVC
        from sklearn.neighbors import KNeighborsClassifier
        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
        
        # 定义模型
        models = {
            'Logistic Regression': LogisticRegression(random_state=42),
            'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
            'Gradient Boosting': GradientBoostingClassifier(random_state=42),
            'SVM': SVC(probability=True, random_state=42),
            'KNN': KNeighborsClassifier(n_neighbors=5)
        }
        
        results = {}
        
        for name, model in models.items():
            logger.info(f"训练模型: {name}")
            
            # 训练模型
            model.fit(X_train, y_train)
            
            # 预测
            y_pred = model.predict(X_test)
            y_pred_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
            
            # 评估指标
            metrics = {
                'accuracy': accuracy_score(y_test, y_pred),
                'precision': precision_score(y_test, y_pred, average='weighted'),
                'recall': recall_score(y_test, y_pred, average='weighted'),
                'f1': f1_score(y_test, y_pred, average='weighted')
            }
            
            if y_pred_proba is not None and len(np.unique(y_test)) == 2:
                metrics['auc'] = roc_auc_score(y_test, y_pred_proba)
            
            results[name] = {
                'model': model,
                'predictions': y_pred,
                'probabilities': y_pred_proba,
                'metrics': metrics
            }
            
            logger.info(f"{name} - Accuracy: {metrics['accuracy']:.4f}, F1: {metrics['f1']:.4f}")
        
        self.models.update({k: v['model'] for k, v in results.items()})
        self.results = results
        
        return results
    
    def plot_model_comparison(self):
        """绘制模型比较图"""
        if not self.results:
            logger.warning("没有训练结果可供比较")
            return
        
        # 提取指标
        model_names = list(self.results.keys())
        metrics = ['accuracy', 'precision', 'recall', 'f1']
        
        metric_values = {metric: [] for metric in metrics}
        
        for model_name in model_names:
            for metric in metrics:
                metric_values[metric].append(self.results[model_name]['metrics'].get(metric, 0))
        
        # 绘制比较图
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        axes = axes.flatten()
        
        for i, metric in enumerate(metrics):
            axes[i].bar(model_names, metric_values[metric])
            axes[i].set_title(f'{metric.capitalize()} 比较')
            axes[i].set_ylabel(metric.capitalize())
            axes[i].tick_params(axis='x', rotation=45)
            
            # 添加数值标签
            for j, v in enumerate(metric_values[metric]):
                axes[i].text(j, v + 0.01, f'{v:.3f}', ha='center', va='bottom')
        
        plt.tight_layout()
        plt.show()
    
    def plot_confusion_matrices(self, y_test: np.ndarray):
        """绘制混淆矩阵"""
        if not self.results:
            logger.warning("没有训练结果可供分析")
            return
        
        n_models = len(self.results)
        n_cols = min(3, n_models)
        n_rows = (n_models + n_cols - 1) // n_cols
        
        fig, axes = plt.subplots(n_rows, n_cols, figsize=(5*n_cols, 4*n_rows))
        if n_rows == 1:
            axes = [axes] if n_cols == 1 else axes
        else:
            axes = axes.flatten()
        
        for i, (name, result) in enumerate(self.results.items()):
            if i < len(axes):
                cm = confusion_matrix(y_test, result['predictions'])
                sns.heatmap(cm, annot=True, fmt='d', ax=axes[i], cmap='Blues')
                axes[i].set_title(f'{name} 混淆矩阵')
                axes[i].set_xlabel('预测值')
                axes[i].set_ylabel('真实值')
        
        # 隐藏多余的子图
        for i in range(n_models, len(axes)):
            axes[i].set_visible(False)
        
        plt.tight_layout()
        plt.show()

# 特征工程工具
class FeatureEngineer:
    """特征工程工具类"""
    
    @staticmethod
    def create_polynomial_features(df: pd.DataFrame, degree: int = 2) -> pd.DataFrame:
        """创建多项式特征"""
        from sklearn.preprocessing import PolynomialFeatures
        
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        poly = PolynomialFeatures(degree=degree, include_bias=False)
        
        poly_features = poly.fit_transform(df[numeric_cols])
        feature_names = poly.get_feature_names_out(numeric_cols)
        
        poly_df = pd.DataFrame(poly_features, columns=feature_names, index=df.index)
        
        # 合并原始非数值特征
        non_numeric_cols = df.select_dtypes(exclude=[np.number]).columns
        if not non_numeric_cols.empty:
            result_df = pd.concat([poly_df, df[non_numeric_cols]], axis=1)
        else:
            result_df = poly_df
        
        logger.info(f"多项式特征创建完成,特征数量: {result_df.shape[1]}")
        return result_df
    
    @staticmethod
    def select_features(
        X: pd.DataFrame, 
        y: pd.Series, 
        method: str = 'mutual_info',
        k: int = 10
    ) -> pd.DataFrame:
        """特征选择"""
        from sklearn.feature_selection import SelectKBest, mutual_info_classif, f_classif, chi2
        
        if method == 'mutual_info':
            selector = SelectKBest(score_func=mutual_info_classif, k=k)
        elif method == 'f_classif':
            selector = SelectKBest(score_func=f_classif, k=k)
        elif method == 'chi2':
            selector = SelectKBest(score_func=chi2, k=k)
        else:
            logger.warning(f"未知的特征选择方法: {method}")
            return X
        
        X_selected = selector.fit_transform(X, y)
        selected_features = X.columns[selector.get_support()].tolist()
        
        result_df = pd.DataFrame(X_selected, columns=selected_features, index=X.index)
        
        logger.info(f"特征选择完成,选择了 {len(selected_features)} 个特征")
        logger.info(f"选择的特征: {selected_features}")
        
        return result_df

总结

Python数据科学与机器学习的核心要点:

🎯 数据处理流程

  1. 数据探索:基本信息、缺失值、分布分析
  2. 数据预处理:清洗、编码、缩放、异常值处理
  3. 特征工程:特征创建、选择、变换
  4. 模型训练:多模型比较、评估、优化

✅ 最佳实践

  • 系统化的数据探索分析
  • 自动化的数据预处理流程
  • 多模型对比和评估
  • 可视化结果展示

🚀 高级技术

  • 特征工程和选择
  • 模型集成和优化
  • 交叉验证和超参数调优
  • 模型解释和部署

💡 实用工具

  • 数据探索分析类
  • 预处理工具包
  • 模型训练器
  • 可视化组件

掌握Python数据科学,开启智能分析之旅!


数据科学是现代技术发展的核心驱动力,Python提供了完整的生态系统来支持从数据处理到模型部署的全流程开发。