"""
Computation of the fairness (i.e. measurement of the similarity in prediction distribution between
different population groups according to their sensitive attributes).
"""
# Authors: Agathe F, Suzie G, Francois H, Philipp R, Arthur C
# License: BSD 3 clause
import numpy as np
import warnings
from scipy.interpolate import interp1d
import numpy as np
from typing import Union
import pandas as pd
# WARNING:You cannot calculate the EQF function of a single value : this means that if
# only one individual has a specific sensitive value, you cannot use the transform function.
class EQF:
"""
Empirical Quantile Function (EQF) Class.
This class computes the linear interpolation of the empirical quantile function for a given set
of sample data.
Parameters
----------
sample_data : array-like
A 1-D array or list-like object containing the sample data.
Attributes
----------
interpolater : scipy.interpolate.interp1d
An interpolation function that maps quantiles to values.
min_val : float
The minimum value in the sample data.
max_val : float
The maximum value in the sample data.
Methods
-------
__init__(sample_data)
Initializes the EQF object by calculating the interpolater, min_val, and max_val.
_calculate_eqf(sample_data)
Private method to calculate interpolater, min_val, and max_val.
__call__(value_)
Callable method to compute the interpolated value for a given quantile.
Raises
------
ValueError
If the input value_ is outside the range [0, 1].
Example
-------
>>> sample_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> eqf = EQF(sample_data)
>>> print(eqf([0.2, 0.5, 0.8])) # Interpolated value at quantiles 0.2, 0.5, and 0.8
[2.8 5.5 8.2]
Note
----
- The EQF interpolates values within the range [0, 1] representing quantiles.
- The input sample_data should be a list or array-like containing numerical values.
"""
def __init__(self, sample_data: Union[np.ndarray, list[float]]):
self._calculate_eqf(sample_data)
if len(sample_data) == 1:
warnings.warn('One of your sample data contains a single value')
def _calculate_eqf(self, sample_data: Union[np.ndarray, list[float]]) -> None:
"""
Calculate the Empirical Quantile Function for the given sample data.
Parameters
----------
sample_data : array-like
A 1-D array or list-like object containing the sample data.
Returns
-------
EQF
An instance of the Empirical Quantile Function (EQF) class.
Notes
-----
The EQF interpolates values within the range [0, 1] representing quantiles.
The input sample_data should be a list or array-like containing numerical values.
"""
sorted_data = np.sort(sample_data)
linspace = np.linspace(0, 1, num=len(sample_data))
if len(sample_data) == 1:
linspace = np.linspace(0, 1, num=2)
self.interpolater = interp1d(
linspace, [sorted_data[0]]*len(linspace))
else:
self.interpolater = interp1d(linspace, sorted_data)
self.min_val = sorted_data[0]
self.max_val = sorted_data[-1]
def __call__(self, value_: float) -> float:
"""
Compute the interpolated value for a given quantile.
Parameters
----------
value_ : float
Array of quantile values between 0 and 1.
Returns
-------
float
Interpolated value corresponding to the input quantile.
Raises
------
ValueError
If the input value_ is outside the range [0, 1].
"""
try:
return self.interpolater(value_)
except ValueError:
if (not isinstance(value_, np.ndarray)) and (not isinstance(value_, float)) and (not isinstance(value_, int)):
raise ValueError(
'value_ can only be an array, a float or an integer number')
elif (isinstance(value_, np.ndarray)) and (not (np.issubdtype(value_, np.floating) or np.issubdtype(value_, np.integer))):
raise ValueError(
'value_ should contain only float or integer numbers')
elif np.any(value_ < 0) or np.any(value_ > 1):
raise ValueError(
'value_ should contain only numbers between 0 and 1')
else:
raise ValueError('Error with input value')
def diff_quantile(data1: np.ndarray,
data2: np.ndarray,
approximate=True) -> float:
"""
Compute the unfairness between two populations based on their quantile functions. If the option
approximate is set to False, compute the compute the Wasserstein distance using the POT package
which requires an install. Else, determine unfairness as the maximum difference in quantiles
between the two populations.
Parameters
----------
data1 : np.ndarray
The first set of data points.
data2 : np.ndarray
The second set of data points.
approximate : bool
if False, compute distance using optimal transport map, else use quantile approximation
Returns
-------
float
The unfairness value between the two populations.
Example
-------
>>> data1 = np.array([5, 2, 4, 6, 1])
>>> data2 = np.array([9, 6, 4, 7, 6])
>>> diff = compute_unfairness(data1, data2, n_min=5)
>>> print(diff)
3.9797979797979797
"""
if approximate:
probs = np.linspace(0.01, 0.99, num=100)
eqf1 = np.quantile(data1, probs)
eqf2 = np.quantile(data2, probs)
unfair_value = np.max(np.abs(eqf1-eqf2))
else:
try:
import ot
except ModuleNotFoundError:
print('POT not installed, install before using'\
'approximate=False option')
except Exception as e:
print(f"Unexpected {e=}, {type(e)=}")
raise
n1 = len(data1) # data1 corresponds to y
n2 = len(data2)
# weights of each point of the two distributions
a, b = np.ones((n1,)) / n1, np.ones((n2,)) / n2
M = ot.dist(data1.reshape((n1, 1)), data2.reshape((n2, 1)),
metric='euclidean') # euclidian distance matrix
M = M/M.max()
unfair_value = ot.emd2(a, b, M)
return unfair_value
[docs]def unfairness(y: np.ndarray, sensitive_features: pd.DataFrame, n_min: float = 1000) -> float:
"""
Compute the unfairness value for a given fair output (y) and multiple sensitive attributes data
(sensitive_features) containing several modalities. If there is a single sensitive feature,
it calculates the maximum quantile difference between different modalities of that single
sensitive feature. If there are multiple sensitive features, it calculates the maximum quantile
difference for each sensitive feature and then takes the sum of these maximums.
Parameters
----------
y : np.ndarray
Predicted (fair or not) output data.
sensitive_features : pd.DataFrame
Sensitive attribute data.
n_min : float
Below this threshold, compute the unfairness based on the Wasserstein distance.
Returns
-------
float
Unfairness value in the dataset.
Example
-------
>>> y = np.array([5, 0, 6, 7, 9])
>>> sensitive_features = pd.DataFrame({'color': ['red', 'blue', 'green', 'blue'],
'nb_child': [1, 2, 0, 2]})
>>> unf = unfairness(y, sensitive_features, n_min=5)
>>> print(unf)
6.0
"""
new_list = []
for col in sensitive_features.columns:
sensitive_feature = sensitive_features[col]
modalities = list(sensitive_feature.unique())
lst_unfairness = []
for modality in modalities:
y_modality = y[sensitive_feature == modality]
lst_unfairness.append(diff_quantile(y, y_modality, n_min))
new_list.append(max(lst_unfairness))
unfs = np.sum(new_list)
return unfs
def unfairness_dict(y_fair_dict: dict[str, np.ndarray],
sensitive_features: pd.DataFrame,
n_min: float = 1000) -> dict[str, float]:
"""
Compute unfairness values for sequentially fair output datasets and multiple sensitive
attributes datasets.
Parameters
----------
y_fair_dict : dict
A dictionary where keys represent sensitive features and values are arrays
containing the fair predictions corresponding to each sensitive feature.
Each sensitive feature's fairness adjustment is performed sequentially,
ensuring that each feature is treated fairly relative to the previous ones.
sensitive_features : pd.DataFrame
Sensitive attribute data.
n_min : float
Below this threshold, compute the unfairness based on the Wasserstein distance.
Returns
-------
dict
A dictionary containing unfairness values for each level of fairness.
The level of fairness corresponds to the number of sensitive attributes to which
fairness has been applied.
Example
-------
>>> y_fair_dict = {'Base model':np.array([19,39,65]), 'color':np.array([22,40,50]),
'nb_child':np.array([28,39,42])}
>>> sensitive_features = pd.DataFrame({'color': ['red', 'blue', 'green', 'blue'],
'nb_child': [1, 2, 0, 2]})
>>> unfs_dict = unfairness_dict(y_fair_dict, sensitive_features, n_min=5)
>>> print(unfs_dict)
{'Base model': 46.0, 'color': 28.0, 'nb_child': 14.0}
"""
unfairness_dict = {}
for key, y_fair in y_fair_dict.items():
result = unfairness(y_fair, sensitive_features, n_min)
unfairness_dict[key] = result
return unfairness_dict