Source code for openquake.vmtk.imselection

"""
Intensity Measure (IM) Selection Framework

Implements the Relative Sufficiency Measure (RSM) and associated efficiency /
proficiency metrics from:

    Ebrahimian, H. & Jalayer, F. (2021). "Selection of seismic intensity
    measures for prescribed limit states using alternative nonlinear dynamic
    analysis methods." Earthquake Engineering & Structural Dynamics, 50(5),
    1235–1258. https://doi.org/10.1002/eqe.3393

The RSM quantifies, in bits of information, how much more structural-demand
information IM₂ provides relative to IM₁.  A positive RSM(IM₂ vs IM₁) means
IM₂ is the more sufficient intensity measure.

Two nonlinear dynamic analysis procedures (NDAPs) are supported:
    - MCA  — Modified Cloud Analysis (``postprocessor.process_mca_results``)
    - IDA  — Incremental Dynamic Analysis (``postprocessor.process_ida_results``)
"""

import warnings

import numpy as np
import pandas as pd
from scipy.stats import norm

# ---------------------------------------------------------------------------
# Module-level numerical floor constants
# ---------------------------------------------------------------------------
_PDF_FLOOR = 1e-300   # floor for any probability density before log
_BETA_FLOOR = 1e-6    # floor for any dispersion value
_IM_FLOOR = 1e-10     # floor for IM values before log-transform


class imselection:
    """
    Intensity Measure selection tools based on the RSM framework.

    All methods are stateless; instantiate once and reuse freely::

        ims = imselection()
        eff = ims.compute_efficiency_mca(cloud_dict_pga)
        rsm = ims.compute_rsm_mca(cloud_dict_pga, cloud_dict_sa)
    """

    # -----------------------------------------------------------------------
    # Private helpers
    # -----------------------------------------------------------------------

    def _lognormal_pdf(self, x, mu_ln, beta):
        """
        Lognormal probability density function.

        Parameters
        ----------
        x : array_like
            Values at which to evaluate the PDF (must be positive).
        mu_ln : array_like
            Mean of ln(x).
        beta : array_like
            Standard deviation of ln(x).  Clipped to ``_BETA_FLOOR``.

        Returns
        -------
        ndarray
            PDF values, floored at ``_PDF_FLOOR``.
        """
        x = np.maximum(np.asarray(x, dtype=float), _IM_FLOOR)
        beta = np.maximum(np.asarray(beta, dtype=float), _BETA_FLOOR)
        z = (np.log(x) - mu_ln) / beta
        pdf = norm.pdf(z) / (x * beta)
        return np.maximum(pdf, _PDF_FLOOR)

    def _pnoc_logistic(self, ln_im, alpha0, alpha1):
        """
        Probability of non-collapse from a logistic regression model.

        ``P(NoC | IM) = 1 − sigmoid(α₀ + α₁ · ln(IM))``

        Parameters
        ----------
        ln_im : array_like
            Natural log of the intensity measure values.
        alpha0 : float
            Logistic regression intercept.
        alpha1 : float
            Logistic regression slope on ln(IM).

        Returns
        -------
        ndarray
            P(NoC | IM) in [0, 1].
        """
        logit = np.clip(alpha0 + alpha1 * np.asarray(ln_im, dtype=float),
                        -500.0, 500.0)
        return 1.0 - 1.0 / (1.0 + np.exp(-logit))

    def _extract_ida_stats_at_demands(self, ida_dict, demand_values):
        """
        Interpolate IDA ensemble statistics and per-record IM at given demands.

        For each record k the method returns:
        * ``eta[k]``           — ensemble median IM at ``demand_values[k]``
        * ``beta[k]``          — ensemble dispersion (0.5·ln(p84/p16)) at that demand
        * ``im_per_record[k]`` — IM from record k's own IDA curve at ``demand_values[k]``

        A record is flagged as *valid* only when the demand falls within the
        tabulated EDP range, p84 > p16 > 0, and the per-record IM is positive.

        Parameters
        ----------
        ida_dict : dict
            Output of ``postprocessor.process_ida_results``.
        demand_values : array_like
            EDP level for each record, shape ``(n_records,)``.

        Returns
        -------
        eta : ndarray, shape (n_records,)
        beta : ndarray, shape (n_records,)
        im_per_record : ndarray, shape (n_records,)
        valid : ndarray of bool, shape (n_records,)
        """
        edp_axis = np.asarray(ida_dict['stats']['fitted_edps'])
        median_im_arr = np.asarray(ida_dict['stats']['median_im'])
        p16_im_arr = np.asarray(ida_dict['stats']['p16_im'])
        p84_im_arr = np.asarray(ida_dict['stats']['p84_im'])
        raw_curves = ida_dict['ida_inputs']['raw_curves']

        demand_values = np.asarray(demand_values, dtype=float)
        n = len(demand_values)

        eta = np.full(n, np.nan)
        beta = np.full(n, np.nan)
        im_per_record = np.full(n, np.nan)
        valid = np.zeros(n, dtype=bool)

        d_min, d_max = edp_axis[0], edp_axis[-1]

        for k, dk in enumerate(demand_values):
            if not (d_min <= dk <= d_max):
                continue
            eta[k] = np.interp(dk, edp_axis, median_im_arr)
            p16k = np.interp(dk, edp_axis, p16_im_arr)
            p84k = np.interp(dk, edp_axis, p84_im_arr)
            if p84k <= p16k or p16k <= 0:
                continue
            beta[k] = max(0.5 * np.log(p84k / p16k), _BETA_FLOOR)

            # Per-record IM at this demand
            curve_edp = np.asarray(raw_curves[k]['edp'])
            curve_im = np.asarray(raw_curves[k]['im'])
            if dk <= curve_edp[-1] and len(curve_edp) > 1:
                im_k = np.interp(dk, curve_edp, curve_im)
                if im_k > 0 and eta[k] > 0:
                    im_per_record[k] = im_k
                    valid[k] = True

        return eta, beta, im_per_record, valid

    def _validate_cloud_dict(self, cloud_dict, label='cloud_dict'):
        """
        Raise ``ValueError`` if *cloud_dict* is missing required regression keys.

        Parameters
        ----------
        cloud_dict : dict
            Output of ``postprocessor.process_mca_results``.
        label : str
            Name used in error messages.
        """
        if 'regression' not in cloud_dict:
            raise ValueError(
                f"{label} is missing the 'regression' key. "
                "Run process_mca_results first."
            )
        reg = cloud_dict['regression']
        required = ('b1', 'b0', 'sigma', 'alpha0', 'alpha1')
        missing = [k for k in required if reg.get(k) is None]
        if missing:
            raise ValueError(
                f"{label}['regression'] is missing keys: {missing}. "
                "Ensure postprocessor.py includes alpha0/alpha1 in regression."
            )

    # -----------------------------------------------------------------------
    # Efficiency
    # -----------------------------------------------------------------------

[docs] def compute_efficiency_mca(self, cloud_dict): """ Classic efficiency βD|IM from a Modified Cloud Analysis result. Efficiency is defined as the standard deviation of the residuals of the log-linear cloud regression, σ (sigma). A lower value indicates a more efficient intensity measure. Parameters ---------- cloud_dict : dict Output of ``postprocessor.process_mca_results``. Returns ------- dict with keys: * ``'beta_D_given_IM'`` — regression residual sigma * ``'method'`` — ``'MCA'`` """ self._validate_cloud_dict(cloud_dict, 'cloud_dict') sigma = cloud_dict['regression']['sigma'] return {'beta_D_given_IM': float(sigma), 'method': 'MCA'}
[docs] def compute_efficiency_ida(self, ida_dict, ds_index=0): """ Classic efficiency βD|IM from an Incremental Dynamic Analysis result. Uses the record-to-record dispersion of the fragility curve at the specified damage state. Parameters ---------- ida_dict : dict Output of ``postprocessor.process_ida_results``. ds_index : int, optional Damage-state index (0 = first / least severe). Default ``0``. Returns ------- dict with keys: * ``'beta_D_given_IM'`` — sigma_record2record at damage state *ds_index* * ``'method'`` — ``'IDA'`` """ sigmas = ida_dict['fragility']['sigma_record2record'] if ds_index >= len(sigmas): raise ValueError( f"ds_index={ds_index} out of range " f"(fragility has {len(sigmas)} damage states)." ) return {'beta_D_given_IM': float(sigmas[ds_index]), 'method': 'IDA'}
# ----------------------------------------------------------------------- # Practicality # ----------------------------------------------------------------------- def compute_practicality_mca(self, cloud_dict): """ Practicality measure from a Modified Cloud Analysis result. Practicality = slope *b* from the log-linear regression ``log(EDP) = b₀ + b · log(IM)``. A higher slope indicates a stronger IM–EDP correlation and therefore a more practical intensity measure. Parameters ---------- cloud_dict : dict Output of ``postprocessor.process_mca_results``. Returns ------- dict with keys: * ``'b_slope'`` — regression slope *b* (higher is better) * ``'method'`` — ``'MCA'`` """ self._validate_cloud_dict(cloud_dict, 'cloud_dict') return {'b_slope': float(cloud_dict['regression']['b1']), 'method': 'MCA'} def compute_practicality_ida(self, ida_dict): """ Practicality measure from an Incremental Dynamic Analysis result. Estimated as the OLS slope of ``log(EDP)`` on ``log(IM)`` along the median IDA curve, mirroring the MCA log-linear regression convention. A higher slope indicates a stronger IM–EDP relationship. Parameters ---------- ida_dict : dict Output of ``postprocessor.process_ida_results``. Returns ------- dict with keys: * ``'b_slope'`` — log-log slope (higher is better); NaN if the median curve has fewer than 2 positive-valued points * ``'method'`` — ``'IDA'`` """ edp_axis = np.asarray(ida_dict['stats']['fitted_edps']) median_im = np.asarray(ida_dict['stats']['median_im']) valid = (edp_axis > _IM_FLOOR) & (median_im > _IM_FLOOR) if valid.sum() < 2: return {'b_slope': float(np.nan), 'method': 'IDA'} slope, _ = np.polyfit(np.log(median_im[valid]), np.log(edp_axis[valid]), 1) return {'b_slope': float(slope), 'method': 'IDA'} # ----------------------------------------------------------------------- # Proficiency # -----------------------------------------------------------------------
[docs] def compute_proficiency_mca(self, cloud_dict, ds_index=0): """ Proficiency measure βIM|DCR=1 from a Modified Cloud Analysis result. Proficiency is estimated as ``0.5 · ln(im84 / im16)`` where im16 and im84 are the IM values at which the fragility curve reaches 16 % and 84 % exceedance probability respectively. An additional analytical estimate ``'beta_formula'`` = σ / b is returned as a cross-check (it equals the proficiency exactly when no records collapse, i.e., under the plain Cloud Analysis assumption). Parameters ---------- cloud_dict : dict Output of ``postprocessor.process_mca_results``. ds_index : int, optional Damage-state index. Default ``0``. Returns ------- dict with keys: * ``'beta_IM_given_DCRLS1'`` — proficiency (NaN if curve does not reach 0.16 or 0.84) * ``'beta_formula'`` — analytical cross-check σ / b * ``'im16'`` — IM at 16 % exceedance (NaN if not reached) * ``'im84'`` — IM at 84 % exceedance (NaN if not reached) * ``'method'`` — ``'MCA'`` """ self._validate_cloud_dict(cloud_dict, 'cloud_dict') intensities = np.asarray(cloud_dict['fragility']['intensities']) poes = np.asarray(cloud_dict['fragility']['poes']) if poes.ndim == 1: poe_curve = poes else: if ds_index >= poes.shape[1]: raise ValueError( f"ds_index={ds_index} out of range " f"(fragility has {poes.shape[1]} damage states)." ) poe_curve = poes[:, ds_index] im16 = float(np.nan) im84 = float(np.nan) beta_prof = float(np.nan) if poe_curve.min() < 0.16 < poe_curve.max(): im16 = float(np.interp(0.16, poe_curve, intensities)) else: warnings.warn( "Fragility curve does not reach 16 % exceedance for " f"ds_index={ds_index}. Returning NaN for im16." ) if poe_curve.min() < 0.84 < poe_curve.max(): im84 = float(np.interp(0.84, poe_curve, intensities)) else: warnings.warn( "Fragility curve does not reach 84 % exceedance for " f"ds_index={ds_index}. Returning NaN for im84." ) if np.isfinite(im16) and np.isfinite(im84) and im84 > im16 > 0: beta_prof = float(0.5 * np.log(im84 / im16)) sigma = cloud_dict['regression']['sigma'] b = cloud_dict['regression']['b1'] beta_formula = float(sigma / b) if b != 0 else float(np.nan) return { 'beta_IM_given_DCRLS1': beta_prof, 'beta_formula': beta_formula, 'im16': im16, 'im84': im84, 'method': 'MCA', }
[docs] def compute_proficiency_ida(self, ida_dict, ds_index=0): """ Proficiency measure βIM|DCR=1 from an Incremental Dynamic Analysis result. Proficiency is estimated as ``0.5 · ln(im84 / im16)`` where im16 and im84 are the 16th and 84th percentile IDA curves evaluated at the damage threshold for the specified damage state. Parameters ---------- ida_dict : dict Output of ``postprocessor.process_ida_results``. ds_index : int, optional Damage-state index. Default ``0``. Returns ------- dict with keys: * ``'beta_IM_given_DCRLS1'`` — proficiency (NaN if p16 ≥ p84) * ``'im16'`` — IM at 16th percentile for this DS * ``'im84'`` — IM at 84th percentile for this DS * ``'method'`` — ``'IDA'`` """ damage_thresholds = ida_dict['ida_inputs']['damage_thresholds'] if ds_index >= len(damage_thresholds): raise ValueError( f"ds_index={ds_index} out of range " f"(IDA has {len(damage_thresholds)} damage states)." ) edp_axis = np.asarray(ida_dict['stats']['fitted_edps']) p16_arr = np.asarray(ida_dict['stats']['p16_im']) p84_arr = np.asarray(ida_dict['stats']['p84_im']) ds_edp = damage_thresholds[ds_index] im16 = float(np.interp(ds_edp, edp_axis, p16_arr)) im84 = float(np.interp(ds_edp, edp_axis, p84_arr)) if im84 <= im16 or im16 <= 0: warnings.warn( f"p84 ≤ p16 at damage threshold for ds_index={ds_index}. " "Returning NaN for proficiency." ) return { 'beta_IM_given_DCRLS1': float(np.nan), 'im16': im16, 'im84': im84, 'method': 'IDA', } beta_prof = float(0.5 * np.log(im84 / im16)) return { 'beta_IM_given_DCRLS1': beta_prof, 'im16': im16, 'im84': im84, 'method': 'IDA', }
# ----------------------------------------------------------------------- # RSM — Modified Cloud Analysis (Eq. 5) # -----------------------------------------------------------------------
[docs] def compute_rsm_mca(self, cloud_dict_im1, cloud_dict_im2): """ Relative Sufficiency Measure between two IMs using Modified Cloud Analysis. Implements Equation 5 of Ebrahimian & Jalayer (2021). A positive RSM value means IM₂ is more sufficient than IM₁ for characterising structural demand. **API contract:** *cloud_dict_im1* and *cloud_dict_im2* must have been computed from **the same N ground-motion records, in the same order**, with the **same** ``censored_limit`` and ``lower_limit`` parameters. Under these conditions the EDP-based collapse flag and lower-EDP filter are identical for both IMs, so ``raw_data['im_nc']`` arrays are positionally aligned. Parameters ---------- cloud_dict_im1 : dict MCA result for IM₁ (reference IM). cloud_dict_im2 : dict MCA result for IM₂ (candidate IM). Returns ------- dict with keys: * ``'rsm'`` — scalar RSM in bits (positive → IM₂ better) * ``'rsm_per_record'`` — ndarray of per-record log₂ ratios, shape (N,) * ``'n_records'`` — number of non-collapse records used * ``'method'`` — ``'MCA'`` * ``'params_im1'`` — regression parameters used for IM₁ * ``'params_im2'`` — regression parameters used for IM₂ """ self._validate_cloud_dict(cloud_dict_im1, 'cloud_dict_im1') self._validate_cloud_dict(cloud_dict_im2, 'cloud_dict_im2') # Extract regression parameters def _params(cd): r = cd['regression'] return { 'b0': float(r['b0']), 'b1': float(r['b1']), 'sigma': max(float(r['sigma']), _BETA_FLOOR), 'alpha0': float(r['alpha0']), 'alpha1': float(r['alpha1']), } p1 = _params(cloud_dict_im1) p2 = _params(cloud_dict_im2) # Non-collapse records (lower_limit already applied in postprocessor) edp_nc = np.asarray(cloud_dict_im1['raw_data']['edp_nc']) im1_nc = np.asarray(cloud_dict_im1['raw_data']['im_nc']) im2_nc = np.asarray(cloud_dict_im2['raw_data']['im_nc']) n = len(edp_nc) if n == 0: raise ValueError( "No non-collapse records found in cloud_dict_im1['raw_data']. " "Cannot compute RSM." ) if len(im2_nc) != n: raise ValueError( f"cloud_dict_im1 has {n} non-collapse records but " f"cloud_dict_im2 has {len(im2_nc)}. " "Both dicts must be derived from the same record set." ) # Clip IM values im1_nc = np.maximum(im1_nc, _IM_FLOOR) im2_nc = np.maximum(im2_nc, _IM_FLOOR) ln_im1 = np.log(im1_nc) ln_im2 = np.log(im2_nc) ln_edp = np.log(np.maximum(edp_nc, _IM_FLOOR)) # Standardised residuals z1 = (ln_edp - p1['b0'] - p1['b1'] * ln_im1) / p1['sigma'] z2 = (ln_edp - p2['b0'] - p2['b1'] * ln_im2) / p2['sigma'] # P(NoC | IM) pnoc1 = self._pnoc_logistic(ln_im1, p1['alpha0'], p1['alpha1']) pnoc2 = self._pnoc_logistic(ln_im2, p2['alpha0'], p2['alpha1']) # Per-record likelihood ratio (the 1/(edp·β) factors appear in both # numerator and denominator so only β1/β2 survives) phi1 = np.maximum(norm.pdf(z1), _PDF_FLOOR) phi2 = np.maximum(norm.pdf(z2), _PDF_FLOOR) ratio = (phi2 * pnoc2 * p1['sigma']) / (phi1 * pnoc1 * p2['sigma']) ratio = np.maximum(ratio, _PDF_FLOOR) log2_ratio = np.log2(ratio) # Screen bad values bad = ~np.isfinite(log2_ratio) n_bad = int(bad.sum()) if n_bad > 0: warnings.warn( f"{n_bad} record(s) produced non-finite log₂ ratio in " "compute_rsm_mca and were excluded from the mean." ) log2_ratio = log2_ratio[~bad] rsm = float(np.mean(log2_ratio)) if len(log2_ratio) > 0 else float(np.nan) return { 'rsm': rsm, 'rsm_per_record': log2_ratio, 'n_records': n - n_bad, 'method': 'MCA', 'params_im1': p1, 'params_im2': p2, }
# ----------------------------------------------------------------------- # RSM — Incremental Dynamic Analysis (Eq. 8) # -----------------------------------------------------------------------
[docs] def compute_rsm_ida(self, ida_dict_im1, ida_dict_im2): """ Relative Sufficiency Measure between two IMs using Incremental Dynamic Analysis. Implements Equation 8 of Ebrahimian & Jalayer (2021). **API contract:** *ida_dict_im1* and *ida_dict_im2* must have been computed from **the same N ground-motion records in the same order**. Parameters ---------- ida_dict_im1 : dict IDA result for IM₁ (reference IM). ida_dict_im2 : dict IDA result for IM₂ (candidate IM). Returns ------- dict with keys: * ``'rsm'`` — scalar RSM in bits (positive → IM₂ better) * ``'rsm_per_record'`` — ndarray of per-record log₂ ratios (valid only) * ``'n_records'`` — total number of records * ``'n_valid'`` — records where IDA interpolation succeeded * ``'method'`` — ``'IDA'`` """ n1 = ida_dict_im1['ida_inputs']['n_records'] n2 = ida_dict_im2['ida_inputs']['n_records'] if n1 != n2: raise ValueError( f"ida_dict_im1 has {n1} records but ida_dict_im2 has {n2}. " "Both dicts must be derived from the same record set." ) n_records = n1 # Use IM1's max demand per record as the common demand level, clipped # to the fitted EDP range. Raw IDA curves can reach collapse EDPs well # beyond the fitted axis (edp_range[-1]); without clipping every record # would fail the range check in _extract_ida_stats_at_demands. raw1 = ida_dict_im1['ida_inputs']['raw_curves'] edp_axis = np.asarray(ida_dict_im1['stats']['fitted_edps']) demand_values = np.clip( np.array([raw1[k]['edp'][-1] for k in range(n_records)]), edp_axis[0], edp_axis[-1], ) # Ensemble stats + per-record IM at the demand level eta1, beta1, im1_per_rec, valid1 = self._extract_ida_stats_at_demands( ida_dict_im1, demand_values ) eta2, beta2, im2_per_rec, valid2 = self._extract_ida_stats_at_demands( ida_dict_im2, demand_values ) # P(NoC | IM) from collapse fragility (last DS = collapse) try: theta_c1 = ida_dict_im1['fragility']['medians'][-1] beta_c1 = ida_dict_im1['fragility']['sigma_record2record'][-1] theta_c2 = ida_dict_im2['fragility']['medians'][-1] beta_c2 = ida_dict_im2['fragility']['sigma_record2record'][-1] use_fragility = ( theta_c1 > 0 and beta_c1 > 0 and theta_c2 > 0 and beta_c2 > 0 ) except (KeyError, IndexError, TypeError): use_fragility = False valid = valid1 & valid2 n_valid = int(valid.sum()) if n_valid == 0: warnings.warn( "No valid records for IDA RSM computation " "(IDA interpolation failed for all records)." ) return { 'rsm': float(np.nan), 'rsm_per_record': np.array([]), 'n_records': n_records, 'n_valid': 0, 'method': 'IDA', } log2_ratio = np.full(n_records, np.nan) for k in range(n_records): if not valid[k]: continue im1k = max(float(im1_per_rec[k]), _IM_FLOOR) im2k = max(float(im2_per_rec[k]), _IM_FLOOR) eta1k = max(float(eta1[k]), _IM_FLOOR) eta2k = max(float(eta2[k]), _IM_FLOOR) beta1k = max(float(beta1[k]), _BETA_FLOOR) beta2k = max(float(beta2[k]), _BETA_FLOOR) if use_fragility: pnoc1k = max( 1.0 - norm.cdf(np.log(im1k / theta_c1) / beta_c1), _PDF_FLOOR ) pnoc2k = max( 1.0 - norm.cdf(np.log(im2k / theta_c2) / beta_c2), _PDF_FLOOR ) else: pnoc1k = 1.0 pnoc2k = 1.0 z1k = (np.log(im1k) - np.log(eta1k)) / beta1k z2k = (np.log(im2k) - np.log(eta2k)) / beta2k phi1k = max(norm.pdf(z1k), _PDF_FLOOR) phi2k = max(norm.pdf(z2k), _PDF_FLOOR) # Eq. 8 ratio: includes 1/(im·β) normalisation terms num = phi2k * pnoc2k * im1k * beta1k den = phi1k * pnoc1k * im2k * beta2k ratio_k = max(num / den, _PDF_FLOOR) log2_ratio[k] = np.log2(ratio_k) valid_log2 = log2_ratio[valid] bad = ~np.isfinite(valid_log2) n_bad = int(bad.sum()) if n_bad > 0: warnings.warn( f"{n_bad} valid record(s) produced non-finite log₂ ratio in " "compute_rsm_ida and were excluded from the mean." ) valid_log2 = valid_log2[~bad] rsm = float(np.mean(valid_log2)) if len(valid_log2) > 0 else float(np.nan) return { 'rsm': rsm, 'rsm_per_record': valid_log2, 'n_records': n_records, 'n_valid': n_valid - n_bad, 'method': 'IDA', }
# ----------------------------------------------------------------------- # RSM — General / user-supplied PDF callables (Eq. 1) # -----------------------------------------------------------------------
[docs] def compute_rsm_general(self, demands, im1_values, im2_values, pdf_func_im1, pdf_func_im2): """ General Relative Sufficiency Measure using user-supplied PDF callables. Implements Equation 1 of Ebrahimian & Jalayer (2021). Useful for custom demand models or non-lognormal formulations. Parameters ---------- demands : array_like EDP value for each record, shape ``(N,)``. im1_values : array_like IM₁ value for each record, shape ``(N,)``. im2_values : array_like IM₂ value for each record, shape ``(N,)``. pdf_func_im1 : callable ``f(d_k, im1_k) → float`` — probability density of demand *d_k* given IM₁ = *im1_k*. pdf_func_im2 : callable ``f(d_k, im2_k) → float`` — same for IM₂. Returns ------- dict with keys: * ``'rsm'`` — scalar RSM in bits * ``'rsm_per_record'`` — ndarray of per-record log₂ ratios * ``'n_records'`` — total records * ``'n_valid'`` — records with finite log₂ ratio * ``'method'`` — ``'general'`` """ demands = np.asarray(demands, dtype=float) im1_values = np.asarray(im1_values, dtype=float) im2_values = np.asarray(im2_values, dtype=float) n = len(demands) log2_ratio = np.full(n, np.nan) for k in range(n): f1 = max(float(pdf_func_im1(demands[k], im1_values[k])), _PDF_FLOOR) f2 = max(float(pdf_func_im2(demands[k], im2_values[k])), _PDF_FLOOR) log2_ratio[k] = np.log2(f2 / f1) bad = ~np.isfinite(log2_ratio) n_bad = int(bad.sum()) if n_bad > 0: warnings.warn( f"{n_bad} record(s) produced non-finite log₂ ratio in " "compute_rsm_general and were excluded from the mean." ) valid_log2 = log2_ratio[~bad] rsm = float(np.mean(valid_log2)) if len(valid_log2) > 0 else float(np.nan) return { 'rsm': rsm, 'rsm_per_record': valid_log2, 'n_records': n, 'n_valid': n - n_bad, 'method': 'general', }
# ----------------------------------------------------------------------- # Orchestrator # -----------------------------------------------------------------------
[docs] def compare_ims(self, im_results_dict, analysis_type='MCA', metric='all', reference_im_key=None, damage_threshold_index=0): """ Compare N ≥ 2 intensity measures by efficiency, proficiency, practicality, and RSM. Parameters ---------- im_results_dict : dict Mapping ``{im_name: cloud_dict}`` (for MCA) or ``{im_name: ida_dict}`` (for IDA). analysis_type : {'MCA', 'IDA'} Which NDAP was used to generate the results. metric : {'all', 'efficiency', 'proficiency', 'practicality', 'rsm'} Which metrics to compute. ``'all'`` computes all four. reference_im_key : str or None IM name to use as IM₁ in RSM pairwise comparisons. If ``None``, the first key in *im_results_dict* is used. damage_threshold_index : int Damage-state index for proficiency computation. Default ``0``. Returns ------- dict with keys: * ``'ranking'`` — ``pd.DataFrame`` with columns im_name, efficiency, proficiency, practicality, rsm_vs_reference, rank_efficiency, rank_proficiency, rank_practicality, rank_rsm * ``'rsm_matrix'`` — ``dict[str, dict[str, float]]`` where ``rsm_matrix[im_i][im_j]`` = RSM(IM_j vs IM_i) * ``'metric'`` — the *metric* argument used * ``'analysis_type'`` — the *analysis_type* argument used * ``'reference_im_key'`` — the IM used as IM₁ for RSM Raises ------ ValueError If fewer than 2 IMs are supplied or *analysis_type* is invalid. """ if analysis_type not in ('MCA', 'IDA'): raise ValueError( f"analysis_type must be 'MCA' or 'IDA', got '{analysis_type}'." ) im_names = list(im_results_dict.keys()) if len(im_names) < 2: raise ValueError( "compare_ims requires at least 2 intensity measures; " f"got {len(im_names)}." ) if reference_im_key is None: reference_im_key = im_names[0] if reference_im_key not in im_results_dict: raise ValueError( f"reference_im_key='{reference_im_key}' not found in " "im_results_dict." ) do_eff = metric in ('all', 'efficiency') do_prof = metric in ('all', 'proficiency') do_prac = metric in ('all', 'practicality') do_rsm = metric in ('all', 'rsm') rows = [] rsm_matrix = {n: {} for n in im_names} for im_name in im_names: res = im_results_dict[im_name] eff_val = float(np.nan) prof_val = float(np.nan) prac_val = float(np.nan) if do_eff: if analysis_type == 'MCA': eff_val = self.compute_efficiency_mca(res)['beta_D_given_IM'] else: eff_val = self.compute_efficiency_ida( res, ds_index=damage_threshold_index )['beta_D_given_IM'] if do_prof: if analysis_type == 'MCA': prof_val = self.compute_proficiency_mca( res, ds_index=damage_threshold_index )['beta_IM_given_DCRLS1'] else: prof_val = self.compute_proficiency_ida( res, ds_index=damage_threshold_index )['beta_IM_given_DCRLS1'] if do_prac: if analysis_type == 'MCA': prac_val = self.compute_practicality_mca(res)['b_slope'] else: prac_val = self.compute_practicality_ida(res)['b_slope'] rows.append({ 'im_name': im_name, 'efficiency': eff_val, 'proficiency': prof_val, 'practicality': prac_val, }) # RSM matrix if do_rsm: ref_res = im_results_dict[reference_im_key] for im_name in im_names: cand_res = im_results_dict[im_name] for ref_name in im_names: ref_res_inner = im_results_dict[ref_name] try: if analysis_type == 'MCA': r = self.compute_rsm_mca(ref_res_inner, cand_res) else: r = self.compute_rsm_ida(ref_res_inner, cand_res) rsm_matrix[ref_name][im_name] = r['rsm'] except Exception as exc: # noqa: BLE001 warnings.warn( f"RSM({im_name} vs {ref_name}) failed: {exc}" ) rsm_matrix[ref_name][im_name] = float(np.nan) # RSM vs reference IM ref_res = im_results_dict[reference_im_key] # noqa: F841 for row in rows: row['rsm_vs_reference'] = rsm_matrix[reference_im_key].get( row['im_name'], float(np.nan) ) else: for row in rows: row['rsm_vs_reference'] = float(np.nan) df = pd.DataFrame(rows) # Ranks (lower beta = better efficiency/proficiency; higher RSM = better) if do_eff: df['rank_efficiency'] = ( df['efficiency'].rank(method='min', ascending=True).astype(int) ) else: df['rank_efficiency'] = np.nan if do_prof: df['rank_proficiency'] = ( df['proficiency'].rank(method='min', ascending=True).astype(int) ) else: df['rank_proficiency'] = np.nan if do_prac: df['rank_practicality'] = ( df['practicality'].rank(method='min', ascending=False).astype(int) ) else: df['rank_practicality'] = np.nan if do_rsm: df['rank_rsm'] = ( df['rsm_vs_reference'] .rank(method='min', ascending=False).astype(int) ) else: df['rank_rsm'] = np.nan return { 'ranking': df, 'rsm_matrix': rsm_matrix, 'metric': metric, 'analysis_type': analysis_type, 'reference_im_key': reference_im_key, }