Skip to content

FeaturesCollection

py3r.behaviour.features.features_collection.FeaturesCollection

FeaturesCollection(features_dict: dict[str, Features])

Bases: BaseCollection

Collection of Features objects, keyed by name. note: type-hints refer to Features, but factory methods allow for other classes these are intended ONLY for subclasses of Features, and this is enforced

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> list(sorted(fc.keys()))
['A', 'B']

each instance-attribute

each: Features

each_forcebatch instance-attribute

each_forcebatch: Features

features_dict property

features_dict

loc property

loc

iloc property

iloc

is_grouped property

is_grouped

True if this collection is a grouped view.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> coll.is_grouped
False

groupby_tags property

groupby_tags

The tag names used to form this grouped view (or None if flat).

group_keys property

group_keys

Keys for the groups in a grouped view. Empty list if not grouped.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sorted(g.group_keys)
[('G1',), ('G2',)]

from_tracking_collection classmethod

from_tracking_collection(
    tracking_collection: TrackingCollection,
    feature_cls=Features,
)

Create a FeaturesCollection from a TrackingCollection.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> isinstance(fc['A'], Features) and isinstance(fc['B'], Features)
True

concat classmethod

concat(
    collections: list[FeaturesCollection],
    *,
    reindex: Literal[
        "rezero", "follow_previous", "keep_original"
    ] = "follow_previous",
) -> FeaturesCollection

Concatenate multiple FeaturesCollections along the time (frame) axis.

Each collection must have the same handles (keys). For each handle, the corresponding Features objects are concatenated in order. Supports both flat and grouped collections.

Parameters:

Name Type Description Default

collections

list[FeaturesCollection]

List of FeaturesCollection objects to concatenate, in temporal order. All must have matching keys (handles) and feature columns.

required

reindex

('rezero', 'follow_previous', 'keep_original')

How to handle frame indices: - "rezero": Reindex all frames starting from 0 (0, 1, 2, ...). - "follow_previous": Each chunk continues from where the previous ended. If chunk 1 ends at frame n, chunk 2 starts at n+1. - "keep_original": Leave indices untouched; duplicates are allowed.

"rezero"

Returns:

Type Description
FeaturesCollection

A new collection with concatenated Features objects for each handle.

Raises:

Type Description
ValueError

If collections is empty, keys don't match, or grouping structure differs.

Notes

For context-dependent features (normalization, embeddings with temporal windows, etc.), consider whether you need to recompute features on concatenated Tracking data rather than concatenating pre-computed features.

Examples:

Concatenate two flat collections:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc1 = TrackingCollection.from_dlc({'A': str(d/'A.csv'),
...                                       'B': str(d/'B.csv')}, fps=30)
...     tc2 = TrackingCollection.from_dlc({'A': str(d/'A.csv'),
...                                        'B': str(d/'B.csv')}, fps=30)
>>> fc1 = FeaturesCollection.from_tracking_collection(tc1)
>>> fc2 = FeaturesCollection.from_tracking_collection(tc2)
>>> # Add a feature to all
>>> for f in list(fc1.values()) + list(fc2.values()):
...     s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
...     f.store(s, 'counter', meta={})
>>> combined = FeaturesCollection.concat([fc1, fc2])
>>> len(combined['A'].data) == len(fc1['A'].data) + len(fc2['A'].data)
True
>>> 'concat' in combined['A'].meta
True

from_list classmethod

from_list(features_list: list[Features])

Create a FeaturesCollection from a list of Features objects, keyed by handle

Examples:

>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...     t1 = Tracking.from_dlc(str(p), handle='A', fps=30)
...     t2 = Tracking.from_dlc(str(p), handle='B', fps=30)
>>> f1, f2 = Features(t1), Features(t2)
>>> fc = FeaturesCollection.from_list([f1, f2])
>>> list(sorted(fc.keys()))
['A', 'B']

cluster_embedding

cluster_embedding(
    embedding_dict: dict[str, list[int]],
    n_clusters: int,
    random_state: int = 0,
    *,
    normalize: bool = False,
    feature_weights: dict[str, float] | None = None,
    lowmem: bool = False,
    decimation_factor: int = 10,
    missing_policy: Literal[
        "drop", "impute_weight"
    ] = "drop",
    auto_normalize: bool = False,
    rescale_factors: dict | None = None,
    custom_scaling: dict[str, dict] | None = None,
)

Perform k-means clustering using the specified embedding.

Returns (BatchResult, centroids, scaling_factors) where scaling_factors is a dict of one float per embedding column — the combined effect of normalisation and feature weights.

Parameters:

Name Type Description Default

normalize

bool

Divide each base feature by its global std before embedding.

False

feature_weights

dict[str, float] | None

Substring → weight mapping, e.g. {"speed": 4.0, "accel": 2.0}. Each key is matched against embedding column names by substring; matched columns are multiplied by the value. Resolved internally via :func:~py3r.behaviour.util.series_utils.build_column_weights. Raises if a rule matches no column (likely typo).

None

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # Create a trivial feature 'counter' in each Features to embed
>>> for f in fc.values():
...     s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
...     f.store(s, 'counter')
>>> batch, centroids, norm = fc.cluster_embedding(
...     {'counter':[0]}, n_clusters=2, lowmem=True)
>>> isinstance(centroids, pd.DataFrame)
True
>>> batch, centroids, norm = fc.cluster_embedding(
...     {'counter':[0]}, n_clusters=2, lowmem=True,
...     missing_policy='impute_weight')
>>> isinstance(centroids, pd.DataFrame)
True
>>> batch, centroids, norm = fc.cluster_embedding(
...     {'counter':[0]}, n_clusters=2, lowmem=True,
...     missing_policy='drop')
>>> isinstance(centroids, pd.DataFrame)
True

cluster_embedding_stream

cluster_embedding_stream(
    embedding_dict: dict[str, list[int]],
    n_clusters: int,
    random_state: int = 0,
    *,
    normalize: bool = False,
    feature_weights: dict[str, float] | None = None,
    missing_policy: Literal[
        "drop", "impute_weight"
    ] = "drop",
    chunk_size: int = 10000,
    n_epochs: int = 3,
    batch_size: int = 1024,
)

Memory-friendly clustering via streaming MiniBatchKMeans.

Unlike cluster_embedding, this never builds a combined DataFrame. Embeddings are extracted one Features at a time, sliced into fixed-size chunks, and fed to MiniBatchKMeans.partial_fit. Multiple epochs improve convergence; uniform chunk sizes prevent large recordings from dominating centroid updates.

Normalisation is computed on base feature columns (before embedding) so that all time-shifts of the same feature share the same std. The returned scaling_factors is a dict of one float per embedding column — the combined effect of normalisation and feature weights. Multiply raw embedding values by these to reproduce the transform.

Returns (BatchResult, centroids, scaling_factors).

Parameters:

Name Type Description Default

embedding_dict

dict[str, list[int]]

Feature columns and their time shifts for the embedding.

required

n_clusters

int

Number of clusters.

required

random_state

int

Seed for reproducibility.

0

normalize

bool

Divide each base feature by its global std before embedding.

False

feature_weights

dict[str, float] | None

Substring → weight mapping, e.g. {"speed": 4.0}. Resolved internally into per-column weights. Raises if a rule matches no column (likely typo).

None

missing_policy

('drop', 'impute_weight')

How to handle NaN rows.

"drop"

chunk_size

int

Max rows per partial_fit call.

10000

n_epochs

int

Number of full passes over the data.

3

batch_size

int

MiniBatchKMeans internal mini-batch size.

1024

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> for f in fc.values():
...     s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
...     f.store(s, 'counter')
>>> batch, centroids, norm = fc.cluster_embedding_stream(
...     {'counter': [0]}, n_clusters=2)
>>> isinstance(centroids, pd.DataFrame) and centroids.shape[0] == 2
True

cluster_diagnostics

cluster_diagnostics(
    labels_result,
    n_clusters: int | None = None,
    *,
    low: float = 0.05,
    high: float = 0.9,
    verbose: bool = True,
)

Compute diagnostic stats for cluster label assignments.

Parameters:

Name Type Description Default

labels_result

Mapping from handle (or group->handle) to FeaturesResult of integer labels (with NA). Accepts the return shape of cluster_embedding(...)[0] (BatchResult or dict).

required

n_clusters

int | None

Optional number of clusters. If None, inferred from labels (max label + 1).

None

low

float

Prevalence thresholds for low/high cluster labels per recording.

0.05

high

float

Prevalence thresholds for low/high cluster labels per recording.

0.05

verbose

bool

If True, print a compact summary.

True

Returns:

Type Description
dict with:
  • 'global': {'cluster_prevalence': {label: frac, ...}, 'percent_nan': frac}
  • 'per_recording': DataFrame, rows per recording, cols ['percent_nan', 'num_missing', 'num_low', 'num_high']
  • 'summary': min/median/max for the per_recording columns
  • if grouped: 'per_group': {group_key: {'per_recording': df, 'summary': {...}}}

cross_predict_rms

cross_predict_rms(
    source_embedding: dict[str, list[int]],
    target_embedding: dict[str, list[int]],
    normalize_source: bool = False,
    normalize_pred: dict | str = None,
    set1: list | None = None,
    set2: list | None = None,
    predictor_cls=None,
    predictor_kwargs=None,
)

Dev mode only: not available in public release yet.

plot_cross_predict_vs_within staticmethod

plot_cross_predict_vs_within(
    results, from_group, to_group, show=True
)

Dev mode only: not available in public release yet.

plot_cross_predict_results staticmethod

plot_cross_predict_results(
    results,
    within_keys=None,
    between_keys=None,
    plot_type="bar",
    figsize=(10, 6),
    show=True,
)

Dev mode only: not available in public release yet.

dumbbell_plot_cross_predict staticmethod

dumbbell_plot_cross_predict(
    results,
    within_key,
    between_key,
    figsize=(3, 3),
    show=True,
)

Dev mode only: not available in public release yet.

train_knn_regressor

train_knn_regressor(
    *,
    source_embedding: dict[str, list[int]],
    target_embedding: dict[str, list[int]],
    predictor_cls=None,
    predictor_kwargs=None,
    normalize_source: bool = False,
    **kwargs,
)

Dev mode only: not available in public release yet.

predict_knn

predict_knn(
    model,
    source_embedding: dict[str, list[int]],
    target_embedding: dict[str, list[int]],
    rescale_factors: dict = None,
) -> pd.DataFrame

Dev mode only: not available in public release yet.

plot

plot(
    arg=None,
    figsize=(8, 2),
    show: bool = True,
    title: str = None,
)

Plot features for all collections in the MultipleFeaturesCollection. - If arg is a BatchResult or dict: treat as batch result and plot for each collection. - Otherwise: treat as column name(s) or None and plot for each collection. - If title is provided, it will be used as the overall title for the figure.

store

store(
    results_dict,
    name: str = None,
    meta: dict = None,
    overwrite: bool = False,
)

Store FeaturesResult objects returned by batch methods.

  • Flat collection: results_dict is {handle: FeaturesResult}
  • Grouped collection: results_dict is {group_key: {handle: FeaturesResult}}

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # Build a simple FeaturesResult dict from distance_between
>>> rd = {h: feat.distance_between('p1','p2') for h, feat in fc.items()}
>>> fc.store(rd, name='d12')
>>> all('d12' in feat.data.columns for feat in fc.values())
True

Returns:

Type Description
str

The resolved stored column name. If auto-naming would resolve to multiple different names across leaves, raises ValueError.

stored_info

stored_info() -> pd.DataFrame

Summarize stored feature columns across the collection's leaf Features objects.

Returns a DataFrame indexed by feature with columns: - attached_to: number of recordings containing the feature - missing_from: number of recordings not containing the feature - type: pandas dtype string for the feature column when consistent, or a list of dtype strings when mixed across recordings.

values

values()

Values iterator (elements or sub-collections).

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> len(list(coll.values())) == 2
True

items

items()

Items iterator (handle, element).

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sorted([h for h, _ in coll.items()])
['A', 'B']

keys

keys()

Keys iterator (handles or group keys).

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> list(sorted(coll.keys()))
['A', 'B']

merge classmethod

merge(collections, *, copy=False)

Merge multiple collections into a single flat collection containing all leaf elements from each input.

Each input collection is flattened before merging, so grouped inputs are supported. The result is always a new flat collection. Leaves are shared by reference unless copy=True.

Parameters:

Name Type Description Default

collections

list[BaseCollection]

Two or more collections of the same concrete type. Every element across all collections must have a unique handle.

required

copy

bool

If True, each leaf is copied (via its .copy() method) so that the merged collection is fully independent of the originals.

False

Returns:

Type Description
BaseCollection

A new flat collection containing all leaves.

Raises:

Type Description
ValueError

If collections is empty, or if any handles are duplicated.

TypeError

If any input is not an instance of the calling class.

Warns:

Type Description
UserWarning

If the tag key sets differ across input collections (the merged collection will have mixed tag coverage).

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...         _ = shutil.copy(p, d / 'C.csv'); _ = shutil.copy(p, d / 'D.csv')
...     c1 = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
...     c2 = TrackingCollection.from_dlc({'C': str(d/'C.csv'), 'D': str(d/'D.csv')}, fps=30)
>>> merged = TrackingCollection.merge([c1, c2])
>>> sorted(merged.keys())
['A', 'B', 'C', 'D']
>>> len(merged)
4

groupby

groupby(tags)

Group the collection by one or more existing tag names. Returns a grouped view (this same collection type) whose values are sub-collections keyed by a tuple of tag values in the order provided.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> g.is_grouped
True
>>> sorted(g.group_keys)
[('G1',), ('G2',)]

flatten

flatten()

Flatten a MultipleCollection to a flat Collection. If already flat, return self.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
...     g = coll.groupby('group')
>>> flat = g.flatten()
>>> flat.is_grouped
False
>>> sorted(flat.keys())
['A', 'B']

get_group

get_group(key)

Get a sub-collection by group key from a grouped view.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sub = g.get_group(('G1',))
>>> list(sub.keys())
['A']

regroup

regroup()

Recompute the same grouping using the current tags and the original grouping tag order. If not grouped, returns self.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
...     g = coll.groupby('group')
...     coll['B'].add_tag('group','G2', overwrite=True)  # change tag
>>> g2 = g.regroup()
>>> sorted(g2.group_keys)
[('G1',), ('G2',)]

tags_info

tags_info(
    *, include_value_counts: bool = False
) -> pd.DataFrame

Summarize tag presence across the collection's leaf objects. Works for flat and grouped collections. If include_value_counts is True, include a column 'value_counts' with a dict of value->count for each tag. Returns a pandas.DataFrame with columns: ['tag', 'attached_to', 'missing_from', 'unique_values', ('value_counts')]

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('genotype', 'WT')
...     coll['B'].add_tag('timepoint', 'T1')
>>> info = coll.tags_info(include_value_counts=True)
>>> int(info.loc['genotype','attached_to'])
1
>>> int(info.loc['genotype','missing_from'])
1
>>> int(info.loc['genotype','unique_values'])
1
>>> info.loc['genotype','value_counts']
{'WT': 1}
>>> int(info.loc['timepoint','attached_to'])
1

map_leaves

map_leaves(fn)

Apply a function to every leaf element and return a new collection of the same type. Preserves grouping shape and groupby metadata when grouped.

fn: callable(Element) -> ElementLike

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sub = coll.map_leaves(lambda t: t.loc[0:1])
>>> all(len(t.data) == 2 for t in sub.values())
True

copy

copy()

Creates a copy of the BaseCollection. Raises NotImplementedError if any leaf does not implement copy().

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv')
...         _ = shutil.copy(p, d / 'B.csv')
...     coll = TrackingCollection.from_folder(
...         str(d), tracking_loader=Tracking.from_dlc, fps=30
...     )
>>> coll_copy = coll.copy()
>>> sorted(coll_copy.keys())
['A', 'B']

save

save(
    dirpath: str,
    *,
    overwrite: bool = False,
    data_format: str = "parquet",
) -> None

Save this collection to a directory. Preserves grouping and delegates to leaf objects' save(dirpath, data_format, overwrite=True).

Examples:

>>> import tempfile, shutil, os
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     out = d / 'coll'
...     coll.save(str(out), overwrite=True, data_format='csv')
...     # collection-level manifest at top-level
...     assert os.path.exists(os.path.join(str(out), 'manifest.json'))
...     # element-level manifests under elements/<handle>/
...     el_manifest = os.path.join(str(out), 'elements', 'A', 'manifest.json')
...     assert os.path.exists(el_manifest)

load classmethod

load(dirpath: str)

Load a collection previously saved with save(). Uses the class's _element_type.load to reconstruct leaves.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     out = d / 'coll'
...     coll.save(str(out), overwrite=True, data_format='csv')
...     coll2 = TrackingCollection.load(str(out))
>>> list(sorted(coll2.keys()))
['A', 'B']