# mesalab/bluelooptools/blue_loop_cmd_plotter.py - REVISED with logging
import os
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.path import Path # Import Path
from isochrones.mist.bc import MISTBolometricCorrectionGrid
from tqdm.auto import tqdm
import logging # Import the logging module
from mpl_toolkits.axes_grid1 import make_axes_locatable
from mesalab.plotting.plot_config import DEFAULT_PLOT_CONFIG
# --- Logging Setup for this module ---
# This ensures that if the module is run directly, it has a basic logging setup.
# When run via cli.py, the root logger configured in cli.py will take precedence.
logging.basicConfig(
level=logging.WARNING, # Default for this module if run standalone; cli.py will override
format='%(asctime)s - %(levelname)s - %(name)s: %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__) # Logger for this specific module
# Initialize the bolometric correction grid ONCE when this module is loaded.
logger.info("Initializing MIST Bolometric Correction Grid... (This happens once)")
bc_grid = MISTBolometricCorrectionGrid(['J', 'H', 'K', 'G', 'BP', 'RP', 'g', 'r', 'i'])
logger.info("MIST Bolometric Correction Grid initialized.")
# Helper function to calculate BC for a single row.
def _calculate_bc_for_single_point(args):
"""
Helper function to calculate bolometric corrections for a single stellar point.
This function is intended for internal use, typically within a parallel processing context.
It handles NaN inputs and interpolation errors gracefully.
Args:
args (tuple): A tuple containing (original_idx, teff_i, logg_i, feh_i, av_i).
- original_idx: The original index of the row in the DataFrame.
- teff_i (float): Effective temperature in Kelvin.
- logg_i (float): Logarithm of surface gravity (log10(g)).
- feh_i (float): Metallicity [Fe/H].
- av_i (float): Extinction in magnitudes (assumed 0.0 for MESA).
Returns:
tuple: A tuple containing (original_idx, BC_G_val, BC_BP_val, BC_RP_val).
Returns NaN for BC values if interpolation fails or inputs are invalid.
"""
original_idx, teff_i, logg_i, feh_i, av_i = args
# Check if any input argument is NaN before attempting interpolation
if not (np.isfinite(teff_i) and np.isfinite(logg_i) and np.isfinite(feh_i) and np.isfinite(av_i)):
logger.debug(f"Skipping BC calculation for idx {original_idx} due to non-finite input: Teff={teff_i}, logg={logg_i}, feh={feh_i}, Av={av_i}")
return (original_idx, np.nan, np.nan, np.nan)
try:
BC_G_val = bc_grid.interp([teff_i, logg_i, feh_i, av_i], ['G'])[0]
BC_BP_val = bc_grid.interp([teff_i, logg_i, feh_i, av_i], ['BP'])[0]
BC_RP_val = bc_grid.interp([teff_i, logg_i, feh_i, av_i], ['RP'])[0]
return (original_idx, BC_G_val, BC_BP_val, BC_RP_val)
except Exception as e:
# Log the error for this specific point, but continue processing others
logger.debug(f"Error interpolating BC for idx {original_idx} (Teff={teff_i}, logg={logg_i}, feh={feh_i}, Av={av_i}): {e}")
return (original_idx, np.nan, np.nan, np.nan)
[docs]
def z_to_feh(Z):
"""
Converts metallicity Z to [Fe/H] (logarithmic iron-to-hydrogen ratio).
Uses a solar metallicity (Z_sun = 0.0152). Returns NaN for non-positive Z values.
Args:
Z (float): The metallicity value (mass fraction of elements heavier than H and He).
Returns:
float: The [Fe/H] value, or np.nan if Z is not positive.
Example:
>>> from mesalab.bluelooptools import z_to_feh
>>> z_to_feh(0.0152)
0.0
>>> z_to_feh(0.0076)
-0.3010299956639812
>>> z_to_feh(0)
nan
"""
Z_sun = 0.0152
if Z <= 0: # Z cannot be zero or negative for log10 calculation and physical reasons
logger.debug(f"Invalid Z value (<=0) for Fe/H conversion: {Z}. Returning NaN.")
return np.nan
return np.log10(Z / Z_sun)
[docs]
def generate_blue_loop_plots_with_bc(combined_df_all_data, output_dir, output_type_label="all_blue_loop_data", plot_cfg=None):
"""
Analyzes MESA history data for stellar blue loop characteristics and Instability Strip crossings,
and generates HRD, CMD, and LogL-LogG plots with bolometric corrections.
This function expects a DataFrame that combines MESA history data from one or more
evolutionary tracks. It requires specific columns to perform its analysis and plotting.
Args:
combined_df_all_data (pandas.DataFrame): DataFrame containing combined MESA history data.
Required columns are:
- 'log_Teff' (Logarithm of effective temperature)
- 'log_L' (Logarithm of luminosity in solar units)
- 'log_g' (Logarithm of surface gravity)
- 'initial_Z' (or 'Z' as a fallback, Initial metallicity)
- Other MESA history columns are useful but not strictly required.
output_dir (str): The directory where the generated plots will be saved.
output_type_label (str): A label used in the filenames to categorize the output plots.
plot_cfg (dict, optional): A dictionary of plotting configurations. If None,
default configurations from 'plot_config.py' will be used.
Example:
>>> import pandas as pd
>>> import os
>>> from mesalab.bluelooptools import generate_blue_loop_plots_with_bc
>>> # Create a simple, yet realistic, dummy DataFrame.
>>> # The rows simulate a short evolutionary phase.
>>> dummy_data = {
... 'log_Teff': [3.78, 3.82, 3.75],
... 'log_L': [2.8, 3.0, 3.2],
... 'log_g': [3.5, 3.2, 3.0],
... 'initial_Z': [0.008, 0.008, 0.008]
... }
>>> dummy_df = pd.DataFrame(dummy_data)
>>> # Call the function with the data and an output directory.
>>> generate_blue_loop_plots_with_bc(dummy_df, "temp_plot_output")
Calculating BCs serially: 100%|███████████████████████████████████████| 3/3 [00:04<00:00, 1.65s/it]
"""
if plot_cfg is None:
from mesalab.plotting.plot_config import DEFAULT_PLOT_CONFIG
plot_cfg = DEFAULT_PLOT_CONFIG
if not os.path.exists(output_dir):
os.makedirs(output_dir)
logger.info(f"Created output directory: '{output_dir}'")
logger.info(f"Generating combined blue loop HRD, CMD, and LogL-LogG plots (with BCs) to '{output_dir}'...")
try:
from importlib.metadata import version
isochrones_version = version('isochrones')
logger.debug(f"Installed isochrones version: {isochrones_version}")
except Exception:
logger.warning("isochrones package not found. Bolometric corrections may be affected.")
if combined_df_all_data.empty:
logger.warning("No data provided for plotting. Skipping plot generation.")
return
if 'Z' in combined_df_all_data.columns and 'initial_Z' not in combined_df_all_data.columns:
combined_df_all_data.rename(columns={'Z': 'initial_Z'}, inplace=True)
logger.debug("Renamed 'Z' column to 'initial_Z' for consistency.")
elif 'initial_Z' not in combined_df_all_data.columns:
logger.error(f"Neither 'Z' nor 'initial_Z' column found in data. Cannot proceed with BC calculations or plotting.")
return
# --- DEBUG: Initial data status ---
logger.debug(f"Initial combined_df_all_data size: {len(combined_df_all_data)} rows.")
logger.debug(f"Initial non-NaN counts in critical columns:")
logger.debug(f" log_Teff: {combined_df_all_data['log_Teff'].count()} / {len(combined_df_all_data)}")
logger.debug(f" log_g: {combined_df_all_data['log_g'].count()} / {len(combined_df_all_data)}")
logger.debug(f" initial_Z: {combined_df_all_data['initial_Z'].count()} / {len(combined_df_all_data)}")
problematic_z_count = combined_df_all_data[combined_df_all_data['initial_Z'] <= 0]['initial_Z'].count()
if problematic_z_count > 0:
logger.warning(f"{problematic_z_count} points have initial_Z <= 0 and will result in NaN for [Fe/H].")
logger.debug(f"Sample initial_Z values: {combined_df_all_data['initial_Z'].head().to_string(float_format='%.4e')}")
logger.debug(f"Unique initial_Z values: {combined_df_all_data['initial_Z'].unique()}")
# --- END DEBUG: Initial data status ---
combined_df_all_data['Mbol'] = np.nan
combined_df_all_data['M_G'] = np.nan
combined_df_all_data['M_BP'] = np.nan
combined_df_all_data['M_RP'] = np.nan
combined_df_all_data['G_BP_minus_G_RP'] = np.nan
# Filter for BC calculation: only rows with finite Teff, logg, and positive Z
bc_input_df_filtered = combined_df_all_data.dropna(subset=['log_Teff', 'log_g', 'initial_Z']).copy()
bc_input_df_filtered = bc_input_df_filtered[bc_input_df_filtered['initial_Z'] > 0].copy() # Ensure positive Z
logger.debug(f"bc_input_df_filtered size: {len(bc_input_df_filtered)} rows after dropna and positive Z filter.")
if bc_input_df_filtered.empty:
logger.warning("No valid stellar data (log_Teff, log_g, initial_Z > 0) for BC calculation. Skipping BC and magnitude calculations.")
else:
tasks = []
logger.debug("Preparing tasks for BC calculation...")
for original_idx, row in bc_input_df_filtered.iterrows():
teff_i = 10**row['log_Teff']
logg_i = row['log_g']
feh_i = z_to_feh(row['initial_Z']) # This will be NaN if Z <= 0, handled by _calculate_bc_for_single_point
av_i = 0.0 # Assuming no extinction
tasks.append((original_idx, teff_i, logg_i, feh_i, av_i))
logger.debug(f"Number of tasks prepared for BC calculation: {len(tasks)}")
logger.info("Starting serial bolometric correction calculation...")
raw_results = []
for task in tqdm(tasks, desc="Calculating BCs serially"):
raw_results.append(_calculate_bc_for_single_point(task))
logger.info("Serial calculation completed.")
logger.debug(f"Processing BC calculation results (length: {len(raw_results)}).")
results_df = pd.DataFrame(raw_results, columns=['original_idx', 'BC_G_val', 'BC_BP_val', 'BC_RP_val'])
results_df.set_index('original_idx', inplace=True)
# Assign calculated BC values back to the original DataFrame
combined_df_all_data['BC_G'] = results_df['BC_G_val']
combined_df_all_data['BC_BP'] = results_df['BC_BP_val']
combined_df_all_data['BC_RP'] = results_df['BC_RP_val']
valid_bc_calculated_count = combined_df_all_data['BC_G'].count()
logger.debug(f"Total BCs calculated (non-NaN BC_G values): {valid_bc_calculated_count} points.")
logger.debug(f"Shape of combined_df_all_data AFTER BC application: {combined_df_all_data.shape}")
logger.debug(f"Number of non-NaN BC_G values after application: {combined_df_all_data['BC_G'].count()}")
# Calculate absolute magnitudes and color
combined_df_all_data['Mbol'] = -2.5 * combined_df_all_data['log_L'] + 4.74 # Mbol is independent of BCs, depends on log_L
combined_df_all_data['M_G'] = combined_df_all_data['Mbol'] - combined_df_all_data['BC_G']
combined_df_all_data['M_BP'] = combined_df_all_data['Mbol'] - combined_df_all_data['BC_BP']
combined_df_all_data['M_RP'] = combined_df_all_data['Mbol'] - combined_df_all_data['BC_RP']
combined_df_all_data['G_BP_minus_G_RP'] = combined_df_all_data['M_BP'] - combined_df_all_data['M_RP']
logger.debug(f"Shape of combined_df_all_data AFTER magnitude calculations: {combined_df_all_data.shape}")
logger.debug(f"Number of non-NaN Mbol values after calculation: {combined_df_all_data['Mbol'].count()}")
logger.debug(f"Number of non-NaN M_G values after calculation: {combined_df_all_data['M_G'].count()}")
logger.debug(f"Number of non-NaN G_BP_minus_G_RP values after calculation: {combined_df_all_data['G_BP_minus_G_RP'].count()}")
logger.info("\nGenerating HRD plot...")
fig_hrd, ax_hrd = plt.subplots(
figsize=plot_cfg["figure"]["figsize"],
facecolor=plot_cfg["figure"]["facecolor"]
)
hrd_plot_df = combined_df_all_data.dropna(subset=['log_Teff', 'log_L', 'initial_Z'])
if not hrd_plot_df.empty:
hrd_min_z = hrd_plot_df['initial_Z'].min()
hrd_max_z = hrd_plot_df['initial_Z'].max()
norm_hrd = plt.Normalize(vmin=hrd_min_z, vmax=hrd_max_z)
sc = ax_hrd.scatter(
hrd_plot_df['log_Teff'],
hrd_plot_df['log_L'],
c=hrd_plot_df['initial_Z'],
cmap=plot_cfg["scatter"]["cmap"],
norm=norm_hrd,
s=plot_cfg["scatter"]["dot_size"],
alpha=plot_cfg["scatter"]["alpha"],
)
instability_strip_vertices = np.array([
[3.83, 2.4],
[3.76, 4.5],
[3.65, 4.5],
[3.77, 2.4]
])
instability_path = Path(instability_strip_vertices)
instability_patch = patches.PathPatch(
instability_path, facecolor='gray', edgecolor='black',
lw=2, alpha=0.3, label='Instability Strip'
)
ax_hrd.add_patch(instability_patch)
ax_hrd.set_xlabel(r'$\log_{10} (T_{\mathrm{eff}}/K)$', fontsize=plot_cfg["axes"]["label_size"])
ax_hrd.set_ylabel(r'$\log_{10} (L/L_{\odot})$', fontsize=plot_cfg["axes"]["label_size"])
ax_hrd.set_title(f'Combined HR Diagram (All Z)', fontsize=plot_cfg["axes"]["title_size"])
ax_hrd.invert_xaxis()
ax_hrd.grid(
plot_cfg["axes"]["grid"],
linestyle=plot_cfg["axes"]["grid_style"],
alpha=plot_cfg["axes"]["grid_alpha"]
)
ax_hrd.legend(loc='upper right')
divider = make_axes_locatable(ax_hrd)
cax = divider.append_axes(
"right",
size=plot_cfg["colorbar"]["size"],
pad=plot_cfg["colorbar"]["padding"]
)
cbar = fig_hrd.colorbar(sc, cax=cax)
cbar.set_label(
"Initial Z",
fontsize=plot_cfg["colorbar"]["label_size"]
)
hrd_path = os.path.join(output_dir, f'HRD_{output_type_label}.png')
# fig_hrd.subplots_adjust(left=0.06, right=0.98, top=0.96, bottom=0.08)
fig_hrd.tight_layout()
fig_hrd.savefig(
hrd_path,
dpi=plot_cfg["figure"]["dpi"],
bbox_inches="tight",
pad_inches=0.05
)
plt.close(fig_hrd)
logger.info(f"Saved HRD BLABLAB plot: {hrd_path}")
else:
plt.close(fig_hrd)
logger.warning("No valid HRD plot for combined data after dropping NaNs.")
# --- CMD Plot ---
logger.info("\nGenerating CMD plot...")
fig_cmd, ax_cmd = plt.subplots(
figsize=plot_cfg["figure"]["figsize"],
facecolor=plot_cfg["figure"]["facecolor"]
)
cmd_plot_df = combined_df_all_data.dropna(subset=['G_BP_minus_G_RP', 'M_G', 'initial_Z'])
logger.debug(f"CMD plot DataFrame size after dropna for plotting: {len(cmd_plot_df)} rows")
if not cmd_plot_df.empty:
# Define min_z and max_z specific to the CMD plot data
cmd_min_z = cmd_plot_df['initial_Z'].min()
cmd_max_z = cmd_plot_df['initial_Z'].max()
logger.debug(f"CMD Plot Z range (from cmd_plot_df): min_Z={cmd_min_z:.4e}, max_Z={cmd_max_z:.4e}") # Added debug print
norm_cmd = plt.Normalize(vmin=cmd_min_z, vmax=cmd_max_z)
sc = ax_cmd.scatter(
cmd_plot_df['G_BP_minus_G_RP'],
cmd_plot_df['M_G'],
c=cmd_plot_df['initial_Z'],
cmap=plot_cfg["scatter"]["cmap"],
norm=norm_cmd,
s=plot_cfg["scatter"]["dot_size"],
alpha=plot_cfg["scatter"]["alpha"],
)
ax_cmd.set_xlabel(r'$M_{BP} - M_{RP}$', fontsize=plot_cfg["axes"]["label_size"])
ax_cmd.set_ylabel(r'$M_G$', fontsize=plot_cfg["axes"]["label_size"])
ax_cmd.set_title(f'Combined CMD (Gaia) (All Z)', fontsize=plot_cfg["axes"]["title_size"])
ax_cmd.invert_yaxis()
divider = make_axes_locatable(ax_cmd)
cax = divider.append_axes(
"right",
size=plot_cfg["colorbar"]["size"],
pad=plot_cfg["colorbar"]["padding"]
)
cbar = fig_cmd.colorbar(sc, cax=cax)
cbar.set_label(
"Initial Z",
fontsize=plot_cfg["colorbar"]["label_size"]
)
cmd_path = os.path.join(output_dir, f'CMD_Gaia_{output_type_label}.png')
fig_cmd.tight_layout()
fig_cmd.savefig(
cmd_path,
dpi=plot_cfg["figure"]["dpi"],
bbox_inches="tight",
pad_inches=0.05
)
plt.close(fig_cmd)
logger.info(f"Saved CMD plot: {cmd_path}")
else:
plt.close(fig_cmd)
logger.warning(f"No valid CMD plots for combined data after dropping NaNs in G_BP_minus_G_RP or M_G.")
# --- LogL vs LogG Plot ---
logger.info("\nGenerating LogL vs LogG plot...")
fig_logg, ax_logg = plt.subplots(
figsize=plot_cfg["figure"]["figsize"],
facecolor=plot_cfg["figure"]["facecolor"]
)
logg_plot_df = combined_df_all_data.dropna(subset=['log_g', 'log_L', 'initial_Z'])
logger.debug(f"LogL vs LogG plot DataFrame size after dropna for plotting: {len(logg_plot_df)} rows")
if not logg_plot_df.empty:
# Define min_z and max_z specific to the LogL vs LogG plot data
logg_min_z = logg_plot_df['initial_Z'].min()
logg_max_z = logg_plot_df['initial_Z'].max()
logger.debug(f"LogL vs LogG Plot Z range (from logg_plot_df): min_Z={logg_min_z:.4e}, max_Z={logg_max_z:.4e}") # Added debug print
norm_logg = plt.Normalize(vmin=logg_min_z, vmax=logg_max_z)
sc = ax_logg.scatter(
logg_plot_df['log_g'],
logg_plot_df['log_L'],
c=logg_plot_df['initial_Z'],
cmap=plot_cfg["scatter"]["cmap"],
norm=norm_logg,
s=plot_cfg["scatter"]["dot_size"],
alpha=plot_cfg["scatter"]["alpha"],
)
ax_logg.set_xlabel(r'$\log_{10} g$', fontsize=plot_cfg["axes"]["label_size"])
ax_logg.set_ylabel(r'$\log_{10} (L/L_{\odot})$', fontsize=plot_cfg["axes"]["label_size"])
ax_logg.set_title(f'Combined LogL vs LogG (All Z)', fontsize=plot_cfg["axes"]["title_size"])
ax_logg.invert_xaxis()
divider = make_axes_locatable(ax_logg)
cax = divider.append_axes(
"right",
size=plot_cfg["colorbar"]["size"],
pad=plot_cfg["colorbar"]["padding"]
)
cbar = fig_logg.colorbar(sc, cax=cax)
cbar.set_label(
"Initial Z",
fontsize=plot_cfg["colorbar"]["label_size"]
)
fig_logg.tight_layout()
logg_path = os.path.join(output_dir, f'LogL_LogG_{output_type_label}.png')
fig_logg.savefig(
logg_path,
dpi=plot_cfg["figure"]["dpi"],
bbox_inches="tight",
pad_inches=0.05
)
plt.close(fig_logg)
logger.info(f"Saved LogL vs LogG plot: {logg_path}")
else:
plt.close(fig_logg)
logger.warning(f"No valid LogL vs LogG plots for combined data after dropping NaNs.")
logger.info("\nAll combined plots generated.")
[docs]
def load_and_group_data(input_dir):
"""
This function expects CSV files to contain stellar evolution data.
It checks for 'initial_Z' or 'Z' and 'initial_mass' columns.
Args:
input_dir (str): The path to the directory containing the CSV files.
Returns:
pd.DataFrame: A concatenated DataFrame of all valid CSV data, or an empty
DataFrame if no files are found or critical columns are missing.
Example:
>>> import pandas as pd
>>> import os
>>> import tempfile
>>> from mesalab.bluelooptools import blue_loop_cmd_plotter as bcmd
>>> # Create a temporary directory and some dummy CSV files for the example
>>> with tempfile.TemporaryDirectory() as temp_dir:
... # File 1: Correct data
... df1 = pd.DataFrame({'initial_mass': [1.0], 'initial_Z': [0.014], 'log_Teff': [3.7]})
... df1.to_csv(os.path.join(temp_dir, 'run_1.csv'), index=False)
... # File 2: Missing 'initial_Z', but has 'Z'
... df2 = pd.DataFrame({'initial_mass': [1.5], 'Z': [0.008], 'log_Teff': [3.8]})
... df2.to_csv(os.path.join(temp_dir, 'run_2.csv'), index=False)
... # Load and process the data
... combined_df = bcmd.load_and_group_data(temp_dir)
... # The combined DataFrame should have 2 rows and 3 columns
... print(f"Combined DataFrame shape: {combined_df.shape}")
... print(f"Columns: {combined_df.columns.tolist()}")
... print(f"First row's mass: {combined_df['initial_mass'].iloc[0]}")
...
Combined DataFrame shape: (2, 4)
Columns: ['initial_mass', 'initial_Z', 'log_Teff']
First row's mass: 1.0
"""
all_dfs = []
logger.info(f"Loading CSV files from '{input_dir}'...")
for filename in os.listdir(input_dir):
if filename.lower().endswith('.csv'):
filepath = os.path.join(input_dir, filename)
try:
df = pd.read_csv(filepath)
all_dfs.append(df)
logger.info(f"Loaded {filename} with {len(df)} rows.")
except Exception as e:
logger.error(f"Error loading {filename}: {e}")
if not all_dfs:
logger.error(f"No CSV files loaded from '{input_dir}'. Returning empty DataFrame.")
return pd.DataFrame()
full_df = pd.concat(all_dfs, ignore_index=True)
logger.info(f"All CSVs concatenated. Total rows: {len(full_df)}.")
if 'initial_Z' not in full_df.columns and 'Z' not in full_df.columns:
logger.error(f"Neither 'initial_Z' nor 'Z' column found in data. Please check your CSVs.")
return pd.DataFrame()
if 'initial_mass' not in full_df.columns:
logger.error(f"'initial_mass' column not found in data. Please check your CSVs.")
return pd.DataFrame()
if 'Z' in full_df.columns and 'initial_Z' not in full_df.columns:
full_df.rename(columns={'Z': 'initial_Z'}, inplace=True)
logger.debug("Renamed 'Z' column to 'initial_Z' for consistency.")
return full_df
if __name__ == "__main__":
# When this module is run directly, set a default output path for testing
input_data_directory = "./blue_loop_data"
output_plots_directory = "./blue_loop_plots"
# Ensure a basic logging config exists if run directly for debugging
# (this will be overridden by cli.py's config when integrated)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(name)s: %(message)s', handlers=[logging.StreamHandler(sys.stdout)])
logger = logging.getLogger(__name__) # Re-get logger after potential basicConfig for __main__
# Example usage
combined_data_for_plotting = load_and_group_data(input_data_directory)
if not combined_data_for_plotting.empty:
generate_blue_loop_plots_with_bc(combined_data_for_plotting, output_plots_directory)
else:
logger.error("No valid data to plot. Exiting.")