SummaryCollection
py3r.behaviour.summary.summary_collection.SummaryCollection ¶
SummaryCollection(summary_dict: dict[str, Summary])
Bases: BaseCollection, SummaryCollectionPlotMixin
collection of Summary objects (e.g. for grouping individuals) note: type-hints refer to Summary, but factory methods allow for other classes these are intended ONLY for subclasses of Summary, and this is enforced
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # add a simple boolean feature to each Features for summaries to consume
>>> for f in fc.values():
... s = pd.Series([True, False] * (len(f.tracking.data)//2 + 1))[:len(f.tracking.data)]
... s.index = f.tracking.data.index
... f.store(s, 'flag', meta={})
>>> sc = SummaryCollection.from_features_collection(fc)
>>> list(sorted(sc.keys()))
['A', 'B']
is_grouped
property
¶
is_grouped
True if this collection is a grouped view.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> coll.is_grouped
False
groupby_tags
property
¶
groupby_tags
The tag names used to form this grouped view (or None if flat).
group_keys
property
¶
group_keys
Keys for the groups in a grouped view. Empty list if not grouped.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sorted(g.group_keys)
[('G1',), ('G2',)]
from_features_collection
classmethod
¶
from_features_collection(
features_collection: FeaturesCollection,
summary_cls=Summary,
)
Create a SummaryCollection from a FeaturesCollection.
Parameters¶
features_collection : FeaturesCollection
Source collection. Grouped structure is preserved.
summary_cls : type, default=Summary
Summary subclass to instantiate for each session.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # add numeric scalar per Features via a quick summary to test to_df later
>>> for f in fc.values():
... import numpy as np, pandas as pd
... s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
... f.store(s, 'counter', meta={})
>>> sc = SummaryCollection.from_features_collection(fc)
>>> isinstance(sc['A'], Summary) and isinstance(sc['B'], Summary)
True
from_list
classmethod
¶
from_list(summary_list: list[Summary])
Create a SummaryCollection from a list of Summary objects, keyed by handle.
Parameters¶
summary_list : list[Summary] Summary objects to collect. All handles must be unique.
Examples¶
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> from py3r.behaviour.features.features import Features
>>> from py3r.behaviour.summary.summary import Summary
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... t1 = Tracking.from_dlc(str(p), handle='A', fps=30)
... t2 = Tracking.from_dlc(str(p), handle='B', fps=30)
>>> f1, f2 = Features(t1), Features(t2)
>>> # store simple scalar summaries
>>> s1, s2 = Summary(f1), Summary(f2)
>>> s1.store(1, 'count'); s2.store(2, 'count')
>>> sc = SummaryCollection.from_list([s1, s2])
>>> list(sorted(sc.keys()))
['A', 'B']
to_df ¶
to_df(
include_tags: bool = False,
tag_prefix: str = "tag_",
series: Literal["ignore", "separate"] = "ignore",
) -> (
pd.DataFrame
| tuple[pd.DataFrame, dict[str, pd.DataFrame]]
)
Collate values from each Summary.data into tabular output.
- Index: handles of the Summary objects
- Scalar columns: keys from each Summary.data with scalar values
- If include_tags is True, include tag columns with the given prefix
- If series='ignore' (default), Series entries are skipped
- If series='separate', return
(scalars_df, series_tables)whereseries_tablesis{metric_name: dataframe}and each dataframe has one row per handle and one column per Series index value.
Examples¶
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> from py3r.behaviour.features.features import Features
>>> from py3r.behaviour.summary.summary import Summary
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... t1 = Tracking.from_dlc(str(p), handle='A', fps=30)
... t2 = Tracking.from_dlc(str(p), handle='B', fps=30)
>>> s1, s2 = Summary(Features(t1)), Summary(Features(t2))
>>> s1.store(1.0, 'score'); s2.store(2.0, 'score')
>>> s1.features.tracking.add_tag('group', 'G1'); s2.features.tracking.add_tag('group', 'G2')
>>> sc = SummaryCollection.from_list([s1, s2])
>>> df = sc.to_df(include_tags=True)
>>> set(df.columns) >= {'score', 'tag_group'}
True
>>> s1.store(pd.Series([1.0, 2.0], index=['A', 'B']), 'speed_by_state')
>>> s2.store(pd.Series([3.0, 4.0], index=['A', 'B']), 'speed_by_state')
>>> scalars, series_tables = sc.to_df(series='separate')
>>> isinstance(scalars, pd.DataFrame) and 'speed_by_state' in series_tables
True
make_bin ¶
make_bin(startframe: int, endframe: int)
Return a new SummaryCollection restricted to frames in [startframe, endframe).
Parameters¶
startframe : int First frame index of the bin (inclusive). endframe : int Last frame index of the bin (exclusive).
Examples¶
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> from py3r.behaviour.features.features import Features
>>> from py3r.behaviour.summary.summary import Summary
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... t = Tracking.from_dlc(str(p), handle='A', fps=30)
>>> s = Summary(Features(t))
>>> sc = SummaryCollection.from_list([s])
>>> b = sc.make_bin(0, 2)
>>> isinstance(b, SummaryCollection)
True
make_bins ¶
make_bins(numbins)
Divide the collection into equal time bins and return one SummaryCollection per bin.
Parameters¶
numbins : int Number of equal-length bins to split each session into.
Examples¶
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> from py3r.behaviour.features.features import Features
>>> from py3r.behaviour.summary.summary import Summary
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... t = Tracking.from_dlc(str(p), handle='A', fps=30)
>>> sc = SummaryCollection.from_list([Summary(Features(t))])
>>> bins = sc.make_bins(3)
>>> len(bins) == 3 and all(isinstance(b, SummaryCollection) for b in bins)
True
store ¶
store(
results_dict,
name: str = None,
meta: dict = None,
overwrite: bool = False,
)
Store SummaryResult objects returned by batch methods.
Parameters¶
results_dict : dict
Batch results to store. Flat: {handle: SummaryResult}.
Grouped: {group_key: {handle: SummaryResult}}.
name : str | None, default=None
Metric name to store under. If None, resolved automatically from
the result objects (all must agree on a single name).
meta : dict | None, default=None
Metadata dict to attach alongside the stored metric.
overwrite : bool, default=False
If True, overwrite an existing metric with the same name.
Examples¶
>>> import pandas as pd, tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # add a boolean column for summaries
>>> for f in fc.values():
... m = pd.Series([True, False] * (len(f.tracking.data)//2 + 1))[:len(f.tracking.data)]
... m.index = f.tracking.data.index
... f.store(m, 'mask', meta={})
>>> sc = SummaryCollection.from_features_collection(fc)
>>> rd = {h: s.time_true('mask') for h, s in sc.items()}
>>> sc.store(rd, name='t_mask')
>>> all('t_mask' in s.data for s in sc.values())
True
Returns¶
str The resolved stored metric name. If auto-naming would resolve to multiple different names across leaves, raises ValueError.
stored_info ¶
stored_info() -> pd.DataFrame
Summarize stored summary metrics across the collection's leaf Summary objects.
Returns a DataFrame indexed by summary with columns:
- attached_to: number of recordings containing the summary key
- missing_from: number of recordings not containing the summary key
- type: value datatype name when consistent, or a list of datatype names
when mixed across recordings.
bfa ¶
bfa(
column: str,
all_states=None,
numshuffles: int = 1000,
pairs: list[tuple[str, str]] | None = None,
random_state: int | None = 0,
scale_by_transitions: bool = False,
)
Behaviour Flow Analysis between groups for a grouped SummaryCollection.
Requires the collection to be grouped (via groupby). Computes transition matrices per Summary within each group, then computes Manhattan distances between group means and surrogate distributions via shuffling.
If pairs is provided, only those group pairs are analyzed; otherwise all
unique pairs in self.group_keys are evaluated.
Parameters¶
column : str
Name of the column containing discrete state labels.
all_states : list | None
Explicit state ordering for the transition matrix. None infers
states from the data.
numshuffles : int
Number of surrogate shuffles used to build the null distribution.
pairs : list[tuple[str, str]] | None
Group pairs to compare. None evaluates all unique pairs.
random_state : int | None
Seed for reproducible surrogate shuffling. None keeps
non-deterministic behaviour. Pass the same seed to each bfa()
call when combining scales so that surrogate shuffles are
synchronised; see :meth:combine_bfa_results.
scale_by_transitions : bool, default False
If True, each pairwise Manhattan distance (observed and all
surrogates) is divided by the total number of transitions across
both groups for that pair. This rescales raw-count distances to a
per-transition unit, making distances comparable across temporal
resolutions with different numbers of observations. Defaults to
False to preserve legacy behaviour and retain the information
contained in total transition counts.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # inject simple 2-state labels and tags to build groups
>>> for i, (h, f) in enumerate(fc.items()):
... pat = ['A','A','B','B','A'] * (len(f.tracking.data)//5 + 1)
... states = pd.Series(pat[:len(f.tracking.data)], index=f.tracking.data.index)
... f.store(states, 'state', meta={})
... f.tracking.add_tag('group', f'G{i+1}')
>>> gfc = fc.groupby('group')
>>> sc = SummaryCollection.from_features_collection(gfc)
>>> # compute all pairs (raw transition counts)
>>> res = sc.bfa('state', all_states=['A','B'], numshuffles=2)
>>> isinstance(res, dict) and 'observed' in next(iter(res.values()))
True
>>> # compute only specific pair(s)
>>> res2 = sc.bfa('state', all_states=['A','B'], numshuffles=2, pairs=[('G1','G2')])
>>> list(res2.keys()) == ['G1_vs_G2']
True
>>> # scale distances by total transition count (comparable across resolutions)
>>> res3 = sc.bfa('state', all_states=['A','B'], numshuffles=2, scale_by_transitions=True)
>>> isinstance(res3, dict) and 'observed' in next(iter(res3.values()))
True
bfa_stats
staticmethod
¶
bfa_stats(
bfa_results: dict[str, dict[str, float]],
) -> dict[str, dict[str, float]]
Compute simple statistics (percentile, zscore, right_tail_p) from bfa results.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> for i, (h, f) in enumerate(fc.items()):
... pat = ['A','A','B','B','A'] * (len(f.tracking.data)//5 + 1)
... states = pd.Series(pat[:len(f.tracking.data)], index=f.tracking.data.index)
... f.store(states, 'state', meta={})
... f.tracking.add_tag('group', f'G{i+1}')
>>> sc = SummaryCollection.from_features_collection(fc.groupby('group'))
>>> bfa_out = sc.bfa('state', all_states=['A','B'], numshuffles=2)
>>> stats = SummaryCollection.bfa_stats(bfa_out)
>>> set(next(iter(stats.values())).keys()) >= {'percentile','zscore','right_tail_p'}
True
plot_bfa_results
staticmethod
¶
plot_bfa_results(
results: dict[str, dict[str, float]],
compares: str | list[str] | None = None,
add_stats: bool = True,
stats: dict[str, dict[str, float]] | None = None,
bins: int = 50,
figsize: tuple[float, float] = (4, 3),
save_dir: str | None = None,
show: bool = True,
compare: str | None = None,
)
Plot one or more BFA result comparisons as separate single-panel figures.
- If
comparesis None and results contain a single comparison, that one is plotted. - If
comparesis a string, only that comparison is plotted. - If
comparesis a list of strings, each comparison is plotted separately. - If
add_statsis True andstatsnot provided, statistics will be computed viaSummaryCollection.bfa_stats(results)and annotated on each plot.
Returns (fig, ax) for a single comparison, or a dict {compare: (fig, ax)}
for multiple.
Examples¶
>>> import tempfile, shutil, os
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # add simple 2-state labels and tags to build two groups
>>> for i, (h, f) in enumerate(fc.items()):
... pat = ['A','A','B','B','A'] * (len(f.tracking.data)//5 + 1)
... states = pd.Series(pat[:len(f.tracking.data)], index=f.tracking.data.index)
... f.store(states, 'state', meta={})
... f.tracking.add_tag('group', f'G{i+1}')
>>> sc = SummaryCollection.from_features_collection(fc.groupby('group'))
>>> bfa_out = sc.bfa('state', all_states=['A','B'], numshuffles=5)
>>> # plot a single comparison and save it
>>> with tempfile.TemporaryDirectory() as outdir:
... fig, ax = SummaryCollection.plot_bfa_results(
... bfa_out, compare='G1_vs_G2', show=False, save_dir=outdir)
... os.path.exists(os.path.join(outdir, 'G1_vs_G2.png'))
True
plot_transition_umap ¶
plot_transition_umap(
column: str,
all_states=None,
groups: list[str | tuple[str, ...]]
| list[list[str | tuple[str, ...]]]
| None = None,
n_neighbors: int = 15,
min_dist: float = 0.1,
random_state: int = 0,
figsize: tuple[float, float] = (4.5, 4),
show: bool = True,
save_dir: str | None = None,
)
Plot a UMAP embedding of per-subject transition matrices for selected groups.
Transition matrices are computed for each subject within each group, flattened,
scaled, and embedded with UMAP. The collection must already be grouped, for
example via groupby.
Parameters¶
column Name of the categorical column used to compute transition matrices. all_states Optional explicit state ordering used when constructing transition matrices. groups Optional group selection. If omitted, all groups are included.
This argument supports three forms:
- A flat list of single-tag group labels, for example
``['control', 'treatment']``.
- A flat list of multi-tag group keys (tuples), for example
``[('control', 'time1'), ('control', 'time2')]``.
- A list of lists defining ordered sequences of groups, for example
``[[('control', 'time1'), ('control', 'time2')],
[('treatment', 'time1'), ('treatment', 'time2')]]``.
When sequences are provided, each sequence is plotted using a monochrome
gradient to indicate progression within that sequence.
n_neighbors
Number of neighbors used by UMAP.
min_dist
Minimum distance parameter passed to UMAP.
random_state
Seed for reproducible UMAP embeddings.
figsize
Figure size passed to Matplotlib.
show
If True, display the figure.
save_dir
Optional directory in which to save the plot as
transition_umap.png.
Returns¶
fig, ax Matplotlib figure and axis.
Raises¶
ValueError
If the collection is not grouped, or if no data are found for the
requested groups.
ImportError
If umap-learn is not installed.
Examples¶
>>> # xdoctest: +REQUIRES(module: umap)
>>> import os, shutil, tempfile
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... paths = {}
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... for name in ['A', 'B', 'C', 'D']:
... dst = d / f'{name}.csv'
... _ = shutil.copy(p, dst)
... paths[name] = str(dst)
... tc = TrackingCollection.from_dlc(paths, fps=30)
... fc = FeaturesCollection.from_tracking_collection(tc)
...
... tags = {
... 'A': ('control', 'time1'),
... 'B': ('control', 'time2'),
... 'C': ('treatment', 'time1'),
... 'D': ('treatment', 'time2'),
... }
...
... for h, f in fc.items():
... pat = ['A', 'A', 'B', 'B', 'A'] * (len(f.tracking.data) // 5 + 1)
... states = pd.Series(pat[:len(f.tracking.data)], index=f.tracking.data.index)
... f.store(states, 'state', meta={})
... condition, time = tags[h]
... f.tracking.add_tag('condition', condition)
... f.tracking.add_tag('time', time)
...
... sc = SummaryCollection.from_features_collection(fc.groupby(['condition', 'time']))
...
... with tempfile.TemporaryDirectory() as outdir:
... fig, ax = sc.plot_transition_umap(
... column='state',
... all_states=['A', 'B'],
... groups=[('control', 'time1'), ('control', 'time2')],
... show=False,
... save_dir=outdir,
... )
... os.path.exists(os.path.join(outdir, 'transition_umap.png'))
True
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... paths = {}
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... for name in ['A', 'B', 'C', 'D']:
... dst = d / f'{name}.csv'
... _ = shutil.copy(p, dst)
... paths[name] = str(dst)
... tc = TrackingCollection.from_dlc(paths, fps=30)
... fc = FeaturesCollection.from_tracking_collection(tc)
...
... tags = {
... 'A': ('control', 'time1'),
... 'B': ('control', 'time2'),
... 'C': ('treatment', 'time1'),
... 'D': ('treatment', 'time2'),
... }
...
... for h, f in fc.items():
... pat = ['A', 'A', 'B', 'B', 'A'] * (len(f.tracking.data) // 5 + 1)
... states = pd.Series(pat[:len(f.tracking.data)], index=f.tracking.data.index)
... f.store(states, 'state', meta={})
... condition, time = tags[h]
... f.tracking.add_tag('condition', condition)
... f.tracking.add_tag('time', time)
...
... sc = SummaryCollection.from_features_collection(fc.groupby(['condition', 'time']))
...
... fig, ax = sc.plot_transition_umap(
... column='state',
... all_states=['A', 'B'],
... groups=[
... [('control', 'time1'), ('control', 'time2')],
... [('treatment', 'time1'), ('treatment', 'time2')],
... ],
... show=False,
... )
... fig is not None and ax is not None
True
combine_bfa_results
staticmethod
¶
combine_bfa_results(
results_list: list[dict],
*,
scale_weights: list[float] | None = None,
per_scale: bool = True,
) -> dict
Combine BFA results from multiple temporal scales into a single result.
.. note::
This is an escape-hatch for advanced workflows. If you are
starting a multi-scale BFA from scratch, use :meth:bfa_multiscale
instead — it handles scale generation, surrogate synchronisation,
and result combination automatically.
Only use this helper directly when you have already computed
per-scale results through a custom pipeline and know that their
surrogate shuffles are synchronised (same ``random_state``, same
group/handle order, same ``pairs``, same ``numshuffles``).
Each entry in results_list is a dict returned by :meth:bfa. The
observed distances and the per-surrogate surrogate distances are
summed (optionally weighted) across scales, yielding a combined result
in the same format as a single :meth:bfa call.
For valid multi-scale statistics the surrogate shuffles must be
synchronised across scales — pass the same random_state to every
:meth:bfa call, use the same group structure and the same pairs
ordering, and the shuffles will be identical by construction.
Parameters¶
results_list : list[dict]
BFA result dicts, one per scale, in the same format returned by
:meth:bfa. All dicts must contain the same pair keys and the
same number of surrogates.
scale_weights : list[float] | None
Optional per-scale multiplicative weights (must have the same length
as results_list). Defaults to uniform weighting (all 1.0).
per_scale : bool, default True
If True, each combined pair entry includes a "per_scale_observed"
list containing the individual scale contributions.
Returns¶
dict
Same structure as :meth:bfa output, with an optional extra key
"per_scale_observed" per comparison when per_scale=True.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> for i, (h, f) in enumerate(fc.items()):
... pat = ['A','A','B','B','A'] * (len(f.tracking.data)//5 + 1)
... states = pd.Series(pat[:len(f.tracking.data)], index=f.tracking.data.index)
... f.store(states, 'state', meta={})
... f.tracking.add_tag('group', f'G{i+1}')
>>> gfc = fc.groupby('group')
>>> sc1 = SummaryCollection.from_features_collection(gfc)
>>> sc4 = SummaryCollection.from_features_collection(
... gfc.each.coarse_grain(4, non_numeric='mode'))
>>> res1 = sc1.bfa('state', all_states=['A','B'], numshuffles=2, random_state=0)
>>> res4 = sc4.bfa('state', all_states=['A','B'], numshuffles=2, random_state=0)
>>> combined = SummaryCollection.combine_bfa_results([res1, res4])
>>> 'observed' in combined['G1_vs_G2'] and 'surrogates' in combined['G1_vs_G2']
True
>>> len(combined['G1_vs_G2']['surrogates']) == 2
True
>>> 'per_scale_observed' in combined['G1_vs_G2']
True
>>> len(combined['G1_vs_G2']['per_scale_observed']) == 2
True
bfa_multiscale
staticmethod
¶
bfa_multiscale(
scs: list[SummaryCollection],
columns: list[str] | str,
all_states: list[list | None] | list | None = None,
numshuffles: int = 1000,
pairs: list[tuple[str, str]] | None = None,
random_state: int | None = 0,
scale_by_transitions: bool = True,
scale_weights: list[float] | None = None,
) -> dict
Multi-scale Behaviour Flow Analysis across pre-built SummaryCollections.
Each entry in scs is an independently prepared grouped
SummaryCollection — typically derived from data at a different
temporal resolution (e.g. raw, 4x coarse-grained, 16x coarse-grained).
The relevant state column at each scale is specified via columns.
The state column is expected to have been computed directly on the data at that scale (e.g. via cluster labels computed on coarse-grained features), not simply aggregated from a finer scale.
Surrogate shuffles are automatically synchronised: the same
random_state is passed to every :meth:bfa call, which — given
that QC has verified identical group/handle order — guarantees that
surrogate i at scale A and surrogate i at scale B used the same
animal shuffle. The combined surrogate distribution is therefore the
correct null for the combined statistic.
Parameters¶
scs : list[SummaryCollection]
Grouped SummaryCollection objects, one per scale, in the order
they should be combined.
columns : list[str] or str
State column name to use for each scale's transition matrix. Pass
a single string to use the same column name for all scales.
all_states : list[list | None] | list | None, default None
Explicit state ordering for transition matrices. Three forms are
accepted:
- ``None`` — states are inferred from the data at every scale.
- A flat list (e.g. ``[0, 1, 2]``) — the same state set is used
for all scales.
- A list of lists / ``None`` values whose length equals ``len(scs)``
(e.g. ``[[0,...,49], [0,...,9], None]``) — each scale uses its
own state set, or ``None`` to infer for that scale.
The per-scale form is detected when every element of the outer list
is itself a list or ``None``.
numshuffles : int
Number of surrogate shuffles per scale.
pairs : list[tuple[str, str]] | None
Group pairs to compare. None evaluates all unique pairs.
Must be the same for all scales.
random_state : int | None
Seed for reproducible surrogate shuffling. The same seed is used
at every scale to synchronise surrogates.
scale_by_transitions : bool, default True
Divide each pairwise Manhattan distance by the total number of
transitions across both groups for that pair. This is enabled by
default here because distances across scales must be on a common
per-transition unit before they can be meaningfully combined.
Set to False only if you need raw-count distances and are
handling comparability yourself.
scale_weights : list[float] | None
Per-scale multipliers for the combined distance, in the same order
as scs. Defaults to uniform weighting.
Returns¶
dict with keys:
"combined": combined result in :meth:bfaformat, with an additional"per_scale_observed"list per comparison."scales": dict mapping integer index (0, 1, …) to the individual :meth:bfaresult for that scale.
Raises¶
ValueError
If any SC is not grouped, group keys / handle order differ across
SCs, a requested column is missing, or columns length mismatches
scs.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> for i, (h, f) in enumerate(fc.items()):
... pat = ['A','A','B','B','A'] * (len(f.tracking.data)//5 + 1)
... states = pd.Series(pat[:len(f.tracking.data)], index=f.tracking.data.index)
... f.store(states, 'state', meta={})
... f.tracking.add_tag('group', f'G{i+1}')
>>> gfc = fc.groupby('group')
>>> sc1 = SummaryCollection.from_features_collection(gfc)
>>> # sc2 would normally come from independently-clustered coarse-grained data;
>>> # here we reuse sc1 with the same column purely for doctest purposes.
>>> # shared all_states (broadcast form)
>>> ms = SummaryCollection.bfa_multiscale(
... [sc1, sc1], 'state', all_states=['A', 'B'], numshuffles=2)
>>> # per-scale all_states (list-of-lists form)
>>> ms2 = SummaryCollection.bfa_multiscale(
... [sc1, sc1], 'state',
... all_states=[['A', 'B'], ['A', 'B']],
... numshuffles=2)
>>> bool(ms['combined']['G1_vs_G2']['observed'] == ms2['combined']['G1_vs_G2']['observed'])
True
>>> set(ms.keys()) == {'combined', 'scales'}
True
>>> set(ms['scales'].keys()) == {0, 1}
True
>>> 'observed' in ms['combined']['G1_vs_G2']
True
>>> 'per_scale_observed' in ms['combined']['G1_vs_G2']
True
>>> len(ms['combined']['G1_vs_G2']['per_scale_observed']) == 2
True
plot_chord ¶
plot_chord(
column: str,
all_states: list[str | int] | None = None,
*,
fromkey: str | None = None,
plot_individual: bool = False,
show: bool = True,
save_dir: str | None = None,
cmap: str | list | None = None,
**kwargs,
)
Plot chord diagrams of state transitions using a minimal pattern.
- If not grouped:
- plot_individual=False: sum over the collection and plot a single chord.
- plot_individual=True: plot one chord per recording.
- If grouped:
- plot_individual=False: sum within each group and plot one chord per group.
- plot_individual=True: plot one chord per recording per group.
Parameters¶
column:
Name of the categorical column used to compute transitions.
all_states:
Optional explicit state ordering for transition matrices.
Required when fromkey is not provided.
fromkey:
Optional key in each Summary.data containing a precomputed transition DataFrame.
If provided, this key is used directly instead of computing transitions from column.
plot_individual:
If True, plot per recording; otherwise plot summed aggregate.
show:
If True, display figures.
save_dir:
Optional directory to save figures; created if missing.
kwargs:
Additional keyword arguments to pass to pycirclize.Circos.chord_diagram.
Returns¶
object: - flat & plot_individual=False: single fig - flat & plot_individual=True: dict {handle: fig} - grouped & plot_individual=False: dict {group: fig} - grouped & plot_individual=True: dict {group: {handle: fig}}
Examples¶
>>> # xdoctest: +REQUIRES(module: pycirclize)
>>> import tempfile, os, shutil
>>> import pandas as pd
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... # create two recordings from the sample csv
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
... # build features and inject a simple 3-state sequence
... fc = FeaturesCollection.from_tracking_collection(tc)
... for _, f in fc.items():
... pat = ['0','1','2','1','0'] * (len(f.tracking.data)//5 + 1)
... seq = pd.Series(pat[:len(f.tracking.data)], index=f.tracking.data.index)
... f.store(seq, 'state', meta={})
... sc = SummaryCollection.from_features_collection(fc)
... # plot flat aggregate and save it
... with tempfile.TemporaryDirectory() as outdir:
... _ = sc.plot_chord(
... 'state', all_states=['0','1','2'], show=False, save_dir=outdir)
... os.path.exists(os.path.join(outdir, 'chord_state.png'))
True
prepare_plot ¶
prepare_plot(
metric,
*,
group_order: dict | None = None,
sort_by: list | str | None = None,
merge_by: str | None = "metric",
ax=None,
figsize=None,
)
Prepare a tidy DataFrame and seaborn kwargs without drawing anything.
This is the single entry point for all plot data preparation. The
convenience sns* methods call this internally; power users can
call it directly for full control over the seaborn call.
Parameters¶
metric : str or BatchResult or list[str | BatchResult]
Metric to prepare. Lists are merged into a single plot-ready metric.
group_order : dict[str, list] | None
{tag_name: [value, ...]} controlling within-tag value ordering.
sort_by : list[str] | str | None
Override spatial sort priority (which tag is the primary x-axis
sort dimension). Colours are unaffected — they always follow
the groupby(tags=...) order. Accepts a single tag name or a
list. See :meth:_sns_plot_common for details.
merge_by : {"metric", "component"} | None
Used only when metric is a list with more than one item. Controls
whether merged labels are arranged as metric::component
("metric"), component::metric ("component"), or kept
as flat merged labels without two-level axis formatting (None).
ax : matplotlib.axes.Axes, optional
Axes to plot on. If None, a new figure is created with
auto-calculated size.
figsize : tuple[float, float], optional
Override the automatic figure size.
Returns¶
PlotSpec A namespace with the following attributes:
- **fig** — the :class:`~matplotlib.figure.Figure`
- **ax** — the :class:`~matplotlib.axes.Axes`
- **df** — tidy long-form :class:`~pandas.DataFrame`
- **sns_kwargs** — ``dict`` ready to unpack into any seaborn
categorical plot function (contains ``data``, ``x``, ``y``,
``hue``, ``order``, ``hue_order``, ``palette``, ``dodge``,
``ax``)
- **metric_name** — raw metric name string
- **ylabel** — auto-detected y-axis label (or ``"Value"``)
- **hide_legend** — ``bool`` hint for legend handling
- **created_fig** — ``bool`` whether the figure was created here
- **n_components** — ``int`` number of unique components
- **n_groups** — ``int`` number of unique groups (1 if ungrouped)
- **filename_prefix** — ``str | None`` handle slug for auto-filenames
Examples¶
Basic power-user workflow::
import seaborn as sns
spec = sc_grouped.prepare_plot(
"total_distance",
group_order=GROUP_ORDER,
sort_by="timepoint",
)
# Full seaborn control — override anything you like
sns.boxplot(**spec.sns_kwargs, width=0.6)
spec.ax.set_title("My custom title")
spec.fig.savefig("custom.png", dpi=300)
Composing multiple layers::
spec = sc_grouped.prepare_plot(metric, group_order=ORDER)
sns.barplot(**spec.sns_kwargs, errorbar=None, alpha=0.4)
sns.stripplot(**spec.sns_kwargs, size=4, jitter=True)
snsstrip ¶
snsstrip(
metric,
*,
group_order: dict | None = None,
sort_by: list | str | None = None,
annotate=None,
ax=None,
show: bool = True,
savedir: str | None = None,
filename: str | None = None,
title: str | None = None,
**kwargs,
)
Strip plot (jittered scatter) using seaborn.
Parameters¶
metric : str or BatchResult
Either a key from Summary.data, or a BatchResult from a batch method.
group_order : dict[str, list] | None
Control group display order. See :meth:_sns_plot_common.
sort_by : list[str] | str | None
Override spatial sort priority. See :meth:_sns_plot_common.
ax : matplotlib.axes.Axes, optional
Axes to plot on. If None, creates new figure.
show : bool
Display the plot. Default True.
savedir : str | None
Directory to save figure.
filename : str | None
Custom filename.
title : str | None
Plot title.
**kwargs
Passed to seaborn.stripplot (e.g., jitter, alpha, size, palette).
Also accepts random_state for deterministic jitter placement.
Returns¶
tuple[Figure, Axes, DataFrame]
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> for f in fc.values():
... idx = f.tracking.data.index[:30]
... f.store(pd.Series(([True, False] * 15)[:len(idx)], index=idx),
... 'active', meta={})
>>> sc = SummaryCollection.from_features_collection(fc)
>>> fig, ax, df = sc.snsstrip(sc.each.time_in_state('active'), show=False)
>>> isinstance(df, pd.DataFrame)
True
snsswarm ¶
snsswarm(
metric,
*,
group_order: dict | None = None,
sort_by: list | str | None = None,
annotate=None,
ax=None,
show: bool = True,
savedir: str | None = None,
filename: str | None = None,
title: str | None = None,
**kwargs,
)
Swarm plot (non-overlapping scatter) using seaborn.
Parameters¶
metric : str or BatchResult
Either a key from Summary.data, or a BatchResult from a batch method.
group_order : dict[str, list] | None
Control group display order. See :meth:_sns_plot_common.
sort_by : list[str] | str | None
Override spatial sort priority. See :meth:_sns_plot_common.
ax, show, savedir, filename, title
Save/display options.
**kwargs
Passed to seaborn.swarmplot (e.g., size, palette).
Returns¶
tuple[Figure, Axes, DataFrame]
snsbar ¶
snsbar(
metric,
*,
group_order: dict | None = None,
sort_by: list | str | None = None,
annotate=None,
ax=None,
show: bool = True,
savedir: str | None = None,
filename: str | None = None,
title: str | None = None,
**kwargs,
)
Bar plot with error bars using seaborn.
Parameters¶
metric : str or BatchResult
Either a key from Summary.data, or a BatchResult from a batch method.
group_order : dict[str, list] | None
Control group display order. See :meth:_sns_plot_common.
sort_by : list[str] | str | None
Override spatial sort priority. See :meth:_sns_plot_common.
ax, show, savedir, filename, title
Save/display options.
**kwargs
Passed to seaborn.barplot (e.g., errorbar, palette, saturation).
Returns¶
tuple[Figure, Axes, DataFrame]
snsbox ¶
snsbox(
metric,
*,
group_order: dict | None = None,
sort_by: list | str | None = None,
annotate=None,
ax=None,
show: bool = True,
savedir: str | None = None,
filename: str | None = None,
title: str | None = None,
**kwargs,
)
Box plot using seaborn.
Parameters¶
metric : str or BatchResult
Either a key from Summary.data, or a BatchResult from a batch method.
group_order : dict[str, list] | None
Control group display order. See :meth:_sns_plot_common.
sort_by : list[str] | str | None
Override spatial sort priority. See :meth:_sns_plot_common.
ax, show, savedir, filename, title
Save/display options.
**kwargs
Passed to seaborn.boxplot (e.g., width, palette, fliersize).
Returns¶
tuple[Figure, Axes, DataFrame]
snsviolin ¶
snsviolin(
metric,
*,
group_order: dict | None = None,
sort_by: list | str | None = None,
annotate=None,
ax=None,
show: bool = True,
savedir: str | None = None,
filename: str | None = None,
title: str | None = None,
**kwargs,
)
Violin plot using seaborn.
Parameters¶
metric : str or BatchResult
Either a key from Summary.data, or a BatchResult from a batch method.
group_order : dict[str, list] | None
Control group display order. See :meth:_sns_plot_common.
sort_by : list[str] | str | None
Override spatial sort priority. See :meth:_sns_plot_common.
ax, show, savedir, filename, title
Save/display options.
**kwargs
Passed to seaborn.violinplot (e.g., inner, split, palette).
Returns¶
tuple[Figure, Axes, DataFrame]
snspoint ¶
snspoint(
metric,
*,
group_order: dict | None = None,
sort_by: list | str | None = None,
annotate=None,
ax=None,
show: bool = True,
savedir: str | None = None,
filename: str | None = None,
title: str | None = None,
**kwargs,
)
Point plot (mean + CI) using seaborn.
Parameters¶
metric : str or BatchResult
Either a key from Summary.data, or a BatchResult from a batch method.
group_order : dict[str, list] | None
Control group display order. See :meth:_sns_plot_common.
sort_by : list[str] | str | None
Override spatial sort priority. See :meth:_sns_plot_common.
ax, show, savedir, filename, title
Save/display options.
**kwargs
Passed to seaborn.pointplot (e.g., errorbar, markers, linestyles).
Returns¶
tuple[Figure, Axes, DataFrame]
snssuperplot ¶
snssuperplot(
metric,
*,
group_order: dict | None = None,
sort_by: list | str | None = None,
annotate=None,
ax=None,
show: bool = True,
savedir: str | None = None,
filename: str | None = None,
title: str | None = None,
ylabel: str | None = None,
bar_kwargs: dict | None = None,
strip_kwargs: dict | None = None,
**kwargs,
)
Superplot: bar plot (mean) with strip plot (individual dots) overlay.
This is the "publication-ready" visualization showing mean bars with individual data points scattered on top, commonly used in scientific papers. The dots are constrained within the bar width by default.
Parameters¶
metric : str or BatchResult
Either a key from Summary.data, or a BatchResult from a batch method.
group_order : dict[str, list] | None
Control group display order. See :meth:_sns_plot_common.
sort_by : list[str] | str | None
Override spatial sort priority. See :meth:_sns_plot_common.
annotate : str or dict or None
Statistical annotations. See :meth:_sns_plot_common.
ax : matplotlib.axes.Axes, optional
Axes to plot on. If None, creates new figure.
show : bool
Display the plot. Default True.
savedir : str | None
Directory to save figure.
filename : str | None
Custom filename.
title : str | None
Plot title.
ylabel : str | None
Y-axis label. Auto-detected from metric when None.
bar_kwargs : dict | None
Extra kwargs for barplot (e.g., errorbar, capsize, saturation).
strip_kwargs : dict | None
Extra kwargs for stripplot (e.g., alpha, size, jitter).
**kwargs
Common kwargs passed to both plots (e.g., palette, dodge).
Also accepts random_state for deterministic jitter placement.
Returns¶
tuple[Figure, Axes, DataFrame]
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> for f in fc.values():
... n = len(f.tracking.data)
... states = pd.Series((['A', 'B', 'A'] * (n // 3 + 1))[:n],
... index=f.tracking.data.index)
... f.store(states, 'zone', meta={})
>>> sc = SummaryCollection.from_features_collection(fc)
>>> fig, ax, df = sc.snssuperplot(sc.each.time_in_state('zone'), show=False)
>>> isinstance(df, pd.DataFrame)
True
values ¶
values()
Values iterator (elements or sub-collections).
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> len(list(coll.values())) == 2
True
items ¶
items()
Items iterator (handle, element).
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sorted([h for h, _ in coll.items()])
['A', 'B']
keys ¶
keys()
Keys iterator (handles or group keys).
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> list(sorted(coll.keys()))
['A', 'B']
merge
classmethod
¶
merge(collections, *, copy=False)
Merge multiple collections into a single flat collection containing all leaf elements from each input.
Each input collection is flattened before merging, so grouped inputs
are supported. The result is always a new flat collection. Leaves are
shared by reference unless copy=True.
Parameters¶
collections : list[BaseCollection]
Two or more collections of the same concrete type. Every element
across all collections must have a unique handle.
copy : bool, default False
If True, each leaf is copied (via its .copy() method) so that
the merged collection is fully independent of the originals.
Returns¶
BaseCollection A new flat collection containing all leaves.
Raises¶
ValueError If collections is empty, or if any handles are duplicated. TypeError If any input is not an instance of the calling class.
Warns¶
UserWarning If the tag key sets differ across input collections (the merged collection will have mixed tag coverage).
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... _ = shutil.copy(p, d / 'C.csv'); _ = shutil.copy(p, d / 'D.csv')
... c1 = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
... c2 = TrackingCollection.from_dlc({'C': str(d/'C.csv'), 'D': str(d/'D.csv')}, fps=30)
>>> merged = TrackingCollection.merge([c1, c2])
>>> sorted(merged.keys())
['A', 'B', 'C', 'D']
>>> len(merged)
4
groupby ¶
groupby(tags)
Group the collection by one or more existing tag names. Returns a grouped view (this same collection type) whose values are sub-collections keyed by a tuple of tag values in the order provided.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> g.is_grouped
True
>>> sorted(g.group_keys)
[('G1',), ('G2',)]
flatten ¶
flatten()
Flatten a MultipleCollection to a flat Collection. If already flat, return self.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
... g = coll.groupby('group')
>>> flat = g.flatten()
>>> flat.is_grouped
False
>>> sorted(flat.keys())
['A', 'B']
get_group ¶
get_group(key)
Get a sub-collection by group key from a grouped view.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sub = g.get_group(('G1',))
>>> list(sub.keys())
['A']
regroup ¶
regroup()
Recompute the same grouping using the current tags and the original grouping tag order. If not grouped, returns self.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
... g = coll.groupby('group')
... coll['B'].add_tag('group','G2', overwrite=True) # change tag
>>> g2 = g.regroup()
>>> sorted(g2.group_keys)
[('G1',), ('G2',)]
tags_info ¶
tags_info(
*, include_value_counts: bool = False
) -> pd.DataFrame
Summarize tag presence across the collection's leaf objects.
Works for flat and grouped collections. If include_value_counts is True,
include a column 'value_counts' with a dict of value->count for each tag.
Returns a pandas.DataFrame with columns:
['tag', 'attached_to', 'missing_from', 'unique_values', ('value_counts')]
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('genotype', 'WT')
... coll['B'].add_tag('timepoint', 'T1')
>>> info = coll.tags_info(include_value_counts=True)
>>> int(info.loc['genotype','attached_to'])
1
>>> int(info.loc['genotype','missing_from'])
1
>>> int(info.loc['genotype','unique_values'])
1
>>> info.loc['genotype','value_counts']
{'WT': 1}
>>> int(info.loc['timepoint','attached_to'])
1
map_leaves ¶
map_leaves(fn)
Apply a function to every leaf element and return a new collection of the same type. Preserves grouping shape and groupby metadata when grouped.
fn: callable(Element) -> ElementLike
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sub = coll.map_leaves(lambda t: t.loc[0:1])
>>> all(len(t.data) == 2 for t in sub.values())
True
copy ¶
copy()
Creates a copy of the BaseCollection. Raises NotImplementedError if any leaf does not implement copy().
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv')
... _ = shutil.copy(p, d / 'B.csv')
... coll = TrackingCollection.from_folder(
... str(d), tracking_loader=Tracking.from_dlc, fps=30
... )
>>> coll_copy = coll.copy()
>>> sorted(coll_copy.keys())
['A', 'B']
save ¶
save(
dirpath: str,
*,
overwrite: bool = False,
data_format: str = "parquet",
) -> None
Save this collection to a directory. Preserves grouping and delegates to leaf objects' save(dirpath, data_format, overwrite=True).
Examples¶
>>> import tempfile, shutil, os
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... out = d / 'coll'
... coll.save(str(out), overwrite=True, data_format='csv')
... # collection-level manifest at top-level
... assert os.path.exists(os.path.join(str(out), 'manifest.json'))
... # element-level manifests under elements/<handle>/
... el_manifest = os.path.join(str(out), 'elements', 'A', 'manifest.json')
... assert os.path.exists(el_manifest)
load
classmethod
¶
load(dirpath: str)
Load a collection previously saved with save(). Uses the class's _element_type.load to reconstruct leaves.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... out = d / 'coll'
... coll.save(str(out), overwrite=True, data_format='csv')
... coll2 = TrackingCollection.load(str(out))
>>> list(sorted(coll2.keys()))
['A', 'B']