EPM pipeline
End‑to‑end example computing various measures for different maze arms using folder‑based loaders, batch preprocessing, feature generation, and summary export. Paths are illustrative; adapt to your environment.
# Load a dataset of single‑view DLC CSVs into a TrackingCollection
import py3r.behaviour as p3b
DATA_DIR = "/data/recordings" # e.g. contains EPM_id1.csv, EPM_id2.csv, ...
TAGS_CSV = "/data/tags.csv" # optional, with columns: handle, treatment, genotype, ...
OUT_DIR = "/outputs" # where to save summary outputs
tc = p3b.TrackingCollection.from_dlc_folder(folder_path=DATA_DIR, fps=30)
# (Optional) Add tags from a CSV for grouping/analysis
# CSV must contain a 'handle' column matching filenames (without extension)
# other column names are the tag names, and those column values are the tag values
# e.g. handle, sex, treatment
# filename1, m, control
# filename2, f, crs
# ...etc
try:
tc.add_tags_from_csv(csv_path=TAGS_CSV)
except FileNotFoundError:
pass
# Remove low-confidence detections (thresholds depend on your tracking software/data)
tc.filter_likelihood(threshold=0.5)
# Interpolate missing points before smoothing
tc.interpolate(limit=5)
# Smooth all points with mean centre window 3
tc.smooth_all(window=3, method="mean")
# Rescale distance to metres according to two corners of the EPM, here named 'tl' and 'br'
tc.rescale_by_known_distance(point1="tl", point2="br", distance_in_metres=0.655)
# Trim ends of recordings if needed
tc.trim(endframe=-10 * 30) # drop 10s from end at 30 fps
# 4) Basic QA such as checking length of recordings and ploting tracking trajectories
# Length check (per recording, assuming 5 min, time in seconds)
timecheck = tc.time_as_expected(mintime=300 - (0.1 * 300), maxtime=300 + (0.1 * 300))
for key, val in timecheck.items():
if not val:
raise Exception(f"file {key} failed timecheck")
# Plot trajectories (per recording, using 'bodycentre' for trajectory of mouse and corners of EPM as static frame)
# (note that point names will likely be different)
tc.plot(
trajectories=["bodycentre"],
static=["tl", "tr", "ctr", "rt", "rb", "cbr", "br", "bl", "cbl", "lb", "lt", "ctl"],
lines=[
("tl", "tr"),
("tr", "ctr"),
("ctr", "rt"),
("rt", "rb"),
("rb", "cbr"),
("cbr", "br"),
("br", "bl"),
("bl", "cbl"),
("cbl", "lb"),
("lb", "lt"),
("lt", "ctl"),
("ctl", "tl"),
],
show=False,
savedir=OUT_DIR,
)
# 5) Create FeaturesCollection object
fc = p3b.FeaturesCollection.from_tracking_collection(tc)
# 6) Compute features necessary to get different EPM measures
# Define different boundaries (open arms, closed arms) and check if mouse (defined by 'bodycentre') is inside defined boundary
# Adjust boundaries so they match orientation of your EPM.
# Open arms
_oa_boundary = fc.define_boundary(
["tl", "tr", "ctr", "ctl"], scaling=1.1, centre=["ctr", "ctl"]
)
on_oa1 = fc.within_boundary_static(point="bodycentre", boundary=_oa_boundary)
_oa_boundary = fc.define_boundary(
["cbl", "cbr", "br", "bl"], scaling=1.1, centre=["cbr", "cbl"]
)
on_oa2 = fc.within_boundary_static(point="bodycentre", boundary=_oa_boundary)
on_oa = on_oa1 | on_oa2
on_oa.store(name="bodycentre_on_open_arms")
dist_change_on_oa = on_oa.astype("Int64") * fc.distance_change("bodycentre")
dist_change_on_oa.store(name="dist_change_bodycentre_on_oa")
# Closed arms
_ca_boundary = fc.define_boundary(
["ctr", "rt", "rb", "cbr"], scaling=1.1, centre=["ctr", "cbr"]
)
on_ca1 = fc.within_boundary_static(point="bodycentre", boundary=_ca_boundary)
_ca_boundary = fc.define_boundary(
["lt", "ctl", "cbl", "lb"], scaling=1.1, centre=["ctl", "cbl"]
)
on_ca2 = fc.within_boundary_static(point="bodycentre", boundary=_ca_boundary)
# you can apply binary operators to BatchResult objects
on_ca = on_ca1 | on_ca2
on_oa.store(name="bodycentre_on_closed_arms")
dist_change_on_ca = on_ca.astype("Int64") * fc.distance_change("bodycentre")
dist_change_on_ca.store(name="dist_change_bodycentre_on_ca")
# 7) (Optional) Save features to csv
fc.save(f"{OUT_DIR}/features", data_format="csv", overwrite=True)
# 8) Create SummaryCollection object
sc = p3b.SummaryCollection.from_features_collection(fc)
# 9) Compute summary measures per recording
# Total distance moved
sc.total_distance("bodycentre").store()
# Time on open arms
sc.time_true("bodycentre_on_open_arms").store("time_on_open_arms")
# Distance moved on open arms
sc.sum_column("dist_change_bodycentre_on_oa").store(name="distance_moved_on_open_arms")
# Time on closed arms
sc.time_true("bodycentre_on_closed_arms").store("time_on_closed_arms")
# Distance moved on closed arms
sc.sum_column("dist_change_bodycentre_on_ca").store(
name="distance_moved_on_closed_arms"
)
# 10) Collate scalar outputs into DataFrame and save results in CSV
summary_df = sc.to_df(include_tags=True)
summary_df.to_csv(f"{OUT_DIR}/EPM_results.csv")