Skip to content

FeaturesCollection

py3r.behaviour.features.features_collection.FeaturesCollection

FeaturesCollection(features_dict: dict[str, Features])

Bases: BaseCollection, FeaturesCollectionBatchMixin

Collection of Features objects, keyed by name. note: type-hints refer to Features, but factory methods allow for other classes these are intended ONLY for subclasses of Features, and this is enforced

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> list(sorted(fc.keys()))
['A', 'B']

features_dict property

features_dict

loc property

loc

iloc property

iloc

is_grouped property

is_grouped

True if this collection is a grouped view.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> coll.is_grouped
False

groupby_tags property

groupby_tags

The tag names used to form this grouped view (or None if flat).

group_keys property

group_keys

Keys for the groups in a grouped view. Empty list if not grouped.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sorted(g.group_keys)
[('G1',), ('G2',)]

from_tracking_collection classmethod

from_tracking_collection(tracking_collection: TrackingCollection, feature_cls=Features)

Create a FeaturesCollection from a TrackingCollection.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> isinstance(fc['A'], Features) and isinstance(fc['B'], Features)
True

within_boundary_static

within_boundary_static(point: str, boundary, boundary_name: str = None)

Collection-aware wrapper that supports: - a single static boundary (list[(x,y)]) applied to all items, or - a per-handle mapping of boundaries produced by batch define_boundary: - flat: {handle: list[(x,y)]} - grouped: {group_key: {handle: list[(x,y)]}} - BatchResult in either of the above shapes

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> boundaries = fc.define_boundary(['p1','p2','p3'], scaling=1.0)
>>> res = fc.within_boundary_static('p1', boundaries)
>>> isinstance(res, dict)
True
>>> any(isinstance(v, pd.Series) for v in res.values())
True

>>> # Grouped case: add tags on Tracking, group, then build grouped FeaturesCollection
>>> # (boundaries BatchResult structure matches grouped layout)
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
...     tc['A'].add_tag('group', 'G1'); tc['B'].add_tag('group', 'G2')
...     gtc = tc.groupby('group')
...     gfc = FeaturesCollection.from_tracking_collection(gtc)
...     g_boundaries = gfc.define_boundary(['p1','p2','p3'], scaling=1.0)
...     g_res = gfc.within_boundary_static('p1', g_boundaries)
>>> isinstance(g_res, dict)
True
>>> any(any(isinstance(s, pd.Series) for s in sub.values()) for sub in g_res.values())
True

distance_to_boundary_static

distance_to_boundary_static(point: str, boundary, boundary_name: str = None)

Collection-aware wrapper that supports: - a single static boundary (list[(x,y)]) applied to all items, or - a per-handle mapping of boundaries produced by batch define_boundary: - flat: {handle: list[(x,y)]} - grouped: {group_key: {handle: list[(x,y)]}} - BatchResult in either of the above shapes

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> boundaries = fc.define_boundary(['p1','p2','p3'], scaling=1.0)
>>> res = fc.distance_to_boundary_static('p1', boundaries)
>>> isinstance(res, dict)
True
>>> any(isinstance(v, pd.Series) for v in res.values())
True

>>> # Grouped case
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
...     tc['A'].add_tag('group', 'G1'); tc['B'].add_tag('group', 'G2')
...     gtc = tc.groupby('group')
...     gfc = FeaturesCollection.from_tracking_collection(gtc)
...     g_boundaries = gfc.define_boundary(['p1','p2','p3'], scaling=1.0)
...     g_res = gfc.distance_to_boundary_static('p1', g_boundaries)
>>> isinstance(g_res, dict)
True
>>> any(any(isinstance(s, pd.Series) for s in sub.values()) for sub in g_res.values())
True

from_list classmethod

from_list(features_list: list[Features])

Create a FeaturesCollection from a list of Features objects, keyed by handle

Examples:

>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...     t1 = Tracking.from_dlc(str(p), handle='A', fps=30)
...     t2 = Tracking.from_dlc(str(p), handle='B', fps=30)
>>> f1, f2 = Features(t1), Features(t2)
>>> fc = FeaturesCollection.from_list([f1, f2])
>>> list(sorted(fc.keys()))
['A', 'B']

cluster_embedding

cluster_embedding(embedding_dict: dict[str, list[int]], n_clusters: int, random_state: int = 0, *, auto_normalize: bool = False, rescale_factors: dict | None = None, lowmem: bool = False, decimation_factor: int = 10, custom_scaling: dict[str, dict] | None = None)

Perform k-means clustering using the specified embedding.

Unified behaviour for flat and grouped collections. Returns a BatchResult mapping: - grouped: {group_key: {feature_handle: FeaturesResult}} - flat: {feature_handle: FeaturesResult} along with (centroids, normalization_factors or None).

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # Create a trivial feature 'counter' in each Features to embed
>>> for f in fc.values():
...     s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
...     f.store(s, 'counter')
>>> batch, centroids, norm = fc.cluster_embedding({'counter':[0]}, n_clusters=2, lowmem=True)
>>> isinstance(centroids, pd.DataFrame)
True

cluster_diagnostics

cluster_diagnostics(labels_result, n_clusters: int | None = None, *, low: float = 0.05, high: float = 0.9, verbose: bool = True)

Compute diagnostic stats for cluster label assignments.

Parameters:

Name Type Description Default

labels_result

Mapping from handle (or group->handle) to FeaturesResult of integer labels (with NA). Accepts the return shape of cluster_embedding(...)[0] (BatchResult or dict).

required

n_clusters

int | None

Optional number of clusters. If None, inferred from labels (max label + 1).

None

low

float

Prevalence thresholds for low/high cluster labels per recording.

0.05

high

float

Prevalence thresholds for low/high cluster labels per recording.

0.05

verbose

bool

If True, print a compact summary.

True

Returns:

Type Description
dict with:
  • 'global': {'cluster_prevalence': {label: frac, ...}, 'percent_nan': frac}
  • 'per_recording': pandas.DataFrame with rows per recording and columns: ['percent_nan', 'num_missing', 'num_low', 'num_high']
  • 'summary': min/median/max for the per_recording columns
  • if grouped: 'per_group': {group_key: {'per_recording': df, 'summary': {...}}}

cross_predict_rms

cross_predict_rms(source_embedding: dict[str, list[int]], target_embedding: dict[str, list[int]], normalize_source: bool = False, normalize_pred: dict | str = None, set1: list | None = None, set2: list | None = None, predictor_cls=None, predictor_kwargs=None)

Dev mode only: not available in public release yet.

plot_cross_predict_vs_within staticmethod

plot_cross_predict_vs_within(results, from_group, to_group, show=True)

Dev mode only: not available in public release yet.

plot_cross_predict_results staticmethod

plot_cross_predict_results(results, within_keys=None, between_keys=None, plot_type='bar', figsize=(10, 6), show=True)

Dev mode only: not available in public release yet.

dumbbell_plot_cross_predict staticmethod

dumbbell_plot_cross_predict(results, within_key, between_key, figsize=(3, 3), show=True)

Dev mode only: not available in public release yet.

train_knn_regressor

train_knn_regressor(*, source_embedding: dict[str, list[int]], target_embedding: dict[str, list[int]], predictor_cls=None, predictor_kwargs=None, normalize_source: bool = False, **kwargs)

Dev mode only: not available in public release yet.

predict_knn

predict_knn(model, source_embedding: dict[str, list[int]], target_embedding: dict[str, list[int]], rescale_factors: dict = None) -> pd.DataFrame

Dev mode only: not available in public release yet.

plot

plot(arg=None, figsize=(8, 2), show: bool = True, title: str = None)

Plot features for all collections in the MultipleFeaturesCollection. - If arg is a BatchResult or dict: treat as batch result and plot for each collection. - Otherwise: treat as column name(s) or None and plot for each collection. - If title is provided, it will be used as the overall title for the figure.

store

store(results_dict, name: str = None, meta: dict = None, overwrite: bool = False)

Store FeaturesResult objects returned by batch methods.

  • Flat collection: results_dict is {handle: FeaturesResult}
  • Grouped collection: results_dict is {group_key: {handle: FeaturesResult}}

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
...     tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # Build a simple FeaturesResult dict from distance_between
>>> rd = {h: feat.distance_between('p1','p2') for h, feat in fc.items()}
>>> fc.store(rd, name='d12')
>>> all('d12' in feat.data.columns for feat in fc.values())
True

save

save(dirpath: str, *, overwrite: bool = False, data_format: str = 'parquet') -> None

Save this collection to a directory. Preserves grouping and delegates to leaf objects' save(dirpath, data_format, overwrite=True).

Examples:

>>> import tempfile, shutil, os
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     out = d / 'coll'
...     coll.save(str(out), overwrite=True, data_format='csv')
...     # collection-level manifest at top-level
...     assert os.path.exists(os.path.join(str(out), 'manifest.json'))
...     # element-level manifests under elements/<handle>/
...     assert os.path.exists(os.path.join(str(out), 'elements', 'A', 'manifest.json'))

distance_between

distance_between(point1: str, point2: str, dims=('x', 'y')) -> BatchResult

Batch-mode wrapper for Features.distance_between across the collection.

See Features.distance_between for examples.

within_distance

within_distance(point1: str, point2: str, distance: float, dims=('x', 'y')) -> BatchResult

Batch-mode wrapper for Features.within_distance across the collection.

See Features.within_distance for examples.

get_point_median

get_point_median(point: str, dims=('x', 'y')) -> BatchResult

Batch-mode wrapper for Features.get_point_median across the collection.

Return the per-dimension median coordinate for a tracked point.

See Features.get_point_median for examples.

define_boundary

define_boundary(points: list[str], scaling: float, scaling_y: float = None, centre: str | list[str] = None) -> BatchResult

Batch-mode wrapper for Features.define_boundary across the collection.

takes a list of defined points, and creates a static rescaled list of point coordinates based on median location of those points 'centre' (point about which to scale) can be a string or list of strings, in which case the median of the points will be used as the centre if 'centre' is None, the median of all the boundary points will be used as the centre 'scaling' is the factor by which to scale the boundary points, and 'scaling_y' is the factor by which to scale the y-axis if 'scaling_y' is not provided, 'scaling' will be applied to both axes

See Features.define_boundary for examples.

within_boundary_dynamic

within_boundary_dynamic(point: str, boundary: list[str], boundary_name: str = None) -> BatchResult

Batch-mode wrapper for Features.within_boundary_dynamic across the collection.

checks whether point is inside polygon defined by ordered list of boundary points boundary points must be specified as a list of names of tracked points

See Features.within_boundary_dynamic for examples.

within_boundary

within_boundary(point: str, boundary: list, median: bool = True, boundary_name: str = None) -> BatchResult

Batch-mode wrapper for Features.within_boundary across the collection.

deprecated: use within_boundary_static or within_boundary_dynamic instead checks whether point is inside polygon defined by ordered list of boundary points boundary points may either be specified as a list of numerical tuples, or as a list of names of tracked points. Optionally, pass boundary_name for a custom short name in the feature name/meta.

See Features.within_boundary for examples.

distance_to_boundary

distance_to_boundary(point: str, boundary: list[str], median: bool = True, boundary_name: str = None) -> BatchResult

Batch-mode wrapper for Features.distance_to_boundary across the collection.

Deprecated: use distance_to_boundary_static or distance_to_boundary_dynamic instead

See Features.distance_to_boundary for examples.

distance_to_boundary_dynamic

distance_to_boundary_dynamic(point: str, boundary: list[str], boundary_name: str | None = None) -> BatchResult

Batch-mode wrapper for Features.distance_to_boundary_dynamic across the collection.

See Features.distance_to_boundary_dynamic for examples.

area_of_boundary

area_of_boundary(boundary: list[str], median: bool = True) -> BatchResult

Batch-mode wrapper for Features.area_of_boundary across the collection.

See Features.area_of_boundary for examples.

acceleration

acceleration(point: str, dims=('x', 'y')) -> BatchResult

Batch-mode wrapper for Features.acceleration across the collection.

See Features.acceleration for examples.

azimuth

azimuth(point1: str, point2: str) -> BatchResult

Batch-mode wrapper for Features.azimuth across the collection.

See Features.azimuth for examples.

azimuth_deviation

azimuth_deviation(basepoint: str, pointdirection1: str, pointdirection2: str) -> BatchResult

Batch-mode wrapper for Features.azimuth_deviation across the collection.

Compute the signed angular deviation (radians) between two directions from a common basepoint for each frame.

See Features.azimuth_deviation for examples.

within_azimuth_deviation

within_azimuth_deviation(basepoint: str, pointdirection1: str, pointdirection2: str, deviation: float) -> BatchResult

Batch-mode wrapper for Features.within_azimuth_deviation across the collection.

Return True for frames where the angular deviation between two rays from basepoint is <= deviation (radians). NA is propagated where inputs are missing (pd.NA).

See Features.within_azimuth_deviation for examples.

speed

speed(point: str, dims=('x', 'y')) -> BatchResult

Batch-mode wrapper for Features.speed across the collection.

See Features.speed for examples.

above_speed

above_speed(point: str, speed: float, dims=('x', 'y')) -> BatchResult

Batch-mode wrapper for Features.above_speed across the collection.

Return True for frames where the point's speed is >= threshold. NA is propagated where inputs are missing (pd.NA).

See Features.above_speed for examples.

all_above_speed

all_above_speed(points: list, speed: float, dims=('x', 'y')) -> BatchResult

Batch-mode wrapper for Features.all_above_speed across the collection.

Return True for frames where all listed points are moving at least at the threshold speed. NA is propagated: if any input is NA at a frame, result is NA.

See Features.all_above_speed for examples.

below_speed

below_speed(point: str, speed: float, dims=('x', 'y')) -> BatchResult

Batch-mode wrapper for Features.below_speed across the collection.

Return True for frames where the point's speed is < threshold. NA is propagated where inputs are missing (pd.NA).

See Features.below_speed for examples.

all_below_speed

all_below_speed(points: list, speed: float, dims=('x', 'y')) -> BatchResult

Batch-mode wrapper for Features.all_below_speed across the collection.

Return True for frames where all listed points are moving slower than the threshold speed. NA is propagated: if any input is NA at a frame, result is NA.

See Features.all_below_speed for examples.

distance_change

distance_change(point: str, dims=('x', 'y')) -> BatchResult

Batch-mode wrapper for Features.distance_change across the collection.

See Features.distance_change for examples.

classify

classify(classifier: BaseClassifier, **kwargs) -> BatchResult

Batch-mode wrapper for Features.classify across the collection.

classify behaviour using a classifier with inputs from this Features object

See Features.classify for examples.

smooth

smooth(name: str, method: str, window: int, center: bool = True, inplace: bool = False) -> BatchResult

Batch-mode wrapper for Features.smooth across the collection.

smooths specified feature with specified method over rolling window. if inplace=True then feature will be directly edited and metadata updated method: 'median' : median of value in window, requires numerical series values 'mean' : mean of value in window, requires numerical series values 'mode' : mode value in window, works with numerical or non-numerical types 'block' : removes labels that occur in blocks of less than length window and replaces them with value from previous block unless there is no previous block, in which case replaced from next block after smoothing note: all nan values will be filled using this method (dangerous!)

See Features.smooth for examples.

embedding_df

embedding_df(embedding: dict[str, list[int]]) -> BatchResult

Batch-mode wrapper for Features.embedding_df across the collection.

generate a time series embedding dataframe with specified time shifts for each column, where embedding is a dict mapping column names to lists of shifts positive shift: value from the future (t+n) negative shift: value from the past (t-n)

See Features.embedding_df for examples.

assign_clusters_by_centroids

assign_clusters_by_centroids(embedding: dict[str, list[int]], centroids_df: DataFrame, *, rescale_factors: dict | None = None, custom_scaling: dict[str, dict] | None = None) -> BatchResult

Batch-mode wrapper for Features.assign_clusters_by_centroids across the collection.

new_embed_df: (n_samples, n_features) DataFrame of your new time-shifted embedding centroids_df: (n_clusters, n_features) DataFrame of cluster centers

See Features.assign_clusters_by_centroids for examples.

define_elliptical_boundary_from_params

define_elliptical_boundary_from_params(centre: str | list[str], major_axis_length: float, minor_axis_length: float, angle_in_radians: float = 0.0, n_points: int = 100) -> BatchResult

Batch-mode wrapper for Features.define_elliptical_boundary_from_params across the collection.

Generate a polygonal approximation of an ellipse as a list of (x, y) tuples, around centre using explicit parameters. centre can be a single point name or a list of point names. if centre is a list, the boundary will be centred on the mean of the median coordinates of the points.

See Features.define_elliptical_boundary_from_params for examples.

define_elliptical_boundary_from_points

define_elliptical_boundary_from_points(points: list[str], n_points: int = 100, scaling: float = 1.0, smallness_weight: float = 0.1) -> BatchResult

Batch-mode wrapper for Features.define_elliptical_boundary_from_points across the collection.

Fit an ellipse to the median coordinates of the given tracked points (at least 4) and return a polygonal approximation. After fitting, the ellipse is scaled by scaling.

See Features.define_elliptical_boundary_from_points for examples.

values

values()

Values iterator (elements or sub-collections).

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> len(list(coll.values())) == 2
True

items

items()

Items iterator (handle, element).

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sorted([h for h, _ in coll.items()])
['A', 'B']

keys

keys()

Keys iterator (handles or group keys).

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> list(sorted(coll.keys()))
['A', 'B']

groupby

groupby(tags)

Group the collection by one or more existing tag names. Returns a grouped view (this same collection type) whose values are sub-collections keyed by a tuple of tag values in the order provided.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> g.is_grouped
True
>>> sorted(g.group_keys)
[('G1',), ('G2',)]

flatten

flatten()

Flatten a MultipleCollection to a flat Collection. If already flat, return self.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
...     g = coll.groupby('group')
>>> flat = g.flatten()
>>> flat.is_grouped
False
>>> sorted(flat.keys())
['A', 'B']

get_group

get_group(key)

Get a sub-collection by group key from a grouped view.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sub = g.get_group(('G1',))
>>> list(sub.keys())
['A']

regroup

regroup()

Recompute the same grouping using the current tags and the original grouping tag order. If not grouped, returns self.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
...     g = coll.groupby('group')
...     coll['B'].add_tag('group','G2', overwrite=True)  # change tag
>>> g2 = g.regroup()
>>> sorted(g2.group_keys)
[('G1',), ('G2',)]

tags_info

tags_info(*, include_value_counts: bool = False) -> pd.DataFrame

Summarize tag presence across the collection's leaf objects. Works for flat and grouped collections. If include_value_counts is True, include a column 'value_counts' with a dict of value->count for each tag. Returns a pandas.DataFrame with columns: ['tag', 'attached_to', 'missing_from', 'unique_values', ('value_counts')]

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     coll['A'].add_tag('genotype', 'WT')
...     coll['B'].add_tag('timepoint', 'T1')
>>> info = coll.tags_info(include_value_counts=True)
>>> int(info.loc['genotype','attached_to'])
1
>>> int(info.loc['genotype','missing_from'])
1
>>> int(info.loc['genotype','unique_values'])
1
>>> info.loc['genotype','value_counts']
{'WT': 1}
>>> int(info.loc['timepoint','attached_to'])
1

map_leaves

map_leaves(fn)

Apply a function to every leaf element and return a new collection of the same type. Preserves grouping shape and groupby metadata when grouped.

fn: callable(Element) -> ElementLike

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sub = coll.map_leaves(lambda t: t.loc[0:1])
>>> all(len(t.data) == 2 for t in sub.values())
True

load classmethod

load(dirpath: str)

Load a collection previously saved with save(). Uses the class's _element_type.load to reconstruct leaves.

Examples:

>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
...     d = Path(d)
...     with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
...         a = d / 'A.csv'; b = d / 'B.csv'
...         _ = shutil.copy(p, a); _ = shutil.copy(p, b)
...     coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
...     out = d / 'coll'
...     coll.save(str(out), overwrite=True, data_format='csv')
...     coll2 = TrackingCollection.load(str(out))
>>> list(sorted(coll2.keys()))
['A', 'B']