EPM pipeline

End‑to‑end example computing various measures for different maze arms using folder‑based loaders, batch preprocessing, feature generation, and summary export. Paths are illustrative; adapt to your environment.

# Load a dataset of single‑view DLC CSVs into a TrackingCollection
import py3r.behaviour as p3b

DATA_DIR = "/data/recordings"  # e.g. contains EPM_id1.csv, EPM_id2.csv, ...
TAGS_CSV = "/data/tags.csv"  # optional, with columns: handle, treatment, genotype, ...
OUT_DIR = "/outputs"  # where to save summary outputs

tc = p3b.TrackingCollection.from_dlc_folder(folder_path=DATA_DIR, fps=30)

# (Optional) Add tags from a CSV for grouping/analysis
# CSV must contain a 'handle' column matching filenames (without extension)
# other column names are the tag names, and those column values are the tag values
# e.g. handle, sex, treatment
#      filename1, m, control
#      filename2, f, crs
#      ...etc
try:
    tc.add_tags_from_csv(csv_path=TAGS_CSV)
except FileNotFoundError:
    pass

# Remove low-confidence detections (thresholds depend on your tracking software/data)
tc.filter_likelihood(threshold=0.5)

# Interpolate missing points before smoothing
tc.interpolate(limit=5)

# Smooth all points with mean centre window 3
tc.smooth_all(window=3, method="mean")

# Rescale distance to metres according to two corners of the EPM, here named 'tl' and 'br'
tc.rescale_by_known_distance(point1="tl", point2="br", distance_in_metres=0.655)

# Trim ends of recordings if needed
tc.trim(endframe=-10 * 30)  # drop 10s from end at 30 fps

# 4) Basic QA such as checking length of recordings and ploting tracking trajectories
# Length check (per recording, assuming 5 min, time in seconds)
timecheck = tc.time_as_expected(mintime=300 - (0.1 * 300), maxtime=300 + (0.1 * 300))
for key, val in timecheck.items():
    if not val:
        raise Exception(f"file {key} failed timecheck")

# Plot trajectories (per recording, using 'bodycentre' for trajectory of mouse and corners of EPM as static frame)
# (note that point names will likely be different)
tc.plot(
    trajectories=["bodycentre"],
    static=["tl", "tr", "ctr", "rt", "rb", "cbr", "br", "bl", "cbl", "lb", "lt", "ctl"],
    lines=[
        ("tl", "tr"),
        ("tr", "ctr"),
        ("ctr", "rt"),
        ("rt", "rb"),
        ("rb", "cbr"),
        ("cbr", "br"),
        ("br", "bl"),
        ("bl", "cbl"),
        ("cbl", "lb"),
        ("lb", "lt"),
        ("lt", "ctl"),
        ("ctl", "tl"),
    ],
    show=False,
    savedir=OUT_DIR,
)

# 5) Create FeaturesCollection object
fc = p3b.FeaturesCollection.from_tracking_collection(tc)

# 6) Compute features necessary to get different EPM measures
# Define different boundaries (open arms, closed arms) and check if mouse (defined by 'bodycentre') is inside defined boundary
# Adjust boundaries so they match orientation of your EPM.
# Open arms
_oa_boundary = fc.define_boundary(
    ["tl", "tr", "ctr", "ctl"], scaling=1.1, centre=["ctr", "ctl"]
)
on_oa1 = fc.within_boundary_static(point="bodycentre", boundary=_oa_boundary)
_oa_boundary = fc.define_boundary(
    ["cbl", "cbr", "br", "bl"], scaling=1.1, centre=["cbr", "cbl"]
)
on_oa2 = fc.within_boundary_static(point="bodycentre", boundary=_oa_boundary)
on_oa = on_oa1 | on_oa2
on_oa.store(name="bodycentre_on_open_arms")

dist_change_on_oa = on_oa.astype("Int64") * fc.distance_change("bodycentre")
dist_change_on_oa.store(name="dist_change_bodycentre_on_oa")

# Closed arms
_ca_boundary = fc.define_boundary(
    ["ctr", "rt", "rb", "cbr"], scaling=1.1, centre=["ctr", "cbr"]
)
on_ca1 = fc.within_boundary_static(point="bodycentre", boundary=_ca_boundary)
_ca_boundary = fc.define_boundary(
    ["lt", "ctl", "cbl", "lb"], scaling=1.1, centre=["ctl", "cbl"]
)
on_ca2 = fc.within_boundary_static(point="bodycentre", boundary=_ca_boundary)
# you can apply binary operators to BatchResult objects
on_ca = on_ca1 | on_ca2
on_oa.store(name="bodycentre_on_closed_arms")

dist_change_on_ca = on_ca.astype("Int64") * fc.distance_change("bodycentre")
dist_change_on_ca.store(name="dist_change_bodycentre_on_ca")

# 7) (Optional) Save features to csv
fc.save(f"{OUT_DIR}/features", data_format="csv", overwrite=True)

# 8) Create SummaryCollection object
sc = p3b.SummaryCollection.from_features_collection(fc)

# 9) Compute summary measures per recording
# Total distance moved
sc.total_distance("bodycentre").store()

# Time on open arms
sc.time_true("bodycentre_on_open_arms").store("time_on_open_arms")

# Distance moved on open arms
sc.sum_column("dist_change_bodycentre_on_oa").store(name="distance_moved_on_open_arms")

# Time on closed arms
sc.time_true("bodycentre_on_closed_arms").store("time_on_closed_arms")

# Distance moved on closed arms
sc.sum_column("dist_change_bodycentre_on_ca").store(
    name="distance_moved_on_closed_arms"
)

# 10) Collate scalar outputs into DataFrame and save results in CSV
summary_df = sc.to_df(include_tags=True)
summary_df.to_csv(f"{OUT_DIR}/EPM_results.csv")