import pandas as pd
import numpy as np
from typing import List, Dict, Any
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
class MECFSDataset:
def __init__(self, tsv_file: str, participants_excluded: List[str]):
self.df = pd.read_csv(tsv_file, sep='\t', na_values=['na'])
self.participants = self.df['ParticipantID'].unique()
self.participants_excluded = participants_excluded
self.metrics = {
'rest': ['DBP', 'HR', 'OUES', 'PETCO2', 'PETO2', 'PP', 'RER', 'RR', 'SBP', 'VCO2', 'VO2', 'VO2_HR', 'Ve_BTPS', 'Ve_VCO2', 'Ve_VO2', 'Vt_BTPS_L'],
'AT': ['DBP', 'HR', 'PETCO2', 'PETO2', 'PP', 'RER', 'RPE', 'RPM', 'RPP', 'RR', 'SBP', 'VCO2', 'VO2', 'VO2_t', 'VO2_HR', 'Ve_BTPS', 'Ve_VCO2', 'Ve_VO2', 'Vt_BTPS_L', 'wkld', 'time_sec'],
'max': ['DBP', 'HR', 'PETCO2', 'PETO2', 'PP', 'RER', 'RPE', 'RPM', 'RPP', 'RR', 'SBP', 'VCO2', 'VO2', 'VO2_t', 'VO2_HR', 'Ve_BTPS', 'Ve_VCO2', 'Ve_VO2', 'Vt_BTPS_L', 'wkld', 'time_sec']
}
self.time_points = ['rest', 'AT', 'max']
self.demographic_metrics = ['ParticipantID', 'test_site', 'matched_pair', 'sex', 'phenotype', 'race', 'age', 'height_in', 'weight_lb', 'bmi', 'bas_score', 'q_education', 'q_reclined', 'q_sleeprefreshing', 'q_hoursinbed']
self.features = self._create_features()
def _create_features(self) -> Dict[str, Dict[str, Any]]:
features = {}
for participant in self.participants:
participant_data = self.df[self.df['ParticipantID'] == participant]
features[participant] = {
'single_day': self._get_single_day_features(participant_data),
'difference': self._get_difference_features(participant_data),
'demographics': self._get_demographic_features(participant_data),
'phenotype': participant_data['phenotype'].iloc[0]
}
return features
def _get_single_day_features(self, participant_data: pd.DataFrame) -> Dict[str, Dict[str, float]]:
single_day = {}
for day in ['D1', 'D2']:
day_data = participant_data[participant_data['Study_Visit'] == day]
single_day[day] = {}
for time_point in self.time_points:
time_point_data = day_data[day_data['Time_Point'] == time_point]
if not time_point_data.empty:
for metric in self.metrics.get(time_point, []):
single_day[day][f"{time_point}_{metric}"] = time_point_data[metric].iloc[0]
return single_day
def _get_difference_features(self, participant_data: pd.DataFrame) -> Dict[str, float]:
difference = {}
d1_data = participant_data[participant_data['Study_Visit'] == 'D1']
d2_data = participant_data[participant_data['Study_Visit'] == 'D2']
participant_id = participant_data['ParticipantID'].iloc[0]
for time_point in self.time_points:
d1_time_point = d1_data[d1_data['Time_Point'] == time_point]
d2_time_point = d2_data[d2_data['Time_Point'] == time_point]
if not d1_time_point.empty and not d2_time_point.empty:
for metric in self.metrics.get(time_point, []):
d1_value = pd.to_numeric(d1_time_point[metric].iloc[0], errors='coerce')
d2_value = pd.to_numeric(d2_time_point[metric].iloc[0], errors='coerce')
if pd.notna(d1_value) and pd.notna(d2_value):
diff_percentage = np.divide((d2_value - d1_value), d1_value, out=np.full_like(d2_value - d1_value, np.inf, dtype=np.float64), where=d1_value!=0) * 100
diff_absolute = d2_value - d1_value
# Only add max diff metrics if participant is not in excluded list
if time_point != 'max' or participant_id not in self.participants_excluded:
difference[f"{time_point}_{metric}_diff_percentage"] = diff_percentage
difference[f"{time_point}_{metric}_diff_absolute"] = diff_absolute
return difference
def _get_demographic_features(self, participant_data: pd.DataFrame) -> Dict[str, Any]:
demographics = {}
for metric in self.demographic_metrics:
demographics[metric] = participant_data[metric].iloc[0]
return demographics
def get_feature_vector(self, participant: str) -> pd.DataFrame:
participant_features = self.features[participant]
feature_vector = []
columns = []
for day in ['D1', 'D2']:
for time_point in self.time_points:
for metric in self.metrics.get(time_point, []):
feature_vector.append(participant_features['single_day'][day].get(f"{time_point}_{metric}", np.nan))
columns.append(f"{day}_{time_point}_{metric}")
for metric in self.demographic_metrics:
feature_vector.append(participant_features['demographics'].get(metric, np.nan))
columns.append(metric)
for time_point in self.time_points:
for metric in self.metrics.get(time_point, []):
feature_vector.append(participant_features['difference'].get(f"{time_point}_{metric}_diff_percentage", np.nan))
columns.append(f"{time_point}_{metric}_diff_percentage")
feature_vector.append(participant_features['difference'].get(f"{time_point}_{metric}_diff_absolute", np.nan))
columns.append(f"{time_point}_{metric}_diff_absolute")
return pd.DataFrame([feature_vector], columns=columns)
def get_feature_names(self) -> List[str]:
feature_names = []
for day in ['D1', 'D2']:
for time_point in self.time_points:
for metric in self.metrics.get(time_point, []):
feature_names.append(f"{day}_{time_point}_{metric}")
for metric in self.demographic_metrics:
feature_names.append(metric)
for time_point in self.time_points:
for metric in self.metrics.get(time_point, []):
feature_names.append(f"{time_point}_{metric}_diff_percentage")
feature_names.append(f"{time_point}_{metric}_diff_absolute")
return feature_names
def get_labels(self) -> np.array:
return np.array([1 if self.features[p]['phenotype'] == 'MECFS' else 0 for p in self.participants])
participants_excluded = [
'PI-012',
'PI-043',
'PI-147',
'PI-008',
'PI-018',
'PI-029',
'PI-057',
'PI-082',
'PI-091',
'PI-128'
]
dataset = MECFSDataset('cpet_clinical_data.tsv', participants_excluded)
feature_names = dataset.get_feature_names()
X = np.array([dataset.get_feature_vector(p) for p in dataset.participants])
y = dataset.get_labels()
# Create a DataFrame with the feature vectors and labels
feature_vectors = [dataset.get_feature_vector(p) for p in dataset.participants]
labels = dataset.get_labels()
df = pd.concat(feature_vectors, ignore_index=True)
#df['label'] = labels
# Output the DataFrame to a CSV file
df.to_csv('mecfs_dataset.csv', index=False)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.lines as lines
from sklearn.metrics import roc_curve
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import precision_recall_curve
def plot_feature(df, feature, y):
# Remove rows with NaN values for this feature
mask = ~df[feature].isna()
mask.value_counts()
X = df[feature][mask]
y_masked = y[mask]
correlation = np.corrcoef(X, y_masked)[0, 1]
X_for_roc = -X if correlation < 0 else X
precision, recall, thresholds = precision_recall_curve(y_masked, X_for_roc)
# Reverse the thresholds back to original if the feature was inverted
if correlation < 0:
thresholds = -thresholds
precision_without_last = precision[:-1]
# Get indices where precision_without_last is greater than or equal to 0.80
indices = np.where(precision_without_last >= 0.80)[0]
# Check if there are any indices and return the max recall index if found, otherwise return 0
if len(indices) > 0:
precision_80_point = indices[np.argmax(recall[indices])]
recall_80 = recall[precision_80_point]
# Calculate True Negatives (TN), False Negatives (FN), and the percentage
threshold_selected = (-thresholds[precision_80_point]) if correlation < 0 else (thresholds[precision_80_point])
y_pred = X_for_roc >= threshold_selected
TN = np.sum((y_pred == 0) & (y_masked == 0))
FP = np.sum((y_pred == 1) & (y_masked == 0))
HC_past_threshold_pct = FP/(TN+FP)
else:
precision_80_point = None
recall_80 = None
HC_past_threshold_pct = None
fig, ax1 = plt.subplots(figsize=(12, 6))
# Define bins based on the combined range of both classes
min_value = X.min()
max_value = X.max()
bins = np.linspace(min_value, max_value, 30)
# Plot the histogram (stratified by phenotype) on the primary y-axis
class_0 = X[y_masked == 0]
class_1 = X[y_masked == 1]
# Plot histograms with counts
ax1.hist(class_0, bins=bins, alpha=0.3, label='HC', color='blue', density=False)
ax1.hist(class_1, bins=bins, alpha=0.3, label='MECFS', color='red', density=False)
# Set the y-axis limit for the histogram
max_hist_class_0 = np.histogram(class_0, bins=bins)[0].max()
max_hist_class_1 = np.histogram(class_1, bins=bins)[0].max()
max_hist = max(max_hist_class_0, max_hist_class_1)
ax1.set_ylim(0, 1.1 * max_hist)
ax1.set_xlabel(f'{feature}')
ax1.set_ylabel('Count')
ax1.legend(loc='upper right')
# Create a secondary y-axis for the ROC curve
ax2 = ax1.twinx()
ax2.plot(thresholds, precision[:-1], label='Precision', color='green', linewidth=2)
ax2.plot(thresholds, recall[:-1], label='Recall', color='purple', linewidth=2)
# Set the y-axis limit for specificity and sensitivity
ax2.set_ylim(0, 1.05)
ax2.set_ylabel('Precision/Recall')
if(precision_80_point):
ax2.axvline(x=thresholds[precision_80_point], color='red', linestyle=':',
label=f'{precision[precision_80_point]*100:.1f}% precision: Recall = {recall[precision_80_point]*100:.1f}%')
ax2.text(thresholds[precision_80_point], -0.05, f'{thresholds[precision_80_point]:.2f}',
ha='center', va='top', color='black', fontsize=10, rotation=45)
# Combine legends from both axes
lines_1, labels_1 = ax1.get_legend_handles_labels()
lines_2, labels_2 = ax2.get_legend_handles_labels()
# Include class_0 percentage in the legend
if HC_past_threshold_pct is not None:
dummy_handle = lines.Line2D([0], [0], linestyle="none", marker='o', markersize=6, label=f'HC passing threshold: {HC_past_threshold_pct*100:.1f}%')
ax1.legend(lines_1 + lines_2 + [dummy_handle], labels_1 + labels_2 + [dummy_handle.get_label()], loc='upper right')
else:
ax1.legend(lines_1 + lines_2, labels_1 + labels_2, loc='upper right')
# Save the figure
plt.title(f'Specificity and Sensitivity vs Threshold: {feature}')
plt.tight_layout()
plt.savefig(f'images/{feature}_prec_rec_hist.png')
plt.close()
# Print some statistics
print(f"\nFeature: {feature}")
print(f"AUC: {np.trapz(tpr, fpr):.3f}")
if(precision_80_point):
print(f"Threshold at 80% precision: {thresholds[precision_80_point]:.3f}")
print(f"Recall at 80% precision: {recall_80*100:.1f}")
print(f'HC passing threshold: {HC_past_threshold_pct*100:.1f}%')
else:
print("No precision >= 80%")
return feature, recall_80
# Load the data
df = pd.read_csv('mecfs_dataset.csv')
labels = df['phenotype'] # Assuming 'phenotype' column exists in the CSV
# Encode labels
le = LabelEncoder()
y = le.fit_transform(labels)
# Get all difference features
diff_features = [col for col in df.columns if '_diff_' in col]
# Collect sensitivities at 90% specificity
feature_sensitivities = []
for feature in tqdm(diff_features, desc='Plotting features', unit='features'):
feature_sensitivities.append(plot_feature(df, feature, y))
feature_sensitivities = [x for x in feature_sensitivities if x[1] is not None]
# Sort by sensitivity at 90% specificity in descending order
sorted_features = sorted(feature_sensitivities, key=lambda x: x[1], reverse=True)
# Print the sorted features
print("\nFeatures sorted by recall at >= 80% specificity:")
for feature, recall_80 in sorted_features:
print(f"{feature}: {recall_80*100:.1f}% recall at >= 80% precision")