FeaturesCollection
py3r.behaviour.features.features_collection.FeaturesCollection ¶
FeaturesCollection(features_dict: dict[str, Features])
Bases: BaseCollection
Collection of Features objects, keyed by name. note: type-hints refer to Features, but factory methods allow for other classes these are intended ONLY for subclasses of Features, and this is enforced.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> list(sorted(fc.keys()))
['A', 'B']
group_keys
property
¶
group_keys
Keys for the groups in a grouped view. Empty list if not grouped.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sorted(g.group_keys)
[('G1',), ('G2',)]
groupby_tags
property
¶
groupby_tags
The tag names used to form this grouped view (or None if flat).
is_grouped
property
¶
is_grouped
True if this collection is a grouped view.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> coll.is_grouped
False
cluster_diagnostics ¶
cluster_diagnostics(
labels_result: BatchResult | dict,
n_clusters: int | None = None,
*,
low: float = 0.05,
high: float = 0.9,
verbose: bool = True,
) -> dict
Compute diagnostic stats for cluster label assignments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
BatchResult | dict
|
Mapping from handle (or group→handle) to FeaturesResult of
integer labels (with NA). Accepts the return shape of
|
required |
|
int | None
|
Number of clusters. If None, inferred from labels (max label + 1). |
None
|
|
float
|
Prevalence threshold below which a cluster is flagged as low. |
0.05
|
|
float
|
Prevalence threshold above which a cluster is flagged as high. |
0.9
|
|
bool
|
If True, print a compact summary. |
True
|
Returns:
| Type | Description |
|---|---|
dict
|
Dict with keys:
|
cluster_embedding ¶
cluster_embedding(*args, **kwargs)
Removed in py3r.behaviour 3.3.0. Use :meth:cluster_embedding_stream instead.
cluster_embedding_stream ¶
cluster_embedding_stream(
embedding_dict: dict[str, list[int]],
n_clusters: int,
random_state: int = 0,
*,
normalize: bool = False,
normalize_details: dict[
str, Literal["individual", "global", "none"]
]
| None = None,
feature_weights: dict[str, float] | None = None,
missing_policy: Literal[
"drop", "impute_weight"
] = "drop",
chunk_size: int = 10000,
n_epochs: int = 3,
batch_size: int = 1024,
max_group_rows: int | None = 300000,
) -> tuple[BatchResult, CentroidsDf]
K-means clustering via streaming MiniBatchKMeans.
Each epoch, Features are shuffled and packed into row-bounded groups
(max_group_rows); each group's embeddings are concatenated,
row-shuffled together, then split into fixed-size chunks and fed to
MiniBatchKMeans.partial_fit. This reduces cluster-fitting bias
arising from autocorrelation within a single recording. The dataset is
never fully concatenated at once, so memory usage scales with
max_group_rows rather than the whole collection.
Note
This method uses MiniBatchKMeans (stochastic online updates), which
replaced the full-batch KMeans removed in py3r 3.3.0. Results are not
bit-for-bit identical to the old cluster_embedding: on well-separated
data the partition will be the same; on harder problems, increasing
n_epochs (5–10+) and batch_size (e.g. 2048–8192) improves
convergence. To reproduce results from py3r ≤ 3.2.1, pin to that version.
Note
Prior to introducing row-shuffling, batches were built by streaming one Features at a time in contiguous chunks, with no shuffling. Results are not reproducible against that older behaviour. To reproduce results from py3r.behaviour ≤ 3.4.0 exactly, pin to that version.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
dict[str, list[int]]
|
Feature columns and their time shifts for the embedding. |
required |
|
int
|
Number of clusters. |
required |
|
int
|
Seed for reproducibility. |
0
|
|
bool
|
Divide each base feature by its global std before embedding.
Equivalent to |
False
|
|
dict[str, Literal['individual', 'global', 'none']] | None
|
Per-column normalisation modes, keyed by substring matched against embedding column names.
Rules must not overlap; each rule must match at least one column.
Unmatched columns default to |
None
|
|
dict[str, float] | None
|
Substring → weight mapping, e.g.
|
None
|
|
Literal['drop', 'impute_weight']
|
How to handle NaN rows during training. |
'drop'
|
|
int
|
Maximum number of rows passed to a single |
10000
|
|
int
|
Number of full passes over the data. More epochs → better convergence. 3–5 is usually sufficient; increase for small or noisy datasets, or to approximate full-batch KMeans more closely. |
3
|
|
int
|
|
1024
|
|
int | None
|
Upper bound on the number of rows held in memory at
once while shuffling (a group of Features is filled until adding
the next one would exceed this). Defaults to 300,000, which is
roughly 1.2 GB for a ~1000-column embedding — safe headroom on an
8 GB+ laptop. Pass |
300000
|
Returns:
| Type | Description |
|---|---|
tuple[BatchResult, CentroidsDf]
|
Batch cluster labels and fitted centroids. The |
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> for f in fc.values():
... s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
... f.store(s, 'counter')
>>> batch, centroids = fc.cluster_embedding_stream(
... {'counter': [0]}, n_clusters=2)
>>> hasattr(centroids, 'columns') and centroids.shape[0] == 2
True
concat
classmethod
¶
concat(
collections: list[FeaturesCollection],
*,
reindex: Literal[
"rezero", "follow_previous", "keep_original"
] = "follow_previous",
) -> FeaturesCollection
Concatenate multiple FeaturesCollections along the time (frame) axis.
Each collection must have the same handles (keys). For each handle, the corresponding Features objects are concatenated in order. Supports both flat and grouped collections.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
list[FeaturesCollection]
|
List of FeaturesCollection objects to concatenate, in temporal order. All must have matching keys (handles) and feature columns. |
required |
|
Literal['rezero', 'follow_previous', 'keep_original']
|
How to handle frame indices:
|
'follow_previous'
|
Returns:
| Type | Description |
|---|---|
FeaturesCollection
|
A new collection with concatenated Features objects for each handle. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If collections is empty, keys don't match, or grouping structure differs. |
Note
For context-dependent features (normalization, embeddings with temporal windows, etc.), consider whether you need to recompute features on concatenated Tracking data rather than concatenating pre-computed features.
Examples¶
Concatenate two flat collections:
>>> import tempfile, shutil
>>> from pathlib import Path
>>> import pandas as pd
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc1 = TrackingCollection.from_dlc({'A': str(d/'A.csv'),
... 'B': str(d/'B.csv')}, fps=30)
... tc2 = TrackingCollection.from_dlc({'A': str(d/'A.csv'),
... 'B': str(d/'B.csv')}, fps=30)
>>> fc1 = FeaturesCollection.from_tracking_collection(tc1)
>>> fc2 = FeaturesCollection.from_tracking_collection(tc2)
>>> # Add a feature to all
>>> for f in list(fc1.values()) + list(fc2.values()):
... s = pd.Series(range(len(f.tracking.data)), index=f.tracking.data.index)
... f.store(s, 'counter', meta={})
>>> combined = FeaturesCollection.concat([fc1, fc2])
>>> len(combined['A'].data) == len(fc1['A'].data) + len(fc2['A'].data)
True
>>> 'concat' in combined['A'].meta
True
copy ¶
copy()
Creates a copy of the BaseCollection. Raises NotImplementedError if any leaf does not implement copy().
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv')
... _ = shutil.copy(p, d / 'B.csv')
... coll = TrackingCollection.from_folder(
... str(d), tracking_loader=Tracking.from_dlc, fps=30
... )
>>> coll_copy = coll.copy()
>>> sorted(coll_copy.keys())
['A', 'B']
cross_predict_rms ¶
cross_predict_rms(
source_embedding: dict[str, list[int]],
target_embedding: dict[str, list[int]],
normalize_source: bool = False,
normalize_pred: dict | str = None,
set1: list | None = None,
set2: list | None = None,
predictor_cls=None,
predictor_kwargs=None,
)
Dev mode only: not available in public release yet.
dumbbell_plot_cross_predict
staticmethod
¶
dumbbell_plot_cross_predict(
results,
within_key,
between_key,
figsize=(3, 3),
show=True,
)
Dev mode only: not available in public release yet.
flatten ¶
flatten()
Flatten a MultipleCollection to a flat Collection. If already flat, return self.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
... g = coll.groupby('group')
>>> flat = g.flatten()
>>> flat.is_grouped
False
>>> sorted(flat.keys())
['A', 'B']
from_list
classmethod
¶
from_list(features_list: list[Features])
Create a FeaturesCollection from a list of Features objects, keyed by handle.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
list[Features]
|
Features objects to collect. All handles must be unique. |
required |
Examples¶
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking import Tracking
>>> with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... t1 = Tracking.from_dlc(str(p), handle='A', fps=30)
... t2 = Tracking.from_dlc(str(p), handle='B', fps=30)
>>> f1, f2 = Features(t1), Features(t2)
>>> fc = FeaturesCollection.from_list([f1, f2])
>>> list(sorted(fc.keys()))
['A', 'B']
from_tracking_collection
classmethod
¶
from_tracking_collection(
tracking_collection: TrackingCollection,
feature_cls: type[Features] = Features,
)
Create a FeaturesCollection from a TrackingCollection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
TrackingCollection
|
Source collection. Grouped structure is preserved. |
required |
|
type[Features]
|
|
Features
|
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> isinstance(fc['A'], Features) and isinstance(fc['B'], Features)
True
get_group ¶
get_group(key)
Get a sub-collection by group key from a grouped view.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> sub = g.get_group(('G1',))
>>> list(sub.keys())
['A']
groupby ¶
groupby(tags)
Group the collection by one or more existing tag names. Returns a grouped view (this same collection type) whose values are sub-collections keyed by a tuple of tag values in the order provided.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G2')
>>> g = coll.groupby('group')
>>> g.is_grouped
True
>>> sorted(g.group_keys)
[('G1',), ('G2',)]
items ¶
items()
Items iterator (handle, element).
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sorted([h for h, _ in coll.items()])
['A', 'B']
keys ¶
keys()
Keys iterator (handles or group keys).
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> list(sorted(coll.keys()))
['A', 'B']
load
classmethod
¶
load(dirpath: str)
Load a collection previously saved with save(). Uses the class's _element_type.load to reconstruct leaves.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... out = d / 'coll'
... coll.save(str(out), overwrite=True, data_format='csv')
... coll2 = TrackingCollection.load(str(out))
>>> list(sorted(coll2.keys()))
['A', 'B']
map_leaves ¶
map_leaves(fn: Callable[[Any], Any])
Apply a function to every leaf element and return a new collection of the same type. Preserves grouping shape and groupby metadata when grouped.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
Callable[[Any], Any]
|
Callable applied to each leaf element. Must return an element compatible with this collection type. |
required |
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> sub = coll.map_leaves(lambda t: t.loc[0:1])
>>> all(len(t.data) == 2 for t in sub.values())
True
merge
classmethod
¶
merge(
collections: list[Self], *, copy: bool = False
) -> Self
Merge multiple collections into a single flat collection containing all leaf elements from each input.
Each input collection is flattened before merging, so grouped inputs
are supported. The result is always a new flat collection. Leaves are
shared by reference unless copy=True.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
list[Self]
|
Two or more collections of the same concrete type. Every element across all collections must have a unique handle. |
required |
|
bool
|
If True, each leaf is copied (via its |
False
|
Returns:
| Type | Description |
|---|---|
Self
|
A new flat collection containing all leaves. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If collections is empty, or if any handles are duplicated. |
TypeError
|
If any input is not an instance of the calling class. |
Warns:
| Type | Description |
|---|---|
UserWarning
|
If the tag key sets differ across input collections (the merged collection will have mixed tag coverage). |
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... _ = shutil.copy(p, d / 'C.csv'); _ = shutil.copy(p, d / 'D.csv')
... c1 = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
... c2 = TrackingCollection.from_dlc({'C': str(d/'C.csv'), 'D': str(d/'D.csv')}, fps=30)
>>> merged = TrackingCollection.merge([c1, c2])
>>> sorted(merged.keys())
['A', 'B', 'C', 'D']
>>> len(merged)
4
plot ¶
plot(
arg=None,
figsize=(8, 2),
show: bool = True,
title: str = None,
)
Plot features for all collections in the MultipleFeaturesCollection. - If arg is a BatchResult or dict: treat as batch result and plot for each collection. - Otherwise: treat as column name(s) or None and plot for each collection. - If title is provided, it will be used as the overall title for the figure.
plot_cross_predict_results
staticmethod
¶
plot_cross_predict_results(
results,
within_keys=None,
between_keys=None,
plot_type="bar",
figsize=(10, 6),
show=True,
)
Dev mode only: not available in public release yet.
plot_cross_predict_vs_within
staticmethod
¶
plot_cross_predict_vs_within(
results, from_group, to_group, show=True
)
Dev mode only: not available in public release yet.
predict_knn ¶
predict_knn(
model,
source_embedding: dict[str, list[int]],
target_embedding: dict[str, list[int]],
rescale_factors: dict = None,
) -> pd.DataFrame
Dev mode only: not available in public release yet.
regroup ¶
regroup()
Recompute the same grouping using the current tags and the original grouping tag order. If not grouped, returns self.
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('group','G1'); coll['B'].add_tag('group','G1')
... g = coll.groupby('group')
... coll['B'].add_tag('group','G2', overwrite=True) # change tag
>>> g2 = g.regroup()
>>> sorted(g2.group_keys)
[('G1',), ('G2',)]
save ¶
save(
dirpath: str,
*,
overwrite: bool = False,
data_format: str = "parquet",
) -> None
Save this collection to a directory. Preserves grouping and delegates to leaf objects' save(dirpath, data_format, overwrite=True).
Examples¶
>>> import tempfile, shutil, os
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... out = d / 'coll'
... coll.save(str(out), overwrite=True, data_format='csv')
... # collection-level manifest at top-level
... assert os.path.exists(os.path.join(str(out), 'manifest.json'))
... # element-level manifests under elements/<handle>/
... el_manifest = os.path.join(str(out), 'elements', 'A', 'manifest.json')
... assert os.path.exists(el_manifest)
store ¶
store(
results_dict: BatchResult | dict,
name: str | None = None,
meta: dict | None = None,
overwrite: bool = False,
) -> str
Store FeaturesResult objects returned by batch methods.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
BatchResult | dict
|
Batch results to store. Flat: |
required |
|
str | None
|
Column name to store under. If None, resolved automatically from the result objects (all must agree on a single name). |
None
|
|
dict | None
|
Metadata dict to attach alongside the stored column. |
None
|
|
bool
|
If True, overwrite an existing column with the same name. |
False
|
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... _ = shutil.copy(p, d / 'A.csv'); _ = shutil.copy(p, d / 'B.csv')
... tc = TrackingCollection.from_dlc({'A': str(d/'A.csv'), 'B': str(d/'B.csv')}, fps=30)
>>> fc = FeaturesCollection.from_tracking_collection(tc)
>>> # Build a simple FeaturesResult dict from distance_between
>>> rd = {h: feat.distance_between('p1','p2') for h, feat in fc.items()}
>>> fc.store(rd, name='d12')
>>> all('d12' in feat.data.columns for feat in fc.values())
True
Returns:
| Type | Description |
|---|---|
str
|
The resolved stored column name. Raises |
str
|
resolves to multiple different names across leaves. |
stored_info ¶
stored_info() -> pd.DataFrame
Summarize stored feature columns across the collection's leaf Features objects.
Returns a DataFrame indexed by feature with columns:
- attached_to: number of recordings containing the feature
- missing_from: number of recordings not containing the feature
- type: pandas dtype string for the feature column when consistent, or a
list of dtype strings when mixed across recordings.
tags_info ¶
tags_info(
*, include_value_counts: bool = False
) -> pd.DataFrame
Summarize tag presence across the collection's leaf objects.
Works for flat and grouped collections. If include_value_counts is True,
include a column 'value_counts' with a dict of value->count for each tag.
Returns a pandas.DataFrame with columns:
['tag', 'attached_to', 'missing_from', 'unique_values', ('value_counts')].
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... coll['A'].add_tag('genotype', 'WT')
... coll['B'].add_tag('timepoint', 'T1')
>>> info = coll.tags_info(include_value_counts=True)
>>> int(info.loc['genotype','attached_to'])
1
>>> int(info.loc['genotype','missing_from'])
1
>>> int(info.loc['genotype','unique_values'])
1
>>> info.loc['genotype','value_counts']
{'WT': 1}
>>> int(info.loc['timepoint','attached_to'])
1
to_summary ¶
to_summary() -> SummaryCollection
Create a SummaryCollection from this FeaturesCollection.
This is a convenience wrapper around
SummaryCollection.from_features_collection(self) and preserves grouped
structure when the collection is grouped.
Returns:
| Type | Description |
|---|---|
SummaryCollection
|
Collection containing one |
SummaryCollection
|
this collection. |
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.features.features_collection import FeaturesCollection
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> from py3r.behaviour.util.docdata import data_path
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... tc = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
... fc = FeaturesCollection.from_tracking_collection(tc)
... sc = fc.to_summary()
>>> from py3r.behaviour.summary.summary_collection import SummaryCollection
>>> isinstance(sc, SummaryCollection)
True
>>> sorted(sc.keys())
['A', 'B']
train_knn_regressor ¶
train_knn_regressor(
*,
source_embedding: dict[str, list[int]],
target_embedding: dict[str, list[int]],
predictor_cls=None,
predictor_kwargs=None,
normalize_source: bool = False,
**kwargs,
)
Dev mode only: not available in public release yet.
values ¶
values()
Values iterator (elements or sub-collections).
Examples¶
>>> import tempfile, shutil
>>> from pathlib import Path
>>> from py3r.behaviour.util.docdata import data_path
>>> from py3r.behaviour.tracking.tracking_collection import TrackingCollection
>>> with tempfile.TemporaryDirectory() as d:
... d = Path(d)
... with data_path('py3r.behaviour.tracking._data', 'dlc_single.csv') as p:
... a = d / 'A.csv'; b = d / 'B.csv'
... _ = shutil.copy(p, a); _ = shutil.copy(p, b)
... coll = TrackingCollection.from_dlc({'A': str(a), 'B': str(b)}, fps=30)
>>> len(list(coll.values())) == 2
True