EPM pipeline

End‑to‑end example computing various measures for different maze arms using folder‑based loaders, batch preprocessing, feature generation, and summary export. Paths are illustrative; adapt to your environment.

# 1) Load a dataset of single‑view DLC CSVs into a TrackingCollection
import py3r.behaviour as p3b

DATA_DIR = "/data/recordings"            # e.g. contains EPM_id1.csv, EPM_id2.csv, ...
TAGS_CSV = "/data/tags.csv"              # optional, with columns: handle, treatment, genotype, ...
OUT_DIR  = "/outputs"                    # where to save summary outputs

tc = p3b.TrackingCollection.from_dlc_folder(folder_path=DATA_DIR, fps=30)

# 2) (Optional) Add tags from a CSV for grouping/analysis
# CSV must contain a 'handle' column matching filenames (without extension)
# other column names are the tag names, and those column values are the tag values
# e.g. handle, sex, treatment
#      filename1, m, control
#      filename2, f, crs
#      ...etc
try:
    tc.add_tags_from_csv(csv_path=TAGS_CSV)
except FileNotFoundError:
    pass

# 3) Batch preprocessing of tracking files
# Remove low-confidence detections (method/thresholds depend on your DLC export)
tc.filter_likelihood(threshold=0.95)

# Smooth all points with mean centre window 3, with exception for environment points
environment_points = ["tl", "tr","ctr","rt","rb","cbr", "br", "bl","cbl","lb","lt","ctl"]
tc.smooth_all(window=3, method="mean", overrides=[(environment_points, "median", 30)])

# Rescale distance to metres according to two corners of the EPM, here named 'tl' and 'br'
tc.rescale_by_known_distance(point1="tl", point2="br", distance_in_metres=0.655)

# Trim ends of recordings if needed
tc.trim(endframe=-10*30)  # drop 10s from end at 30 fps

# 4) Basic QA such as checking length of recordings and ploting tracking trajectories
# Length check (per recording, assuming 5 min, time in seconds)
timecheck = tc.time_as_expected(mintime=300-(0.1*300) ,maxtime=300+(0.1*300))
for key, val in timecheck.items():
    if not val:
        raise Exception(f"file {key} failed timecheck")

# Plot trajectories (per recording, using 'bodycentre' for trajectory of mouse and corners of EPM as static frame)
tc.plot(trajectories=["bodycentre"], static=environment_points, 
        lines=[("tl", "tr"), ("tr", "ctr"), ("ctr", "rt"), ("rt", "rb"),
               ("rb", "cbr"), ("cbr", "br"), ("br", "bl"), ("bl", "cbl"),
               ("cbl", "lb"), ("lb", "lt"), ("lt", "ctl"), ("ctl", "tl")])

# 5) Create FeaturesCollection object
fc = p3b.FeaturesCollection.from_tracking_collection(tc)

# 6) Compute features necessary to get different EPM measures 
# Define different boundaries (open arms, closed arms) and check if mouse (defined by 'bodycentre') is inside defined boundary
# Adjust boundaries so they match orientation of your EPM.
# Open arms
_oa_boundary = fc.define_boundary(['tl', 'tr', 'ctr', 'ctl'], scaling=1.1, centre = ["ctr", "ctl"])
on_oa1 = fc.within_boundary_static(point="bodycentre", boundary=_oa_boundary)
_oa_boundary = fc.define_boundary(['cbl', 'cbr', 'br', 'bl'], scaling=1.1, centre = ["cbr", "cbl"])
on_oa2 = fc.within_boundary_static(point="bodycentre", boundary=_oa_boundary)
on_oa = on_oa1 | on_oa2
on_oa.store(name="bodycentre_on_open_arms")

dist_change_on_oa = on_oa.astype("Int64") * fc.distance_change("bodycentre")
dist_change_on_oa.store(name="dist_change_bodycentre_on_oa")

# Closed arms
_ca_boundary = fc.define_boundary(["ctr", "rt", "rb", "cbr"], scaling=1.1, centre = ["ctr", "cbr"])
on_ca1 = fc.within_boundary_static(point="bodycentre", boundary=_ca_boundary)
_ca_boundary = fc.define_boundary(["lt", "ctl", "cbl", "lb"], scaling=1.1, centre = ["ctl", "cbl"])
on_ca2 = fc.within_boundary_static(point="bodycentre", boundary=_ca_boundary)
# you can apply binary operators to BatchResult objects
on_ca = on_ca1 | on_ca2
on_oa.store(name="bodycentre_on_closed_arms")

dist_change_on_ca = on_ca.astype("Int64") * fc.distance_change("bodycentre")
dist_change_on_ca.store(name="dist_change_bodycentre_on_ca")

# 7) (Optional) Save features to csv
fc.save(f"{OUT_DIR}/features", data_format="csv", overwrite=True)

# 8) Create SummaryCollection object
sc = p3b.SummaryCollection.from_features_collection(fc)

# 9) Compute summary measures per recording
# Total distance moved
sc.total_distance("bodycentre").store()

# Time on open arms
sc.time_true("bodycentre_on_open_arms").store("time_on_open_arms")

# Distance moved on open arms
sc.sum_column("dist_change_bodycentre_on_oa").store(name="distance_moved_on_open_arms")

# Time on closed arms
sc.time_true("bodycentre_on_closed_arms").store("time_on_closed_arms")

# Distance moved on closed arms
sc.sum_column("dist_change_bodycentre_on_ca").store(name="distance_moved_on_closed_arms")

# 10) Collate scalar outputs into DataFrame and save results in CSV
summary_df = sc.to_df(include_tags=True)
summary_df.to_csv(f"{OUT_DIR}/EPM_results.csv")