- 发布于
Python数据科学与机器学习实战:从数据处理到模型部署
- 作者

- 姓名
- 全能波
- GitHub
- @weicracker
Python数据科学与机器学习实战:从数据处理到模型部署
Python在数据科学和机器学习领域占据主导地位,拥有丰富的生态系统和强大的工具库。本文将带您完整体验从数据处理到模型部署的全流程。
环境搭建和核心库
项目依赖配置
# requirements.txt
numpy==1.24.3
pandas==2.0.3
matplotlib==3.7.2
seaborn==0.12.2
scikit-learn==1.3.0
tensorflow==2.13.0
keras==2.13.1
xgboost==1.7.6
lightgbm==4.0.0
plotly==5.15.0
jupyter==1.0.0
ipykernel==6.25.0
streamlit==1.25.0
fastapi==0.101.1
uvicorn==0.23.2
joblib==1.3.2
optuna==3.3.0
mlflow==2.5.0
数据科学工具包
# src/data_science/utils.py - 数据科学工具包
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
from typing import List, Dict, Tuple, Optional, Union
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
import logging
# 设置样式和警告
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataExplorer:
"""数据探索分析工具类"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
self.categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
def basic_info(self) -> Dict:
"""获取数据基本信息"""
info = {
'shape': self.df.shape,
'memory_usage': self.df.memory_usage(deep=True).sum() / 1024**2, # MB
'missing_values': self.df.isnull().sum().sum(),
'duplicate_rows': self.df.duplicated().sum(),
'numeric_columns': len(self.numeric_cols),
'categorical_columns': len(self.categorical_cols)
}
logger.info(f"数据集形状: {info['shape']}")
logger.info(f"内存使用: {info['memory_usage']:.2f} MB")
logger.info(f"缺失值总数: {info['missing_values']}")
logger.info(f"重复行数: {info['duplicate_rows']}")
return info
def missing_value_analysis(self) -> pd.DataFrame:
"""缺失值分析"""
missing_data = pd.DataFrame({
'Column': self.df.columns,
'Missing_Count': self.df.isnull().sum(),
'Missing_Percentage': (self.df.isnull().sum() / len(self.df)) * 100
})
missing_data = missing_data[missing_data['Missing_Count'] > 0].sort_values(
'Missing_Percentage', ascending=False
)
if not missing_data.empty:
# 可视化缺失值
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 缺失值热图
sns.heatmap(self.df.isnull(), cbar=True, ax=ax1, cmap='viridis')
ax1.set_title('缺失值热图')
# 缺失值柱状图
missing_data.plot(x='Column', y='Missing_Percentage', kind='bar', ax=ax2)
ax2.set_title('缺失值百分比')
ax2.set_ylabel('缺失百分比 (%)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
return missing_data
def correlation_analysis(self, method: str = 'pearson') -> pd.DataFrame:
"""相关性分析"""
if len(self.numeric_cols) < 2:
logger.warning("数值列少于2个,无法进行相关性分析")
return pd.DataFrame()
corr_matrix = self.df[self.numeric_cols].corr(method=method)
# 相关性热图
plt.figure(figsize=(12, 10))
mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
sns.heatmap(
corr_matrix,
mask=mask,
annot=True,
cmap='coolwarm',
center=0,
square=True,
fmt='.2f'
)
plt.title(f'{method.capitalize()} 相关性矩阵')
plt.tight_layout()
plt.show()
return corr_matrix
def distribution_analysis(self, columns: Optional[List[str]] = None):
"""分布分析"""
cols_to_analyze = columns or self.numeric_cols[:6] # 限制显示数量
if not cols_to_analyze:
logger.warning("没有数值列可供分析")
return
n_cols = min(3, len(cols_to_analyze))
n_rows = (len(cols_to_analyze) + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5*n_rows))
if n_rows == 1:
axes = [axes] if n_cols == 1 else axes
else:
axes = axes.flatten()
for i, col in enumerate(cols_to_analyze):
if i < len(axes):
# 直方图和密度图
self.df[col].hist(bins=30, alpha=0.7, ax=axes[i], density=True)
self.df[col].plot(kind='density', ax=axes[i], alpha=0.7)
axes[i].set_title(f'{col} 分布')
axes[i].set_ylabel('密度')
# 隐藏多余的子图
for i in range(len(cols_to_analyze), len(axes)):
axes[i].set_visible(False)
plt.tight_layout()
plt.show()
def outlier_detection(self, method: str = 'iqr') -> Dict[str, List]:
"""异常值检测"""
outliers = {}
for col in self.numeric_cols:
if method == 'iqr':
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_indices = self.df[
(self.df[col] < lower_bound) | (self.df[col] > upper_bound)
].index.tolist()
elif method == 'zscore':
z_scores = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
outlier_indices = self.df[z_scores > 3].index.tolist()
if outlier_indices:
outliers[col] = outlier_indices
# 可视化异常值
if outliers:
n_cols = min(3, len(outliers))
n_rows = (len(outliers) + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5*n_rows))
if n_rows == 1:
axes = [axes] if n_cols == 1 else axes
else:
axes = axes.flatten()
for i, (col, indices) in enumerate(outliers.items()):
if i < len(axes):
self.df.boxplot(column=col, ax=axes[i])
axes[i].set_title(f'{col} 箱线图 ({len(indices)} 个异常值)')
plt.tight_layout()
plt.show()
return outliers
class DataPreprocessor:
"""数据预处理工具类"""
def __init__(self):
self.scalers = {}
self.encoders = {}
def handle_missing_values(
self,
df: pd.DataFrame,
strategy: Dict[str, str] = None
) -> pd.DataFrame:
"""处理缺失值"""
df_processed = df.copy()
if strategy is None:
strategy = {}
for col in df_processed.columns:
if df_processed[col].isnull().any():
col_strategy = strategy.get(col, 'auto')
if col_strategy == 'auto':
if df_processed[col].dtype in ['object', 'category']:
# 分类变量用众数填充
mode_value = df_processed[col].mode()
if not mode_value.empty:
df_processed[col].fillna(mode_value[0], inplace=True)
else:
# 数值变量用中位数填充
median_value = df_processed[col].median()
df_processed[col].fillna(median_value, inplace=True)
elif col_strategy == 'drop':
df_processed.dropna(subset=[col], inplace=True)
elif col_strategy == 'forward_fill':
df_processed[col].fillna(method='ffill', inplace=True)
elif col_strategy == 'backward_fill':
df_processed[col].fillna(method='bfill', inplace=True)
elif isinstance(col_strategy, (int, float, str)):
df_processed[col].fillna(col_strategy, inplace=True)
logger.info(f"缺失值处理完成,剩余缺失值: {df_processed.isnull().sum().sum()}")
return df_processed
def encode_categorical_features(
self,
df: pd.DataFrame,
encoding_type: str = 'auto'
) -> pd.DataFrame:
"""编码分类特征"""
df_encoded = df.copy()
categorical_cols = df_encoded.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
unique_values = df_encoded[col].nunique()
if encoding_type == 'auto':
# 自动选择编码方式
if unique_values <= 2:
# 二分类用标签编码
encoder = LabelEncoder()
df_encoded[col] = encoder.fit_transform(df_encoded[col].astype(str))
self.encoders[col] = encoder
elif unique_values <= 10:
# 少量类别用独热编码
encoded_cols = pd.get_dummies(df_encoded[col], prefix=col)
df_encoded = pd.concat([df_encoded.drop(col, axis=1), encoded_cols], axis=1)
else:
# 大量类别用标签编码
encoder = LabelEncoder()
df_encoded[col] = encoder.fit_transform(df_encoded[col].astype(str))
self.encoders[col] = encoder
elif encoding_type == 'label':
encoder = LabelEncoder()
df_encoded[col] = encoder.fit_transform(df_encoded[col].astype(str))
self.encoders[col] = encoder
elif encoding_type == 'onehot':
encoded_cols = pd.get_dummies(df_encoded[col], prefix=col)
df_encoded = pd.concat([df_encoded.drop(col, axis=1), encoded_cols], axis=1)
logger.info(f"分类特征编码完成,特征数量: {df_encoded.shape[1]}")
return df_encoded
def scale_features(
self,
df: pd.DataFrame,
method: str = 'standard',
exclude_cols: List[str] = None
) -> pd.DataFrame:
"""特征缩放"""
df_scaled = df.copy()
numeric_cols = df_scaled.select_dtypes(include=[np.number]).columns.tolist()
if exclude_cols:
numeric_cols = [col for col in numeric_cols if col not in exclude_cols]
if method == 'standard':
scaler = StandardScaler()
elif method == 'minmax':
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
elif method == 'robust':
from sklearn.preprocessing import RobustScaler
scaler = RobustScaler()
else:
logger.warning(f"未知的缩放方法: {method}")
return df_scaled
df_scaled[numeric_cols] = scaler.fit_transform(df_scaled[numeric_cols])
self.scalers[method] = scaler
logger.info(f"特征缩放完成,使用方法: {method}")
return df_scaled
def remove_outliers(
self,
df: pd.DataFrame,
method: str = 'iqr',
threshold: float = 1.5
) -> pd.DataFrame:
"""移除异常值"""
df_clean = df.copy()
numeric_cols = df_clean.select_dtypes(include=[np.number]).columns
outlier_indices = set()
for col in numeric_cols:
if method == 'iqr':
Q1 = df_clean[col].quantile(0.25)
Q3 = df_clean[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
col_outliers = df_clean[
(df_clean[col] < lower_bound) | (df_clean[col] > upper_bound)
].index
elif method == 'zscore':
z_scores = np.abs((df_clean[col] - df_clean[col].mean()) / df_clean[col].std())
col_outliers = df_clean[z_scores > threshold].index
outlier_indices.update(col_outliers)
df_clean = df_clean.drop(outlier_indices)
logger.info(f"移除异常值 {len(outlier_indices)} 个,剩余数据: {df_clean.shape[0]} 行")
return df_clean
class MLModelTrainer:
"""机器学习模型训练器"""
def __init__(self):
self.models = {}
self.results = {}
def prepare_data(
self,
df: pd.DataFrame,
target_col: str,
test_size: float = 0.2,
random_state: int = 42
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""准备训练数据"""
X = df.drop(target_col, axis=1)
y = df[target_col]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state, stratify=y
)
logger.info(f"数据分割完成 - 训练集: {X_train.shape}, 测试集: {X_test.shape}")
return X_train, X_test, y_train, y_test
def train_classification_models(
self,
X_train: np.ndarray,
X_test: np.ndarray,
y_train: np.ndarray,
y_test: np.ndarray
) -> Dict:
"""训练分类模型"""
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# 定义模型
models = {
'Logistic Regression': LogisticRegression(random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(random_state=42),
'SVM': SVC(probability=True, random_state=42),
'KNN': KNeighborsClassifier(n_neighbors=5)
}
results = {}
for name, model in models.items():
logger.info(f"训练模型: {name}")
# 训练模型
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
# 评估指标
metrics = {
'accuracy': accuracy_score(y_test, y_pred),
'precision': precision_score(y_test, y_pred, average='weighted'),
'recall': recall_score(y_test, y_pred, average='weighted'),
'f1': f1_score(y_test, y_pred, average='weighted')
}
if y_pred_proba is not None and len(np.unique(y_test)) == 2:
metrics['auc'] = roc_auc_score(y_test, y_pred_proba)
results[name] = {
'model': model,
'predictions': y_pred,
'probabilities': y_pred_proba,
'metrics': metrics
}
logger.info(f"{name} - Accuracy: {metrics['accuracy']:.4f}, F1: {metrics['f1']:.4f}")
self.models.update({k: v['model'] for k, v in results.items()})
self.results = results
return results
def plot_model_comparison(self):
"""绘制模型比较图"""
if not self.results:
logger.warning("没有训练结果可供比较")
return
# 提取指标
model_names = list(self.results.keys())
metrics = ['accuracy', 'precision', 'recall', 'f1']
metric_values = {metric: [] for metric in metrics}
for model_name in model_names:
for metric in metrics:
metric_values[metric].append(self.results[model_name]['metrics'].get(metric, 0))
# 绘制比较图
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.flatten()
for i, metric in enumerate(metrics):
axes[i].bar(model_names, metric_values[metric])
axes[i].set_title(f'{metric.capitalize()} 比较')
axes[i].set_ylabel(metric.capitalize())
axes[i].tick_params(axis='x', rotation=45)
# 添加数值标签
for j, v in enumerate(metric_values[metric]):
axes[i].text(j, v + 0.01, f'{v:.3f}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
def plot_confusion_matrices(self, y_test: np.ndarray):
"""绘制混淆矩阵"""
if not self.results:
logger.warning("没有训练结果可供分析")
return
n_models = len(self.results)
n_cols = min(3, n_models)
n_rows = (n_models + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(5*n_cols, 4*n_rows))
if n_rows == 1:
axes = [axes] if n_cols == 1 else axes
else:
axes = axes.flatten()
for i, (name, result) in enumerate(self.results.items()):
if i < len(axes):
cm = confusion_matrix(y_test, result['predictions'])
sns.heatmap(cm, annot=True, fmt='d', ax=axes[i], cmap='Blues')
axes[i].set_title(f'{name} 混淆矩阵')
axes[i].set_xlabel('预测值')
axes[i].set_ylabel('真实值')
# 隐藏多余的子图
for i in range(n_models, len(axes)):
axes[i].set_visible(False)
plt.tight_layout()
plt.show()
# 特征工程工具
class FeatureEngineer:
"""特征工程工具类"""
@staticmethod
def create_polynomial_features(df: pd.DataFrame, degree: int = 2) -> pd.DataFrame:
"""创建多项式特征"""
from sklearn.preprocessing import PolynomialFeatures
numeric_cols = df.select_dtypes(include=[np.number]).columns
poly = PolynomialFeatures(degree=degree, include_bias=False)
poly_features = poly.fit_transform(df[numeric_cols])
feature_names = poly.get_feature_names_out(numeric_cols)
poly_df = pd.DataFrame(poly_features, columns=feature_names, index=df.index)
# 合并原始非数值特征
non_numeric_cols = df.select_dtypes(exclude=[np.number]).columns
if not non_numeric_cols.empty:
result_df = pd.concat([poly_df, df[non_numeric_cols]], axis=1)
else:
result_df = poly_df
logger.info(f"多项式特征创建完成,特征数量: {result_df.shape[1]}")
return result_df
@staticmethod
def select_features(
X: pd.DataFrame,
y: pd.Series,
method: str = 'mutual_info',
k: int = 10
) -> pd.DataFrame:
"""特征选择"""
from sklearn.feature_selection import SelectKBest, mutual_info_classif, f_classif, chi2
if method == 'mutual_info':
selector = SelectKBest(score_func=mutual_info_classif, k=k)
elif method == 'f_classif':
selector = SelectKBest(score_func=f_classif, k=k)
elif method == 'chi2':
selector = SelectKBest(score_func=chi2, k=k)
else:
logger.warning(f"未知的特征选择方法: {method}")
return X
X_selected = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()].tolist()
result_df = pd.DataFrame(X_selected, columns=selected_features, index=X.index)
logger.info(f"特征选择完成,选择了 {len(selected_features)} 个特征")
logger.info(f"选择的特征: {selected_features}")
return result_df
总结
Python数据科学与机器学习的核心要点:
🎯 数据处理流程
- 数据探索:基本信息、缺失值、分布分析
- 数据预处理:清洗、编码、缩放、异常值处理
- 特征工程:特征创建、选择、变换
- 模型训练:多模型比较、评估、优化
✅ 最佳实践
- 系统化的数据探索分析
- 自动化的数据预处理流程
- 多模型对比和评估
- 可视化结果展示
🚀 高级技术
- 特征工程和选择
- 模型集成和优化
- 交叉验证和超参数调优
- 模型解释和部署
💡 实用工具
- 数据探索分析类
- 预处理工具包
- 模型训练器
- 可视化组件
掌握Python数据科学,开启智能分析之旅!
数据科学是现代技术发展的核心驱动力,Python提供了完整的生态系统来支持从数据处理到模型部署的全流程开发。