FeaturesCollection
py3r.behaviour.features.features_collection.FeaturesCollection ¶
FeaturesCollection(features_dict: dict[str, Features])
Bases: BaseCollection
Collection of Features objects, keyed by name. note: type-hints refer to Features, but factory methods allow for other classes these are intended ONLY for subclasses of Features, and this is enforced
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> list(sorted(fc.keys()))
['A', 'B']
is_grouped
property
¶
is_grouped
True if this collection is a grouped view.
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> coll.is_grouped
False
groupby_tags
property
¶
groupby_tags
The tag names used to form this grouped view (or None if flat).
group_keys
property
¶
group_keys
Keys for the groups in a grouped view. Empty list if not grouped.
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sorted(g.group_keys)
[('G1',), ('G2',)]
from_tracking_collection
classmethod
¶
from_tracking_collection(
tracking_collection: TrackingCollection,
feature_cls=Features,
)
Create a FeaturesCollection from a TrackingCollection.
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> isinstance(fc['A'], Features) and isinstance(fc['B'], Features)
True
concat
classmethod
¶
concat(
collections: list[FeaturesCollection],
*,
reindex: Literal[
"rezero", "follow_previous", "keep_original"
] = "follow_previous",
) -> FeaturesCollection
Concatenate multiple FeaturesCollections along the time (frame) axis.
Each collection must have the same handles (keys). For each handle, the corresponding Features objects are concatenated in order. Supports both flat and grouped collections.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
list[FeaturesCollection]
|
List of FeaturesCollection objects to concatenate, in temporal order. All must have matching keys (handles) and feature columns. |
required |
|
('rezero', 'follow_previous', 'keep_original')
|
How to handle frame indices: - "rezero": Reindex all frames starting from 0 (0, 1, 2, ...). - "follow_previous": Each chunk continues from where the previous ended. If chunk 1 ends at frame n, chunk 2 starts at n+1. - "keep_original": Leave indices untouched; duplicates are allowed. |
"rezero"
|
Returns:
| Type | Description |
|---|---|
FeaturesCollection
|
A new collection with concatenated Features objects for each handle. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If collections is empty, keys don't match, or grouping structure differs. |
Notes
For context-dependent features (normalization, embeddings with temporal windows, etc.), consider whether you need to recompute features on concatenated Tracking data rather than concatenating pre-computed features.
Examples:
Concatenate two flat collections:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc1 = TrackingCollection.from_dlc({'A': str(d/'A.csv'),
... 'B': str(d/'B.csv')}, fps=30)
... tc2 = TrackingCollection.from_dlc({'A': str(d/'A.csv'),
... 'B': str(d/'B.csv')}, fps=30)
>>> fc1 = FeaturesCollection.from_tracking_collection(tc1)
>>> fc2 = FeaturesCollection.from_tracking_collection(tc2)
>>> # Add a feature to all
>>> for f in list(fc1.values()) + list(fc2.values()):
... s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
... f.store(s, 'counter', meta={})
>>> combined = FeaturesCollection.concat([fc1, fc2])
>>> len(combined['A'].data) == len(fc1['A'].data) + len(fc2['A'].data)
True
>>> 'concat' in combined['A'].meta
True
from_list
classmethod
¶
from_list(features_list: list[Features])
Create a FeaturesCollection from a list of Features objects, keyed by handle
Examples:
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... t1 = Tracking.from_dlc(str(p), handle='A', fps=30)
... t2 = Tracking.from_dlc(str(p), handle='B', fps=30)
>>> f1, f2 = Features(t1), Features(t2)
>>> fc = FeaturesCollection.from_list([f1, f2])
>>> list(sorted(fc.keys()))
['A', 'B']
cluster_embedding ¶
cluster_embedding(
embedding_dict: dict[str, list[int]],
n_clusters: int,
random_state: int = 0,
*,
normalize: bool = False,
feature_weights: dict[str, float] | None = None,
lowmem: bool = False,
decimation_factor: int = 10,
missing_policy: Literal[
"drop", "impute_weight"
] = "drop",
auto_normalize: bool = False,
rescale_factors: dict | None = None,
custom_scaling: dict[str, dict] | None = None,
)
Perform k-means clustering using the specified embedding.
Returns (BatchResult, centroids, scaling_factors) where
scaling_factors is a dict of one float per embedding column —
the combined effect of normalisation and feature weights.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
bool
|
Divide each base feature by its global std before embedding. |
False
|
|
dict[str, float] | None
|
Substring → weight mapping, e.g. |
None
|
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # Create a trivial feature 'counter' in each Features to embed
>>> for f in fc.values():
... s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
... f.store(s, 'counter')
>>> batch, centroids, norm = fc.cluster_embedding(
... {'counter':[0]}, n_clusters=2, lowmem=True)
>>> isinstance(centroids, pd.DataFrame)
True
>>> batch, centroids, norm = fc.cluster_embedding(
... {'counter':[0]}, n_clusters=2, lowmem=True,
... missing_policy='impute_weight')
>>> isinstance(centroids, pd.DataFrame)
True
>>> batch, centroids, norm = fc.cluster_embedding(
... {'counter':[0]}, n_clusters=2, lowmem=True,
... missing_policy='drop')
>>> isinstance(centroids, pd.DataFrame)
True
cluster_embedding_stream ¶
cluster_embedding_stream(
embedding_dict: dict[str, list[int]],
n_clusters: int,
random_state: int = 0,
*,
normalize: bool = False,
feature_weights: dict[str, float] | None = None,
missing_policy: Literal[
"drop", "impute_weight"
] = "drop",
chunk_size: int = 10000,
n_epochs: int = 3,
batch_size: int = 1024,
)
Memory-friendly clustering via streaming MiniBatchKMeans.
Unlike cluster_embedding, this never builds a combined DataFrame.
Embeddings are extracted one Features at a time, sliced into
fixed-size chunks, and fed to MiniBatchKMeans.partial_fit.
Multiple epochs improve convergence; uniform chunk sizes prevent
large recordings from dominating centroid updates.
Normalisation is computed on base feature columns (before embedding)
so that all time-shifts of the same feature share the same std.
The returned scaling_factors is a dict of one float per
embedding column — the combined effect of normalisation and
feature weights. Multiply raw embedding values by these to reproduce
the transform.
Returns (BatchResult, centroids, scaling_factors).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
dict[str, list[int]]
|
Feature columns and their time shifts for the embedding. |
required |
|
int
|
Number of clusters. |
required |
|
int
|
Seed for reproducibility. |
0
|
|
bool
|
Divide each base feature by its global std before embedding. |
False
|
|
dict[str, float] | None
|
Substring → weight mapping, e.g. |
None
|
|
('drop', 'impute_weight')
|
How to handle NaN rows. |
"drop"
|
|
int
|
Max rows per partial_fit call. |
10000
|
|
int
|
Number of full passes over the data. |
3
|
|
int
|
MiniBatchKMeans internal mini-batch size. |
1024
|
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> for f in fc.values():
... s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
... f.store(s, 'counter')
>>> batch, centroids, norm = fc.cluster_embedding_stream(
... {'counter': [0]}, n_clusters=2)
>>> isinstance(centroids, pd.DataFrame) and centroids.shape[0] == 2
True
cluster_diagnostics ¶
cluster_diagnostics(
labels_result,
n_clusters: int | None = None,
*,
low: float = 0.05,
high: float = 0.9,
verbose: bool = True,
)
Compute diagnostic stats for cluster label assignments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
Mapping from handle (or group->handle) to FeaturesResult of integer labels (with NA).
Accepts the return shape of |
required | |
|
int | None
|
Optional number of clusters. If None, inferred from labels (max label + 1). |
None
|
|
float
|
Prevalence thresholds for low/high cluster labels per recording. |
0.05
|
|
float
|
Prevalence thresholds for low/high cluster labels per recording. |
0.05
|
|
bool
|
If True, print a compact summary. |
True
|
Returns:
| Type | Description |
|---|---|
dict with:
|
|
cross_predict_rms ¶
cross_predict_rms(
source_embedding: dict[str, list[int]],
target_embedding: dict[str, list[int]],
normalize_source: bool = False,
normalize_pred: dict | str = None,
set1: list | None = None,
set2: list | None = None,
predictor_cls=None,
predictor_kwargs=None,
)
Dev mode only: not available in public release yet.
plot_cross_predict_vs_within
staticmethod
¶
plot_cross_predict_vs_within(
results, from_group, to_group, show=True
)
Dev mode only: not available in public release yet.
plot_cross_predict_results
staticmethod
¶
plot_cross_predict_results(
results,
within_keys=None,
between_keys=None,
plot_type="bar",
figsize=(10, 6),
show=True,
)
Dev mode only: not available in public release yet.
dumbbell_plot_cross_predict
staticmethod
¶
dumbbell_plot_cross_predict(
results,
within_key,
between_key,
figsize=(3, 3),
show=True,
)
Dev mode only: not available in public release yet.
train_knn_regressor ¶
train_knn_regressor(
*,
source_embedding: dict[str, list[int]],
target_embedding: dict[str, list[int]],
predictor_cls=None,
predictor_kwargs=None,
normalize_source: bool = False,
**kwargs,
)
Dev mode only: not available in public release yet.
predict_knn ¶
predict_knn(
model,
source_embedding: dict[str, list[int]],
target_embedding: dict[str, list[int]],
rescale_factors: dict = None,
) -> pd.DataFrame
Dev mode only: not available in public release yet.
plot ¶
plot(
arg=None,
figsize=(8, 2),
show: bool = True,
title: str = None,
)
Plot features for all collections in the MultipleFeaturesCollection. - If arg is a BatchResult or dict: treat as batch result and plot for each collection. - Otherwise: treat as column name(s) or None and plot for each collection. - If title is provided, it will be used as the overall title for the figure.
store ¶
store(
results_dict,
name: str = None,
meta: dict = None,
overwrite: bool = False,
)
Store FeaturesResult objects returned by batch methods.
- Flat collection: results_dict is {handle: FeaturesResult}
- Grouped collection: results_dict is {group_key: {handle: FeaturesResult}}
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # Build a simple FeaturesResult dict from distance_between
>>> rd = {h: feat.distance_between('p1','p2') for h, feat in fc.items()}
>>> fc.store(rd, name='d12')
>>> all('d12' in feat.data.columns for feat in fc.values())
True
Returns:
| Type | Description |
|---|---|
str
|
The resolved stored column name. If auto-naming would resolve to multiple different names across leaves, raises ValueError. |
stored_info ¶
stored_info() -> pd.DataFrame
Summarize stored feature columns across the collection's leaf Features objects.
Returns a DataFrame indexed by feature with columns:
- attached_to: number of recordings containing the feature
- missing_from: number of recordings not containing the feature
- type: pandas dtype string for the feature column when consistent, or a
list of dtype strings when mixed across recordings.
values ¶
values()
Values iterator (elements or sub-collections).
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> len(list(coll.values())) == 2
True
items ¶
items()
Items iterator (handle, element).
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sorted([h for h, _ in coll.items()])
['A', 'B']
keys ¶
keys()
Keys iterator (handles or group keys).
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> list(sorted(coll.keys()))
['A', 'B']
merge
classmethod
¶
merge(collections, *, copy=False)
Merge multiple collections into a single flat collection containing all leaf elements from each input.
Each input collection is flattened before merging, so grouped inputs
are supported. The result is always a new flat collection. Leaves are
shared by reference unless copy=True.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
list[BaseCollection]
|
Two or more collections of the same concrete type. Every element across all collections must have a unique handle. |
required |
|
bool
|
If True, each leaf is copied (via its |
False
|
Returns:
| Type | Description |
|---|---|
BaseCollection
|
A new flat collection containing all leaves. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If collections is empty, or if any handles are duplicated. |
TypeError
|
If any input is not an instance of the calling class. |
Warns:
| Type | Description |
|---|---|
UserWarning
|
If the tag key sets differ across input collections (the merged collection will have mixed tag coverage). |
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... _ = shutil.copy(p, d / 'C.csv'); _ = shutil.copy(p, d / 'D.csv')
... c1 = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
... c2 = TrackingCollection.from_dlc({'C': str(d/'C.csv'), 'D': str(d/'D.csv')}, fps=30)
>>> merged = TrackingCollection.merge([c1, c2])
>>> sorted(merged.keys())
['A', 'B', 'C', 'D']
>>> len(merged)
4
groupby ¶
groupby(tags)
Group the collection by one or more existing tag names. Returns a grouped view (this same collection type) whose values are sub-collections keyed by a tuple of tag values in the order provided.
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> g.is_grouped
True
>>> sorted(g.group_keys)
[('G1',), ('G2',)]
flatten ¶
flatten()
Flatten a MultipleCollection to a flat Collection. If already flat, return self.
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
... g = coll.groupby('group')
>>> flat = g.flatten()
>>> flat.is_grouped
False
>>> sorted(flat.keys())
['A', 'B']
get_group ¶
get_group(key)
Get a sub-collection by group key from a grouped view.
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sub = g.get_group(('G1',))
>>> list(sub.keys())
['A']
regroup ¶
regroup()
Recompute the same grouping using the current tags and the original grouping tag order. If not grouped, returns self.
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
... g = coll.groupby('group')
... coll['B'].add_tag('group','G2', overwrite=True) # change tag
>>> g2 = g.regroup()
>>> sorted(g2.group_keys)
[('G1',), ('G2',)]
tags_info ¶
tags_info(
*, include_value_counts: bool = False
) -> pd.DataFrame
Summarize tag presence across the collection's leaf objects.
Works for flat and grouped collections. If include_value_counts is True,
include a column 'value_counts' with a dict of value->count for each tag.
Returns a pandas.DataFrame with columns:
['tag', 'attached_to', 'missing_from', 'unique_values', ('value_counts')]
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('genotype', 'WT')
... coll['B'].add_tag('timepoint', 'T1')
>>> info = coll.tags_info(include_value_counts=True)
>>> int(info.loc['genotype','attached_to'])
1
>>> int(info.loc['genotype','missing_from'])
1
>>> int(info.loc['genotype','unique_values'])
1
>>> info.loc['genotype','value_counts']
{'WT': 1}
>>> int(info.loc['timepoint','attached_to'])
1
map_leaves ¶
map_leaves(fn)
Apply a function to every leaf element and return a new collection of the same type. Preserves grouping shape and groupby metadata when grouped.
fn: callable(Element) -> ElementLike
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sub = coll.map_leaves(lambda t: t.loc[0:1])
>>> all(len(t.data) == 2 for t in sub.values())
True
copy ¶
copy()
Creates a copy of the BaseCollection. Raises NotImplementedError if any leaf does not implement copy().
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv')
... _ = shutil.copy(p, d / 'B.csv')
... coll = TrackingCollection.from_folder(
... str(d), tracking_loader=Tracking.from_dlc, fps=30
... )
>>> coll_copy = coll.copy()
>>> sorted(coll_copy.keys())
['A', 'B']
save ¶
save(
dirpath: str,
*,
overwrite: bool = False,
data_format: str = "parquet",
) -> None
Save this collection to a directory. Preserves grouping and delegates to leaf objects' save(dirpath, data_format, overwrite=True).
Examples:
>>> import tempfile, shutil, os
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... out = d / 'coll'
... coll.save(str(out), overwrite=True, data_format='csv')
... # collection-level manifest at top-level
... assert os.path.exists(os.path.join(str(out), 'manifest.json'))
... # element-level manifests under elements/<handle>/
... el_manifest = os.path.join(str(out), 'elements', 'A', 'manifest.json')
... assert os.path.exists(el_manifest)
load
classmethod
¶
load(dirpath: str)
Load a collection previously saved with save(). Uses the class's _element_type.load to reconstruct leaves.
Examples:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... out = d / 'coll'
... coll.save(str(out), overwrite=True, data_format='csv')
... coll2 = TrackingCollection.load(str(out))
>>> list(sorted(coll2.keys()))
['A', 'B']