Skip to content

FeaturesCollection

py3r.behaviour.features.features_collection.FeaturesCollection

FeaturesCollection(features_dict: dict[str, Features])

Bases: BaseCollection

Collection of Features objects, keyed by name. note: type-hints refer to Features, but factory methods allow for other classes these are intended ONLY for subclasses of Features, and this is enforced.

Examples

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> list(sorted(fc.keys()))
['A', 'B']

each instance-attribute

each: Features

each_forcebatch instance-attribute

each_forcebatch: Features

features_dict property

features_dict

group_keys property

group_keys

Keys for the groups in a grouped view. Empty list if not grouped.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sorted(g.group_keys)
[('G1',), ('G2',)]

groupby_tags property

groupby_tags

The tag names used to form this grouped view (or None if flat).

iloc property

iloc

is_grouped property

is_grouped

True if this collection is a grouped view.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> coll.is_grouped
False

loc property

loc

cluster_diagnostics

cluster_diagnostics(
    labels_result: BatchResult | dict,
    n_clusters: int | None = None,
    *,
    low: float = 0.05,
    high: float = 0.9,
    verbose: bool = True,
) -> dict

Compute diagnostic stats for cluster label assignments.

Parameters:

Name Type Description Default

labels_result

BatchResult | dict

Mapping from handle (or group→handle) to FeaturesResult of integer labels (with NA). Accepts the return shape of cluster_embedding_stream(...)[0] (BatchResult or dict).

required

n_clusters

int | None

Number of clusters. If None, inferred from labels (max label + 1).

None

low

float

Prevalence threshold below which a cluster is flagged as low.

0.05

high

float

Prevalence threshold above which a cluster is flagged as high.

0.9

verbose

bool

If True, print a compact summary.

True

Returns:

Type Description
dict

Dict with keys:

  • 'global'{'cluster_prevalence': {label: frac}, 'percent_nan': frac}
  • 'per_recording' — DataFrame with columns ['percent_nan', 'num_missing', 'num_low', 'num_high']
  • 'summary' — min/median/max for the per_recording columns
  • 'per_group' — (grouped only) {group_key: {'per_recording': df, 'summary': {...}}}

cluster_embedding

cluster_embedding(*args, **kwargs)

Removed in py3r.behaviour 3.3.0. Use :meth:cluster_embedding_stream instead.

cluster_embedding_stream

cluster_embedding_stream(
    embedding_dict: dict[str, list[int]],
    n_clusters: int,
    random_state: int = 0,
    *,
    normalize: bool = False,
    normalize_details: dict[
        str, Literal["individual", "global", "none"]
    ]
    | None = None,
    feature_weights: dict[str, float] | None = None,
    missing_policy: Literal[
        "drop", "impute_weight"
    ] = "drop",
    chunk_size: int = 10000,
    n_epochs: int = 3,
    batch_size: int = 1024,
    max_group_rows: int | None = 300000,
) -> tuple[BatchResult, CentroidsDf]

K-means clustering via streaming MiniBatchKMeans.

Each epoch, Features are shuffled and packed into row-bounded groups (max_group_rows); each group's embeddings are concatenated, row-shuffled together, then split into fixed-size chunks and fed to MiniBatchKMeans.partial_fit. This reduces cluster-fitting bias arising from autocorrelation within a single recording. The dataset is never fully concatenated at once, so memory usage scales with max_group_rows rather than the whole collection.

Note

This method uses MiniBatchKMeans (stochastic online updates), which replaced the full-batch KMeans removed in py3r 3.3.0. Results are not bit-for-bit identical to the old cluster_embedding: on well-separated data the partition will be the same; on harder problems, increasing n_epochs (5–10+) and batch_size (e.g. 2048–8192) improves convergence. To reproduce results from py3r ≤ 3.2.1, pin to that version.

Note

Prior to introducing row-shuffling, batches were built by streaming one Features at a time in contiguous chunks, with no shuffling. Results are not reproducible against that older behaviour. To reproduce results from py3r.behaviour ≤ 3.4.0 exactly, pin to that version.

Parameters:

Name Type Description Default

embedding_dict

dict[str, list[int]]

Feature columns and their time shifts for the embedding.

required

n_clusters

int

Number of clusters.

required

random_state

int

Seed for reproducibility.

0

normalize

bool

Divide each base feature by its global std before embedding. Equivalent to normalize_details={"<all>": "global"}.

False

normalize_details

dict[str, Literal['individual', 'global', 'none']] | None

Per-column normalisation modes, keyed by substring matched against embedding column names.

  • "global" — divide by std pooled across the whole collection.
  • "individual" — divide by std computed within each Features.
  • "none" — no normalisation for matching columns.

Rules must not overlap; each rule must match at least one column. Unmatched columns default to "global" if normalize is True, otherwise "none".

None

feature_weights

dict[str, float] | None

Substring → weight mapping, e.g. {"speed": 4.0, "accel": 2.0}. Matched columns are multiplied by the value after normalisation. Raises if a rule matches no column.

None

missing_policy

Literal['drop', 'impute_weight']

How to handle NaN rows during training. "drop" excludes them; "impute_weight" fills with training-set column means and up-weights complete rows proportionally. The chosen means are stored in the scaling_recipe for automatic reuse when assigning clusters to future recordings.

'drop'

chunk_size

int

Maximum number of rows passed to a single partial_fit call. Larger values reduce noise in centroid updates at the cost of higher per-iteration memory use.

10000

n_epochs

int

Number of full passes over the data. More epochs → better convergence. 3–5 is usually sufficient; increase for small or noisy datasets, or to approximate full-batch KMeans more closely.

3

batch_size

int

MiniBatchKMeans internal mini-batch size (passed directly to sklearn). Larger values reduce variance in updates.

1024

max_group_rows

int | None

Upper bound on the number of rows held in memory at once while shuffling (a group of Features is filled until adding the next one would exceed this). Defaults to 300,000, which is roughly 1.2 GB for a ~1000-column embedding — safe headroom on an 8 GB+ laptop. Pass None for a true dataset-wide shuffle (every Features in one group), but only if the whole embedding matrix is known to fit in memory; a much larger embedding column count than ~1000 may need a smaller value instead. A single Features longer than this bound still forms its own group rather than being split.

300000

Returns:

Type Description
tuple[BatchResult, CentroidsDf]

Batch cluster labels and fitted centroids. The CentroidsDf carries a scaling_recipe for future use with Features.assign_clusters_by_centroids.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> for f in fc.values():
...     s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
...     f.store(s, 'counter')
>>> batch, centroids = fc.cluster_embedding_stream(
...     {'counter': [0]}, n_clusters=2)
>>> hasattr(centroids, 'columns') and centroids.shape[0] == 2
True

concat classmethod

concat(
    collections: list[FeaturesCollection],
    *,
    reindex: Literal[
        "rezero", "follow_previous", "keep_original"
    ] = "follow_previous",
) -> FeaturesCollection

Concatenate multiple FeaturesCollections along the time (frame) axis.

Each collection must have the same handles (keys). For each handle, the corresponding Features objects are concatenated in order. Supports both flat and grouped collections.

Parameters:

Name Type Description Default

collections

list[FeaturesCollection]

List of FeaturesCollection objects to concatenate, in temporal order. All must have matching keys (handles) and feature columns.

required

reindex

Literal['rezero', 'follow_previous', 'keep_original']

How to handle frame indices:

  • "rezero" — Reindex all frames starting from 0.
  • "follow_previous" — Each chunk continues from where the previous ended. If chunk 1 ends at frame n, chunk 2 starts at n+1.
  • "keep_original" — Leave indices untouched; duplicates are allowed.
'follow_previous'

Returns:

Type Description
FeaturesCollection

A new collection with concatenated Features objects for each handle.

Raises:

Type Description
ValueError

If collections is empty, keys don't match, or grouping structure differs.

Note

For context-dependent features (normalization, embeddings with temporal windows, etc.), consider whether you need to recompute features on concatenated Tracking data rather than concatenating pre-computed features.

Examples

Concatenate two flat collections:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc1 = TrackingCollection.from_dlc({'A': str(d/'A.csv'),
...                                       'B': str(d/'B.csv')}, fps=30)
...     tc2 = TrackingCollection.from_dlc({'A': str(d/'A.csv'),
...                                        'B': str(d/'B.csv')}, fps=30)
>>> fc1 = FeaturesCollection.from_tracking_collection(tc1)
>>> fc2 = FeaturesCollection.from_tracking_collection(tc2)
>>> # Add a feature to all
>>> for f in list(fc1.values()) + list(fc2.values()):
...     s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
...     f.store(s, 'counter', meta={})
>>> combined = FeaturesCollection.concat([fc1, fc2])
>>> len(combined['A'].data) == len(fc1['A'].data) + len(fc2['A'].data)
True
>>> 'concat' in combined['A'].meta
True

copy

copy()

Creates a copy of the BaseCollection. Raises NotImplementedError if any leaf does not implement copy().

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv')
...         _ = shutil.copy(p, d / 'B.csv')
...     coll = TrackingCollection.from_folder(
...         str(d), tracking_loader=Tracking.from_dlc, fps=30
...     )
>>> coll_copy = coll.copy()
>>> sorted(coll_copy.keys())
['A', 'B']

cross_predict_rms

cross_predict_rms(
    source_embedding: dict[str, list[int]],
    target_embedding: dict[str, list[int]],
    normalize_source: bool = False,
    normalize_pred: dict | str = None,
    set1: list | None = None,
    set2: list | None = None,
    predictor_cls=None,
    predictor_kwargs=None,
)

Dev mode only: not available in public release yet.

dumbbell_plot_cross_predict staticmethod

dumbbell_plot_cross_predict(
    results,
    within_key,
    between_key,
    figsize=(3, 3),
    show=True,
)

Dev mode only: not available in public release yet.

flatten

flatten()

Flatten a MultipleCollection to a flat Collection. If already flat, return self.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
...     g = coll.groupby('group')
>>> flat = g.flatten()
>>> flat.is_grouped
False
>>> sorted(flat.keys())
['A', 'B']

from_list classmethod

from_list(features_list: list[Features])

Create a FeaturesCollection from a list of Features objects, keyed by handle.

Parameters:

Name Type Description Default

features_list

list[Features]

Features objects to collect. All handles must be unique.

required
Examples
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...     t1 = Tracking.from_dlc(str(p), handle='A', fps=30)
...     t2 = Tracking.from_dlc(str(p), handle='B', fps=30)
>>> f1, f2 = Features(t1), Features(t2)
>>> fc = FeaturesCollection.from_list([f1, f2])
>>> list(sorted(fc.keys()))
['A', 'B']

from_tracking_collection classmethod

from_tracking_collection(
    tracking_collection: TrackingCollection,
    feature_cls: type[Features] = Features,
)

Create a FeaturesCollection from a TrackingCollection.

Parameters:

Name Type Description Default

tracking_collection

TrackingCollection

Source collection. Grouped structure is preserved.

required

feature_cls

type[Features]

Features subclass to instantiate for each session.

Features
Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> isinstance(fc['A'], Features) and isinstance(fc['B'], Features)
True

get_group

get_group(key)

Get a sub-collection by group key from a grouped view.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sub = g.get_group(('G1',))
>>> list(sub.keys())
['A']

groupby

groupby(tags)

Group the collection by one or more existing tag names. Returns a grouped view (this same collection type) whose values are sub-collections keyed by a tuple of tag values in the order provided.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> g.is_grouped
True
>>> sorted(g.group_keys)
[('G1',), ('G2',)]

items

items()

Items iterator (handle, element).

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sorted([h for h, _ in coll.items()])
['A', 'B']

keys

keys()

Keys iterator (handles or group keys).

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> list(sorted(coll.keys()))
['A', 'B']

load classmethod

load(dirpath: str)

Load a collection previously saved with save(). Uses the class's _element_type.load to reconstruct leaves.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     out = d / 'coll'
...     coll.save(str(out), overwrite=True, data_format='csv')
...     coll2 = TrackingCollection.load(str(out))
>>> list(sorted(coll2.keys()))
['A', 'B']

map_leaves

map_leaves(fn: Callable[[Any], Any])

Apply a function to every leaf element and return a new collection of the same type. Preserves grouping shape and groupby metadata when grouped.

Parameters:

Name Type Description Default

fn

Callable[[Any], Any]

Callable applied to each leaf element. Must return an element compatible with this collection type.

required
Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sub = coll.map_leaves(lambda t: t.loc[0:1])
>>> all(len(t.data) == 2 for t in sub.values())
True

merge classmethod

merge(
    collections: list[Self], *, copy: bool = False
) -> Self

Merge multiple collections into a single flat collection containing all leaf elements from each input.

Each input collection is flattened before merging, so grouped inputs are supported. The result is always a new flat collection. Leaves are shared by reference unless copy=True.

Parameters:

Name Type Description Default

collections

list[Self]

Two or more collections of the same concrete type. Every element across all collections must have a unique handle.

required

copy

bool

If True, each leaf is copied (via its .copy() method) so that the merged collection is fully independent of the originals.

False

Returns:

Type Description
Self

A new flat collection containing all leaves.

Raises:

Type Description
ValueError

If collections is empty, or if any handles are duplicated.

TypeError

If any input is not an instance of the calling class.

Warns:

Type Description
UserWarning

If the tag key sets differ across input collections (the merged collection will have mixed tag coverage).

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...         _ = shutil.copy(p, d / 'C.csv'); _ = shutil.copy(p, d / 'D.csv')
...     c1 = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
...     c2 = TrackingCollection.from_dlc({'C': str(d/'C.csv'), 'D': str(d/'D.csv')}, fps=30)
>>> merged = TrackingCollection.merge([c1, c2])
>>> sorted(merged.keys())
['A', 'B', 'C', 'D']
>>> len(merged)
4

plot

plot(
    arg=None,
    figsize=(8, 2),
    show: bool = True,
    title: str = None,
)

Plot features for all collections in the MultipleFeaturesCollection. - If arg is a BatchResult or dict: treat as batch result and plot for each collection. - Otherwise: treat as column name(s) or None and plot for each collection. - If title is provided, it will be used as the overall title for the figure.

plot_cross_predict_results staticmethod

plot_cross_predict_results(
    results,
    within_keys=None,
    between_keys=None,
    plot_type="bar",
    figsize=(10, 6),
    show=True,
)

Dev mode only: not available in public release yet.

plot_cross_predict_vs_within staticmethod

plot_cross_predict_vs_within(
    results, from_group, to_group, show=True
)

Dev mode only: not available in public release yet.

predict_knn

predict_knn(
    model,
    source_embedding: dict[str, list[int]],
    target_embedding: dict[str, list[int]],
    rescale_factors: dict = None,
) -> pd.DataFrame

Dev mode only: not available in public release yet.

regroup

regroup()

Recompute the same grouping using the current tags and the original grouping tag order. If not grouped, returns self.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
...     g = coll.groupby('group')
...     coll['B'].add_tag('group','G2', overwrite=True)  # change tag
>>> g2 = g.regroup()
>>> sorted(g2.group_keys)
[('G1',), ('G2',)]

save

save(
    dirpath: str,
    *,
    overwrite: bool = False,
    data_format: str = "parquet",
) -> None

Save this collection to a directory. Preserves grouping and delegates to leaf objects' save(dirpath, data_format, overwrite=True).

Examples
>>> import tempfile, shutil, os
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     out = d / 'coll'
...     coll.save(str(out), overwrite=True, data_format='csv')
...     # collection-level manifest at top-level
...     assert os.path.exists(os.path.join(str(out), 'manifest.json'))
...     # element-level manifests under elements/<handle>/
...     el_manifest = os.path.join(str(out), 'elements', 'A', 'manifest.json')
...     assert os.path.exists(el_manifest)

store

store(
    results_dict: BatchResult | dict,
    name: str | None = None,
    meta: dict | None = None,
    overwrite: bool = False,
) -> str

Store FeaturesResult objects returned by batch methods.

Parameters:

Name Type Description Default

results_dict

BatchResult | dict

Batch results to store. Flat: {handle: FeaturesResult}. Grouped: {group_key: {handle: FeaturesResult}}.

required

name

str | None

Column name to store under. If None, resolved automatically from the result objects (all must agree on a single name).

None

meta

dict | None

Metadata dict to attach alongside the stored column.

None

overwrite

bool

If True, overwrite an existing column with the same name.

False
Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # Build a simple FeaturesResult dict from distance_between
>>> rd = {h: feat.distance_between('p1','p2') for h, feat in fc.items()}
>>> fc.store(rd, name='d12')
>>> all('d12' in feat.data.columns for feat in fc.values())
True

Returns:

Type Description
str

The resolved stored column name. Raises ValueError if auto-naming

str

resolves to multiple different names across leaves.

stored_info

stored_info() -> pd.DataFrame

Summarize stored feature columns across the collection's leaf Features objects.

Returns a DataFrame indexed by feature with columns: - attached_to: number of recordings containing the feature - missing_from: number of recordings not containing the feature - type: pandas dtype string for the feature column when consistent, or a list of dtype strings when mixed across recordings.

tags_info

tags_info(
    *, include_value_counts: bool = False
) -> pd.DataFrame

Summarize tag presence across the collection's leaf objects. Works for flat and grouped collections. If include_value_counts is True, include a column 'value_counts' with a dict of value->count for each tag. Returns a pandas.DataFrame with columns: ['tag', 'attached_to', 'missing_from', 'unique_values', ('value_counts')].

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('genotype', 'WT')
...     coll['B'].add_tag('timepoint', 'T1')
>>> info = coll.tags_info(include_value_counts=True)
>>> int(info.loc['genotype','attached_to'])
1
>>> int(info.loc['genotype','missing_from'])
1
>>> int(info.loc['genotype','unique_values'])
1
>>> info.loc['genotype','value_counts']
{'WT': 1}
>>> int(info.loc['timepoint','attached_to'])
1

to_summary

to_summary() -> SummaryCollection

Create a SummaryCollection from this FeaturesCollection.

This is a convenience wrapper around SummaryCollection.from_features_collection(self) and preserves grouped structure when the collection is grouped.

Returns:

Type Description
SummaryCollection

Collection containing one Summary object per features object in

SummaryCollection

this collection.

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.util.docdata import data_path
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     tc = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     fc = FeaturesCollection.from_tracking_collection(tc)
...     sc = fc.to_summary()
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> isinstance(sc, SummaryCollection)
True
>>> sorted(sc.keys())
['A', 'B']

train_knn_regressor

train_knn_regressor(
    *,
    source_embedding: dict[str, list[int]],
    target_embedding: dict[str, list[int]],
    predictor_cls=None,
    predictor_kwargs=None,
    normalize_source: bool = False,
    **kwargs,
)

Dev mode only: not available in public release yet.

values

values()

Values iterator (elements or sub-collections).

Examples
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> len(list(coll.values())) == 2
True