Skip to content

Reference

TDLM.tdlm

TDLM: Systematic comparison of trip distribution laws and models in Python

Author: Rémi Perrier (2025)

A Python port of the TDLM R package, with numpy-based implementations and parallel processing support for multiple exponent values.

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License version 3.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

Functions

extract_opportunities(mass_destination, distance, processes=None, verbose=True)

Compute the opportunities matrix Si,jS_{i,j} Number of opportunities located in a circle of radius di,jd_{i,j} centered in ii (excluding the source and the destination).

Parameters:

Name Type Description Default
mass_destination ndarray

Number of inhabitants at destination mjm_j

required
distance ndarray

Distance matrix di,jd_{i,j} (n x n)

required
processes int

Number of processes for parallel computation. Default: CPU count - 2

None
verbose bool

Controls the verbosity

True

Returns:

Type Description
ndarray

Opportunities matrix Si,jS_{i,j} (n x n)

Source code in TDLM/tdlm.py
def extract_opportunities(
    mass_destination: np.ndarray,
    distance: np.ndarray,
    processes: Optional[int] = None,
    verbose: bool = True
) -> np.ndarray:
    r"""
    Compute the opportunities matrix $`S_{i,j}`$ Number of opportunities located in a circle 
    of radius $`d_{i,j}`$ centered in $`i`$ (excluding the source and the destination).

    Parameters
    ----------
    mass_destination : np.ndarray
        Number of inhabitants at destination $`m_j`$
    distance : np.ndarray
        Distance matrix $`d_{i,j}`$ (n x n)
    processes : int, optional
        Number of processes for parallel computation. Default: CPU count - 2
    verbose : bool, default True
        Controls the verbosity

    Returns
    -------
    np.ndarray
        Opportunities matrix $`S_{i,j}`$ (n x n)
    """
    n = len(mass_destination)

    # Validate inputs
    if distance.shape != (n, n):
        raise _TDLMError(f"distance matrix must be {n}x{n}")

    _vprint(f"Computing opportunities matrix for {n} regions...", verbose)

    # Setup multiprocessing
    num_processes = processes if processes is not None else max(1, mp.cpu_count() - 2)
    _vprint(f'Using {num_processes} parallel processes', verbose)

    # Prepare arguments for parallel processing
    args_list = [(i, distance[i,:], mass_destination, n) for i in range(n)]

    # Use multiprocessing to compute S matrix rows in parallel
    with mp.Pool(processes=num_processes) as pool:
        results = list(tqdm(pool.imap(_process_opportunities_row, args_list), 
                           total=n, desc="Computing opportunities", disable=not verbose))

    # Collect results into S matrix
    S = np.zeros((n, n))
    for i, row_S in results:
        S[i, :] = row_S

    _vprint("Done\n", verbose)
    return S

run_optimization(mass_origin, mass_destination, distance, obs, opportunity=None, law='all', model='all', out_trips=None, in_trips=None, average=False, repli=1, measure='CPC', processes=None, random_seed=None, verbose=True)

Run trip distribution law and model simulations, compute the goodness-of-fit measures, and determine the optimal exponent with respect to the specified measure.

Parameters:

Name Type Description Default
mass_origin ndarray

Number of inhabitants at origin mim_i

required
mass_destination ndarray

Number of inhabitants at destination mjm_j

required
distance ndarray

Distance matrix di,jd_{i,j} (n x n)

required
obs ndarray

Observed trip matrix Ti,jT_{i,j} (n x n)

required
opportunity ndarray

Matrix of opportunities Si,jS_{i,j} (n x n). Required for "Rad", "RadExt", "Schneider". If not provided and required, will be computed automatically.

None
law str or List[str]

Trip distribution law. "all" or subset of: ["GravExp", "NGravExp", "GravPow", "NGravPow", "Schneider", "Rad", "RadExt", "Rand"]

"all"
model str or List[str]

Distribution model. "all" or subset of: ["UM", "PCM", "ACM", "DCM"]

"all"
out_trips ndarray

Number of out-commuters OiO_i. Required for constrained models.

None
in_trips ndarray

Number of in-commuters DjD_j. Required for ACM and DCM models.

None
average bool

Whether the average mobility flow matrix should be generated instead of the repli matrices based on random draws.

False
repli int

Number of replications. Note that repli = 1 if average = True.

1
measure str

Good-of-fit metric with respect to which to determine the optimal exponent

'CPC'
processes int

Number of processes for parallel computation. Default: CPU count - 2

None
random_seed int

Random seed for reproducibility

None
verbose bool

Controls the verbosity

True

Returns:

Type Description
DataFrame

For each combination of law and model, resulting optimal exponent and corresponding goodness-of-fit measures.

Source code in TDLM/tdlm.py
def run_optimization(
    mass_origin: np.ndarray,
    mass_destination: np.ndarray, 
    distance: np.ndarray,
    obs: np.ndarray,
    opportunity: Optional[np.ndarray] = None,
    law: Union[str, List[str]] = "all",
    model: Union[str, List[str]] = "all",
    out_trips: Optional[np.ndarray] = None,
    in_trips: Optional[np.ndarray] = None,
    average: bool = False,
    repli: int = 1,
    measure: str = "CPC",
    processes: Optional[int] = None,
    random_seed: Optional[int] = None,
    verbose: bool = True
) -> Union[np.ndarray, Dict[float, np.ndarray]]:
    r"""
    Run trip distribution law and model simulations, compute the goodness-of-fit measures, and determine the optimal exponent with respect to the specified measure.

    Parameters
    ----------
    mass_origin : np.ndarray
        Number of inhabitants at origin $`m_i`$
    mass_destination : np.ndarray  
        Number of inhabitants at destination $`m_j`$
    distance : np.ndarray
        Distance matrix $`d_{i,j}`$ (n x n)
    obs : np.ndarray
        Observed trip matrix $`T_{i,j}`$ (n x n)
    opportunity : np.ndarray, optional
        Matrix of opportunities $`S_{i,j}`$ (n x n). Required for "Rad", "RadExt", "Schneider".
        If not provided and required, will be computed automatically.
    law : str or List[str], default "all"
        Trip distribution law. "all" or subset of: 
        ["GravExp", "NGravExp", "GravPow", "NGravPow", "Schneider", "Rad", "RadExt", "Rand"]
    model : str or List[str], default "all"
        Distribution model. "all" or subset of:
        ["UM", "PCM", "ACM", "DCM"]
    out_trips : np.ndarray, optional
        Number of out-commuters $`O_i`$. Required for constrained models.
    in_trips : np.ndarray, optional
        Number of in-commuters $`D_j`$. Required for ACM and DCM models.
    average : bool, default False
        Whether the average mobility flow matrix should be generated instead of the `repli` matrices based on random draws.
    repli : int, default 1
        Number of replications. Note that `repli = 1` if `average = True`.
    measure: str, default "CPC"
        Good-of-fit metric with respect to which to determine the optimal exponent
    processes : int, optional
        Number of processes for parallel computation. Default: CPU count - 2
    random_seed : int, optional
        Random seed for reproducibility
    verbose : bool, default True
        Controls the verbosity

    Returns
    -------
    pd.DataFrame
        For each combination of law and model, resulting optimal exponent and corresponding goodness-of-fit measures.
    """
    if average:
        repli = 1

    n = len(mass_origin)

    #Check laws
    all_laws = ["GravExp", "NGravExp", "GravPow", "NGravPow", "Schneider", "Rad", "RadExt", "Rand"]
    laws_requiring_opportunities = ["Rad", "RadExt", "Schneider"]

    if law == "all":
        selected_laws = all_laws
    else:
        selected_laws = law if isinstance(law, list) else [law]
        invalid = set(selected_laws) - set(all_laws)
        if invalid:
            raise _TDLMError(f"Invalid laws: {invalid}. Available: {['all']+all_laws}")

    require_opportunity = set(selected_laws).intersection(set(laws_requiring_opportunities))
    if require_opportunity:
        if opportunity is None:
            _vprint(f"Laws {require_opportunity} require opportunities matrix. Computing automatically...", verbose)
            opportunity = extract_opportunities(mass_destination, distance, processes, verbose)
        if opportunity.shape != (n, n):
            raise _TDLMError(f"opportunities matrix must be {n}x{n}")

    #Check models
    all_models = ["UM", "PCM", "ACM", "DCM"]
    if model == "all":
        selected_models = all_models
    else:
        selected_models = model if isinstance(model, list) else [model]
        invalid = set(selected_models) - set(all_models)
        if invalid:
            raise _TDLMError(f"Invalid models: {invalid}. Available: {['all']+all_models}")

    # Check array dimensions
    if len(mass_destination) != n:
        raise _TDLMError("mass_origin and mass_destination must have same length")

    if distance.shape != (n, n):
        raise _TDLMError(f"distance matrix must be {n}x{n}")

    # Check trip constraints for models
    if set(selected_models).intersection(set(["PCM", "DCM"])) and out_trips is None:
        raise _TDLMError(f"out_trips required for model '{model}'")

    if set(selected_models).intersection(set(["ACM", "DCM"])) and in_trips is None:
        raise _TDLMError(f"in_trips required for model '{model}'")

    if out_trips is not None and len(out_trips) != n:
        raise _TDLMError("out_trips must have same length as mass arrays")

    if in_trips is not None and len(in_trips) != n:
        raise _TDLMError("in_trips must have same length as mass arrays")

    #Check measure
    all_measures = ["CPC", "CPL", "CPCd", "KS_stat", "KS_pval", "KL_div", "RMSE"]
    if measure not in all_measures:
        raise _TDLMError(f"Invalid measure: {measure}. Available: {all_measures}")
    # Set random seed if provided
    if random_seed is not None:
        np.random.seed(random_seed)

    results = []
    for i, l in enumerate(selected_laws):
        for j, m in enumerate(selected_models):
            result_dict= {'Law':l, 'Model':m}
            _vprint(f'Calculating average {measure} for {l} with {m} over {repli} realizations', verbose)
            params = [l, m, measure, mass_origin, mass_destination, distance, opportunity, out_trips, in_trips, obs, average, repli, processes, verbose]
            res = minimize_scalar(_single_metric_average, args=(params))
            if res.success:
                result_dict['Exponent'] = res.x
            else:
                result_dict['Exponent'] = np.nan
            results.append(result_dict)

    return pd.DataFrame(results)

run_law_model_gof(law, mass_origin, mass_destination, distance, obs, opportunity=None, exponent=1.0, model='UM', out_trips=None, in_trips=None, average=False, repli=1, measures='all', processes=None, random_seed=None, verbose=True)

Run trip distribution law and model simulations, and compute the goodness-of-fit measures, all in one go.

Parameters:

Name Type Description Default
law str

Trip distribution law. One of: "GravExp", "NGravExp", "GravPow", "NGravPow", "Schneider", "Rad", "RadExt", "Rand"

required
mass_origin ndarray

Number of inhabitants at origin mim_i

required
mass_destination ndarray

Number of inhabitants at destination mjm_j

required
distance ndarray

Distance matrix di,jd_{i,j} (n x n)

required
obs ndarray

Observed trip matrix Ti,jT_{i,j} (n x n)

required
opportunity ndarray

Matrix of opportunities Si,jS_{i,j} (n x n). Required for "Rad", "RadExt", "Schneider". If not provided and required, will be computed automatically.

None
exponent float or ndarray

Exponent parameter(s) for the distribution law

1.0
model str

Distribution model. One of: "UM", "PCM", "ACM", "DCM"

"UM"
out_trips ndarray

Number of out-commuters OiO_i. Required for constrained models.

None
in_trips ndarray

Number of in-commuters DjD_j. Required for ACM and DCM models.

None
average bool

Whether the average mobility flow matrix should be generated instead of the repli matrices based on random draws.

False
repli int

Number of replications. Note that repli = 1 if average = True.

1
measures str or List[str]

Measures to calculate. "all" or subset of: ["CPC", "CPL", "CPCd", "KS_stat", "KS_pval", "KL_div", "RMSE"]

"all"
processes int

Number of processes for parallel computation. Default: CPU count - 2

None
random_seed int

Random seed for reproducibility

None
verbose bool

Controls the verbosity

True

Returns:

Type Description
Union[DataFrame, Dict[float, DataFrame]]

If single exponent: DataFrame with measures. If multiple exponents: Dict with exponents as keys, DataFrames as values.

Source code in TDLM/tdlm.py
def run_law_model_gof(
    law: str,
    mass_origin: np.ndarray,
    mass_destination: np.ndarray, 
    distance: np.ndarray,
    obs: np.ndarray,
    opportunity: Optional[np.ndarray] = None,
    exponent: Union[float, np.ndarray] = 1.0,
    model: str = "UM",
    out_trips: Optional[np.ndarray] = None,
    in_trips: Optional[np.ndarray] = None,
    average: bool = False,
    repli: int = 1,
    measures: Union[str, List[str]] = "all",
    processes: Optional[int] = None,
    random_seed: Optional[int] = None,
    verbose: bool = True
) -> Union[pd.DataFrame, Dict[float, pd.DataFrame]]:
    r"""
    Run trip distribution law and model simulations, and compute the goodness-of-fit measures, all in one go.

    Parameters
    ----------
    law : str
        Trip distribution law. One of: "GravExp", "NGravExp", "GravPow", 
        "NGravPow", "Schneider", "Rad", "RadExt", "Rand"
    mass_origin : np.ndarray
        Number of inhabitants at origin $`m_i`$
    mass_destination : np.ndarray  
        Number of inhabitants at destination $`m_j`$
    distance : np.ndarray
        Distance matrix $`d_{i,j}`$ (n x n)
    obs : np.ndarray
        Observed trip matrix $`T_{i,j}`$ (n x n)
    opportunity : np.ndarray, optional
        Matrix of opportunities $`S_{i,j}`$ (n x n). Required for "Rad", "RadExt", "Schneider".
        If not provided and required, will be computed automatically.
    exponent : float or np.ndarray
        Exponent parameter(s) for the distribution law
    model : str, default "UM"
        Distribution model. One of: "UM", "PCM", "ACM", "DCM"
    out_trips : np.ndarray, optional
        Number of out-commuters $`O_i`$. Required for constrained models.
    in_trips : np.ndarray, optional
        Number of in-commuters $`D_j`$. Required for ACM and DCM models.
    average : bool, default False
        Whether the average mobility flow matrix should be generated instead of the `repli` matrices based on random draws.
    repli : int, default 1
        Number of replications. Note that `repli = 1` if `average = True`.
    measures : str or List[str], default "all"
        Measures to calculate. "all" or subset of:
        ["CPC", "CPL", "CPCd", "KS_stat", "KS_pval", "KL_div", "RMSE"]
    processes : int, optional
        Number of processes for parallel computation. Default: CPU count - 2
    random_seed : int, optional
        Random seed for reproducibility
    verbose : bool, default True
        Controls the verbosity

    Returns
    -------
    Union[pd.DataFrame, Dict[float, pd.DataFrame]]
        If single exponent: DataFrame with measures.
        If multiple exponents: Dict with exponents as keys, DataFrames as values.
    """
    if average:
        repli = 1
    pij = None

    # Check if opportunities matrix is needed and compute if not provided
    laws_requiring_opportunities = ["Rad", "RadExt", "Schneider"]
    if law in laws_requiring_opportunities and opportunity is None:
        _vprint(f"Law '{law}' requires opportunities matrix. Computing automatically...", verbose)
        opportunity = extract_opportunities(mass_destination, distance, processes, verbose)

    # Input validation
    _validate_inputs(law, model, mass_origin, mass_destination, distance, 
                    opportunity, out_trips, in_trips)
    # Available measures
    all_measures = ["CPC", "CPL", "CPCd", "KS_stat", "KS_pval", "KL_div", "RMSE"]

    if measures == "all":
        selected_measures = all_measures
    else:
        selected_measures = measures if isinstance(measures, list) else [measures]
        invalid = set(selected_measures) - set(all_measures)
        if invalid:
            raise _TDLMError(f"Invalid measures: {invalid}. Available: {['all']+all_measures}")

    # Set random seed if provided
    if random_seed is not None:
        np.random.seed(random_seed)

    # Handle single vs multiple exponents
    exponents = np.atleast_1d(exponent)
    single_exponent = len(exponents) == 1

    # Setup multiprocessing
    num_processes = processes if processes is not None else max(1, mp.cpu_count() - 2)

    if len(exponents) > 1 and num_processes > 1:
        num_processes_needed = min(len(exponent), num_processes)
        # Parallel processing for multiple exponents
        _vprint(f'Running simulations and GOF for {law} with {model} model ({repli} replications)', verbose)
        _vprint(f'Using {num_processes_needed} parallel processes', verbose)

        shm_list = [] # To track shared memory blocks for cleanup
        try:
            # Create shared memory for all large NumPy arrays
            shm_info = {
                'mass_origin': _numpy_to_shm(mass_origin, shm_list),
                'mass_destination': _numpy_to_shm(mass_destination, shm_list),
                'distance': _numpy_to_shm(distance, shm_list),
                'opportunity': _numpy_to_shm(opportunity, shm_list),
                'out_trips': _numpy_to_shm(out_trips, shm_list),
                'in_trips': _numpy_to_shm(in_trips, shm_list),
                'obs': _numpy_to_shm(obs, shm_list)
            }

            with mp.Pool(processes=num_processes_needed) as pool:
                params = [(shm_info, pij, law, model, beta, repli, average, selected_measures) for beta in exponents]
                results = list(tqdm(pool.imap(_process_exponent_gof_shared, params),
                                    total=len(exponents), desc='Computing exponents', disable=not verbose))
        finally:
            # CRITICAL: Clean up! Unlink all shared memory blocks
            # This must be in a 'finally' block to run even if the pool fails
            _vprint("Cleaning up shared memory blocks...", verbose)
            for shm in shm_list:
                shm.close()
                shm.unlink() # Destroys the shared memory block

        # Organize results
        output = {}
        for i, exponent in enumerate(exponents):
            output[exponent] = results[i]
        _vprint('Done\n', verbose)
        return output
    else:
        # Sequential processing
        data = (mass_origin, mass_destination, distance, opportunity, out_trips, in_trips, obs)
        if single_exponent:
            beta = exponents[0]
            _vprint(f'Simulating matrix and GOF for {law} β = {beta:.2g} with {model}', verbose)
            params = (data, pij, law, model, beta, repli, average, selected_measures)
            result = _process_exponent_gof(params)
            return result
        else:
            results = {}
            _vprint(f'Running simulations and GOF for {law} with {model} model ({repli} replications)', verbose)
            for i, beta in enumerate(tqdm(exponents, desc='Computing exponents', disable=not verbose)):
                params = (data, pij, law, model, beta, repli, average, selected_measures)
                results[beta] = _process_exponent_gof(params)
            return results

run_law_model(law, mass_origin, mass_destination, distance, opportunity=None, exponent=1.0, return_proba=False, model='UM', out_trips=None, in_trips=None, average=False, repli=1, processes=None, random_seed=None, verbose=True)

Run trip distribution law and model simulations.

Parameters:

Name Type Description Default
law str

Trip distribution law. One of: "GravExp", "NGravExp", "GravPow", "NGravPow", "Schneider", "Rad", "RadExt", "Rand"

required
mass_origin ndarray

Number of inhabitants at origin mim_i

required
mass_destination ndarray

Number of inhabitants at destination mjm_j

required
distance ndarray

Distance matrix di,jd_{i,j} (n x n)

required
opportunity ndarray

Matrix of opportunities Si,jS_{i,j} (n x n). Required for "Rad", "RadExt", "Schneider". If not provided and required, will be computed automatically.

None
exponent float or ndarray

Exponent parameter(s) for the distribution law

1.0
return_proba bool

Whether to return probability matrices

False
model str

Distribution model. One of: "UM", "PCM", "ACM", "DCM"

"UM"
out_trips ndarray

Number of out-commuters OiO_i. Required for constrained models.

None
in_trips ndarray

Number of in-commuters DjD_j. Required for ACM and DCM models.

None
average bool

Whether the average mobility flow matrix should be generated instead of the repli matrices based on random draws.

False
repli int

Number of replications. Note that repli = 1 if average = True.

1
processes int

Number of processes for parallel computation. Default: CPU count - 2

None
random_seed int

Random seed for reproducibility

None
verbose bool

Controls the verbosity

True

Returns:

Type Description
Union[ndarray, Dict[float, ndarray]]

Simulated trip matrix or matrices T~i,j\tilde{T}_{i,j}. If single exponent: np.ndarray of shape (repli, n, n). If multiple exponents: Dict with exponents as keys, arrays as values.

Source code in TDLM/tdlm.py
def run_law_model(
    law: str,
    mass_origin: np.ndarray,
    mass_destination: np.ndarray, 
    distance: np.ndarray,
    opportunity: Optional[np.ndarray] = None,
    exponent: Union[float, np.ndarray] = 1.0,
    return_proba: bool = False,
    model: str = "UM",
    out_trips: Optional[np.ndarray] = None,
    in_trips: Optional[np.ndarray] = None,
    average: bool = False,
    repli: int = 1,
    processes: Optional[int] = None,
    random_seed: Optional[int] = None,
    verbose: bool = True    
) -> Union[np.ndarray, Dict[float, np.ndarray]]:
    r"""
    Run trip distribution law and model simulations.

    Parameters
    ----------
    law : str
        Trip distribution law. One of: "GravExp", "NGravExp", "GravPow", 
        "NGravPow", "Schneider", "Rad", "RadExt", "Rand"
    mass_origin : np.ndarray
        Number of inhabitants at origin $`m_i`$
    mass_destination : np.ndarray  
        Number of inhabitants at destination $`m_j`$
    distance : np.ndarray
        Distance matrix $`d_{i,j}`$ (n x n)
    opportunity : np.ndarray, optional
        Matrix of opportunities $`S_{i,j}`$ (n x n). Required for "Rad", "RadExt", "Schneider".
        If not provided and required, will be computed automatically.
    exponent : float or np.ndarray
        Exponent parameter(s) for the distribution law
    return_proba : bool, default False
        Whether to return probability matrices
    model : str, default "UM"
        Distribution model. One of: "UM", "PCM", "ACM", "DCM"
    out_trips : np.ndarray, optional
        Number of out-commuters $`O_i`$. Required for constrained models.
    in_trips : np.ndarray, optional
        Number of in-commuters $`D_j`$. Required for ACM and DCM models.
    average : bool, default False
        Whether the average mobility flow matrix should be generated instead of the `repli` matrices based on random draws.
    repli : int, default 1
        Number of replications. Note that `repli = 1` if `average = True`.
    processes : int, optional
        Number of processes for parallel computation. Default: CPU count - 2
    random_seed : int, optional
        Random seed for reproducibility
    verbose : bool, default True
        Controls the verbosity

    Returns
    -------
    Union[np.ndarray, Dict[float, np.ndarray]]
        Simulated trip matrix or matrices $`\tilde{T}_{i,j}`$.
        If single exponent: np.ndarray of shape (repli, n, n).
        If multiple exponents: Dict with exponents as keys, arrays as values.
    """
    if average:
        repli = 1
    pij = None

    # Check if opportunities matrix is needed and compute if not provided
    laws_requiring_opportunities = ["Rad", "RadExt", "Schneider"]
    if law in laws_requiring_opportunities and opportunity is None:
        _vprint(f"Law '{law}' requires opportunities matrix. Computing automatically...", verbose)
        opportunity = extract_opportunities(mass_destination, distance, processes, verbose)

    # Input validation
    _validate_inputs(law, model, mass_origin, mass_destination, distance, 
                    opportunity, out_trips, in_trips)

    # Set random seed if provided
    if random_seed is not None:
        np.random.seed(random_seed)

    # Handle single vs multiple exponents
    exponents = np.atleast_1d(exponent)
    single_exponent = len(exponents) == 1

    # Setup multiprocessing
    num_processes = processes if processes is not None else max(1, mp.cpu_count() - 2)

    if len(exponents) > 1 and num_processes > 1:
        num_processes_needed = min(len(exponent), num_processes)
        _vprint(f'Running simulations for {law} with {model} model ({repli} replications)', verbose)
        _vprint(f'Using {num_processes_needed} parallel processes', verbose)

        shm_list = [] # To track shared memory blocks for cleanup
        try:
            # Create shared memory for all large NumPy arrays
            shm_info = {
                'mass_origin': _numpy_to_shm(mass_origin, shm_list),
                'mass_destination': _numpy_to_shm(mass_destination, shm_list),
                'distance': _numpy_to_shm(distance, shm_list),
                'opportunity': _numpy_to_shm(opportunity, shm_list),
                'out_trips': _numpy_to_shm(out_trips, shm_list),
                'in_trips': _numpy_to_shm(in_trips, shm_list),
            }

            with mp.Pool(processes=num_processes_needed) as pool:
                # Prepare parameters for the shared worker function
                # Pass the dict of shared memory info instead of the raw data
                params = [(shm_info, pij, law, model, beta, repli, return_proba, average) for beta in exponents]

                results = list(tqdm(pool.imap(_process_exponent_shared, params),
                                      total=len(exponents), desc='Computing exponents', disable=not verbose))
        finally:
            # CRITICAL: Clean up! Unlink all shared memory blocks
            # This must be in a 'finally' block to run even if the pool fails
            _vprint("Cleaning up shared memory blocks...", verbose)
            for shm in shm_list:
                shm.close()
                shm.unlink() # Destroys the shared memory block

        # Organize results
        output = {}
        for i, beta in enumerate(exponents):
            if return_proba:
                output[beta] = results[i]
            else:
                output[beta] = results[i]['simulations']

    else:
        # Sequential processing
        n = len(mass_origin)
        data = (n, mass_origin, mass_destination, out_trips, in_trips, distance, opportunity)

        output = {}
        if single_exponent:
            beta = exponents[0]
            _vprint(f'Simulating matrix for {law} β = {beta:.2g} with {model}', verbose)
            params = (data, pij, law, model, beta, repli, return_proba, average)
            result = _process_exponent(params) # Original function call
            if return_proba:
                output[beta] = result
            else:
                output[beta] = result['simulations']
        else:
            _vprint(f'Running simulations for {law} with {model} model ({repli} replications)', verbose)
            for i, beta in enumerate(tqdm(exponents, desc='Computing exponents', disable=not verbose)):
                params = (data, pij, law, model, beta, repli, return_proba, average)
                result = _process_exponent(params) # Original function call
                if return_proba:
                    output[beta] = result
                else:
                    output[beta] = result['simulations']

    _vprint('Done\n', verbose)
    if single_exponent:
        return list(output.values())[0]
    else:
        return output

run_law(law, mass_origin, mass_destination, distance, opportunity=None, exponent=1.0, processes=None, random_seed=None, verbose=True)

Estimate the probability matrix pi,jp_{i,j} according to the trip distribution law.

Parameters:

Name Type Description Default
law str

Trip distribution law. One of: "GravExp", "NGravExp", "GravPow", "NGravPow", "Schneider", "Rad", "RadExt", "Rand"

required
mass_origin ndarray

Number of inhabitants at origin mim_i

required
mass_destination ndarray

Number of inhabitants at destination mjm_j

required
distance ndarray

Distance matrix di,jd_{i,j} (n x n)

required
opportunity ndarray

Matrix of opportunities Si,jS_{i,j} (n x n). Required for "Rad", "RadExt", "Schneider". If not provided and required, will be computed automatically.

None
exponent float or ndarray

Exponent parameter(s) for the distribution law

1.0
processes int

Number of processes for parallel computation. Default: CPU count - 2

None
random_seed int

Random seed for reproducibility

None
verbose bool

Controls the verbosity

True

Returns:

Type Description
Union[ndarray, Dict[float, ndarray]]

Estimated matrix or matrices of probabilities pi,jp_{i,j}. If single exponent: np.ndarray of shape (n, n). If multiple exponents: Dict with exponents as keys, arrays as values.

Source code in TDLM/tdlm.py
def run_law(
    law: str,
    mass_origin: np.ndarray,
    mass_destination: np.ndarray, 
    distance: np.ndarray,
    opportunity: Optional[np.ndarray] = None,
    exponent: Union[float, np.ndarray] = 1.0,
    processes: Optional[int] = None,
    random_seed: Optional[int] = None,
    verbose: bool = True
) -> Union[np.ndarray, Dict[float, np.ndarray]]:
    r"""
    Estimate the probability matrix $`p_{i,j}`$ according to the trip distribution law.

    Parameters
    ----------
    law : str
        Trip distribution law. One of: "GravExp", "NGravExp", "GravPow", 
        "NGravPow", "Schneider", "Rad", "RadExt", "Rand"
    mass_origin : np.ndarray
        Number of inhabitants at origin $`m_i`$
    mass_destination : np.ndarray  
        Number of inhabitants at destination $`m_j`$
    distance : np.ndarray
        Distance matrix $`d_{i,j}`$ (n x n)
    opportunity : np.ndarray, optional
        Matrix of opportunities $`S_{i,j}`$ (n x n). Required for "Rad", "RadExt", "Schneider".
        If not provided and required, will be computed automatically.
    exponent : float or np.ndarray
        Exponent parameter(s) for the distribution law
    processes : int, optional
        Number of processes for parallel computation. Default: CPU count - 2
    random_seed : int, optional
        Random seed for reproducibility
    verbose : bool, default True
        Controls the verbosity

    Returns
    -------
    Union[np.ndarray, Dict[float, np.ndarray]]
        Estimated matrix or matrices of probabilities $`p_{i,j}`$.
        If single exponent: np.ndarray of shape (n, n).
        If multiple exponents: Dict with exponents as keys, arrays as values.
    """

    # Check if opportunities matrix is needed and compute if not provided
    laws_requiring_opportunities = ["Rad", "RadExt", "Schneider"]
    if law in laws_requiring_opportunities and opportunity is None:
        _vprint(f"Law '{law}' requires opportunities matrix. Computing automatically...", verbose)
        opportunity = extract_opportunities(mass_destination, distance, processes, verbose)

    # Input validation
    _validate_inputs(law, "UM", mass_origin, mass_destination, distance, 
                    opportunity, None, None)

    # Set random seed if provided
    if random_seed is not None:
        np.random.seed(random_seed)

    # Handle single vs multiple exponents
    exponents = np.atleast_1d(exponent)
    single_exponent = len(exponents) == 1


    # Setup multiprocessing
    num_processes = processes if processes is not None else max(1, mp.cpu_count() - 2)

    if len(exponents) > 1 and num_processes > 1:
        num_processes_needed = min(len(exponent), num_processes)
        # Parallel processing for multiple exponents
        _vprint(f'Estimating probabilities for {law}', verbose)
        _vprint(f'Using {num_processes_needed} parallel processes', verbose)

        with mp.Pool(processes=num_processes_needed) as pool:
            params = [(law, distance, opportunity, mass_origin, mass_destination, beta) for beta in exponents]
            results = list(tqdm(pool.starmap(_proba, params), 
                              total=len(exponents), desc='Computing exponents', disable=not verbose))

        # Organize results
        output = {}
        for i, beta in enumerate(exponents):
            output[beta] = results[i]

    else:
        # Sequential processing
        output = {}
        if single_exponent:
            beta = exponents[0]
            _vprint(f'Estimating probabilities for {law} with β = {beta:.2g}', verbose)
            return _proba(law, distance, opportunity, mass_origin, mass_destination, beta)

        else:
            _vprint(f'Estimating probabilities for {law}', verbose)

            for i, beta in enumerate(tqdm(exponents, desc='Computing exponents', disable=not verbose)):
                output[beta] = _proba(law, distance, opportunity, mass_origin, mass_destination, beta)
    _vprint('Done\n', verbose)

    return output

run_model(probabilities, mass_origin, mass_destination, distance, model='UM', out_trips=None, in_trips=None, average=False, repli=1, processes=None, random_seed=None, verbose=True)

Run trip distribution model simulations with provided probability matrix or matrices pi,jp_{i,j}.

Parameters:

Name Type Description Default
probabilities Dict[float, ndarray]

Estimated matrix or matrices of probabilities pi,jp_{i,j}. Dict with exponent(s) as key(s), arrays as values.

required
mass_origin ndarray

Number of inhabitants at origin mim_i

required
mass_destination ndarray

Number of inhabitants at destination mjm_j

required
distance ndarray

Distance matrix di,jd_{i,j} (n x n)

required
model str

Distribution model. One of: "UM", "PCM", "ACM", "DCM"

"UM"
out_trips ndarray

Number of out-commuters OiO_i. Required for constrained models.

None
in_trips ndarray

Number of in-commuters DjD_j. Required for ACM and DCM models.

None
average bool

Whether the average mobility flow matrix should be generated instead of the repli matrices based on random draws.

False
repli int

Number of replications. Note that repli = 1 if average = True.

1
processes int

Number of processes for parallel computation. Default: CPU count - 2

None
random_seed int

Random seed for reproducibility

None
verbose bool

Controls the verbosity

True

Returns:

Type Description
Union[ndarray, Dict[float, ndarray]]

Simulated trip matrix or matrices T~i,j\tilde{T}_{i,j}. If single exponent: np.ndarray of shape (repli, n, n). If multiple exponents: Dict with exponents as keys, arrays as values.

Source code in TDLM/tdlm.py
def run_model(
    probabilities: Dict[float, np.ndarray],
    mass_origin: np.ndarray,
    mass_destination: np.ndarray, 
    distance: np.ndarray,
    model: str = "UM",
    out_trips: Optional[np.ndarray] = None,
    in_trips: Optional[np.ndarray] = None,
    average: bool = False,
    repli: int = 1,
    processes: Optional[int] = None,
    random_seed: Optional[int] = None,
    verbose: bool = True
) -> Union[np.ndarray, Dict[float, np.ndarray]]:
    r"""
    Run trip distribution model simulations with provided probability matrix or matrices $`p_{i,j}`$.

    Parameters
    ----------
    probabilities : Dict[float, np.ndarray]
        Estimated matrix or matrices of probabilities $`p_{i,j}`$.
        Dict with exponent(s) as key(s), arrays as values.
    mass_origin : np.ndarray
        Number of inhabitants at origin $`m_i`$
    mass_destination : np.ndarray  
        Number of inhabitants at destination $`m_j`$
    distance : np.ndarray
        Distance matrix $`d_{i,j}`$ (n x n)
    model : str, default "UM"
        Distribution model. One of: "UM", "PCM", "ACM", "DCM"
    out_trips : np.ndarray, optional
        Number of out-commuters $`O_i`$. Required for constrained models.
    in_trips : np.ndarray, optional
        Number of in-commuters $`D_j`$. Required for ACM and DCM models.
    average : bool, default False
        Whether the average mobility flow matrix should be generated instead of the `repli` matrices based on random draws.
    repli : int, default 1
        Number of replications. Note that `repli = 1` if `average = True`.
    processes : int, optional
        Number of processes for parallel computation. Default: CPU count - 2
    random_seed : int, optional
        Random seed for reproducibility
    verbose : bool, default True
        Controls the verbosity

    Returns
    -------
    Union[np.ndarray, Dict[float, np.ndarray]]
        Simulated trip matrix or matrices $`\tilde{T}_{i,j}`$.
        If single exponent: np.ndarray of shape (repli, n, n).
        If multiple exponents: Dict with exponents as keys, arrays as values.
    """
    if average:
        repli = 1
    # Reconstruct exponent or exponents array from provided probabilities
    exponent = np.fromiter(probabilities.keys(), dtype=float)
    exponent.sort()

    # Input validation
    _validate_inputs("Rand", model, mass_origin, mass_destination, distance, 
                    None, out_trips, in_trips)

    # Set random seed if provided
    if random_seed is not None:
        np.random.seed(random_seed)

    # Handle single vs multiple exponents
    exponents = np.atleast_1d(exponent)
    single_exponent = len(exponents) == 1

    opportunity = None
    return_proba = False
    law = None
    # Setup multiprocessing
    num_processes = processes if processes is not None else max(1, mp.cpu_count() - 2)

    if len(exponents) > 1 and num_processes > 1:
        num_processes_needed = min(len(exponent), num_processes)
        # Parallel processing for multiple exponents
        _vprint(f'Running simulations with {model} model ({repli} replications)', verbose)
        _vprint(f'Using {num_processes_needed} parallel processes', verbose)
        shm_list = [] # To track shared memory blocks for cleanup
        try:
            # Create shared memory for all large NumPy arrays
            shm_info = {
                'mass_origin': _numpy_to_shm(mass_origin, shm_list),
                'mass_destination': _numpy_to_shm(mass_destination, shm_list),
                'distance': _numpy_to_shm(distance, shm_list),
                'opportunity': _numpy_to_shm(opportunity, shm_list),
                'out_trips': _numpy_to_shm(out_trips, shm_list),
                'in_trips': _numpy_to_shm(in_trips, shm_list),
            }

            with mp.Pool(processes=num_processes_needed) as pool:
                # Prepare parameters for the shared worker function
                # Pass the dict of shared memory info instead of the raw data
                params = [(shm_info, probabilities[beta], law, model, beta, repli, return_proba, average) for beta in exponents]

                results = list(tqdm(pool.imap(_process_exponent_shared, params),
                                      total=len(exponents), desc='Computing exponents', disable=not verbose))

        finally:
            # CRITICAL: Clean up! Unlink all shared memory blocks
            # This must be in a 'finally' block to run even if the pool fails
            _vprint("Cleaning up shared memory blocks...", verbose)
            for shm in shm_list:
                shm.close()
                shm.unlink() # Destroys the shared memory block
        # Organize results
        output = {}
        for i, beta in enumerate(exponents):
            output[beta] = results[i]['simulations']

    else:
        # Sequential processing
        n = len(mass_origin)
        data = (n, mass_origin, mass_destination, out_trips, in_trips, distance, opportunity)

        output = {}
        if single_exponent:
            beta = exponents[0]
            _vprint(f'Simulating matrix with {model}', verbose)
            params = (data, probabilities[beta], law, model, beta, repli, return_proba, average)
            result = _process_exponent(params)
            output[beta] = result['simulations']
        else:
            _vprint(f'Running simulations with {model} model ({repli} replications)', verbose)
            for i, beta in enumerate(tqdm(exponents, desc='Computing exponents', disable=not verbose)):
                params = (data, probabilities[beta], law, model, beta, repli, return_proba, average)
                result = _process_exponent(params)
                output[beta] = result['simulations']
    _vprint('Done\n', verbose)

    # Return format based on input
    if single_exponent:
        return list(output.values())[0]
    else:
        return output

gof(sim, obs, distance, measures='all', processes=None, verbose=True)

Calculate goodness-of-fit measures for simulated vs observed trip matrices.

Parameters:

Name Type Description Default
sim ndarray or Dict[float, ndarray]

Simulated trip matrix or matrices T~i,j\tilde{T}_{i,j}. If Dict, keys should be exponent values

required
obs ndarray

Observed trip matrix Ti,jT_{i,j} (n x n)

required
distance ndarray

Distance matrix di,jd_{i,j} (n x n)

required
measures str or List[str]

Measures to calculate. "all" or subset of: ["CPC", "CPL", "CPCd", "KS_stat", "KS_pval", "KL_div", "RMSE"]

"all"
processes int

Number of processes for parallel computation. Default: CPU count - 2

None
verbose bool

Controls the verbosity

True

Returns:

Type Description
Union[DataFrame, Dict[float, DataFrame]]

If single exponent: DataFrame with measures. If multiple exponents: Dict with exponents as keys, DataFrames as values.

Source code in TDLM/tdlm.py
def gof(
    sim: Union[np.ndarray, Dict[float, np.ndarray]], 
    obs: np.ndarray,
    distance: np.ndarray,
    measures: Union[str, List[str]] = "all",
    processes: Optional[int] = None,
    verbose: bool = True
) -> Union[pd.DataFrame, Dict[float, pd.DataFrame]]:
    r"""
    Calculate goodness-of-fit measures for simulated vs observed trip matrices.

    Parameters
    ----------
    sim : np.ndarray or Dict[float, np.ndarray]
        Simulated trip matrix or matrices $`\tilde{T}_{i,j}`$. If Dict, keys should be exponent values
    obs : np.ndarray
        Observed trip matrix $`T_{i,j}`$ (n x n)
    distance : np.ndarray
        Distance matrix $`d_{i,j}`$ (n x n)
    measures : str or List[str], default "all"
        Measures to calculate. "all" or subset of:
        ["CPC", "CPL", "CPCd", "KS_stat", "KS_pval", "KL_div", "RMSE"]
    processes : int, optional
        Number of processes for parallel computation. Default: CPU count - 2
    verbose : bool, default True
        Controls the verbosity

    Returns
    -------
    Union[pd.DataFrame, Dict[float, pd.DataFrame]]
        If single exponent: DataFrame with measures.
        If multiple exponents: Dict with exponents as keys, DataFrames as values.
    """

    # Available measures
    all_measures = ["CPC", "CPL", "CPCd", "KS_stat", "KS_pval", "KL_div", "RMSE"]

    if measures == "all":
        selected_measures = all_measures
    else:
        selected_measures = measures if isinstance(measures, list) else [measures]
        invalid = set(selected_measures) - set(all_measures)
        if invalid:
            raise _TDLMError(f"Invalid measures: {invalid}. Available: {['all']+all_measures}")

    # Setup multiprocessing
    num_processes = processes if processes is not None else max(1, mp.cpu_count() - 2)

    # Handle single vs multiple simulations
    if isinstance(sim, dict):
        exponents = list(sim.keys())
        single_simulation = len(exponents) == 1

        if len(exponents) > 1 and num_processes > 1:
            num_processes_needed = min(len(exponents), num_processes)
            # Parallel processing for multiple exponents
            _vprint(f'Calculating GOF measures for {len(exponents)} exponents', verbose)
            _vprint(f'Using {num_processes_needed} parallel processes', verbose)

            shm_list = [] # To track shared memory blocks for cleanup
            try:
                # Create shared memory for all large NumPy arrays
                shm_info = {
                    'obs': _numpy_to_shm(obs, shm_list),
                    'distance': _numpy_to_shm(distance, shm_list),
                }

                with mp.Pool(processes=num_processes_needed) as pool:
                    # Prepare parameters for the new shared worker function
                    # Pass the dict of shared memory info instead of the raw data
                    params = [(shm_info, sim[exponent], selected_measures) for exponent in exponents]

                    results = list(tqdm(pool.imap(_calculate_gof_shared, params), 
                                  total=len(exponents), desc='Computing GOF measures', disable=not verbose))
            finally:
                # CRITICAL: Clean up! Unlink all shared memory blocks
                # This must be in a 'finally' block to run even if the pool fails
                _vprint("Cleaning up shared memory blocks...", verbose)
                for shm in shm_list:
                    shm.close()
                    shm.unlink() # Destroys the shared memory block

            # Organize results
            output = {}
            for i, exponent in enumerate(exponents):
                output[exponent] = results[i]

            _vprint('Done\n', verbose)
            return output

        else:
            # Sequential processing
            results = {}
            if single_simulation:
                exponent = exponents[0]
                _vprint(f'Calculating GOF measures for exponent {exponent}', verbose)
                params = (sim[exponent], obs, distance, selected_measures)
                results[exponent] = _calculate_gof(params)
            else:
                _vprint(f'Calculating GOF measures for {len(exponents)} exponents', verbose)
                for exponent in tqdm(exponents, desc='Computing GOF measures', disable=not verbose):
                    params = (sim[exponent], obs, distance, selected_measures)
                    results[exponent] = _calculate_gof(params)

            _vprint('Done\n', verbose)
            return results
    else:
        # Single simulation matrix
        _vprint('Calculating GOF measures', verbose)
        result = _calculate_gof((sim, obs, distance, selected_measures))
        _vprint('Done\n', verbose)
        return result