import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
class DataAnalyzer:
"""
A comprehensive data analysis class for exploratory data analysis,
statistical testing, dimensionality reduction, and clustering.
"""
def __init__(self, data_path=None, data=None):
"""
Initialize the DataAnalyzer with either a path to data or a DataFrame.
Parameters:
-----------
data_path : str, optional
Path to the CSV file
data : pandas.DataFrame, optional
DataFrame to analyze
"""
if data_path:
self.data = pd.read_csv(data_path)
elif isinstance(data, pd.DataFrame):
self.data = data.copy()
else:
raise ValueError("Either data_path or data must be provided")
self.numeric_cols = None
self.categorical_cols = None
self.processed_data = None
self.pca_result = None
self.cluster_labels = None
# Initialize plots with better aesthetics
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette("viridis")
def explore_data(self, verbose=True):
"""
Perform initial data exploration and return basic statistics.
Parameters:
-----------
verbose : bool, default=True
Whether to print exploration results
Returns:
--------
dict
Dictionary containing exploration results
"""
if verbose:
print(f"Dataset Shape: {self.data.shape}")
print("\nFirst 5 rows:")
print(self.data.head())
print("\nData Types:")
print(self.data.dtypes)
print("\nMissing Values:")
missing = self.data.isnull().sum()
print(missing[missing > 0])
print("\nBasic Statistics:")
print(self.data.describe())
# Identify column types
self.numeric_cols = self.data.select_dtypes(include=['int64', 'float64']).columns.tolist()
self.categorical_cols = self.data.select_dtypes(include=['object', 'category']).columns.tolist()
if verbose:
print(f"\nNumeric Columns: {len(self.numeric_cols)}")
print(f"Categorical Columns: {len(self.categorical_cols)}")
if len(self.categorical_cols) > 0:
print("\nCategory Distributions:")
for col in self.categorical_cols[:5]: # Show first 5 categorical columns
print(f"\n{col}:")
print(self.data[col].value_counts(normalize=True).head())
# Return exploration results
return {
"shape": self.data.shape,
"dtypes": self.data.dtypes,
"missing": self.data.isnull().sum(),
"numeric_cols": self.numeric_cols,
"categorical_cols": self.categorical_cols,
"stats": self.data.describe()
}
def clean_data(self, strategy='median', drop_thresh=0.5):
"""
Clean the dataset by handling missing values and optionally dropping columns.
Parameters:
-----------
strategy : str, default='median'
Strategy for imputing missing values in numeric columns
('mean', 'median', 'mode', 'drop')
drop_thresh : float, default=0.5
Drop columns with missing values ratio higher than this threshold
Returns:
--------
pandas.DataFrame
Cleaned DataFrame
"""
data_cleaned = self.data.copy()
# Drop columns with too many missing values
if 0 < drop_thresh < 1:
missing_ratio = data_cleaned.isnull().mean()
cols_to_drop = missing_ratio[missing_ratio > drop_thresh].index
data_cleaned.drop(columns=cols_to_drop, inplace=True)
print(f"Dropped {len(cols_to_drop)} columns with >{drop_thresh*100}% missing values")
# Handle missing values in remaining columns
if strategy != 'drop':
# Impute numeric columns
for col in data_cleaned.select_dtypes(include=['int64', 'float64']).columns:
if data_cleaned[col].isnull().any():
if strategy == 'mean':
fill_value = data_cleaned[col].mean()
elif strategy == 'median':
fill_value = data_cleaned[col].median()
elif strategy == 'mode':
fill_value = data_cleaned[col].mode()[0]
data_cleaned[col].fillna(fill_value, inplace=True)
# Impute categorical columns with mode
for col in data_cleaned.select_dtypes(include=['object', 'category']).columns:
if data_cleaned[col].isnull().any():
data_cleaned[col].fillna(data_cleaned[col].mode()[0], inplace=True)
else:
# Drop rows with any missing values
data_cleaned.dropna(inplace=True)
print(f"Dropped {self.data.shape[0] - data_cleaned.shape[0]} rows with missing values")
self.processed_data = data_cleaned
return data_cleaned
def visualize_distributions(self, columns=None, figsize=(15, 10)):
"""
Visualize the distributions of specified numeric columns.
Parameters:
-----------
columns : list, optional
List of column names to visualize. If None, uses all numeric columns
up to a maximum of 10 columns.
figsize : tuple, default=(15, 10)
Figure size
Returns:
--------
matplotlib.figure.Figure
The figure object containing the plots
"""
if columns is None:
columns = self.numeric_cols[:min(10, len(self.numeric_cols))]
n_cols = min(3, len(columns))
n_rows = int(np.ceil(len(columns) / n_cols))
fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)
axes = axes.flatten() if n_rows * n_cols > 1 else [axes]
for i, col in enumerate(columns):
if i < len(axes):
# Histogram with KDE
sns.histplot(data=self.data, x=col, kde=True, ax=axes[i])
axes[i].set_title(f'Distribution of {col}')
# Add basic statistics to the plot
stats_text = f"Mean: {self.data[col].mean():.2f}\n"
stats_text += f"Median: {self.data[col].median():.2f}\n"
stats_text += f"Std: {self.data[col].std():.2f}"
axes[i].text(0.95, 0.95, stats_text,
transform=axes[i].transAxes,
verticalalignment='top',
horizontalalignment='right',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.7))
# Hide unused subplots
for j in range(i + 1, len(axes)):
axes[j].set_visible(False)
plt.tight_layout()
return fig
def correlation_analysis(self, method='pearson', figsize=(12, 10), annot=True):
"""
Calculate and visualize the correlation matrix for numeric columns.
Parameters:
-----------
method : str, default='pearson'
Correlation method ('pearson', 'kendall', 'spearman')
figsize : tuple, default=(12, 10)
Figure size
annot : bool, default=True
Whether to annotate the heatmap with correlation values
Returns:
--------
pandas.DataFrame
Correlation matrix
"""
if not self.numeric_cols:
print("No numeric columns available for correlation analysis")
return None
# Calculate correlation matrix
corr_matrix = self.data[self.numeric_cols].corr(method=method)
# Visualize
plt.figure(figsize=figsize)
mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
cmap = sns.diverging_palette(230, 20, as_cmap=True)
sns.heatmap(corr_matrix, mask=mask, cmap=cmap, annot=annot,
square=True, linewidths=.5, cbar_kws={"shrink": .5})
plt.title(f'{method.capitalize()} Correlation Matrix', fontsize=16)
plt.tight_layout()
return corr_matrix
def detect_outliers(self, columns=None, method='zscore', threshold=3.0):
"""
Detect outliers in the specified numeric columns.
Parameters:
-----------
columns : list, optional
List of column names to check for outliers. If None, uses all numeric columns.
method : str, default='zscore'
Method to detect outliers ('zscore', 'iqr')
threshold : float, default=3.0
Threshold for outlier detection (z-score threshold or IQR factor)
Returns:
--------
dict
Dictionary with column names as keys and boolean masks of outliers as values
"""
if columns is None:
columns = self.numeric_cols
outliers = {}
for col in columns:
if method == 'zscore':
# Z-score method
z_scores = np.abs(stats.zscore(self.data[col].dropna()))
mask = pd.Series(self.data[col].notna())
mask[mask] = z_scores > threshold
outliers[col] = mask
elif method == 'iqr':
# IQR method
Q1 = self.data[col].quantile(0.25)
Q3 = self.data[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
outliers[col] = (self.data[col] < lower_bound) | (self.data[col] > upper_bound)
# Print summary
print(f"Outliers detected using {method} method (threshold={threshold}):")
for col, mask in outliers.items():
n_outliers = mask.sum()
pct_outliers = n_outliers / len(self.data) * 100
print(f" {col}: {n_outliers} outliers ({pct_outliers:.2f}%)")
return outliers
def feature_importance(self, target_col, top_n=10):
"""
Calculate feature importance using correlation with target variable.
Parameters:
-----------
target_col : str
Target column name
top_n : int, default=10
Number of top features to display
Returns:
--------
pandas.Series
Series with feature importance scores
"""
if target_col not in self.data.columns:
print(f"Target column '{target_col}' not found in the dataset")
return None
# Calculate absolute correlation with target
correlations = self.data[self.numeric_cols].corr()[target_col].abs()
correlations = correlations.drop(target_col, errors='ignore') # Remove self-correlation
# Sort and get top features
top_features = correlations.sort_values(ascending=False).head(top_n)
# Visualize
plt.figure(figsize=(10, 6))
top_features.sort_values().plot(kind='barh')
plt.title(f'Top {top_n} Features Correlated with {target_col}')
plt.xlabel('Absolute Correlation')
plt.tight_layout()
return correlations
def run_pca(self, n_components=2, standardize=True):
"""
Perform Principal Component Analysis on numeric features.
Parameters:
-----------
n_components : int, default=2
Number of principal components to compute
standardize : bool, default=True
Whether to standardize the data before PCA
Returns:
--------
dict
Dictionary containing PCA results
"""
# Filter numeric data and handle missing values
numeric_data = self.data[self.numeric_cols].copy()
numeric_data = numeric_data.fillna(numeric_data.median())
# Standardize if requested
if standardize:
scaler = StandardScaler()
data_scaled = scaler.fit_transform(numeric_data)
else:
data_scaled = numeric_data.values
# Run PCA
pca = PCA(n_components=min(n_components, len(self.numeric_cols)))
pca_result = pca.fit_transform(data_scaled)
# Store results
result = {
'pca_model': pca,
'pca_result': pca_result,
'explained_variance': pca.explained_variance_ratio_,
'cumulative_variance': np.cumsum(pca.explained_variance_ratio_),
'feature_importance': pd.DataFrame(
pca.components_,
columns=numeric_data.columns,
index=[f'PC{i+1}' for i in range(n_components)]
)
}
self.pca_result = pca_result
# Visualize results
# 1. Explained variance
plt.figure(figsize=(10, 5))
plt.bar(range(1, len(result['explained_variance']) + 1),
result['explained_variance'], alpha=0.7)
plt.plot(range(1, len(result['cumulative_variance']) + 1),
result['cumulative_variance'], 'r-o', alpha=0.7)
plt.grid(True, alpha=0.3)
plt.xlabel('Principal Component')
plt.ylabel('Explained Variance Ratio')
plt.title('Explained Variance by Principal Component')
plt.xticks(range(1, len(result['explained_variance']) + 1))
plt.legend(['Cumulative Explained Variance', 'Explained Variance'])
plt.tight_layout()
# 2. First two components if available
if n_components >= 2:
plt.figure(figsize=(10, 8))
plt.scatter(pca_result[:, 0], pca_result[:, 1], alpha=0.7)
plt.grid(True, alpha=0.3)
plt.xlabel(f'PC1 ({result["explained_variance"][0]:.2%} variance explained)')
plt.ylabel(f'PC2 ({result["explained_variance"][1]:.2%} variance explained)')
plt.title('PCA: First Two Principal Components')
plt.tight_layout()
return result
def cluster_data(self, n_clusters=None, max_clusters=10):
"""
Perform K-means clustering on the data, using PCA result if available.
Parameters:
-----------
n_clusters : int, optional
Number of clusters. If None, determines optimal number of clusters
max_clusters : int, default=10
Maximum number of clusters to try when determining optimal number
Returns:
--------
numpy.ndarray
Array of cluster labels
"""
# Use PCA result if available, otherwise use processed numeric data
if self.pca_result is not None:
data_for_clustering = self.pca_result
print("Using PCA results for clustering")
else:
# Filter numeric data and handle missing values
numeric_data = self.data[self.numeric_cols].copy()
numeric_data = numeric_data.fillna(numeric_data.median())
# Standardize data
scaler = StandardScaler()
data_for_clustering = scaler.fit_transform(numeric_data)
print("Using standardized numeric data for clustering")
# Determine optimal number of clusters if not specified
if n_clusters is None:
print("Finding optimal number of clusters...")
silhouette_scores = []
for k in range(2, min(max_clusters + 1, len(self.data))):
kmeans = KMeans