Skip to content

FeaturesCollection

py3r.behaviour.features.features_collection.FeaturesCollection

FeaturesCollection(features_dict: dict[str, Features])

Bases: BaseCollection

Collection of Features objects, keyed by name. note: type-hints refer to Features, but factory methods allow for other classes these are intended ONLY for subclasses of Features, and this is enforced

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> list(sorted(fc.keys()))
['A', 'B']

each instance-attribute

each: Features

each_forcebatch instance-attribute

each_forcebatch: Features

features_dict property

features_dict

loc property

loc

iloc property

iloc

is_grouped property

is_grouped

True if this collection is a grouped view.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> coll.is_grouped
False

groupby_tags property

groupby_tags

The tag names used to form this grouped view (or None if flat).

group_keys property

group_keys

Keys for the groups in a grouped view. Empty list if not grouped.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sorted(g.group_keys)
[('G1',), ('G2',)]

from_tracking_collection classmethod

from_tracking_collection(
    tracking_collection: TrackingCollection,
    feature_cls=Features,
)

Create a FeaturesCollection from a TrackingCollection.

Parameters

tracking_collection : TrackingCollection Source collection. Grouped structure is preserved. feature_cls : type, default=Features Features subclass to instantiate for each session.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> isinstance(fc['A'], Features) and isinstance(fc['B'], Features)
True

concat classmethod

concat(
    collections: list[FeaturesCollection],
    *,
    reindex: Literal[
        "rezero", "follow_previous", "keep_original"
    ] = "follow_previous",
) -> FeaturesCollection

Concatenate multiple FeaturesCollections along the time (frame) axis.

Each collection must have the same handles (keys). For each handle, the corresponding Features objects are concatenated in order. Supports both flat and grouped collections.

Parameters

collections : list[FeaturesCollection] List of FeaturesCollection objects to concatenate, in temporal order. All must have matching keys (handles) and feature columns. reindex : {"rezero", "follow_previous", "keep_original"}, default "follow_previous" How to handle frame indices: - "rezero": Reindex all frames starting from 0 (0, 1, 2, ...). - "follow_previous": Each chunk continues from where the previous ended. If chunk 1 ends at frame n, chunk 2 starts at n+1. - "keep_original": Leave indices untouched; duplicates are allowed.

Returns

FeaturesCollection A new collection with concatenated Features objects for each handle.

Raises

ValueError If collections is empty, keys don't match, or grouping structure differs.

Notes

For context-dependent features (normalization, embeddings with temporal windows, etc.), consider whether you need to recompute features on concatenated Tracking data rather than concatenating pre-computed features.

Examples

Concatenate two flat collections:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc1 = TrackingCollection.from_dlc({'A': str(d/'A.csv'),
...                                       'B': str(d/'B.csv')}, fps=30)
...     tc2 = TrackingCollection.from_dlc({'A': str(d/'A.csv'),
...                                        'B': str(d/'B.csv')}, fps=30)
>>> fc1 = FeaturesCollection.from_tracking_collection(tc1)
>>> fc2 = FeaturesCollection.from_tracking_collection(tc2)
>>> # Add a feature to all
>>> for f in list(fc1.values()) + list(fc2.values()):
...     s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
...     f.store(s, 'counter', meta={})
>>> combined = FeaturesCollection.concat([fc1, fc2])
>>> len(combined['A'].data) == len(fc1['A'].data) + len(fc2['A'].data)
True
>>> 'concat' in combined['A'].meta
True

from_list classmethod

from_list(features_list: list[Features])

Create a FeaturesCollection from a list of Features objects, keyed by handle.

Parameters

features_list : list[Features] Features objects to collect. All handles must be unique.

Examples
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...     t1 = Tracking.from_dlc(str(p), handle='A', fps=30)
...     t2 = Tracking.from_dlc(str(p), handle='B', fps=30)
>>> f1, f2 = Features(t1), Features(t2)
>>> fc = FeaturesCollection.from_list([f1, f2])
>>> list(sorted(fc.keys()))
['A', 'B']

to_summary

to_summary() -> SummaryCollection

Create a SummaryCollection from this FeaturesCollection.

This is a convenience wrapper around SummaryCollection.from_features_collection(self) and preserves grouped structure when the collection is grouped.

Returns

SummaryCollection Collection containing one Summary object per features object in this collection.

Examples

```pycon >>> import tempfile, shutil >>> from pathlib import Path >>> from py3r.behaviour.features.features_collection import FeaturesCollection >>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection >>> from py3r.behaviour.util.docdata import data_path >>> with tempfile.TemporaryDirectory() as d: ... d = Path(d) ... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p: ... a = d / 'A.csv'; b = d / 'B.csv' ... _ = shutil.copy(p, a); _ = shutil.copy(p, b) ... tc = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30) ... fc = FeaturesCollection.from_tracking_collection(tc) ... sc = fc.to_summary() >>> from py3r.behaviour.summary.summary_collection import SummaryCollection >>> isinstance(sc, SummaryCollection) True >>> sorted(sc.keys()) ['A', 'B']

```

cluster_embedding

cluster_embedding(
    embedding_dict: dict[str, list[int]],
    n_clusters: int,
    random_state: int = 0,
    *,
    normalize: bool = False,
    feature_weights: dict[str, float] | None = None,
    lowmem: bool = False,
    decimation_factor: int = 10,
    missing_policy: Literal[
        "drop", "impute_weight"
    ] = "drop",
    auto_normalize: bool = False,
    rescale_factors: dict | None = None,
    custom_scaling: dict[str, dict] | None = None,
)

Perform k-means clustering using the specified embedding.

Returns (BatchResult, centroids, scaling_factors) where scaling_factors is a dict of one float per embedding column — the combined effect of normalisation and feature weights.

Parameters

normalize : bool Divide each base feature by its global std before embedding. feature_weights : dict[str, float] | None Substring → weight mapping, e.g. {"speed": 4.0, "accel": 2.0}. Each key is matched against embedding column names by substring; matched columns are multiplied by the value. Resolved internally via :func:~py3r.behaviour.util.series_utils.build_column_weights. Raises if a rule matches no column (likely typo). Examples


>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # Create a trivial feature 'counter' in each Features to embed
>>> for f in fc.values():
...     s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
...     f.store(s, 'counter')
>>> batch, centroids, norm = fc.cluster_embedding(
...     {'counter':[0]}, n_clusters=2, lowmem=True)
>>> isinstance(centroids, pd.DataFrame)
True
>>> batch, centroids, norm = fc.cluster_embedding(
...     {'counter':[0]}, n_clusters=2, lowmem=True,
...     missing_policy='impute_weight')
>>> isinstance(centroids, pd.DataFrame)
True
>>> batch, centroids, norm = fc.cluster_embedding(
...     {'counter':[0]}, n_clusters=2, lowmem=True,
...     missing_policy='drop')
>>> isinstance(centroids, pd.DataFrame)
True

cluster_embedding_stream

cluster_embedding_stream(
    embedding_dict: dict[str, list[int]],
    n_clusters: int,
    random_state: int = 0,
    *,
    normalize: bool = False,
    feature_weights: dict[str, float] | None = None,
    missing_policy: Literal[
        "drop", "impute_weight"
    ] = "drop",
    chunk_size: int = 10000,
    n_epochs: int = 3,
    batch_size: int = 1024,
)

Memory-friendly clustering via streaming MiniBatchKMeans.

Unlike cluster_embedding, this never builds a combined DataFrame. Embeddings are extracted one Features at a time, sliced into fixed-size chunks, and fed to MiniBatchKMeans.partial_fit. Multiple epochs improve convergence; uniform chunk sizes prevent large recordings from dominating centroid updates.

Normalisation is computed on base feature columns (before embedding) so that all time-shifts of the same feature share the same std. The returned scaling_factors is a dict of one float per embedding column — the combined effect of normalisation and feature weights. Multiply raw embedding values by these to reproduce the transform.

Returns (BatchResult, centroids, scaling_factors).

Parameters

embedding_dict : dict[str, list[int]] Feature columns and their time shifts for the embedding. n_clusters : int Number of clusters. random_state : int Seed for reproducibility. normalize : bool Divide each base feature by its global std before embedding. feature_weights : dict[str, float] | None Substring → weight mapping, e.g. {"speed": 4.0}. Resolved internally into per-column weights. Raises if a rule matches no column (likely typo). missing_policy : {"drop", "impute_weight"} How to handle NaN rows. chunk_size : int Max rows per partial_fit call. n_epochs : int Number of full passes over the data. batch_size : int MiniBatchKMeans internal mini-batch size.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> for f in fc.values():
...     s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
...     f.store(s, 'counter')
>>> batch, centroids, norm = fc.cluster_embedding_stream(
...     {'counter': [0]}, n_clusters=2)
>>> isinstance(centroids, pd.DataFrame) and centroids.shape[0] == 2
True

cluster_diagnostics

cluster_diagnostics(
    labels_result,
    n_clusters: int | None = None,
    *,
    low: float = 0.05,
    high: float = 0.9,
    verbose: bool = True,
)

Compute diagnostic stats for cluster label assignments.

Parameters

labels_result: Mapping from handle (or group->handle) to FeaturesResult of integer labels (with NA). Accepts the return shape of cluster_embedding(...)[0] (BatchResult or dict). n_clusters: Optional number of clusters. If None, inferred from labels (max label + 1). low, high: Prevalence thresholds for low/high cluster labels per recording. verbose: If True, print a compact summary.

Returns

dict with: - 'global': {'cluster_prevalence': {label: frac, ...}, 'percent_nan': frac} - 'per_recording': DataFrame, rows per recording, cols ['percent_nan', 'num_missing', 'num_low', 'num_high'] - 'summary': min/median/max for the per_recording columns - if grouped: 'per_group': {group_key: {'per_recording': df, 'summary': {...}}}

cross_predict_rms

cross_predict_rms(
    source_embedding: dict[str, list[int]],
    target_embedding: dict[str, list[int]],
    normalize_source: bool = False,
    normalize_pred: dict | str = None,
    set1: list | None = None,
    set2: list | None = None,
    predictor_cls=None,
    predictor_kwargs=None,
)

Dev mode only: not available in public release yet.

plot_cross_predict_vs_within staticmethod

plot_cross_predict_vs_within(
    results, from_group, to_group, show=True
)

Dev mode only: not available in public release yet.

plot_cross_predict_results staticmethod

plot_cross_predict_results(
    results,
    within_keys=None,
    between_keys=None,
    plot_type="bar",
    figsize=(10, 6),
    show=True,
)

Dev mode only: not available in public release yet.

dumbbell_plot_cross_predict staticmethod

dumbbell_plot_cross_predict(
    results,
    within_key,
    between_key,
    figsize=(3, 3),
    show=True,
)

Dev mode only: not available in public release yet.

train_knn_regressor

train_knn_regressor(
    *,
    source_embedding: dict[str, list[int]],
    target_embedding: dict[str, list[int]],
    predictor_cls=None,
    predictor_kwargs=None,
    normalize_source: bool = False,
    **kwargs,
)

Dev mode only: not available in public release yet.

predict_knn

predict_knn(
    model,
    source_embedding: dict[str, list[int]],
    target_embedding: dict[str, list[int]],
    rescale_factors: dict = None,
) -> pd.DataFrame

Dev mode only: not available in public release yet.

plot

plot(
    arg=None,
    figsize=(8, 2),
    show: bool = True,
    title: str = None,
)

Plot features for all collections in the MultipleFeaturesCollection. - If arg is a BatchResult or dict: treat as batch result and plot for each collection. - Otherwise: treat as column name(s) or None and plot for each collection. - If title is provided, it will be used as the overall title for the figure.

store

store(
    results_dict,
    name: str = None,
    meta: dict = None,
    overwrite: bool = False,
)

Store FeaturesResult objects returned by batch methods.

Parameters

results_dict : dict Batch results to store. Flat: {handle: FeaturesResult}. Grouped: {group_key: {handle: FeaturesResult}}. name : str | None, default=None Column name to store under. If None, resolved automatically from the result objects (all must agree on a single name). meta : dict | None, default=None Metadata dict to attach alongside the stored column. overwrite : bool, default=False If True, overwrite an existing column with the same name.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # Build a simple FeaturesResult dict from distance_between
>>> rd = {h: feat.distance_between('p1','p2') for h, feat in fc.items()}
>>> fc.store(rd, name='d12')
>>> all('d12' in feat.data.columns for feat in fc.values())
True
Returns

str The resolved stored column name. If auto-naming would resolve to multiple different names across leaves, raises ValueError.

stored_info

stored_info() -> pd.DataFrame

Summarize stored feature columns across the collection's leaf Features objects.

Returns a DataFrame indexed by feature with columns: - attached_to: number of recordings containing the feature - missing_from: number of recordings not containing the feature - type: pandas dtype string for the feature column when consistent, or a list of dtype strings when mixed across recordings.

values

values()

Values iterator (elements or sub-collections).

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> len(list(coll.values())) == 2
True

items

items()

Items iterator (handle, element).

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sorted([h for h, _ in coll.items()])
['A', 'B']

keys

keys()

Keys iterator (handles or group keys).

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> list(sorted(coll.keys()))
['A', 'B']

merge classmethod

merge(collections, *, copy=False)

Merge multiple collections into a single flat collection containing all leaf elements from each input.

Each input collection is flattened before merging, so grouped inputs are supported. The result is always a new flat collection. Leaves are shared by reference unless copy=True.

Parameters

collections : list[BaseCollection] Two or more collections of the same concrete type. Every element across all collections must have a unique handle. copy : bool, default False If True, each leaf is copied (via its .copy() method) so that the merged collection is fully independent of the originals.

Returns

BaseCollection A new flat collection containing all leaves.

Raises

ValueError If collections is empty, or if any handles are duplicated. TypeError If any input is not an instance of the calling class.

Warns

UserWarning If the tag key sets differ across input collections (the merged collection will have mixed tag coverage).

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...         _ = shutil.copy(p, d / 'C.csv'); _ = shutil.copy(p, d / 'D.csv')
...     c1 = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
...     c2 = TrackingCollection.from_dlc({'C': str(d/'C.csv'), 'D': str(d/'D.csv')}, fps=30)
>>> merged = TrackingCollection.merge([c1, c2])
>>> sorted(merged.keys())
['A', 'B', 'C', 'D']
>>> len(merged)
4

groupby

groupby(tags)

Group the collection by one or more existing tag names. Returns a grouped view (this same collection type) whose values are sub-collections keyed by a tuple of tag values in the order provided.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> g.is_grouped
True
>>> sorted(g.group_keys)
[('G1',), ('G2',)]

flatten

flatten()

Flatten a MultipleCollection to a flat Collection. If already flat, return self.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
...     g = coll.groupby('group')
>>> flat = g.flatten()
>>> flat.is_grouped
False
>>> sorted(flat.keys())
['A', 'B']

get_group

get_group(key)

Get a sub-collection by group key from a grouped view.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sub = g.get_group(('G1',))
>>> list(sub.keys())
['A']

regroup

regroup()

Recompute the same grouping using the current tags and the original grouping tag order. If not grouped, returns self.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
...     g = coll.groupby('group')
...     coll['B'].add_tag('group','G2', overwrite=True)  # change tag
>>> g2 = g.regroup()
>>> sorted(g2.group_keys)
[('G1',), ('G2',)]

tags_info

tags_info(
    *, include_value_counts: bool = False
) -> pd.DataFrame

Summarize tag presence across the collection's leaf objects. Works for flat and grouped collections. If include_value_counts is True, include a column 'value_counts' with a dict of value->count for each tag. Returns a pandas.DataFrame with columns: ['tag', 'attached_to', 'missing_from', 'unique_values', ('value_counts')]

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('genotype', 'WT')
...     coll['B'].add_tag('timepoint', 'T1')
>>> info = coll.tags_info(include_value_counts=True)
>>> int(info.loc['genotype','attached_to'])
1
>>> int(info.loc['genotype','missing_from'])
1
>>> int(info.loc['genotype','unique_values'])
1
>>> info.loc['genotype','value_counts']
{'WT': 1}
>>> int(info.loc['timepoint','attached_to'])
1

map_leaves

map_leaves(fn)

Apply a function to every leaf element and return a new collection of the same type. Preserves grouping shape and groupby metadata when grouped.

fn: callable(Element) -> ElementLike

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sub = coll.map_leaves(lambda t: t.loc[0:1])
>>> all(len(t.data) == 2 for t in sub.values())
True

copy

copy()

Creates a copy of the BaseCollection. Raises NotImplementedError if any leaf does not implement copy().

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv')
...         _ = shutil.copy(p, d / 'B.csv')
...     coll = TrackingCollection.from_folder(
...         str(d), tracking_loader=Tracking.from_dlc, fps=30
...     )
>>> coll_copy = coll.copy()
>>> sorted(coll_copy.keys())
['A', 'B']

save

save(
    dirpath: str,
    *,
    overwrite: bool = False,
    data_format: str = "parquet",
) -> None

Save this collection to a directory. Preserves grouping and delegates to leaf objects' save(dirpath, data_format, overwrite=True).

Examples
>>> import tempfile, shutil, os
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     out = d / 'coll'
...     coll.save(str(out), overwrite=True, data_format='csv')
...     # collection-level manifest at top-level
...     assert os.path.exists(os.path.join(str(out), 'manifest.json'))
...     # element-level manifests under elements/<handle>/
...     el_manifest = os.path.join(str(out), 'elements', 'A', 'manifest.json')
...     assert os.path.exists(el_manifest)

load classmethod

load(dirpath: str)

Load a collection previously saved with save(). Uses the class's _element_type.load to reconstruct leaves.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     out = d / 'coll'
...     coll.save(str(out), overwrite=True, data_format='csv')
...     coll2 = TrackingCollection.load(str(out))
>>> list(sorted(coll2.keys()))
['A', 'B']