import math
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy import stats, optimize
from scipy.optimize import curve_fit
from scipy.stats import norm, lognorm
from scipy.interpolate import interp1d
from statsmodels.miscmodels.ordinal_model import OrderedModel
class postprocessor:
"""
Class for post-processing results of nonlinear time-history analysis,
including fragility and vulnerability analysis.
This class provides methods to compute fragility functions, perform
cloud and multiple stripe analyses, and calculate vulnerability
functions and average annual losses. It supports various fragility
fitting methods, including lognormal, probit, logit, and ordinal
models. The class also includes functionality to handle uncertainty
and variability in the analysis.
Methods
-------
calculate_lognormal_fragility(
theta, sigma_record2record, sigma_build2build=0.30,
sigma_ds=0.30,
intensities=np.round(np.geomspace(0.05, 10.0, 50), 3))
Computes the probability of exceeding a damage state using a
lognormal cumulative distribution function (CDF).
calculate_rotated_fragility(
percentile, theta, sigma_record2record,
sigma_build2build=0.30, sigma_ds=0.30,
intensities=np.round(np.geomspace(0.05, 10.0, 50), 3))
Calculates a rotated fragility function based on a lognormal
CDF, adjusting the median intensity to align with a specified
target percentile.
calculate_glm_fragility(
imls, edps, damage_thresholds,
intensities=np.round(np.geomspace(0.05, 10.0, 50), 3),
fragility_method='logit')
Computes non-parametric fragility functions using Generalized
Linear Models (GLM) with either a Logit or Probit link function.
calculate_ordinal_fragility(
imls, edps, damage_thresholds,
intensities=np.round(np.geomspace(0.05, 10.0, 50), 3))
Fits an ordinal (cumulative) probit model to estimate fragility
curves for different damage states.
process_mca_results(
imls, edps, damage_thresholds, lower_limit, censored_limit,
sigma_build2build=0.3, sigma_ds=0.3,
intensities=np.geomspace(0.05, 10, 50), n_bootstrap=200,
random_seed=None, fragility_rotation=False,
rotation_percentile=0.10, fragility_method='lognormal')
Postprocess Modified Cloud Analysis (MCA) results: fits the
probabilistic seismic demand model and derives fragility functions
from pre-computed NLTHA outputs.
process_msa_results(
imls, edps, damage_thresholds, sigma_build2build=0.3,
intensities=np.round(np.geomspace(0.05, 10.0, 50), 3),
fragility_rotation=False, rotation_percentile=0.10)
Postprocess Multiple Stripe Analysis (MSA) results: maximum
likelihood estimation for fragility curve fitting from
pre-computed NLTHA stripe outputs.
calculate_vulnerability_function(
poes, consequence_model, cov_consequence=None,
uncertainty=True, method=None,
intensities=np.round(np.geomspace(0.05, 10.0, 50), 3))
Compute a vulnerability function by convolving fragility
functions with a consequence (damage-to-loss) model.
calculate_average_annual_damage_probability(
fragility_array, hazard_array, return_period=1,
max_return_period=5000)
Calculate the Average Annual Damage State Probability (AADP)
based on fragility and hazard curves.
calculate_average_annual_loss(
vulnerability_array, hazard_array, return_period=1,
max_return_period=5000)
Calculate the Average Annual Loss (AAL) based on vulnerability
and hazard curves.
"""
[docs]
def __init__(self):
pass
[docs]
def calculate_lognormal_fragility(self,
theta,
sigma_record2record,
sigma_build2build=0.30,
sigma_ds=0.30,
intensities=np.round(
np.geomspace(0.05, 10.0, 50),
3)):
"""
Computes the probability of exceeding a damage state using a
lognormal cumulative distribution function (CDF).
Parameters
----------
theta : float
The median seismic intensity corresponding to an EDP-based
damage threshold.
sigma_record2record : float
The logarithmic standard deviation representing
record-to-record variability.
sigma_build2build : float, optional
The logarithmic standard deviation representing
building-to-building (or model) variability.
Default value is 0.30.
sigma_ds : float, optional
The logarithmic standard deviation representing uncertainty
in damage-state thresholds. Default value is 0.30.
intensities : array-like, optional
The set of intensity measure (IM) levels for which
exceedance probabilities will be computed. Default is a
geometric sequence from 0.05 to 10.0 with 50 points.
Returns
-------
poes : numpy.ndarray
An array of exceedance probabilities corresponding to each
intensity measure in `intensities`.
References
-----
1) Baker JW. Efficient Analytical Fragility Function Fitting
Using Dynamic Structural Analysis. Earthquake Spectra.
2015;31(1):579-599. doi:10.1193/021113EQS025M
2) Singhal A, Kiremidjian AS. Method for probabilistic
evaluation of seismic structural damage. Journal of Structural
Engineering 1996; 122: 1459-1467.
DOI:10.1061/(ASCE)0733-9445(1996)122:12(1459)
3) Lallemant, D., Kiremidjian, A., and Burton, H. (2015),
Statistical procedures for developing earthquake damage
fragility curves. Earthquake Engng Struct. Dyn., 44, 1373-1389.
doi: 10.1002/eqe.2522.
4) Bird JF, Bommer JJ, Bray JD, Sancio R, Spence RJS. Comparing
loss estimation with observed damage in a zone of ground failure:
a study of the 1999 Kocaeli Earthquake in Turkey. Bulletin of
Earthquake Engineering 2004; 2: 329-360.
DOI: 10.1007/s10518-004-3804-0
"""
# Calculate the total uncertainty
beta_total = np.sqrt(sigma_record2record**2 +
sigma_build2build**2 + sigma_ds**2)
# Compute exceedance probabilities for each intensity level
return lognorm.cdf(intensities, s=beta_total, loc=0, scale=theta)
[docs]
def calculate_rotated_fragility(self,
percentile,
theta,
sigma_record2record,
sigma_build2build=0.30,
sigma_ds=0.30,
intensities=np.round(
np.geomspace(0.05, 10.0, 50),
3)):
"""
Calculates a rotated fragility function based on a lognormal
cumulative distribution function (CDF), adjusting the median
intensity to align with a specified target percentile.
This function modifies the median intensity based on the desired
target percentile and total uncertainty (considering both
record-to-record variability and modeling variability). The
resulting rotated fragility curve represents the damage
exceedance probabilities for a range of intensity measure
levels, as defined by the lognormal distribution.
Parameters
----------
percentile : float
The target percentile for fragility function rotation. This
value corresponds to the desired percentile (e.g., 0.2
corresponds to the 20th percentile of the fragility curve).
The curve is adjusted such that this percentile aligns with
the calculated fragility function.
theta : float
The median seismic intensity corresponding to the edp-based
damage threshold.
sigma_record2record : float
The uncertainty associated with record-to-record variability
in the seismic records used to derive the fragility.
sigma_build2build : float, optional, default=0.30
The uncertainty associated with modeling variability between
different buildings or building types.
sigma_ds : float, optional
The logarithmic standard deviation representing uncertainty
in damage-state thresholds. Default value is 0.30.
intensities : array-like, optional,
default=np.round(np.geomspace(0.05, 10.0, 50), 3)
A list or array of intensity measure levels at which to
evaluate the fragility function, typically representing
seismic intensity levels (e.g., spectral acceleration). The
default is a geometric space ranging from 0.05 to 10.0.
Returns
-------
theta_prime : float
The new median intensity after the rotation based on the
specified percentile.
beta_total : float
The total standard deviation of the lognormal distribution,
calculated from both record-to-record and building-to-
building (modelling) uncertainties.
poes : array-like
The probabilities of exceedance (fragility values)
corresponding to the input intensity measure levels. This is
the lognormal CDF evaluated at the given intensities with
the rotated median and combined uncertainty.
References
----------
1) Porter, K. (2017), "When Addressing Epistemic Uncertainty in
a Lognormal Fragility Function, How Should One Adjust the
Median?", Proceedings of the 16th World Conference on Earthquake
Engineering (16WCEE), Santiago, Chile.
"""
# Calculate combined log standard deviation (total uncertainty)
beta_total = np.sqrt(sigma_record2record**2 +
sigma_build2build**2 + sigma_ds**2)
# Adjust the median intensity based on the target percentile
theta_prime = theta * \
np.exp(-stats.norm.ppf(percentile) *
(beta_total - sigma_record2record))
# Return rotated lognormal CDF for the given intensities
return (theta_prime, beta_total,
stats.lognorm(
s=beta_total, scale=theta_prime).cdf(intensities))
[docs]
def calculate_glm_fragility(self,
imls,
edps,
damage_thresholds,
intensities=np.round(
np.geomspace(0.05, 10.0, 50), 3),
fragility_method='logit'):
"""
Computes non-parametric fragility functions using Generalized
Linear Models (GLM) with either a Logit or Probit link function.
Parameters
----------
imls : array-like
Intensity Measure Levels (IMLs) corresponding to each
observation.
edps : array-like
Engineering Demand Parameters (EDPs) representing structural
response values.
damage_thresholds : array-like
List of thresholds defining different damage states.
intensities : array-like, optional
Intensity measure values at which probabilities of
exceedance (PoEs) are evaluated. Defaults to
np.round(np.geomspace(0.05, 10.0, 50), 3).
fragility_method : str, optional
Specifies the GLM model to be used for fragility function
fitting. Options:
- ``'logit'`` (default): Uses a logistic regression model.
- ``'probit'``: Uses a probit regression model.
Returns
-------
poes : ndarray
A 2D array where each column represents the probability of
exceeding a specific damage state at each intensity level.
References
----------
1) Charvet, I., Ioannou, I., Rossetto, T., Suppasri, A., and
Imamura, F.: Empirical fragility assessment of buildings
affected by the 2011 Great East Japan tsunami using improved
statistical models, Nat. Hazards, 73, 951-973, 2014.
2) Lahcene, E., Ioannou, I., Suppasri, A., Pakoksung, K.,
Paulik, R., Syamsidik, S., Bouchette, F., and Imamura, F.:
Characteristics of building fragility curves for seismic and
non-seismic tsunamis: case studies of the 2018 Sunda Strait,
2018 Sulawesi-Palu, and 2004 Indian Ocean tsunamis, Nat.
Hazards Earth Syst. Sci., 21, 2313-2344,
https://doi.org/10.5194/nhess-21-2313-2021, 2021.
3) Lallemant, D., Kiremidjian, A., and Burton, H. (2015),
Statistical procedures for developing earthquake damage
fragility curves. Earthquake Engng Struct. Dyn., 44, 1373-1389.
doi: 10.1002/eqe.2522.
4) Jalayer, F., Ebrahamian, H., Trevlopoulos, K., and Bradley,
B. (2023). Empirical tsunami fragility modelling for
hierarchical damage levels. Natural Hazards and Earth System
Sciences, 23(2), 909-931.
https://doi.org/10.5194/nhess-23-909-2023
"""
# Create probabilities of exceedance array
poes = np.zeros((len(intensities), len(damage_thresholds)))
for ds, current_threshold in enumerate(damage_thresholds):
# Count exceedances
exceedances = [1 if edp > damage_thresholds[ds]
else 0 for edp in edps]
# Assemble dict: log of IMs and binary damage state labels
data = {'IM': np.log(imls),
'Damage': exceedances}
# Create DataFrame
df = pd.DataFrame(data)
# Add a constant for the intercept term
X = sm.add_constant(df['IM'])
y = df['Damage']
if fragility_method.lower() == 'probit':
# Fit the Probit GLM model
probit_model = sm.GLM(y, X, family=sm.families.Binomial(
link=sm.families.links.Probit()))
probit_results = probit_model.fit()
# Generate a range of IM values for plotting
log_IM_range = np.log(intensities)
X_range = sm.add_constant(log_IM_range)
# Predict probabilities using the Probit GLM model
poes[:, ds] = probit_results.predict(X_range)
elif fragility_method.lower() == 'logit':
# Fit the Logit GLM model
logit_model = sm.GLM(y, X, family=sm.families.Binomial(
link=sm.families.links.Logit()))
logit_results = logit_model.fit()
# Generate a range of IM values for plotting
log_IM_range = np.log(intensities)
X_range = sm.add_constant(log_IM_range)
# Predict probabilities using the Probit GLM model
poes[:, ds] = logit_results.predict(X_range)
return poes
[docs]
def calculate_ordinal_fragility(self,
imls,
edps,
damage_thresholds,
intensities=np.round(
np.geomspace(0.05, 10.0, 50),
3)):
"""
Fits an ordinal (cumulative) probit model to estimate fragility
curves for different damage states.
This function estimates the probability of exceeding various
damage states using an ordinal regression model based on observed
Engineering Demand Parameters (EDPs) and corresponding Intensity
Measure Levels (IMLs).
Parameters
----------
imls : array-like
Intensity measure levels corresponding to the observed EDPs.
edps : array-like
Engineering Demand Parameters (EDPs) representing structural
responses.
damage_thresholds : array-like
Damage state thresholds for classifying exceedance levels.
intensities : array-like, optional
Intensity measure levels for which fragility curves are
evaluated (default: np.geomspace(0.05, 10.0, 50)).
Returns
-------
poes : numpy.ndarray
A 2D array of exceedance probabilities (CDF values) for each
intensity level. Shape: (len(intensities),
len(damage_thresholds) + 1), where the last column represents
the probability of exceeding the highest damage state.
References
-----
1) Lallemant, D., Kiremidjian, A., and Burton, H. (2015),
Statistical procedures for developing earthquake damage fragility
curves. Earthquake Engng Struct. Dyn., 44, 1373-1389.
doi: 10.1002/eqe.2522.
2) Nguyen, M. and Lallemant, D. (2022), Order Matters: The
Benefits of Ordinal Fragility Curves for Damage and Loss
Estimation. Risk Analysis, 42: 1136-1148.
https://doi.org/10.1111/risa.13815
"""
# Create probabilities of exceedance array
# +1 to include the highest damage state
poes = np.zeros((len(intensities), len(damage_thresholds) + 1))
# Initialize damage state assignments
damage_states = np.zeros(len(edps), dtype=int)
# Loop over each EDP and determine the highest exceeded damage state
for i, edp in enumerate(edps):
# Indices where EDP exceeds thresholds
exceeded = np.where(edp > np.asarray(damage_thresholds))[0]
# Assign highest exceeded state (0-based)
damage_states[i] = exceeded[-1] + 1 if exceeded.size > 0 else 0
# Assemble DataFrame containing log(IM) and damage state assignment
df = pd.DataFrame({'IM': np.log(imls), 'Damage State': damage_states})
# Fit the Cumulative Probit Model
X_ordinal = df[['IM']]
y_ordinal = df['Damage State']
# Create and fit the OrderedModel
ordinal_model = OrderedModel(y_ordinal, X_ordinal, distr='probit')
ordinal_results = ordinal_model.fit(
method='bfgs', disp=False) # Silent optimization
# Generate log-transformed IM values for prediction
log_IM_range = np.log(intensities)
X_range_ordinal = pd.DataFrame({'IM': log_IM_range})
# Predict probabilities for each damage state (PMF)
# Shape: (len(intensities), num_damage_states)
pmf_values = ordinal_results.predict(X_range_ordinal)
# Convert PMF to CDF (exceedance probabilities) by cumulative sum
# Cumulative sum along damage state axis
poes = 1 - np.cumsum(pmf_values, axis=1)
return poes.values
[docs]
def process_mca_results(self,
imls,
edps,
damage_thresholds,
lower_limit,
censored_limit,
sigma_build2build=0.3,
sigma_ds=0.3,
intensities=np.geomspace(0.05, 10, 50),
n_bootstrap=200,
random_seed=None,
fragility_rotation=False,
rotation_percentile=0.10,
fragility_method='lognormal',
cloud_method='bootstrap',
n_mcmc=5000,
n_mcmc_burnin=1000,
confidence_k=2):
"""
Perform a Modified Cloud Analysis (MCA) to derive seismic
fragility functions. This method extends classical cloud analysis
by incorporating logistic regression to account for structural
collapse cases. It supports lognormal, ordinal, and GLM-based
fragility fitting.
When ``fragility_method='lognormal'``, two ``cloud_method``
options are available:
- ``'bootstrap'`` (default): uses non-parametric bootstrapping
to estimate the mean fragility and parameter uncertainty.
- ``'classical'``: implements the Bayesian Robust Fragility
procedure of Jalayer et al. (2017), sampling the 5-parameter
vector chi = [ln a, b, beta, alpha0, alpha1] from its
posterior distribution via MCMC (Metropolis-Hastings), and
computing the Robust Fragility as the posterior expected value
of the fragility model together with confidence bands.
Parameters
----------
imls : array_like
Intensity Measure Levels (e.g., Sa, AvgSA) from the cloud.
edps : array_like
Engineering Demand Parameters (e.g., maximum interstory
drift) from the cloud.
damage_thresholds : list of float
The demand-based thresholds defining the onset of different
damage states.
lower_limit : float
The EDP value below which data is ignored for regression
(demand is considered negligible for damage).
censored_limit : float
The "Collapse" threshold. EDP values above this are treated
as collapse instances in the logistic regression.
sigma_build2build : float, optional
Additional modeling uncertainty (building-to-building
variability). Default is 0.3.
sigma_ds : float, optional
The logarithmic standard deviation representing uncertainty
in damage-state thresholds. Default value is 0.30.
intensities : np.array, optional
The seismic intensity range over which to evaluate the
fragility functions. Default is a geometric space from
0.05 to 10.
n_bootstrap : int, optional
Number of bootstrap samples to draw for statistical
stability (used only when ``cloud_method='bootstrap'``).
Default is 200.
random_seed : int, optional
Seed for reproducibility of the bootstrap sampling or MCMC.
Default is None.
fragility_rotation : bool, optional
Whether to rotate the fragility curve around a specific
percentile to adjust for target reliability. Default is
False.
rotation_percentile : float, optional
The target percentile for fragility function rotation.
Default is 0.10.
fragility_method : {'lognormal', 'ordinal', 'probit', 'logit'},
optional. The methodology used to fit the fragility
functions. Default is 'lognormal'.
cloud_method : {'bootstrap', 'classical'}, optional
When ``fragility_method='lognormal'``, selects the
uncertainty quantification approach.
- ``'bootstrap'``: non-parametric bootstrapping (default).
- ``'classical'``: Bayesian MCMC (Metropolis-Hastings) on
the full 5-parameter model chi = [ln a, b, beta,
alpha0, alpha1] following Jalayer et al. (2017). The
Robust Fragility is computed as the posterior expected
value of the fragility model (Eq. 10), and confidence
bands are derived from the posterior variance (Eq. 11).
n_mcmc : int, optional
Total number of MCMC samples (including burn-in) used
when ``cloud_method='classical'``. Default is 5000.
n_mcmc_burnin : int, optional
Number of initial MCMC samples discarded as burn-in.
Default is 1000.
confidence_k : float, optional
Half-width of the robust confidence band in units of the
posterior standard deviation. Default is 2.
Returns
-------
cloud_dict : dict
A nested dictionary. Keys present for all fragility methods:
``'cloud inputs'``, ``'fragility'``, ``'regression'``.
Additional keys for ``fragility_method='lognormal'``:
``'raw_data'``; and either ``'bootstraps'``
(``cloud_method='bootstrap'``) or ``'mcmc'``
(``cloud_method='classical'``).
``'fragility'`` always contains: ``fragility_method``,
``cloud_method``, ``intensities``, ``poes``, ``medians``,
``sigma_record2record``, ``sigma_build2build``, ``sigma_ds``,
``betas_total``. When ``cloud_method='classical'``, also
includes ``poes_robust_plus``, ``poes_robust_minus``, and
``confidence_k``.
Notes
-----
The 'lognormal' method implements a dual-regression approach:
1. **Linear Regression**: Performed in log-log space on
non-collapse data (log(EDP) = log(a) + b * log(IM)).
2. **Logistic Regression**: Used to predict P(C|IM).
3. **Total Fragility**: P(DS|IM) = P(DS|NC,IM)*P(NC|IM) +
P(C|IM).
The ``'classical'`` cloud method follows the 5-parameter
fragility model from Jalayer et al. (2017), Eq. 7:
P(DCR>1|Sa,chi) = Phi(ln(eta)/beta) * P(NC|Sa) + P(C|Sa)
where chi = [ln a, b, beta, alpha0, alpha1]. The posterior
joint distribution f(chi|D) is sampled via MCMC using the
likelihood from Eq. A1.2 (Jalayer et al. 2017). The Robust
Fragility (Eq. 10) is the posterior mean of the fragility
model over the sampled chi vectors, and the confidence bands
(Eq. 11) are derived from the posterior variance.
References
----------
1) Jalayer, F., Ebrahimian, H., Miano, A., Manfredi, G., and
Sezen, H. (2017). Analytical fragility assessment using
unscaled ground motion records. Earthquake Engng Struct.
Dyn., 46, 2639-2663. https://doi.org/10.1002/eqe.2922
2) Jalayer, F., Elefante, L., De Risi, R., and Manfredi, G.
(2014). Cloud Analysis revisited: Efficient fragility
calculation and uncertainty propagation using simple linear
regression. Proceedings of the 10th NCEE, Anchorage, AK.
3) Jalayer, F., De Risi, R., and Manfredi, G. (2015). Bayesian
Cloud Analysis: efficient structural fragility assessment
using linear regression. Bulletin of Earthquake Engineering,
13(4), 1183-1203.
4) Miano, A., Jalayer, F., and Prota, A. (2019). Considering
structural modelling uncertainties using Bayesian cloud
analysis. COMPDYN 2019.
"""
def cond_fragility(x, a, b):
"""
Helper function that fits the conditioned fragility
functions (i.e., considering collapse and non-collapse
cases).
"""
return (1-np.exp(-a*x))**b
def prepare_mca_data(imls, edps, collapse_limit, bootstrap=True):
"""
Helper function that standardizes the cloud input parameters
and splits IMs and EDPs into collapse and non-collapse cases.
Then implements bootstrapping to ensure stability in Logistic
Regression.
"""
# Ensure inputs are arrays
imls = np.array(imls)
edps = np.array(edps)
n_total = len(imls)
# Identify indicies where collapse occurs in the original dataset
mask_coll_ori = edps > collapse_limit
im_coll_ori = imls[mask_coll_ori]
edp_coll_ori = edps[mask_coll_ori]
npt_ori = len(im_coll_ori)
# Bootstrap sampling
if bootstrap:
idx_boot = np.random.randint(0, n_total, size=n_total)
sample_im = imls[idx_boot]
sample_edp = edps[idx_boot]
else:
sample_im = imls.copy()
sample_edp = edps.copy()
# Standardization
npts_sample_coll = np.sum(sample_edp > collapse_limit)
target_min = math.ceil(0.5*npt_ori)
# Stabilize Logistic Regression if bootstrap has few collapses
if npts_sample_coll < target_min and npt_ori > 0:
add_count = int(target_min - npts_sample_coll)
# Resample from original collapse points (1D)
extra_indices = np.random.choice(
len(im_coll_ori), size=add_count, replace=True)
# Use 1D indexing: im_coll_ori[extra_indices]
sample_im = np.concatenate(
[sample_im, im_coll_ori[extra_indices]])
sample_edp = np.concatenate(
[sample_edp, edp_coll_ori[extra_indices]])
# Final split
is_coll = sample_edp > collapse_limit # Collapse mask
im_nc = sample_im[~is_coll] # Non-collapse IMs
edp_nc = sample_edp[~is_coll] # Non-collapse EDPs
im_c = sample_im[is_coll] # Collapse IMs
return im_nc, edp_nc, im_c
# Compute exceedance probabilities using the specified fragility method
if fragility_method in ['probit', 'logit']:
# Get the probabilities of exceedance
poes = self.calculate_glm_fragility(
imls, edps, damage_thresholds,
fragility_method=fragility_method)
# Compute lognormal equivalent fragility parameters
# Equivalent median intensities
thetas = [np.interp(0.50, poes[:, ds], intensities)
for ds in range(len(damage_thresholds))]
sigmas_record2record = [
np.abs(0.50 * (
np.log(np.interp(0.84, poes[:, ds], intensities))
- np.log(np.interp(
0.16, poes[:, ds], intensities))))
for ds in range(len(damage_thresholds))
] # Equivalent record-to-record variability
# Modelling uncertainty
sigmas_build2build = np.full(
len(damage_thresholds), sigma_build2build)
# Uncertainty in DS thresholds
sigmas_ds = np.full(len(damage_thresholds), sigma_ds)
betas_total = [
np.sqrt(
sigma_record2record**2
+ sigma_build2build**2
+ sigma_ds**2)
for sigma_record2record, sigma_build2build, sigma_ds
in zip(
sigmas_record2record,
sigmas_build2build,
sigmas_ds)
] # Total dispersion
# Create the dictionary
cloud_dict = {
# Add a nested dictionary for the inputs of the regression
'cloud inputs': {'imls': imls,
'edps': edps,
'lower_limit': None,
'upper_limit': None,
'damage_thresholds': damage_thresholds},
# Add a nested dictionary for fragility functions parameters
'fragility': {'fragility_method': fragility_method.lower(),
'intensities': intensities,
'poes': poes,
'medians': thetas,
'sigma_record2record': sigmas_record2record,
'sigma_build2build': sigmas_build2build,
'sigma_ds': sigmas_ds,
'betas_total': betas_total},
# Add a nested dictionary for regression coefficients
'regression': {'b1': None, # Store 'b1' coefficient
'b0': None, # Store 'b0' coefficient
'sigma': None, # Store 'sigma' value
'fitted_x': None, # Store the fitted x-values
'fitted_y': None} # Store the fitted y-values
}
return cloud_dict
elif fragility_method.lower() == 'ordinal':
# Compute exceedance probabilities via ordinal fragility
poes = self.calculate_ordinal_fragility(
imls, edps, damage_thresholds)
# Compute lognormal equivalent fragility parameters
# Equivalent median intensities
thetas = [np.interp(0.50, poes[:, ds], intensities)
for ds in range(len(damage_thresholds))]
sigmas_record2record = [
np.abs(0.50 * (
np.log(np.interp(0.84, poes[:, ds], intensities))
- np.log(np.interp(
0.16, poes[:, ds], intensities))))
for ds in range(len(damage_thresholds))
] # Equivalent record-to-record variability
# Modelling uncertainty
sigmas_build2build = np.full(
len(damage_thresholds), sigma_build2build)
# Uncertainty in DS thresholds
sigmas_ds = np.full(len(damage_thresholds), sigma_ds)
betas_total = [
np.sqrt(
sigma_record2record**2
+ sigma_build2build**2
+ sigma_ds**2)
for sigma_record2record, sigma_build2build, sigma_ds
in zip(
sigmas_record2record,
sigmas_build2build,
sigmas_ds)
] # Total dispersion
# Create the dictionary
cloud_dict = {
# Add a nested dictionary for the inputs of the regression
'cloud inputs': {
'imls': imls,
'edps': edps,
'lower_limit': None,
'upper_limit': None,
'damage_thresholds': damage_thresholds},
# Fragility functions parameters
'fragility': {
'fragility_method': fragility_method.lower(),
'intensities': intensities,
'poes': poes,
'medians': thetas,
'sigma_record2record': sigmas_record2record,
'sigma_build2build': sigmas_build2build,
'sigma_ds': sigmas_ds,
'betas_total': betas_total},
# Add a nested dictionary for regression coefficients
'regression': {'b1': None, # Store 'b1' coefficient
'b0': None, # Store 'b0' coefficient
'sigma': None, # Store 'sigma' value
'fitted_x': None, # Store the fitted x-values
'fitted_y': None} # Store the fitted y-values
}
return cloud_dict
elif fragility_method.lower() == 'lognormal':
# Initialise seed for reproducibility
if random_seed is not None:
np.random.seed(random_seed)
# Ensure inputs are in the right format
imls, edps = np.asarray(imls), np.asarray(edps)
# Common dimensions (shared by both cloud methods)
n_ds = len(damage_thresholds)
n_im = len(intensities)
# =============================================================
# BOOTSTRAP APPROACH
# =============================================================
if cloud_method.lower() == 'bootstrap':
# --- Full-dataset OLS (regression dict) — raw data, no
# bootstrap resampling or collapse-stabilisation ---
_is_coll_f = edps > censored_limit
_im_nc_f = imls[~_is_coll_f]
_edp_nc_f = edps[~_is_coll_f]
_mask_f = _edp_nc_f >= lower_limit
_ln_im_f = np.log(_im_nc_f[_mask_f])
_ln_edp_f = np.log(_edp_nc_f[_mask_f])
b_full = (
np.sum((_ln_im_f - _ln_im_f.mean())
* (_ln_edp_f - _ln_edp_f.mean()))
/ np.sum((_ln_im_f - _ln_im_f.mean()) ** 2))
a_full = np.exp(_ln_edp_f.mean() - b_full * _ln_im_f.mean())
_res_full = _ln_edp_f - np.log(a_full * _im_nc_f[_mask_f] ** b_full)
sig_full = np.linalg.norm(_res_full) / np.sqrt(len(_res_full) - 2)
# Storage for exceedance probabilities and regression params
poes_s = np.zeros((n_bootstrap, n_im, n_ds + 1))
a_s, b_s, sig_s = np.zeros(n_bootstrap), np.zeros(
n_bootstrap), np.zeros(n_bootstrap)
al0_s, al1_s = (np.zeros(n_bootstrap),
np.zeros(n_bootstrap))
# Bootstrapping loop
for i in range(n_bootstrap):
# Prepare bootstrap samples
im_nc_b, edp_nc_b, im_c_b = prepare_mca_data(
imls, edps, censored_limit, bootstrap=True)
# Cloud regression on non-collapse cases
# above lower_limit
mask_lower = edp_nc_b >= lower_limit
ln_im = np.log(im_nc_b[mask_lower])
ln_edp = np.log(edp_nc_b[mask_lower])
b = (
np.sum(
(ln_im - ln_im.mean())
* (ln_edp - ln_edp.mean()))
/ np.sum((ln_im - ln_im.mean()) ** 2)
) # b-parameter
# Calculate the a-parameter
a = np.exp(ln_edp.mean() - b * ln_im.mean())
# Apply the regression to get the mean
res = ln_edp - np.log(
a * im_nc_b[mask_lower]**b)
# Get the standard error
sig = np.linalg.norm(res) / np.sqrt(
len(res) - 2)
# Store cloud analysis coefficients
a_s[i], b_s[i], sig_s[i] = a, b, sig
# Logistic regression for collapse cases
y_logit = np.concatenate(
[np.zeros(len(im_nc_b)),
np.ones(len(im_c_b))])
x_logit = sm.add_constant(
np.log(np.concatenate(
[im_nc_b, im_c_b])))
logit_mod = sm.GLM(
y_logit, x_logit,
family=sm.families.Binomial()).fit(disp=0)
# Store logistic regression coefficients
al0_s[i], al1_s[i] = logit_mod.params
# Calculate the probabilities of exceedance
p_collapse = logit_mod.predict(
sm.add_constant(np.log(intensities)))
mu_ln = np.log(a * intensities**b)
sig_total = np.sqrt(
sig**2 + sigma_build2build**2
+ sigma_ds**2)
# Loop over damage states
for ds in range(n_ds):
poe_nc = 1 - norm.cdf(
np.log(damage_thresholds[ds]),
loc=mu_ln, scale=sig_total)
poes_s[i, :, ds] = (
poe_nc * (1 - p_collapse)
+ p_collapse)
# Store the collapse fragility
poes_s[i, :, -1] = p_collapse
# Mean statistics and lognormal CDF parameters
poes_mean = poes_s.mean(axis=0)
poes_fitted = np.zeros_like(poes_mean)
params_a = np.zeros(n_ds + 1)
params_b = np.zeros(n_ds + 1)
medians = np.zeros(n_ds + 1)
betas_total = np.zeros(n_ds + 1)
sigmas_ds = np.full(
len(damage_thresholds), sigma_ds)
# Loop over damage states and collapse
for ds in range(n_ds + 1):
try:
popt, _ = curve_fit(
cond_fragility,
intensities,
poes_mean[:, ds],
bounds=((0, 0),
(np.inf, np.inf)))
params_a[ds], params_b[ds] = popt
except Exception as e:
raise RuntimeError(
f'ERROR! Curve fitting failed for '
f'DS {ds}: {e}')
poes_fitted[:, ds] = cond_fragility(
intensities, params_a[ds], params_b[ds])
# Lognormal equivalents at 16%, 50%, 84%
f_interp = interp1d(
poes_fitted[:, ds], intensities,
bounds_error=False,
fill_value='extrapolate')
medians[ds] = f_interp(0.5)
im16 = f_interp(0.16)
im84 = f_interp(0.84)
if im16 > 0 and im84 > im16:
betas_total[ds] = np.log(im84 / im16) / 2
else:
betas_total[ds] = np.nan
# Fill NaN betas from the next valid DS
for ds in range(n_ds):
if np.isnan(betas_total[ds]):
for ds_next in range(ds + 1, n_ds + 1):
if not np.isnan(betas_total[ds_next]):
betas_total[ds] = (
betas_total[ds_next])
break
# Recalculate lognormal fragility functions
for ds in range(n_ds + 1):
if fragility_rotation:
fragility_method = (
f'lognormal - rotated around the '
f'{rotation_percentile}th percentile')
(medians[ds],
betas_total[ds],
poes_fitted[:, ds]) = (
self.calculate_rotated_fragility(
rotation_percentile,
medians[ds],
betas_total[ds],
sigma_build2build=0.0,
sigma_ds=0.0))
else:
poes_fitted[:, ds] = (
self.calculate_lognormal_fragility(
medians[ds],
betas_total[ds],
sigma_build2build=0.0,
sigma_ds=0.0))
# Final cleanup: prevent fragility crossing
for i in range(n_ds - 1, -1, -1):
poes_fitted[:, i] = np.maximum(
poes_fitted[:, i], poes_fitted[:, i + 1])
# Store everything
is_collapse = edps >= censored_limit
is_nc_plot = (
(~is_collapse) & (edps >= lower_limit))
cloud_dict = {
'cloud inputs': {
'imls': imls,
'edps': edps,
'lower_limit': lower_limit,
'upper_limit': censored_limit,
'damage_thresholds': damage_thresholds},
'fragility': {
'fragility_method': (
fragility_method.lower()),
'cloud_method': 'bootstrap',
'intensities': intensities,
'poes': poes_fitted,
'medians': medians[:n_ds],
'sigma_record2record': sig_s.mean(),
'sigma_build2build': sigma_build2build,
'sigma_ds': sigmas_ds,
'betas_total': betas_total[:n_ds]},
'regression': {
'b1': b_full,
'b0': np.log(a_full),
'sigma': sig_full,
'alpha0': al0_s.mean(),
'alpha1': al1_s.mean(),
'fitted_x': np.log(intensities),
'fitted_y': (
np.log(a_full)
+ b_full
* np.log(intensities))},
'bootstraps': {
'b1': b_s,
'a': a_s,
'sigma_rr': sig_s,
'alpha0': al0_s,
'alpha1': al1_s,
'poes_all': poes_s},
'raw_data': {
'im_nc': imls[is_nc_plot],
'edp_nc': edps[is_nc_plot],
'im_c': imls[is_collapse]}
}
return cloud_dict
# =============================================================
# CLASSICAL APPROACH (Jalayer et al. 2017, Bayesian MCMC)
# =============================================================
elif cloud_method.lower() == 'classical':
# ---------------------------------------------------------
# Step 1: Partition data into NoC and C
# ---------------------------------------------------------
is_collapse = edps >= censored_limit
is_nc = ~is_collapse
# For regression: also exclude sub-lower_limit data
is_nc_reg = is_nc & (edps >= lower_limit)
im_nc = imls[is_nc_reg]
edp_nc = edps[is_nc_reg]
im_c = imls[is_collapse]
n_nc = len(im_nc)
n_c = len(im_c)
if n_nc < 3:
raise ValueError(
'ERROR! Fewer than 3 non-collapse data '
'points above lower_limit. Cloud '
'regression cannot be performed.')
# ---------------------------------------------------------
# Step 2: OLS point estimates for [ln a, b, beta]
# Jalayer et al. 2017, Eq. 2
# ---------------------------------------------------------
ln_im_nc = np.log(im_nc)
ln_edp_nc = np.log(edp_nc)
nu = n_nc - 2 # degrees of freedom
# Slope b
b_hat = (
np.sum((ln_im_nc - ln_im_nc.mean())
* (ln_edp_nc - ln_edp_nc.mean()))
/ np.sum(
(ln_im_nc - ln_im_nc.mean()) ** 2)
)
# Intercept ln(a)
lna_hat = (ln_edp_nc.mean()
- b_hat * ln_im_nc.mean())
# Residuals and standard error (beta)
residuals = (ln_edp_nc
- (lna_hat + b_hat * ln_im_nc))
s2 = np.sum(residuals**2) / nu
beta_hat = np.sqrt(s2)
# Design matrix quantities
X_design = np.column_stack(
[np.ones(n_nc), ln_im_nc])
XtX = X_design.T @ X_design
XtX_inv = np.linalg.inv(XtX)
# ---------------------------------------------------------
# Step 3: Logistic regression for [alpha0, alpha1]
# Jalayer et al. 2017, Eq. 6 and Appendix B
# ---------------------------------------------------------
im_all = np.concatenate([imls[is_nc], imls[is_collapse]])
y_all = np.concatenate(
[np.zeros(np.sum(is_nc)),
np.ones(n_c)])
x_logit_all = sm.add_constant(np.log(im_all))
logit_fit = sm.GLM(
y_all, x_logit_all,
family=sm.families.Binomial()).fit(disp=0)
alpha0_hat, alpha1_hat = logit_fit.params
# ---------------------------------------------------------
# Step 4: Define the log-likelihood function
# Jalayer et al. 2017, Eq. A1.2
# ---------------------------------------------------------
sa_nc = imls[is_nc_reg]
dcr_nc = edps[is_nc_reg]
ln_sa_nc = np.log(sa_nc)
ln_dcr_nc = np.log(dcr_nc)
sa_c = imls[is_collapse]
ln_sa_c = np.log(sa_c)
def log_likelihood(chi):
"""
Log-likelihood f(D|chi) from Eq. A1.2 of
Jalayer et al. (2017).
Parameters
----------
chi : array of length 5
[ln_a, b, beta, alpha0, alpha1]
Returns
-------
ll : float
The log-likelihood value.
"""
ln_a, b_val, beta_val, a0, a1 = chi
if beta_val <= 0:
return -np.inf
# NoC contribution
mu_nc = ln_a + b_val * ln_sa_nc
z_nc = (ln_dcr_nc - mu_nc) / beta_val
ll_nc = np.sum(
norm.logpdf(z_nc)
- np.log(beta_val))
eta_nc = a0 + a1 * ln_sa_nc
ll_nc += np.sum(
np.log(1.0 / (1.0 + np.exp(eta_nc))
+ 1e-300))
# C contribution
eta_c = a0 + a1 * ln_sa_c
ll_c = np.sum(
np.log(1.0 / (1.0 + np.exp(-eta_c))
+ 1e-300))
return ll_nc + ll_c
# ---------------------------------------------------------
# Step 5: MCMC sampling (Metropolis-Hastings)
# Jalayer et al. 2017, Appendix A
# ---------------------------------------------------------
chi_current = np.array([
lna_hat, b_hat, beta_hat,
alpha0_hat, alpha1_hat])
n_params = len(chi_current)
se_lna = np.sqrt(XtX_inv[0, 0] * s2)
se_b = np.sqrt(XtX_inv[1, 1] * s2)
se_beta = beta_hat / np.sqrt(2.0 * nu)
se_a0 = logit_fit.bse[0]
se_a1 = logit_fit.bse[1]
proposal_scale = np.array(
[se_lna, se_b, se_beta, se_a0, se_a1])
prop_factor = 2.4 / np.sqrt(n_params)
proposal_cov = np.diag(
(prop_factor * proposal_scale) ** 2)
chain = np.zeros((n_mcmc, n_params))
ll_current = log_likelihood(chi_current)
n_accepted = 0
n_adapt = min(n_mcmc_burnin, 500)
for step in range(n_mcmc):
chi_candidate = np.random.multivariate_normal(
chi_current, proposal_cov)
if chi_candidate[2] <= 0:
chain[step] = chi_current
continue
ll_candidate = log_likelihood(chi_candidate)
log_r = ll_candidate - ll_current
if np.log(np.random.uniform()) < log_r:
chi_current = chi_candidate
ll_current = ll_candidate
n_accepted += 1
chain[step] = chi_current
if step == n_adapt - 1 and step > 50:
emp_cov = np.cov(
chain[:n_adapt].T)
proposal_cov = (
prop_factor**2 * emp_cov
+ 1e-8 * np.eye(n_params))
# Discard burn-in
chain_post = chain[n_mcmc_burnin:]
n_post = len(chain_post)
acceptance_rate = n_accepted / n_mcmc
# ---------------------------------------------------------
# Step 6: Robust Fragility (Eq. 10) and variance (Eq. 11)
# ---------------------------------------------------------
poes_mcmc = np.zeros((n_post, n_im, n_ds + 1))
ln_intensities = np.log(intensities)
for j in range(n_post):
ln_a_j, b_j, beta_j, a0_j, a1_j = chain_post[j]
eta_j = a0_j + a1_j * ln_intensities
p_c_j = 1.0 / (1.0 + np.exp(-eta_j))
mu_ln_j = ln_a_j + b_j * ln_intensities
for ds in range(n_ds):
poe_nc_j = 1.0 - norm.cdf(
np.log(damage_thresholds[ds]),
loc=mu_ln_j, scale=beta_j)
poes_mcmc[j, :, ds] = (
poe_nc_j * (1.0 - p_c_j) + p_c_j)
poes_mcmc[j, :, -1] = p_c_j
# Posterior mean (Eq. 10) and variance (Eq. 11)
poes_robust = poes_mcmc.mean(axis=0)
poes_std = np.sqrt(poes_mcmc.var(axis=0))
poes_robust_plus = np.clip(
poes_robust + confidence_k * poes_std, 0.0, 1.0)
poes_robust_minus = np.clip(
poes_robust - confidence_k * poes_std, 0.0, 1.0)
# ---------------------------------------------------------
# Step 7: Lognormal-equivalent parameters
# ---------------------------------------------------------
medians = np.zeros(n_ds + 1)
betas_total = np.zeros(n_ds + 1)
sigmas_ds_arr = np.full(
len(damage_thresholds), sigma_ds)
for ds in range(n_ds + 1):
f_interp = interp1d(
poes_robust[:, ds], intensities,
bounds_error=False,
fill_value='extrapolate')
medians[ds] = f_interp(0.5)
im16 = f_interp(0.16)
im84 = f_interp(0.84)
if im16 > 0 and im84 > im16:
betas_total[ds] = (
np.log(im84 / im16) / 2)
else:
betas_total[ds] = np.nan
# Fill NaN betas from the next valid DS
for ds in range(n_ds):
if np.isnan(betas_total[ds]):
for ds_next in range(ds + 1, n_ds + 1):
if not np.isnan(betas_total[ds_next]):
betas_total[ds] = (
betas_total[ds_next])
break
# ---------------------------------------------------------
# Step 8: Optional rotation and crossing cleanup
# ---------------------------------------------------------
poes_fitted = poes_robust.copy()
for ds in range(n_ds + 1):
if fragility_rotation:
fragility_method = (
f'lognormal - rotated around the '
f'{rotation_percentile}th percentile')
(medians[ds],
betas_total[ds],
poes_fitted[:, ds]) = (
self.calculate_rotated_fragility(
rotation_percentile,
medians[ds],
betas_total[ds],
sigma_build2build=0.0,
sigma_ds=0.0))
# Final cleanup: prevent fragility crossing
for i in range(n_ds - 1, -1, -1):
poes_fitted[:, i] = np.maximum(
poes_fitted[:, i], poes_fitted[:, i + 1])
# Apply crossing cleanup to confidence bands
for i in range(n_ds - 1, -1, -1):
poes_robust_plus[:, i] = np.maximum(
poes_robust_plus[:, i],
poes_robust_plus[:, i + 1])
poes_robust_minus[:, i] = np.maximum(
poes_robust_minus[:, i],
poes_robust_minus[:, i + 1])
chi_mean = chain_post.mean(axis=0)
is_nc_plot = (
(~is_collapse) & (edps >= lower_limit))
cloud_dict = {
'cloud inputs': {
'imls': imls,
'edps': edps,
'lower_limit': lower_limit,
'upper_limit': censored_limit,
'damage_thresholds': damage_thresholds},
'fragility': {
'fragility_method': (
fragility_method.lower()),
'cloud_method': 'classical',
'intensities': intensities,
'poes': poes_fitted,
'poes_robust_plus': poes_robust_plus,
'poes_robust_minus': poes_robust_minus,
'medians': medians[:n_ds],
'sigma_record2record': beta_hat,
'sigma_build2build': sigma_build2build,
'sigma_ds': sigmas_ds_arr,
'betas_total': betas_total[:n_ds],
'confidence_k': confidence_k},
'regression': {
'b1': chi_mean[1],
'b0': chi_mean[0],
'sigma': chi_mean[2],
'alpha0': chi_mean[3],
'alpha1': chi_mean[4],
'fitted_x': np.log(intensities),
'fitted_y': (
chi_mean[0]
+ chi_mean[1]
* np.log(intensities))},
'mcmc': {
'chain': chain_post,
'acceptance_rate': acceptance_rate,
'n_mcmc': n_mcmc,
'n_burnin': n_mcmc_burnin,
'chi_labels': [
'ln_a', 'b', 'beta',
'alpha0', 'alpha1'],
'chi_mean': chi_mean,
'chi_std': chain_post.std(axis=0),
'chi_median': np.median(
chain_post, axis=0),
'poes_all': poes_mcmc},
'raw_data': {
'im_nc': imls[is_nc_plot],
'edp_nc': edps[is_nc_plot],
'im_c': imls[is_collapse]}
}
return cloud_dict
else:
raise ValueError(
f"ERROR! Unknown cloud_method: "
f"'{cloud_method}'. Expected 'bootstrap' "
f"or 'classical'.")
# ---------------------------------------------------------------
# POSTPROCESS MULTIPLE STRIPE ANALYSIS RESULTS
# ---------------------------------------------------------------
[docs]
def process_msa_results(self,
imls,
edps,
damage_thresholds,
sigma_build2build=0.3,
sigma_ds=0.3,
intensities=np.round(
np.geomspace(0.05, 10.0, 50), 3),
fragility_rotation=False,
rotation_percentile=0.10):
"""
Perform maximum likelihood estimation (MLE) for fragility curve
fitting following a multiple stripe analysis. This method
calculates the fragility function by fitting to the provided
intensity measure levels (IMLs) and engineering demand parameters
(EDPs) "stripes", with the option to rotate the fragility curve
around a target percentile.
The method is useful for deriving fragility functions by
determining the probability of exceedance for various damage
states based on the provided data.
Parameters
----------
imls : list or array
A list or array of intensity measure levels (IMLs)
representing the seismic intensity levels used for sampling
the fragility functions.
edps : list or array
A list or array of engineering demand parameters (EDPs),
which describe the structural response to seismic events.
Examples include maximum interstorey drifts, maximum peak
floor acceleration, or top displacements.
damage_thresholds : list
A list of EDP-based damage thresholds that correspond to
different levels of structural damage, such as slight,
moderate, extensive, and complete. These thresholds help
categorize the severity of damage based on EDP values.
sigma_build2build : float, optional, default=0.3
The building-to-building variability or modeling uncertainty.
It accounts for differences in performance between buildings
with similar characteristics due to random variations or
model uncertainties.
sigma_ds : float, optional
The logarithmic standard deviation representing uncertainty
in damage-state thresholds. Default value is 0.30.
intensities : array, optional, default=np.geomspace(0.05, 10.0, 50)
An array of intensity measure levels over which the fragility
function will be sampled. By default, this is a logarithmic
space ranging from 0.05 to 10.0, with 50 sample points.
fragility_rotation : bool, optional, default=False
A boolean flag that determines whether or not to rotate the
fragility curve about a given percentile. If `True`, the
fragility curve will be adjusted based on the specified
`rotation_percentile`.
rotation_percentile : float, optional, default=0.10
The target percentile (between 0 and 1) around which the
fragility function will be rotated. A value of 0.10
corresponds to rotating the curve to the 10th percentile.
Returns
-------
msa_dict : dict
A nested dictionary with the following top-level keys:
**'msa_inputs'** : dict
Inputs passed to the analysis.
- ``'imls'`` : array — intensity measure levels
used for each stripe.
- ``'edps'`` : array — engineering demand
parameters recorded at each stripe.
- ``'damage_thresholds'`` : list — EDP values
defining each damage state boundary.
- ``'sigma_build2build'`` : float — building-to-
building modelling uncertainty.
- ``'sigma_ds'`` : float — uncertainty in the
damage-state threshold definition.
- ``'is_rotated'`` : bool — whether fragility
rotation was applied.
**'fragility'** : dict
MLE-fitted fragility function results.
- ``'fragility_method'`` : str — always 'mle'.
- ``'intensities'`` : array — IM levels at which
PoEs are evaluated.
- ``'poes'`` : ndarray, shape (n_IM, n_DS) —
probabilities of exceedance per damage state.
- ``'medians'`` : list, length n_DS — median IM
(theta) for each damage state.
- ``'sigma_record2record'`` : list, length n_DS —
record-to-record dispersion per damage state.
- ``'betas_total'`` : list, length n_DS — total
logarithmic standard deviation per damage state,
combining record-to-record, building-to-building,
and damage-state threshold uncertainties.
**'metadata'** : dict
Stripe-level summary information.
- ``'stripe_levels'`` : array — mean IM value for
each stripe.
- ``'observed_fractions'`` : list of lists,
shape (n_DS, n_stripes) — observed fraction of
ground-motion records exceeding each damage
threshold at each stripe level.
Notes
-----
This method fits the fragility curve using MLE, which minimizes
the difference between observed and predicted exceedance
probabilities. The option for fragility curve rotation allows for
adjusting the curve to better match the expected percentile of
damage occurrence, offering greater flexibility in representing
the fragility of the structure.
"""
# Convert to numpy arrays
imls = np.array(imls)
edps = np.array(edps)
if intensities is None:
intensities = np.round(np.geomspace(0.05, 10.0, 50), 3)
# Handle IM levels: Extract stripe values (one per column)
stripe_imls = np.mean(imls, axis=0) if imls.ndim > 1 else imls
num_stripes = len(stripe_imls)
num_gmrs_per_stripe = np.array(
[len(edps[:, i]) for i in range(num_stripes)])
def negative_log_likelihood(params, x, n, z):
"""
Negative Log-Likelihood for Binomial Distribution.
Fits the aleatory (record-to-record) parameters to the data.
"""
theta, beta_r2r = params
# Probability of exceedance based on lognormal CDF
# We use the standard normal CDF on the log-transformed data
p = stats.norm.cdf(np.log(x / theta) / beta_r2r)
# Numerical stability: clip p to avoid log(0) in binomial logpmf
p = np.clip(p, 1e-15, 1 - 1e-15)
# log-likelihood of observing z exceedances in n trials
log_f = stats.binom.logpmf(z, n, p)
return -np.sum(log_f)
# Storage lists
thetas = []
sigmas_record2record = []
betas_total = []
# Iterate through each Damage State threshold
for threshold in damage_thresholds:
# Count exceedances per stripe (z values)
num_exc = np.array([np.sum(edps[:, i] >= threshold)
for i in range(num_stripes)])
# Initial guess: theta = median of stripes, beta = 0.4
initial_guess = [np.median(stripe_imls), 0.4]
# Bounds: prevent median=0 and keep beta in physical range
bounds = optimize.Bounds([0.001 * np.min(stripe_imls), 0.05],
[10.0 * np.max(stripe_imls), 1.5])
# Optimize the aleatory fit
sol = optimize.minimize(
negative_log_likelihood,
initial_guess,
args=(stripe_imls, num_gmrs_per_stripe, num_exc),
bounds=bounds,
method='L-BFGS-B'
)
t_val = sol.x[0] # Fitted Median
s_val = sol.x[1] # Fitted Beta (Record-to-Record)
# Combine uncertainties AFTER fitting using SRSS
b_tot = np.sqrt(s_val**2 + sigma_build2build**2 + sigma_ds**2)
thetas.append(t_val)
sigmas_record2record.append(s_val)
betas_total.append(b_tot)
# Calculate Fragility Curves (POEs) over the full intensity range
poes = np.zeros((len(intensities), len(damage_thresholds)))
for i in range(len(damage_thresholds)):
if fragility_rotation:
_, _, poes[:, i] = self.calculate_rotated_fragility(
rotation_percentile,
thetas[i],
sigmas_record2record[i],
sigma_build2build=sigma_build2build,
sigma_ds=sigma_ds,
intensities=intensities
)
else:
# Standard lognormal PoE
poes[:, i] = stats.norm.cdf(
np.log(intensities / thetas[i]) / betas_total[i])
# Formatting Output
# observed_fractions: list of arrays (one array per DS)
msa_dict = {
'msa_inputs': {
'imls': imls,
'edps': edps,
'damage_thresholds': damage_thresholds,
'sigma_build2build': sigma_build2build,
'sigma_ds': sigma_ds,
'is_rotated': fragility_rotation
},
'fragility': {
'fragility_method': 'mle',
'intensities': intensities,
'poes': poes,
'medians': thetas,
'sigma_record2record': sigmas_record2record,
'betas_total': betas_total
},
'metadata': {
'stripe_levels': stripe_imls,
'observed_fractions': [
[np.sum(edps[:, j] >= thresh) / num_gmrs_per_stripe[j]
for j in range(num_stripes)]
for thresh in damage_thresholds
]
}
}
return msa_dict
# ---------------------------------------------------------------
# POSTPROCESS INCREMENTAL DYNAMIC ANALYSIS RESULTS
# ---------------------------------------------------------------
[docs]
def process_ida_results(self,
ansys_dict,
im_matrix,
damage_thresholds,
edp_key,
sigma_build2build=0.3,
sigma_ds=0.3,
intensities=np.round(
np.geomspace(0.05, 10.0, 50), 3),
edp_range=np.linspace(0.00, 0.05, 101),
fragility_rotation=False,
rotation_percentile=0.10):
"""
Perform fragility function fitting and statistical processing on
Incremental Dynamic Analysis (IDA) results.
This method processes raw IDA data by interpolating individual
record response curves to a continuous Engineering Demand
Parameter (EDP) range. It accounts for "flatlining" (global
dynamic instability) using Maximum Likelihood Estimation (MLE)
for censored data to estimate the fragility parameters (median
and dispersion) for multiple damage states. It also supports
fragility curve rotation around a target percentile to account
for modeling uncertainties.
Parameters
----------
ansys_dict : dict
A dictionary containing the structural response data with keys:
- ``'max_peak_drift_list'`` or ``'max_peak_accel_list'``: a list
where each entry maps Scale Factors (SF) to peak drift or
acceleration values.
- ``'sf_matrix'``: a 2D numpy array (n_records × max_runs)
of scale factors used for each analysis run.
im_matrix : numpy.ndarray
A 2D array of intensity measure levels corresponding to the
ground motion records and number of runs carried out in IDA.
damage_thresholds : list of float
A list of EDP-based damage thresholds (e.g., interstorey
drift ratios) defining different limit states (e.g., Slight,
Moderate, Extensive, Collapse).
edp_key : str, optional,
default='max_peak_drift_list' (other: 'max_peak_accel_list')
The key in `ansys_dict` used to retrieve the engineering
demand parameter data.
sigma_build2build : float, optional, default=0.3
The modeling uncertainty or building-to-building variability.
This is combined with the record-to-record variability to
calculate total fragility dispersion.
sigma_ds : float, optional
The logarithmic standard deviation representing uncertainty
in damage-state thresholds. Default value is 0.30.
intensities : numpy.ndarray, optional,
default=np.geomspace(0.05, 10.0, 50)
The array of intensity measure levels over which the final
fragility functions (POEs) will be sampled.
edp_range : numpy.ndarray, optional,
default=np.linspace(0.00, 0.05, 101) (0% to 5% drift)
The array of engineering demand parameters over which the
IDA curves will be evaluated.
fragility_rotation : bool, optional, default=False
Flag to determine if the fragility curves should be rotated
around a specific percentile to adjust for modeling bias or
target reliability levels.
rotation_percentile : float, optional, default=0.10
The target percentile (0.0 to 1.0) around which the fragility
curve rotation is anchored if `fragility_rotation` is True.
Returns
-------
ida_dict : dict
A nested dictionary with the following top-level keys:
**'ida_inputs'** : dict
Raw IDA data and analysis configuration.
- ``'target_edps'`` : array — continuous EDP axis
over which IM values are interpolated.
- ``'raw_curves'`` : list of dicts, one per record,
each containing ``'im'`` (sorted IM array) and
``'edp'`` (monotonic EDP array).
- ``'damage_thresholds'`` : list — EDP values that
define each damage state boundary.
- ``'im_matrix'`` : array — IM values applied to
each record at each run.
- ``'n_records'`` : int — number of ground-motion
records in the analysis.
- ``'im_max_analyzed'`` : float — maximum IM level
reached across all records and runs.
**'fragility'** : dict
MLE-fitted fragility parameters and PoEs.
- ``'fragility_method'`` : str — always
'mle_ida_censored'.
- ``'intensities'`` : array — IM levels at which
PoEs are evaluated.
- ``'poes'`` : ndarray, shape (n_IM, n_DS) —
probabilities of exceedance per damage state.
- ``'medians'`` : list, length n_DS — median IM
(theta) for each damage state.
- ``'sigma_record2record'`` : list, length n_DS —
record-to-record dispersion per damage state,
estimated via MLE on censored capacities.
- ``'sigma_build2build'`` : list, length n_DS —
building-to-building modelling uncertainty.
- ``'sigma_ds'`` : list, length n_DS — uncertainty
in the damage-state threshold definition.
- ``'betas_total'`` : list, length n_DS — total
logarithmic standard deviation per damage state.
- ``'rotation_active'`` : bool — whether fragility
rotation around a percentile was applied.
- ``'rotation_percentile'`` : float or None — the
percentile used for rotation, or None if inactive.
**'stats'** : dict
Statistical IDA curves across all records.
- ``'fitted_edps'`` : array — EDP axis shared with
``'target_edps'`` in ``'ida_inputs'``.
- ``'median_im'`` : array — median IM across all
records at each EDP level (50th percentile curve).
- ``'p16_im'`` : array — 16th percentile IM across
records at each EDP level.
- ``'p84_im'`` : array — 84th percentile IM across
records at each EDP level.
Notes
-----
The method uses a log-likelihood minimization approach to handle
records that do not reach a specific damage threshold within the
analyzed range (right-censored data), ensuring the fragility
curves remain statistically robust even near collapse.
"""
drifts_data = ansys_dict[edp_key]
sf_matrix = ansys_dict['sf_matrix']
n_records = len(drifts_data)
# Define Continuous EDP range (X-axis for the statistical lines)
im_at_edp_matrix = []
raw_curves = []
for i in range(n_records):
rec_ims, rec_edps = [], []
for j, sf in enumerate(sf_matrix[i, :]):
if not np.isnan(sf) and sf in drifts_data[i]:
rec_ims.append(im_matrix[i, j])
rec_edps.append(drifts_data[i][sf])
if len(rec_ims) > 1:
# Sort by IM (not EDP) to preserve analysis sequence
idx = np.argsort(rec_ims)
sorted_ims = np.array(rec_ims)[idx]
sorted_edps = np.array(rec_edps)[idx]
# Handle structural resurrection
# Ensures that once a limit state is reached, it stays reached.
monotonic_edps = np.maximum.accumulate(sorted_edps)
raw_curves.append({'im': sorted_ims, 'edp': monotonic_edps})
# Interpolate IM = f(EDP)
# We interpolate against the monotonic EDPs to find the FIRST
# occurrence of each threshold.
# Use fill_value=np.nan for points beyond the analyzed range
# to allow the MLE to handle censoring correctly.
f_im_cap = interp1d(monotonic_edps, sorted_ims,
bounds_error=False, fill_value=np.nan)
im_at_edp_matrix.append(f_im_cap(edp_range))
else:
im_at_edp_matrix.append(np.full_like(edp_range, np.nan))
im_at_edp_matrix = np.array(im_at_edp_matrix)
# Handle collapse cases
# Replace NaNs that occur AFTER a record has flatlined with the max IM
# achieved for that record to represent the capacity 'ceiling'.
for i in range(n_records):
# Find where the record stops having data
mask = ~np.isnan(im_at_edp_matrix[i, :])
if np.any(mask):
last_valid_idx = np.where(mask)[0][-1]
last_valid_im = im_at_edp_matrix[i, last_valid_idx]
# Fill the remaining EDP range with the collapse IM
im_at_edp_matrix[i, last_valid_idx+1:] = last_valid_im
# Calculate percentiles
# Now that collapse region is filled, no All-NaN slices
# unless a record never started at all.
median_ida_im = np.nanmedian(im_at_edp_matrix, axis=0)
p16_ida_im = np.nanpercentile(im_at_edp_matrix, 16, axis=0)
p84_ida_im = np.nanpercentile(im_at_edp_matrix, 84, axis=0)
# Fragility Fitting (MLE for Censored Data)
im_max = np.nanmax(im_matrix)
thetas = []
sigmas_rec2rec = []
sigmas_build2build = []
sigmas_ds = []
for threshold in damage_thresholds:
thresh_idx = np.argmin(np.abs(edp_range - threshold))
capacities = im_at_edp_matrix[:, thresh_idx]
num_collapsed = np.sum(~np.isnan(capacities))
if num_collapsed == n_records:
ln_cap = np.log(capacities)
theta = np.exp(np.mean(ln_cap))
beta_rec = np.std(ln_cap, ddof=1)
else:
def log_likelihood(params):
t, b = params
if t <= 0 or b <= 0:
return 1e10
collapsed = capacities[~np.isnan(capacities)]
term1 = np.sum(np.log(stats.norm.pdf(
(np.log(collapsed)-np.log(t))/b)/(collapsed*b)))
term2 = ((n_records - num_collapsed) * np.log(
1 - stats.norm.cdf(
(np.log(im_max) - np.log(t)) / b)))
return -(term1 + term2)
sol = optimize.minimize(
log_likelihood, [im_max, 0.4], method='Nelder-Mead')
theta, beta_rec = sol.x[0], sol.x[1]
thetas.append(theta)
sigmas_rec2rec.append(beta_rec)
sigmas_build2build.append(sigma_build2build)
sigmas_ds.append(sigma_ds)
# Generate Probabilities of Exceedance with Rotation Option
poes = np.zeros((len(intensities), len(damage_thresholds)))
betas_total = []
for i, threshold in enumerate(damage_thresholds):
theta = thetas[i]
beta_rec = sigmas_rec2rec[i]
if fragility_rotation:
# Combined uncertainty isn't a simple SRSS in rotation,
# but we report total for consistency in dict
betas_total.append(
np.sqrt(beta_rec**2 + sigma_build2build**2 + sigma_ds**2))
(_, _, poes[:, i]) = (
self.calculate_rotated_fragility(
theta,
rotation_percentile,
beta_rec,
sigma_build2build,
sigma_ds,
intensities))
else:
beta_total = np.sqrt(
beta_rec**2 + sigma_build2build**2 + sigma_ds**2)
betas_total.append(beta_total)
poes[:, i] = self.calculate_lognormal_fragility(
theta, beta_total)
# 5. Construct the final nested dictionary (Cloud-style)
ida_dict = {
'ida_inputs': {
'target_edps': edp_range,
'raw_curves': raw_curves,
'damage_thresholds': damage_thresholds,
'im_matrix': im_matrix,
'n_records': n_records,
'im_max_analyzed': im_max
},
'fragility': {
'fragility_method': 'mle_ida_censored',
'intensities': intensities,
'poes': poes,
'medians': thetas,
'sigma_record2record': sigmas_rec2rec,
'sigma_build2build': sigmas_build2build,
'sigma_ds': sigmas_ds,
'betas_total': betas_total,
'rotation_active': fragility_rotation,
'rotation_percentile': (
rotation_percentile if fragility_rotation
else None)
},
'stats': {
'fitted_edps': edp_range,
'median_im': median_ida_im,
'p16_im': p16_ida_im,
'p84_im': p84_ida_im
}
}
return ida_dict
[docs]
def calculate_vulnerability_function(self,
poes,
consequence_model,
cov_consequence=None,
uncertainty=True,
method=None,
intensities=np.round(
np.geomspace(
0.05, 10.0, 50),
3)):
"""
Compute a vulnerability function (mean loss ratio and associated
uncertainty) by convolving fragility functions with a consequence
(damage-to-loss) model.
The expected loss ratio is computed as the convolution of mutually
exclusive damage-state probabilities with damage-to-loss ratios.
Uncertainty in the loss ratio conditional on intensity measure
level (Loss | IM) can be computed either explicitly using the law
of total variance or via an empirical Silva-type envelope.
Parameters
----------
poes : ndarray, shape (n_IM, n_DS)
Probabilities of exceedance of each damage state conditional
on the intensity measure level (P[DS >= k | IM]). Damage
states must be ordered from least to most severe.
consequence_model : array-like, length n_DS
Mean damage-to-loss ratios associated with each damage state.
Values must lie in the interval [0, 1].
cov_consequence : array-like, length n_DS, optional
Coefficient of variation of the damage-to-loss ratio for each
damage state. Required when ``method="explicit"``. Each entry
represents the conditional uncertainty of loss given the
damage state.
uncertainty : bool, optional
Flag indicating whether to compute uncertainty (coefficient of
variation) of the loss ratio conditional on IM. If False, the
COV column is still returned and filled with zeros.
Default is True.
method : {"explicit", "silva"}, optional
Method used to compute uncertainty when ``uncertainty=True``.
- "explicit" (default):
Computes uncertainty using the law of total variance,
accounting for both damage-state mixing and uncertainty
within each damage state. Requires ``cov_consequence``.
- "silva":
Computes uncertainty using a Silva-type empirical envelope
based only on the mean loss ratio.
If ``uncertainty=True`` and ``method=None``, the method
defaults to "explicit".
intensities : ndarray, optional
Intensity measure levels corresponding to the rows of
``poes``. Default is a geometric sequence between 0.05
and 10.0.
Returns
-------
df : pandas.DataFrame
DataFrame with the following columns:
- ``IML`` : Intensity measure level
- ``Loss`` : Expected loss ratio at the given IML
- ``COV`` : Coefficient of variation of the loss ratio
at the given IML
The ``COV`` column is always returned. If
``uncertainty=False``, it contains zeros.
Raises
------
Exception
If the dimensions of ``poes``, ``consequence_model``, or
``cov_consequence`` are inconsistent, or if
``method="explicit"`` is selected without providing
``cov_consequence``.
Notes
-----
For the explicit uncertainty method, the variance of the loss
ratio is computed using the law of total variance:
Var(LR | IM) = sum_k p_k [ sigma_k^2 + (mu_k - mu)^2 ]
where:
- p_k is the probability of being in damage state k
given IM,
- mu_k is the mean loss ratio for damage state k,
- sigma_k^2 is the variance of the loss ratio within
damage state k,
- mu is the mean loss ratio at the given IM.
This formulation is consistent with performance-based earthquake
engineering (PBEE) frameworks and produces physically meaningful,
IM-dependent uncertainty.
"""
def calculate_sigma_silva(loss):
"""
Helper function to calculate uncertainty in the loss
estimates based on the method proposed in Silva (2019),
which incorporates the sigma (standard deviation) for loss
ratios within seismic vulnerability functions.
This method computes the sigma loss ratio for expected loss
ratios and also estimates the parameters of a beta
distribution (coefficients a and b), which describe the
uncertainty and variability in the loss estimates. The
formula used is derived from seismic vulnerability research.
Parameters:
-----------
loss : list or array
A list or array of expected loss ratios. The expected
loss ratio represents the proportion of the building's
value that is expected to be lost due to an earthquake
event, ranging from 0 to 1.
Returns:
--------
sigma_loss_ratio : list or array
The calculated uncertainty (sigma) associated with the
mean loss ratio for each input loss value. The sigma
loss ratio represents the variability of the loss
estimates and is computed based on the loss ratios
provided.
a_beta_dist : list or array
The coefficient 'a' of the beta distribution for each
loss ratio. This parameter represents the shape of the
beta distribution and is used to model the uncertainty
in the loss estimates.
b_beta_dist : list or array
The coefficient 'b' of the beta distribution for each
loss ratio. This parameter also represents the shape of
the beta distribution, complementing the coefficient 'a'
to fully describe the distribution's behavior.
References:
----------
1) Silva, V. (2019) "Uncertainty and correlation in seismic
vulnerability functions of building classes." Earthquake
Spectra. DOI: 10.1193/013018eqs031m.
"""
sigma_loss_ratio = np.where(
loss == 1e-8, 1e-8, np.where(
loss == 1, 1,
0.5 * np.sqrt(
loss * (-0.7 - 2 * loss
+ np.sqrt(6.8 * loss + 0.5)))))
a_beta_dist = np.zeros(loss.shape)
b_beta_dist = np.zeros(loss.shape)
return sigma_loss_ratio, a_beta_dist, b_beta_dist
# Default behavior
if uncertainty and method is None:
method = "explicit"
# Consistency checks
n_im, n_ds = poes.shape
if len(consequence_model) != n_ds:
raise Exception(
'ERROR! Mismatch between fragility and consequence models!')
if len(intensities) != n_im:
raise Exception(
'ERROR! Mismatch between number of IMLs and fragility models!')
if uncertainty and method == "explicit":
if cov_consequence is None:
raise Exception(
'ERROR! Explicit uncertainty method requires '
'cov_consequence.')
if len(cov_consequence) != n_ds:
raise Exception(
'ERROR! Length of cov_consequence must match '
'consequence_model.')
# Convert to arrays
mu_k = np.asarray(consequence_model, dtype=float)
if cov_consequence is not None:
cov_k = np.asarray(cov_consequence, dtype=float)
var_k = (cov_k * mu_k) ** 2
else:
var_k = np.zeros_like(mu_k)
# Damage-state probabilities from POEs
p_ds = np.zeros_like(poes)
for j in range(n_ds):
if j == n_ds - 1:
p_ds[:, j] = poes[:, j]
else:
p_ds[:, j] = poes[:, j] - poes[:, j + 1]
# Mean loss ratio (convolution)
loss = np.dot(p_ds, mu_k)
# Initialize COV
cov = np.zeros_like(loss)
if uncertainty:
if method.lower() == "explicit":
# Law of total variance
diff = mu_k - loss[:, None]
var_loss = np.sum(p_ds * (var_k + diff ** 2), axis=1)
cov = np.sqrt(var_loss) / (loss + 1e-12)
elif method.lower() == "silva":
# Semi-empirical derivatoin
for i, mu in enumerate(loss):
sigma_loss_ratio, _, _ = calculate_sigma_silva(mu)
cov[i] = np.min(
[sigma_loss_ratio / mu,
0.90 * np.sqrt(mu * (1 - mu)) / mu])
else:
raise Exception(f"ERROR! Unknown uncertainty method: {method}")
# Output
df = pd.DataFrame({'IML': intensities,
'Loss': loss,
'COV': cov})
return df
[docs]
def calculate_average_annual_damage_probability(self,
fragility_array,
hazard_array,
return_period=1,
max_return_period=5000):
"""
Calculate the Average Annual Damage State Probability (AADP)
based on fragility and hazard curves.
This function estimates the average annual probability of damage
states occurring over a given return period, using the fragility
curve (which relates intensity measure levels to damage state
probabilities) and the hazard curve (which relates intensity
measure levels to annual rates of exceedance).
The calculation integrates the product of the fragility function
and the hazard curve over the specified range of intensity measure
levels, accounting for the return period and a maximum return
period threshold.
Parameters
----------
fragility_array : 2D array
A 2D array where the first column contains intensity measure
levels, and the second column contains the corresponding
probabilities of exceedance for each intensity level.
hazard_array : 2D array
A 2D array where the first column contains intensity measure
levels, and the second column contains the annual rates of
exceedance (i.e., the probability of exceedance per year)
for each intensity level.
return_period : float, optional, default=1
The return period used to scale the hazard rate. This is the
time span (in years) over which the average annual damage
probability is calculated. A typical value is 1 year, but
longer periods can be used for multi-year assessments.
max_return_period : float, optional, default=5000
The maximum return period threshold used to filter out very
low hazard rates. The hazard curve is truncated to include
only intensity levels with exceedance rates above this
threshold.
Returns
-------
average_annual_damage_probability : float
The average annual damage state probability, calculated by
integrating the product of the fragility function and the
hazard curve over the given intensity measure levels.
"""
# Ensure arrays are sorted by Intensity (Column 0)
hazard_array = hazard_array[hazard_array[:, 0].argsort()]
fragility_array = fragility_array[fragility_array[:, 0].argsort()]
# Filter hazard based on return period threshold
max_integration = return_period / max_return_period
hazard_array = hazard_array[hazard_array[:, 1] >= max_integration]
# Need at least 2 points to calculate a rate difference
if len(hazard_array) < 2:
return 0.0
# Compute midpoints and rate of occurrences (|d_lambda|)
mean_imls = (hazard_array[:-1, 0] + hazard_array[1:, 0]) / 2
# abs ensures positive probability mass regardless of sort order
rate_occ = np.abs(np.diff(hazard_array[:, 1])) / return_period
# Define fragility curve with dynamic upper boundary
# We assume Probability=0 at IM=0 and Probability=1 at high IM
upper_im_bound = max(20.0, fragility_array[:, 0].max() * 1.5)
curve_imls = np.hstack(
([0.0], fragility_array[:, 0], [upper_im_bound]))
curve_ordinates = np.hstack(([0.0], fragility_array[:, 1], [1.0]))
# Interpolate and Integrate
interpolated_values = np.interp(mean_imls, curve_imls, curve_ordinates)
# Result: Sum of (Probability of Damage * Frequency of Occurrence)
return np.dot(interpolated_values, rate_occ)
[docs]
def calculate_average_annual_loss(self,
vulnerability_array,
hazard_array,
return_period=1,
max_return_period=5000):
"""
Calculate the Average Annual Loss Ratio (AALR) based on
vulnerability and hazard curves.
This function estimates the average loss ratio occurring over a
given return period (typically annual where return_period = 1),
using the vulnerability curve (which relates intensity measure
levels to an expected loss ratio) and the hazard curve (which
relates intensity measure levels to annual rates of exceedance).
The calculation integrates the product of the vulnerability
function and the hazard curve over the specified range of
intensity measure levels, accounting for the return period and
a maximum return period threshold.
Parameters
----------
vulnerability_array : 2D array
A 2D array where the first column contains intensity measure
levels, and the second column contains the corresponding
expected loss ratios for each intensity level.
hazard_array : 2D array
A 2D array where the first column contains intensity measure
levels, and the second column contains the annual rates of
exceedance (i.e., the probability of exceedance per year)
for each intensity level.
return_period : float, optional, default=1
The return period used to scale the hazard rate. This is the
time span (in years) over which the average annual damage
probability is calculated. A typical value is 1 year, but
longer periods can be used for multi-year assessments.
max_return_period : float, optional, default=5000
The maximum return period threshold used to filter out very
low hazard rates. The hazard curve is truncated to include
only intensity levels with exceedance rates above this
threshold.
Returns
-------
average_annual_loss_ratio : float
The average annual loss ratio, calculated by integrating
the product of the vulnerability function and the hazard
curve over the given intensity measure levels.
"""
# Ensure hazard data is sorted by Intensity Measure (IM)
hazard_array = hazard_array[hazard_array[:, 0].argsort()]
vulnerability_array = vulnerability_array[
vulnerability_array[:, 0].argsort()]
# Filter hazard based on max return period (min frequency threshold)
min_rate_threshold = return_period / max_return_period
hazard_filtered = hazard_array[hazard_array[:, 1]
>= min_rate_threshold]
if len(hazard_filtered) < 2:
return 0.0
# Calculate midpoints (mean IMs) and the change in rates (d_lambda)
# abs difference gives the rate of occurrence for each interval
mean_imls = (hazard_filtered[:-1, 0] + hazard_filtered[1:, 0]) / 2
rate_occ = np.abs(np.diff(hazard_filtered[:, 1])) / return_period
# Prepare vulnerability curve for interpolation
# Anchoring the curve at IM=0 (Loss=0) and a high IM cap (Loss=1.0)
v_im = vulnerability_array[:, 0]
v_loss = vulnerability_array[:, 1]
curve_imls = np.concatenate(
([0.0], v_im, [max(20.0, v_im.max() * 1.5)]))
curve_ordinates = np.concatenate(([0.0], v_loss, [1.0]))
# Interpolate vulnerability at the hazard midpoints
interpolated_losses = np.interp(mean_imls, curve_imls, curve_ordinates)
# Final Integration (Dot product of Losses and Probabilities)
average_annual_loss = np.dot(interpolated_losses, rate_occ)
return average_annual_loss