Source code for neurodesign.classes

from __future__ import annotations

import copy
import math
import random
import shutil
import warnings
import zipfile
from collections import Counter
from io import BytesIO
from pathlib import Path

import numpy as np
import scipy
import scipy.linalg
import sklearn.cluster
from numpy import transpose as t
from rich import print
from rich.progress import (
    BarColumn,
    MofNCompleteColumn,
    Progress,
    SpinnerColumn,
    TaskProgressColumn,
    TextColumn,
    TimeElapsedColumn,
    TimeRemainingColumn,
)
from scipy.special import gamma

from neurodesign import generate, report


def progress_bar(text: str, color: str = "green") -> Progress:
    """Return a rich progress bar instance."""
    return Progress(
        TextColumn(f"[{color}]{text}"),
        SpinnerColumn("dots"),
        TimeElapsedColumn(),
        BarColumn(),
        MofNCompleteColumn(),
        TaskProgressColumn(),
        TimeRemainingColumn(),
    )


[docs] class Design: """ This class represents an experimental design for an fMRI experiment. :param order: The stimulus order. :type order: list of integers :param ITI: The ITI's between all stimuli. :type ITI: list of floats :param experiment: The experimental setup. :type experiment: experiment object :param onsets: The onsets of all stimuli. :type onsets: list of floats """ def __init__(self, order, ITI, experiment, onsets=None, all_stim_durations=None): self.order = order self.ITI = ITI self.onsets = onsets self.Fe = 0 self.Fd = 0 # Per-design variable stimulus durations (None = use experiment.stim_duration) self.all_stim_durations = all_stim_durations self.experiment = experiment # assert whether design is valid if len(self.ITI) != experiment.n_trials: raise ValueError("length of design (ITI's) does not comply with experiment") if len(self.order) != experiment.n_trials: raise ValueError("length of design (orders) does not comply with experiment")
[docs] def check_maxrep(self, maxrep): """Check whether design does not exceed maximum repeats within design. :param maxrep: How many times should a stimulus maximally be repeated. :type maxrep: integer :returns repcheck: Boolean indicating maximum repeats are respected """ for stim in range(self.experiment.n_stimuli): repcheck = "".join(str(e) for e in [stim] * maxrep) not in "".join( str(e) for e in self.order ) if not repcheck: break return repcheck
[docs] def check_hardprob(self): """Check whether frequencies match the prespecified frequencies. :returns probcheck: Boolean indicating probabilities are respected """ obscnt = Counter(self.order).values() obsprob = np.round(obscnt / np.sum(obscnt), decimals=2) if len(self.experiment.P) != len(obsprob): return False close = np.isclose(np.array(self.experiment.P), np.array(obsprob), atol=0.001) return np.sum(close) == len(obsprob)
[docs] def crossover(self, other, seed=1234): """Crossover design with other design and create offspring. :param other: The design with which the design will be mixed :type other: design object :param seed: The seed with which the change point will be sampled. :type seed: integer or None :returns offspring: List of two offspring designs. """ # check whether designs are compatible assert len(self.order) == len(other.order) np.random.seed(seed) changepoint = np.random.choice(len(self.order), 1)[0] offspringorder1 = None offspringorder2 = None # Making sure the order doesn't change for the crossover if self.experiment.order_fixed: offspringorder1 = self.order offspringorder2 = other.order elif self.experiment.order_probabilities is not None: offspringorder1 = Experiment.sample_from_probabilities( self.experiment.order_probabilities, self.experiment.order_keys, self.experiment.order_length, ) offspringorder2 = Experiment.sample_from_probabilities( self.experiment.order_probabilities, self.experiment.order_keys, self.experiment.order_length, ) else: offspringorder1 = ( list(self.order)[:changepoint] + list(other.order)[changepoint:] ) offspringorder2 = ( list(other.order)[:changepoint] + list(self.order)[changepoint:] ) # Re-sample stim durations for new orders if variable durations are used asd1 = self.all_stim_durations # default: inherit from parent asd2 = other.all_stim_durations if ( self.experiment.stimuli_durations is not None and not self.experiment.order_fixed ): # Order changed, so re-sample durations to match new order asd1 = Experiment.sample_stim_durations( offspringorder1, self.experiment.stimuli_durations, self.experiment.t_pre, self.experiment.t_post, ) asd2 = Experiment.sample_stim_durations( offspringorder2, self.experiment.stimuli_durations, self.experiment.t_pre, self.experiment.t_post, ) offspring1 = Design( order=offspringorder1, ITI=self.ITI, experiment=self.experiment, all_stim_durations=asd1, ) offspring2 = Design( order=offspringorder2, ITI=other.ITI, experiment=self.experiment, all_stim_durations=asd2, ) return [offspring1, offspring2]
[docs] def mutation(self, q, seed=1234): """Mutate q% of the stimuli with another stimulus. :param q: The percentage of stimuli that should be mutated :type q: float :param seed: The seed with which the mutation points are sampled. :type seed: integer or None :returns mutated: Mutated design """ np.random.seed(seed) mut_ind = np.random.choice( len(self.order), int(len(self.order) * q), replace=False ) mutated = copy.copy(self.order) if ( not self.experiment.order_fixed and self.experiment.order_probabilities is None ): for mut in mut_ind: np.random.seed(seed) mut_stim = np.random.choice(self.experiment.n_stimuli, 1, replace=True)[0] mutated[mut] = mut_stim # Re-sample stim durations if order changed and variable durations are used asd = self.all_stim_durations if ( self.experiment.stimuli_durations is not None and not self.experiment.order_fixed ): asd = Experiment.sample_stim_durations( mutated, self.experiment.stimuli_durations, self.experiment.t_pre, self.experiment.t_post, ) offspring = Design( order=mutated, ITI=self.ITI, experiment=self.experiment, all_stim_durations=asd, ) return offspring
[docs] def designmatrix(self): """Expand from order of stimuli to a fMRI timeseries. Returns self on success, or False if the design's actual timing exceeds the experiment's container (e.g. from extreme ITI draws). """ # ITIs to onsets orderli = list(self.order) ITIli = list(self.ITI) if self.experiment.restnum > 0: if self.all_stim_durations is None: # Old Package Code ITIli = [ y + self.experiment.trial_duration if not x == "R" else y for x, y in zip(orderli, ITIli) ] onsets = np.cumsum(ITIli) - self.experiment.trial_duration self.onsets = [y for x, y in zip(orderli, onsets) if not x == "R"] else: for x in np.arange(0, self.experiment.n_trials, self.experiment.restnum)[ 1: ][::-1]: orderli.insert(x, "R") ITIli.insert(x, self.experiment.restdur) # Calculate ITI_li based on rest numbers and # varied trial duration. Use a separate trial # counter since orderli now has "R" entries # but all_stim_durations has n_trials entries. ITIli_new = [] onsets = [] trial_idx = 0 for i, (x, y) in enumerate(zip(orderli, ITIli)): if not x == "R": ITIli_new.append(y + self.all_stim_durations[trial_idx]) trial_idx += 1 else: ITIli_new.append(y) ITIli_cumsum = np.cumsum(ITIli_new) # Subtracts the cumulative sum by the respective trial durations # while excluding the rest trials. trial_idx = 0 for i, (x, y) in enumerate(zip(orderli, ITIli_cumsum)): if not x == "R": onsets.append(y - self.all_stim_durations[trial_idx]) trial_idx += 1 self.onsets = onsets else: if self.all_stim_durations is None: # Old Package Code ITIli = np.array(self.ITI) + self.experiment.trial_duration self.onsets = np.cumsum(ITIli) - self.experiment.trial_duration else: # Modified by Atharv Umap ITIli_new = [y + x for x, y in zip(self.all_stim_durations, ITIli)] ITIli_cumsum = np.cumsum(ITIli_new) onsets_temp = [] for x, y in zip(self.all_stim_durations, list(ITIli_cumsum)): onsets_temp.append(y - x) self.onsets = onsets_temp stimonsets = [x + self.experiment.t_pre for x in self.onsets] # round onsets to resolution self.ITI, x = _round_to_resolution(self.ITI, self.experiment.resolution) onsetX, XindStim = _round_to_resolution(stimonsets, self.experiment.resolution) if self.all_stim_durations is None: stim_duration_tp = int( self.experiment.stim_duration / self.experiment.resolution ) # Check if design fits in container — reject if not if np.max(XindStim) + stim_duration_tp > self.experiment.n_tp: return False # create design matrix in resolution scale (=deltasM in Kao toolbox) X_X = np.zeros([self.experiment.n_tp, self.experiment.n_stimuli]) for stimulus in range(self.experiment.n_stimuli): for dur in range(stim_duration_tp): X_X[np.array(XindStim) + dur, int(stimulus)] = [ 1 if z == stimulus else 0 for z in self.order ] else: stim_duration_tp = ( np.array(self.all_stim_durations) / self.experiment.resolution ) stim_duration_tp = [int(x) for x in stim_duration_tp] # Check if design fits in container — reject if not max_endpoint = max( XindStim[i] + stim_duration_tp[i] for i in range(len(self.order)) ) if max_endpoint > self.experiment.n_tp: return False X_X = np.zeros([self.experiment.n_tp, self.experiment.n_stimuli]) # indexing and traversing through the order for i, stim in enumerate(self.order): # the current onset onset = XindStim[i] # the duration between the onsets dur = stim_duration_tp[i] # labeling the binary values of the indices with for j in range(dur): t_idx = onset + j if t_idx < self.experiment.n_tp: X_X[t_idx, stim] = 1 # deconvolved matrix in resolution units deconvM = np.zeros( [ self.experiment.n_tp, int(self.experiment.laghrf * self.experiment.n_stimuli), ] ) for stim in range(self.experiment.n_stimuli): for j in range(int(self.experiment.laghrf)): deconvM[j:, self.experiment.laghrf * stim + j] = X_X[ : (self.experiment.n_tp - j), stim ] # downsample and whiten deconvM idxX = [ int(x) for x in np.arange( 0, self.experiment.n_tp, self.experiment.TR / self.experiment.resolution ) ] if len(idxX) - self.experiment.white.shape[0] == 1: idxX = idxX[: self.experiment.white.shape[0]] deconvMdown = deconvM[idxX, :] Xwhite = np.dot(np.dot(t(deconvMdown), self.experiment.white), deconvMdown) # convolve design matrix X_Z = np.zeros([self.experiment.n_tp, self.experiment.n_stimuli]) for stim in range(self.experiment.n_stimuli): X_Z[:, stim] = deconvM[ :, (stim * self.experiment.laghrf) : ((stim + 1) * self.experiment.laghrf) ].dot(self.experiment.basishrf) X_Z = X_Z[idxX, :] X_X = X_X[idxX, :] Zwhite = t(X_Z) @ self.experiment.white @ X_Z self.X = Xwhite self.Z = Zwhite self.Xconv = X_Z self.Xnonconv = X_X self.CX = self.experiment.CX self.C = self.experiment.C return self
[docs] def FeCalc(self, Aoptimality=True): """ Compute estimation efficiency. :param Aoptimality: Kind of optimality to optimize, A- or D-optimality :type Aoptimality: boolean """ try: invM = scipy.linalg.inv(self.X) except scipy.linalg.LinAlgError: try: invM = scipy.linalg.pinv(self.X) except np.linalg.linalg.LinAlgError: invM = np.nan invM = np.array(invM) st1 = np.dot(self.CX, invM) CMC = np.dot(st1, t(self.CX)) if Aoptimality is True: self.Fe = float(self.CX.shape[0] / np.trace(CMC)) else: self.Fe = float(np.linalg.det(CMC) ** (-1 / len(self.C))) self.Fe = self.Fe / self.experiment.FeMax return self
[docs] def FdCalc(self, Aoptimality=True): """ Compute detection power. :param Aoptimality: Kind of optimality to optimize: A- or D-optimality :type Aoptimality: boolean """ try: invM = scipy.linalg.inv(self.Z) except scipy.linalg.LinAlgError: try: invM = scipy.linalg.pinv(self.Z) except np.linalg.linalg.LinAlgError: invM = np.nan invM = np.array(invM) CMC = self.C @ invM @ t(self.C) if Aoptimality is True: self.Fd = float(len(self.C) / np.trace(CMC)) else: self.Fd = float(np.linalg.det(CMC) ** (-1 / len(self.C))) self.Fd = self.Fd / self.experiment.FdMax return self
[docs] def FcCalc(self, confoundorder=3): """ Compute confounding efficiency. :param confoundorder: To what order should confounding be protected :type confoundorder: integer """ Q = np.zeros( [self.experiment.n_stimuli, self.experiment.n_stimuli, confoundorder] ) for n in range(len(self.order)): for r in np.arange(1, confoundorder + 1): if n > (r - 1): Q[self.order[n], self.order[n - r], r - 1] += 1 Qexp = np.zeros( [self.experiment.n_stimuli, self.experiment.n_stimuli, confoundorder] ) for si in range(self.experiment.n_stimuli): for sj in range(self.experiment.n_stimuli): for r in np.arange(1, confoundorder + 1): Qexp[si, sj, r - 1] = ( self.experiment.P[si] * self.experiment.P[sj] * (self.experiment.n_trials + 1) ) Qmatch = np.sum(abs(Q - Qexp)) self.Fc = Qmatch self.Fc = 1 - self.Fc / self.experiment.FcMax return self
[docs] def FfCalc(self): """Compute efficiency of frequencies.""" trialcount = Counter(self.order) Pobs = [trialcount[x] for x in range(self.experiment.n_stimuli)] self.Ff = np.sum( abs( np.array(Pobs) - np.array(self.experiment.n_trials * np.array(self.experiment.P)) ) ) self.Ff = 1 - self.Ff / self.experiment.FfMax return self
[docs] def FCalc(self, weights, Aoptimality=True, confoundorder=3): """ Compute weighted average of efficiencies. :param weights: Weights given to each of the efficiency metrics in this order: Estimation, Detection, Frequencies, Confounders. :type weights: list of floats """ if weights[0] > 0: self.FeCalc(Aoptimality) if weights[1] > 0: self.FdCalc(Aoptimality) self.FfCalc() self.FcCalc(confoundorder) matr = np.array([self.Fe, self.Fd, self.Ff, self.Fc]) self.F = np.sum(weights * matr) return self
[docs] class Experiment: """ This class represents an fMRI experiment. :param TR: The repetition time. :type TR: float :param P: The probabilities of each trialtype. :type P: ndarray :param C: The contrast matrix. Example: np.array([[1,-1,0],[0,1,-1]]) :type C: ndarray :param rho: AR(1) correlation coefficient :type rho: float :param n_stimuli: The number of stimuli (or conditions) in the experiment. :type n_stimuli: integer :param n_trials: The number of trials in the experiment. Either specify n_trials **or** duration. :type n_trials: integer :param duration: The total duration (seconds) of the experiment. Either specify n_trials **or** duration. :type duration: float :param resolution: the maximum resolution of design matrix :type resolution: float :param stim_duration: duration (seconds) of stimulus :type stim_duration: float :param t_pre: duration (seconds) of trial part before stimulus presentation (eg. fixation cross) :type t_pre: float :param t_post: duration (seconds) of trial part after stimulus presentation :type t_post: float :param maxrep: maximum number of repetitions :type maxrep: integer or None :param hardprob: can the probabilities differ from the nominal value? :type hardprob: boolean :param confoundorder: The order to which confounding is controlled. :type confoundorder: integer :param restnum: Number of trials between restblocks :type restnum: integer :param restdur: duration (seconds) of the rest blocks :type restdur: float :param ITImodel: Which model to sample from. Possibilities: "fixed","uniform","exponential" :type ITImodel: string :param ITImin: The minimum ITI (required with "uniform" or "exponential") :type ITImin: float :param ITImean: The mean ITI (required with "fixed" or "exponential") :type ITImean: float :param ITImax: The max ITI (required with "uniform" or "exponential") :type ITImax: float """ def __init__( self, TR: float, P, C, rho: float, stim_duration, n_stimuli: int, stimuli_durations=None, conditional_ITI=None, # Specification for condition-dependent ITI distributions ITImodel=None, ITImin=None, ITImax=None, ITImean=None, restnum=0, restdur=0, t_pre=0, t_post=0, n_trials: int | None = None, duration=None, resolution=0.1, FeMax=1, FdMax=1, FcMax=1, FfMax=1, maxrep=None, hardprob=False, confoundorder=3, order=None, order_probabilities=None, order_keys=None, order_length=None, order_fixed=False, trial_max=None, ): self.TR = TR self.P = P self.C = np.array(C) self.rho = rho self.n_stimuli = n_stimuli self.t_pre = t_pre self.t_post = t_post self.n_trials = n_trials self.duration = duration self.resolution = resolution self.stim_duration = stim_duration # We will calculate all stimuli durations based on the given values # NOTE: all_stim_durations is now stored per-Design, not per-Experiment. # The Experiment only stores the *specification* (stimuli_durations dict/list). # Modification # Working with multiple stimuli durations if stimuli_durations is not None: assert ( len(stimuli_durations) == n_stimuli ), "Must specify a duration for each stimulus" assert ( trial_max is not None ), "Must provide a trial_max given stimuli_durations" self.trial_max = trial_max self.stimuli_durations = stimuli_durations else: self.trial_max = stim_duration self.stimuli_durations = None self.order_probabilities = order_probabilities self.order_keys = order_keys self.order_length = order_length self.order_fixed = order_fixed # Adding the custom order if order is not None: self.order = order self.order_fixed = True else: self.order = None if order_probabilities is not None: self.order = self.sample_from_probabilities( order_probabilities, order_keys, order_length ) self.maxrep = maxrep self.hardprob = hardprob self.confoundorder = confoundorder # Adding a conditional ITI for stimulus-dependent intervals self.conditional_ITI = conditional_ITI self.ITImodel = ITImodel self.ITImin = ITImin self.ITImean = ITImean self.ITImax = ITImax self.ITIlam = None self.restnum = restnum self.restdur = restdur self.FeMax = FeMax self.FdMax = FdMax self.FcMax = FcMax self.FfMax = FfMax # make sure resolution is a divisor of TR (up to ) if not np.isclose(self.TR % self.resolution, 0): self.resolution = _find_new_resolution(self.TR, self.resolution) warnings.warn( "the resolution is adjusted to be a multiple of the TR. " f"New resolution: {self.resolution}" ) self.countstim() self.CreateTsComp() self.CreateLmComp() self.max_eff()
[docs] def max_eff(self): """Compute maximum efficiency for Confounding and Frequency efficiency.""" NulDesign = Design( order=[np.argmin(self.P)] * self.n_trials, ITI=[0] + [self.ITImean] * (self.n_trials - 1), experiment=self, ) NulDesign.designmatrix() NulDesign.FcCalc(self.confoundorder) self.FcMax = 1 - NulDesign.Fc NulDesign.FfCalc() self.FfMax = 1 - NulDesign.Ff return self
[docs] def countstim(self): """Compute some arguments depending on other arguments. Duration is always computed from EXPECTED values (trial_max + ITImean) so the whitening matrix is stable across all designs in a population. Individual designs may have shorter actual timing — the unused timepoints in the design matrix are simply zeros (equivalent to rest). """ self.trial_duration = self.trial_max + self.t_pre + self.t_post if self.ITImodel == "uniform": self.ITImean = (self.ITImax + self.ITImin) / 2 # Always compute duration from expected values, regardless of # whether stimuli_durations is set. This keeps the container stable. if self.n_trials is not None and self.duration is None: ITIdur = self.n_trials * self.ITImean TRIALdur = self.n_trials * self.trial_duration duration = ITIdur + TRIALdur if self.restnum > 0: duration = duration + ( np.floor(self.n_trials / self.restnum) * self.restdur ) self.duration = duration elif self.duration is not None and self.n_trials is None: self.n_trials = self._compute_n_trials()
# Computes the n_trials given the duration def _compute_n_trials(self): if self.restnum == 0: return int(self.duration / (self.ITImean + self.trial_duration)) # duration of block between rest blockdurNR = self.restnum * (self.ITImean + self.trial_duration) # duration of block including rest blockdurWR = blockdurNR + self.restdur # number of blocks blocknum = np.floor(self.duration / blockdurWR) n_trials = blocknum * self.restnum remain = self.duration - (blocknum * blockdurWR) if remain >= blockdurNR: n_trials = n_trials + self.restnum else: extratrials = np.floor(remain / (self.ITImean + self.trial_duration)) n_trials = n_trials + extratrials return int(n_trials)
[docs] def CreateTsComp(self): """Compute the number of scans and timepoints.""" self.n_scans = int(np.ceil(self.duration / self.TR)) # number of scans # number of timepoints (in resolution) self.n_tp = int(np.ceil(self.duration / self.resolution)) self.r_scans = np.arange(0, self.duration, self.TR) self.r_tp = np.arange(0, self.duration, self.resolution) return self
[docs] def CreateLmComp(self): """Generate components for the linear model. Components: hrf, whitening matrix, autocorrelation matrix, CX. """ # hrf self.canonical() # contrasts # expand contrasts to resolution self.CX = np.array(np.kron(self.C, np.eye(self.laghrf))) assert self.CX.shape[0] == self.C.shape[0] * self.laghrf assert self.CX.shape[1] == self.n_stimuli * self.laghrf # drift self.S = self.drift(np.arange(0, self.n_scans)) # [tp x 1] assert self.S.shape == (3, self.n_scans) self.S = np.asarray(self.S) # square of the whitening matrix base = [1 + self.rho**2, -1 * self.rho] + [0] * (self.n_scans - 2) self.V2 = scipy.linalg.toeplitz(base) # set first and last to 1 self.V2[0, 0] = 1 self.V2[self.n_scans - 1, self.n_scans - 1] = 1 self.V2 = np.asarray(self.V2) self.white = ( self.V2 - self.V2
[docs] @ t(self.S) @ np.linalg.pinv(self.S @ self.V2 @ t(self.S)) @ self.S @ self.V2 ) return self
def canonical(self): """Generate the canonical hrf. :param resolution: resolution to sample the canonical hrf :type resolution: float """ # translated from spm_hrf p = [6, 16, 1, 1, 6, 0, 32] dt = self.resolution s = np.array(range(int(np.ceil(p[6] / dt)))) # HRF sampled at resolution hrf = ( self.spm_Gpdf(s, p[0] / p[2], dt / p[2]) - self.spm_Gpdf(s, p[1] / p[3], dt / p[3]) / p[4] ) self.basishrf = hrf / np.sum(hrf) s # duration of the HRF self.durhrf = p[6] # length of the HRF parameters in resolution scale self.laghrf = int(np.ceil(self.durhrf / self.resolution)) assert self.laghrf == len(s) return self
[docs] @staticmethod def drift(s, deg=3): """Compute a drift component.""" S = np.ones([deg, len(s)]) s = np.array(s) tmpt = np.array(2.0 * s / float(len(s) - 1) - 1) S[1] = tmpt for k in np.arange(2, deg): S[k] = ((2.0 * k - 1.0) / k) * tmpt * S[k - 1] - ((k - 1) / float(k)) * S[ k - 2 ] return S
[docs] @staticmethod def spm_Gpdf(s, h, l): """Generate gamma pdf.""" s = np.array(s) res = (h - 1) * np.log(s) + h * np.log(l) - l * s - np.log(gamma(h)) return np.exp(res)
[docs] @staticmethod def sample_stim_durations(order, stimuli_durations, t_pre, t_post): """Sample concrete stimulus durations for a specific trial order. Called per-Design (not per-Experiment) so each design gets its own random draw, but they all share the same Experiment container. Returns a list of durations (including t_pre + t_post per trial). """ all_stim_durations = [] for i in range(len(order)): stimuli = order[i] key = stimuli_durations[stimuli] if isinstance(key, dict): params = key if params["model"] == "fixed": all_stim_durations.append(params["mean"]) elif params["model"] == "exponential": val = np.random.exponential(scale=params["mean"]) if "min" in params: val = max(val, params["min"]) if "max" in params: val = min(val, params["max"]) all_stim_durations.append(val) elif params["model"] == "uniform": all_stim_durations.append( np.random.uniform(params["min"], params["max"]) ) elif params["model"] == "gaussian": mean = params.get("mean", 0) std = params.get("std", 1) val = np.random.normal(loc=mean, scale=std) if "min" in params: val = max(val, params["min"]) if "max" in params: val = min(val, params["max"]) all_stim_durations.append(val) else: all_stim_durations.append(key) assert len(all_stim_durations) == len(order) # Add pre/post time to each trial duration all_stim_durations = [d + t_pre + t_post for d in all_stim_durations] return all_stim_durations
# Generates ITI with default first value then # the order (ensures n_trials and ITI length match)
[docs] @staticmethod def generate_iti(order, conditional_iti): ITI = [] for i in range(len(order)): stim_prev = order[i - 1] if i > 0 else None stim_curr = order[i] if stim_prev is not None or stim_curr is not None: # Determine key for condition-based ITI key = (stim_prev, stim_curr) if stim_prev is not None else "default" params = conditional_iti.get(key, conditional_iti.get("default")) # Generate ITI based on parameters if params["model"] == "fixed": ITI.append(params["mean"]) elif params["model"] == "exponential": val = np.random.exponential(scale=params["mean"]) if "min" in params: val = max(val, params["min"]) if "max" in params: val = min(val, params["max"]) ITI.append(val) elif params["model"] == "uniform": ITI.append(np.random.uniform(params["min"], params["max"])) elif params["model"] == "gaussian": mean = params.get("mean", 0) std = params.get("std", 1) val = np.random.normal(loc=mean, scale=std) if "min" in params: val = max(val, params["min"]) if "max" in params: val = min(val, params["max"]) ITI.append(val) return ITI
[docs] @staticmethod def calculate_duration(ITI, dur): """Calculate total duration from ITI and trial durations.""" total_sum = sum(dur) total_sum = total_sum + sum(ITI) return total_sum
# Generates an order based on a given probability distribution of certain keys
[docs] @staticmethod def sample_from_probabilities(prob, key, length): # random.choices picks elements from key with weights = prob_array\ samples = random.choices(key, weights=prob, k=length) merged = [item for sublist in samples for item in sublist] return merged[:length]
[docs] class Optimisation: """Represent the population of experimental designs for fMRI. :param experiment: The experimental setup of the fMRI experiment. :type experiment: experiment :param G: The size of the generation :type G: integer :param R: with which rate are the orders generated from ['blocked','random','mseq'] :type R: list :param q: percentage of mutations :type q: float :param weights: weights attached to Fe, Fd, Ff, Fc :type weights: list :param I: number of immigrants :type I: integer :param preruncycles: number of prerun cycles (to find maximum Fe and Fd) :type preruncycles: integer :param cycles: number of cycles :type cycles: integer :param seed: seed :type seed: integer :param Aoptimality: optimises A-optimality if true, else D-optimality :type Aoptimality: boolean :param convergence: after how many stable iterations is there convergence :type convergence: integer :param folder: folder to save output :type folder: string :param outdes: number of designs to be saved :type outdes: integer :param optimisation: The type of optimisation - 'GA' or 'simulation' :type optimisation: string """ def __init__( self, experiment: Experiment, weights: list[float], preruncycles: int, cycles: int, seed: int | None = None, I: int = 4, G: int = 20, R: list[float] | None = None, q: float = 0.01, Aoptimality: bool = True, folder: str | Path | None = None, outdes: int = 3, convergence: int = 1000, optimisation: str = "GA", ): self.exp = experiment self.G = G self.R = [0.4, 0.4, 0.2] if R is None else R self.q = q self.weights = weights self.I = I self.preruncycles = preruncycles self.cycles = cycles self.convergence = convergence self.Aoptimality = Aoptimality self.outdes = outdes self.folder = Path(folder).absolute() if folder else None self.optimisation = optimisation self.seed = seed or np.random.randint(10000) self.designs = [] self.optima = [] self.bestdesign = None self.cov = None
[docs] def change_seed(self): """Change the seed.""" self.seed = self.seed + 1000 if self.seed < 4 * 10**9 else 1 return self
[docs] def check_develop(self, design, weights=None): """Check and develop a design to the population. Function will check design against strict options and develop the design if valid. :param design: Design to be added to population. :type design: esign object :param weights: weights for efficiency calculation. :type weights: list of floats, summing to 1 """ # weights if weights is None: weights = self.weights # check maxrep, hardprob, every stimulus at least once if self.exp.maxrep is not None and not design.check_maxrep(self.exp.maxrep): return False if self.exp.hardprob and not design.check_hardprob(): return False if len(np.unique(design.order)) < self.exp.n_stimuli: return False # develop out = design.designmatrix() if out is False: return False design.FCalc( weights, confoundorder=self.exp.confoundorder, Aoptimality=self.Aoptimality ) return False if np.isnan(design.F) else design
[docs] def add_new_designs(self, weights=None, R=None): """Generate the population. :param experiment: The experimental setup of the fMRI experiment. :type experiment: experiment :param weights: weights for efficiency calculation. :type weights: list of floats, summing to 1 :param seed: The seed for random processes. :type seed: integer or None """ # weights if weights is None: weights = self.weights if not R: R = np.round(np.array(self.R) * self.G).tolist() if self.exp.n_stimuli in [6, 10] and R[2] > 0: warnings.warns( "for this number of conditions/stimuli, " "there are no msequences possible.\n" "Replaced by random designs." ) R[1] = R[1] + R[2] R[2] = 0 NDes = 0 self.change_seed() while NDes < np.sum(R): self.change_seed() ind = np.sum(NDes >= np.cumsum(R)) ordertype = ["blocked", "random", "msequence"][ind] # --- Sample order --- order = None if self.exp.order_fixed: order = self.exp.order elif self.exp.order_probabilities is None: order = generate.order( self.exp.n_stimuli, self.exp.n_trials, self.exp.P, ordertype=ordertype, seed=self.seed, ) else: order = Experiment.sample_from_probabilities( self.exp.order_probabilities, self.exp.order_keys, self.exp.order_length, ) # --- Sample ITI --- ITI = [] if self.exp.conditional_ITI is None: ITI, ITIlam = generate.iti( ntrials=self.exp.n_trials, model=self.exp.ITImodel, min=self.exp.ITImin, max=self.exp.ITImax, mean=self.exp.ITImean, lam=self.exp.ITIlam, seed=self.seed, resolution=self.exp.resolution, ) if ITIlam: self.exp.ITIlam = ITIlam else: ITI = Experiment.generate_iti(order, self.exp.conditional_ITI) # --- Sample per-trial stimulus durations (if variable) --- all_stim_durations = None if self.exp.stimuli_durations is not None: all_stim_durations = Experiment.sample_stim_durations( order, self.exp.stimuli_durations, self.exp.t_pre, self.exp.t_post, ) # --- Create Design with the ONE shared Experiment --- des = Design( order=order, ITI=np.array(ITI), experiment=self.exp, all_stim_durations=all_stim_durations, ) fulldes = self.check_develop(des, weights) if fulldes is False: continue self.designs.append(fulldes) NDes += 1 return self
def _clean_designs(self, weights): n = 0 rm = 0 while n == 0: orders = [x.order for x in self.designs] cors = np.corrcoef(orders) isone = np.isclose(cors, 1.0) if len(isone) == 1: n = 1 else: np.fill_diagonal(isone, 0) if np.sum(isone) == 0: n = 1 else: ind = np.where(isone) remove = ind[1][ind[0] == ind[0][0]] self.designs = [ des for ind, des in enumerate(self.designs) if ind not in remove ] rm = rm + len(remove) self.add_new_designs(R=[0, rm, 0], weights=weights) return self def _mutation(self, weights, seed): # Mutation: # if: Best design: stay untouched # elif Correlation between all is > 0.8: mutate with 20% mutations # else: mutate with 5% mutations # for all: if conditions are not fulfilled: not mutated signals = [x.Xconv for x in self.designs] efficiencies = [x.F for x in self.designs] cors = self.pearsonr(signals, self.exp.n_stimuli) mncor = np.mean(cors) for idx in range(len(self.designs)): design = self.designs[idx] if design.F == np.max(efficiencies): offspring = design elif mncor > 0.6: offspring = design.mutation(0.2, seed=seed) offspring = self.check_develop(offspring, weights) else: offspring = design.mutation(self.q, seed=seed) offspring = self.check_develop(offspring, weights) if offspring is False: continue else: self.designs[idx] = offspring return self def _crossover(self, weights, seed): # select designs with F>median(F): crossind = range(len(self.designs)) nparents = len(crossind) npairs = int(nparents / 2.0) np.random.seed(seed) CouplingRnd = np.random.choice(nparents, size=(npairs * 2), replace=False) CouplingRnd = [crossind[x] for x in CouplingRnd] CouplingRnd = [ [CouplingRnd[i], CouplingRnd[i + 1]] for i in np.arange(0, npairs * 2, 2) ] count = 0 for couple in CouplingRnd: baby1, baby2 = self.designs[couple[0]].crossover( self.designs[couple[1]], seed=seed ) for baby in [baby1, baby2]: baby = self.check_develop(baby, weights) if baby is False: continue self.designs.append(baby) count = count + 1 return self def _immigration(self, weights, noim): R = np.ceil(np.array(self.R) * noim).tolist() self.add_new_designs(R=R, weights=weights) return self
[docs] def to_next_generation(self, weights=None, seed=1234, optimisation=None): """Go from one generation to the next. :param weights: weights for efficiency calculation. :type weights: list of floats, summing to 1 :param seed: The seed for random processes. :type seed: integer or None :param optimisation: The type of optimisation - 'GA' or 'simulation' :type optimisation: string """ if optimisation is None: optimisation = self.optimisation # weights if weights is None: weights = self.weights # Skip crossover/mutation when order is fixed if not self.exp.order_fixed: self._clean_designs(weights) if optimisation == "GA": self._mutation(weights, seed) self._crossover(weights, seed) self._immigration(weights, noim=self.I) elif optimisation == "simulation": self._immigration(weights, noim=self.I) else: print("Unknown optimisation type") else: self._immigration(weights, noim=self.I) # inspect efficiencies efficiencies = [x.F for x in self.designs] maximum = np.max(efficiencies) self.optima.append(maximum) bestind = [ind for ind, val in enumerate(efficiencies) if val == maximum][0] self.bestdesign = self.designs[bestind] # append best designs to lists # check convergence gen = len(self.optima) if gen > 1000 and self.optima[-1] > self.optima[gen - 1000]: self.finished = True # select best G cutoff = np.sort(efficiencies)[::-1][self.G] self.designs = [des for des in self.designs if des.F >= cutoff] return self
[docs] def clear(self): """Clear results between optimisations (maximum Fe, Fd or opt).""" self.designs = [] self.optima = [] self.finished = False self.change_seed() if self.bestdesign: bestdes = Design( order=self.bestdesign.order, ITI=self.bestdesign.ITI, experiment=self.exp, all_stim_durations=self.bestdesign.all_stim_durations, ) bestdes = self.check_develop(bestdes) if bestdes is not False: self.designs.append(bestdes) self.bestdesign = None return self
[docs] def optimise(self): """Run design optimization.""" if self.exp.FcMax == 1 and self.exp.FfMax == 1: self.exp.max_eff() if self.exp.FeMax == 1 and self.weights[0] > 0: # add new designs self.clear() self.add_new_designs(weights=[1, 0, 0, 0]) with progress_bar(text="Optimizing") as progress: task = progress.add_task( description="optimize", total=len(range(self.preruncycles)) ) for _ in range(self.preruncycles): self.to_next_generation(seed=self.seed, weights=[1, 0, 0, 0]) progress.update(task, advance=1) if self.finished: continue self.exp.FeMax = np.max(self.bestdesign.F) if self.exp.FdMax == 1 and self.weights[1] > 0: self.clear() self.add_new_designs(weights=[0, 1, 0, 0]) with progress_bar(text="Optimizing") as progress: task = progress.add_task( description="optimize", total=len(range(self.preruncycles)) ) for _ in range(self.preruncycles): self.to_next_generation(seed=self.seed, weights=[0, 1, 0, 0]) progress.update(task, advance=1) if self.finished: continue self.exp.FdMax = np.max(self.bestdesign.F) # clear all attributes self.clear() self.add_new_designs() # loop with progress_bar(text="Optimizing") as progress: task = progress.add_task( description="optimize", total=len(range(self.cycles)) ) for _ in range(self.cycles): self.to_next_generation(seed=self.seed) progress.update(task, advance=1) if self.finished: continue return self
[docs] def evaluate(self): # select designs: best from k-means clusters shape = self.bestdesign.Xconv.shape des = np.zeros([np.prod(shape), len(self.designs)]) efficiencies = np.array([x.F for x in self.designs]) for d in range(len(self.designs)): hrf = [] for stim in range(shape[1]): hrf = hrf + self.designs[d].Xconv[:, stim].tolist() des[:, d] = hrf clus = sklearn.cluster.k_means(des.T, self.outdes, random_state=self.seed)[1] out = [] des = [] cl = [] first = 0 for c in range(self.outdes): ids = np.where(clus == c)[0] id_ordered = ids[np.flipud(np.argsort(efficiencies[ids]))] out.append(first) for d in id_ordered: cl.append(c) des.append(self.designs[d]) first = first + 1 self.designs = des self.out = out self.clus = cl signals = [x.Xconv for x in self.designs] co = self.pearsonr(signals, 3) self.cov = co return self
[docs] def download(self): if not self.folder: raise ValueError("No folder defined to download output.") if self.cov is None: self.evaluate() # empty folder if self.folder.exists(): files = self.folder.glob("**/design_*") for f in files: shutil.rmtree(f) else: self.folder.mkdir(parents=True, exist_ok=True) reportfile = "report.pdf" report.make_report(self, self.folder / reportfile) files = [] for des in range(self.outdes): (self.folder / f"design_{str(des)}").mkdir(parents=True) design = self.designs[self.out[des]] for stim in range(self.exp.n_stimuli): onsetsfile = Path(f"design_{str(des)}") / f"stimulus_{str(stim)}.txt" onsubsets = [ str(x) for x in np.array(design.onsets)[np.array(design.order) == stim] ] with open(self.folder / onsetsfile, "w+") as f: for line in onsubsets: f.write(line) f.write("\n") files.append(onsetsfile) itifile = Path(f"design_{str(des)}") / "ITIs.txt" with open(self.folder / itifile, "w+") as f: for line in design.ITI: f.write(str(line)) f.write("\n") files.append(itifile) files.append(reportfile) # zip up zip_subdir = "OptimalDesign" self.zip_filename = f"{zip_subdir}.zip" self.file = BytesIO() zf = zipfile.ZipFile(self.file, "w") for fpath in files: zf.write(self.folder / fpath, Path(zip_subdir) / fpath) zf.close() return self
[docs] @staticmethod def pearsonr(signals, nstim): varcov = np.zeros([len(signals), len(signals)]) for sig1 in range(len(signals)): for sig2 in range(sig1, len(signals)): cors = np.diag( np.corrcoef(t(signals[sig1]), t(signals[sig2]))[nstim:, :nstim] ) varcov[sig1, sig2] = np.mean(cors) varcov[sig2, sig1] = np.mean(cors) return varcov
def _find_new_resolution(TR, res): n = TR * 1000.0 # find divisors of TR*1000 large_divisors = [] for i in range(1, int(math.sqrt(n) + 1)): if n % i == 0: large_divisors.append(i) if i * i != n: large_divisors.append(int(n / i)) sorted = np.sort(large_divisors) # closest to res resdivisor = TR / float(res) difs = np.abs(resdivisor - sorted) minind = np.where(difs == np.min(difs))[0] divisor = sorted[minind][0] newres = TR / divisor return newres def _round_to_resolution(inmat, res): out = res * np.floor(np.array(inmat) / res) ind = out / res ind = [int(x) for x in ind] return out, ind