Skip to content

API Reference

Auto-generated reference documentation for the Flow3R public plugin API.


Plugin interface

flow3r.core.plugin.plugin.IPlugin

Bases: Protocol

Interface that every Flow3R plugin must satisfy.

A plugin is the single entry-point object that Flow3R instantiates when it discovers a flow3r.plugins entry-point group. Its only responsibility is to call the relevant api.*.register(...) methods during :meth:initialize so that the application becomes aware of the source types, pipeline types, visualizers, and settings the plugin provides.

Example package entry-point declaration (pyproject.toml)::

[project.entry-points."flow3r.plugins"]
my_plugin = "my_package.plugin:MyPlugin"
Source code in src/flow3r/core/plugin/plugin.py
@runtime_checkable
class IPlugin(Protocol):
    """Interface that every Flow3R plugin must satisfy.

    A plugin is the single entry-point object that Flow3R instantiates when it
    discovers a ``flow3r.plugins`` entry-point group.  Its only responsibility
    is to call the relevant ``api.*.register(...)`` methods during
    :meth:`initialize` so that the application becomes aware of the source
    types, pipeline types, visualizers, and settings the plugin provides.

    Example package entry-point declaration (``pyproject.toml``)::

        [project.entry-points."flow3r.plugins"]
        my_plugin = "my_package.plugin:MyPlugin"
    """

    @property
    def name(self) -> str:
        """Human-readable name of the plugin (used in log messages)."""
        ...

    def initialize(self, api: IPluginAPI) -> None:
        """Register all plugin contributions with the application.

        Called exactly once, before the main window is shown.  Use *api* to
        register source types, pipeline types, visualizers, config types, and
        settings menus.

        Args:
            api: The plugin API surface provided by the application.
        """
        ...

name property

Human-readable name of the plugin (used in log messages).

initialize(api)

Register all plugin contributions with the application.

Called exactly once, before the main window is shown. Use api to register source types, pipeline types, visualizers, config types, and settings menus.

Parameters:

Name Type Description Default
api IPluginAPI

The plugin API surface provided by the application.

required
Source code in src/flow3r/core/plugin/plugin.py
def initialize(self, api: IPluginAPI) -> None:
    """Register all plugin contributions with the application.

    Called exactly once, before the main window is shown.  Use *api* to
    register source types, pipeline types, visualizers, config types, and
    settings menus.

    Args:
        api: The plugin API surface provided by the application.
    """
    ...

Plugin API

flow3r.core.api.plugins.plugins.IPluginAPI

Bases: Protocol

Read-only protocol describing the API surface available to plugins.

An instance of this protocol is passed to :meth:IPlugin.initialize for every loaded plugin. Plugins use the registry properties to register their contributions (source types, pipeline types, etc.) with the application.

The concrete implementation is :class:flow3r.app.api.plugins.plugins.PluginAPI.

Source code in src/flow3r/core/api/plugins/plugins.py
class IPluginAPI(Protocol):
    """Read-only protocol describing the API surface available to plugins.

    An instance of this protocol is passed to :meth:`IPlugin.initialize` for
    every loaded plugin.  Plugins use the registry properties to register their
    contributions (source types, pipeline types, etc.) with the application.

    The concrete implementation is :class:`flow3r.app.api.plugins.plugins.PluginAPI`.
    """

    @property
    def config_types(self) -> IConfigTypeRegistry:
        """Registry for mapping ``TYPE_ID`` strings to config dataclasses.

        Plugins must register a config class here for every source config and
        pipeline config they introduce so that Flow3R can deserialise saved
        ``.f3r`` project files correctly.
        """
        ...

    @property
    def source_types(self) -> ISourceTypeRegistry:
        """Registry for :class:`~flow3r.core.source.abc.source_type.ISourceType` objects.

        Each registered source type appears in the *Add Source* dialog and can
        be instantiated by the user when building a recording project.
        """
        ...

    @property
    def visualizer_types(self) -> VisualizerTypeRegistry:
        """Registry for visualizer types (live preview widgets)."""
        ...

    @property
    def pipeline_types(self) -> IPipelineTypeRegistry:
        """Registry for :class:`~flow3r.core.pipeline.abc.pipeline_type.IPipelineType` objects.

        Each registered pipeline type appears in the *Add Pipeline* dialog and
        can be attached to a recording group.
        """
        ...

    @property
    def settings(self) -> ISettingsRegistry:
        """Registry for application-wide settings keys and their default values."""
        ...

    @property
    def settings_menus(self) -> ISettingsMenusRegistry:
        """Registry for plugin-contributed entries in the Settings menu."""
        ...

config_types property

Registry for mapping TYPE_ID strings to config dataclasses.

Plugins must register a config class here for every source config and pipeline config they introduce so that Flow3R can deserialise saved .f3r project files correctly.

source_types property

Registry for :class:~flow3r.core.source.abc.source_type.ISourceType objects.

Each registered source type appears in the Add Source dialog and can be instantiated by the user when building a recording project.

visualizer_types property

Registry for visualizer types (live preview widgets).

pipeline_types property

Registry for :class:~flow3r.core.pipeline.abc.pipeline_type.IPipelineType objects.

Each registered pipeline type appears in the Add Pipeline dialog and can be attached to a recording group.

settings property

Registry for application-wide settings keys and their default values.

settings_menus property

Registry for plugin-contributed entries in the Settings menu.

flow3r.app.api.plugins.plugins.PluginAPI

Bases: IPluginAPI

Source code in src/flow3r/app/api/plugins/plugins.py
class PluginAPI(IPluginAPI):
    def __init__(self):
        self._config_types = ConfigTypeRegistry()
        self._source_types = SourceTypeRegistry()
        self._visualizer_types = VisualizerTypeRegistry()
        self._pipeline_types = PipelineTypeRegistry()
        self._settings_registry = SettingsRegistry()
        self._settings_menus = SettingsMenuRegistry()

    @property
    def config_types(self) -> ConfigTypeRegistry:
        return self._config_types

    @property
    def source_types(self) -> SourceTypeRegistry:
        return self._source_types

    @property
    def visualizer_types(self) -> VisualizerTypeRegistry:
        return self._visualizer_types

    @property
    def pipeline_types(self) -> PipelineTypeRegistry:
        return self._pipeline_types

    @property
    def settings(self) -> SettingsRegistry:
        return self._settings_registry

    @property
    def settings_menus(self) -> SettingsMenuRegistry:
        return self._settings_menus

Source types

flow3r.core.source.abc.source_type.ISourceType

Bases: Protocol[TConfig, TDesc, TData]

Protocol that describes a kind of source (e.g. "Webcam", "Pylon Camera").

Flow3R uses :class:ISourceType objects as factories. When a user adds a source in the UI, the application calls the relevant factory callables to create a default config, a config-editing widget, and ultimately a live source instance.

Use the concrete :class:SourceType dataclass when registering a source type from a plugin — it satisfies this protocol automatically.

Source code in src/flow3r/core/source/abc/source_type.py
class ISourceType(Protocol[TConfig, TDesc, TData]):
    """Protocol that describes a *kind* of source (e.g. "Webcam", "Pylon Camera").

    Flow3R uses :class:`ISourceType` objects as factories.  When a user adds a
    source in the UI, the application calls the relevant factory callables to
    create a default config, a config-editing widget, and ultimately a live
    source instance.

    Use the concrete :class:`SourceType` dataclass when registering a source type
    from a plugin — it satisfies this protocol automatically.
    """

    @property
    def name(self) -> str:
        """Unique, human-readable name shown in the *Add Source* dialog (e.g. ``"Webcam"``)."""
        ...

    @property
    def category(self) -> Tuple[str, ...]:
        """Hierarchical category path used to group sources in menus (e.g. ``("Video", "Camera")``)."""
        ...

    @property
    def config_factory(self) -> Callable[[], TConfig]:
        """Zero-argument callable that returns a default source config instance."""
        ...

    @property
    def config_widget_factory(self) -> Callable[[TConfig, QWidget], IConfigWidget]:
        """Callable ``(config, parent) -> IConfigWidget`` that creates the source config editor widget."""
        ...

    @property
    def source_factory(self) -> Callable[[TConfig], ISource[TDesc, TData]]:
        """Callable ``(config) -> ISource`` that constructs a live source instance."""
        ...

name property

Unique, human-readable name shown in the Add Source dialog (e.g. "Webcam").

category property

Hierarchical category path used to group sources in menus (e.g. ("Video", "Camera")).

config_factory property

Zero-argument callable that returns a default source config instance.

config_widget_factory property

Callable (config, parent) -> IConfigWidget that creates the source config editor widget.

source_factory property

Callable (config) -> ISource that constructs a live source instance.

flow3r.core.source.abc.source_type.SourceType dataclass

Bases: Generic[TConfig, TDesc, TData]

Concrete dataclass implementation of :class:ISourceType.

Create one of these and pass it to :meth:~flow3r.app.api.plugins.source_type_registry.SourceTypeRegistry.register inside your plugin's :meth:~flow3r.core.plugin.plugin.IPlugin.initialize method.

Example::

MY_SOURCE_TYPE = SourceType(
    name="My Camera",
    category=("Video", "Camera"),
    config_factory=MyCameraConfig,
    config_widget_factory=MyCameraConfigWidget,
    source_factory=MyCameraSource,
)

# inside IPlugin.initialize:
api.source_types.register(MY_SOURCE_TYPE)
Source code in src/flow3r/core/source/abc/source_type.py
@dataclass
class SourceType(Generic[TConfig, TDesc, TData]):
    """Concrete dataclass implementation of :class:`ISourceType`.

    Create one of these and pass it to
    :meth:`~flow3r.app.api.plugins.source_type_registry.SourceTypeRegistry.register`
    inside your plugin's :meth:`~flow3r.core.plugin.plugin.IPlugin.initialize` method.

    Example::

        MY_SOURCE_TYPE = SourceType(
            name="My Camera",
            category=("Video", "Camera"),
            config_factory=MyCameraConfig,
            config_widget_factory=MyCameraConfigWidget,
            source_factory=MyCameraSource,
        )

        # inside IPlugin.initialize:
        api.source_types.register(MY_SOURCE_TYPE)
    """

    name: str
    """Unique, human-readable name shown in the *Add Source* dialog."""

    category: Tuple[str, ...]
    """Hierarchical category path (e.g. ``("Video", "Camera")``)."""

    config_factory: Callable[[], TConfig]
    """Zero-argument callable that returns a default source config instance."""

    config_widget_factory: Callable[[TConfig, QWidget], IConfigWidget]
    """Callable ``(config, parent) -> IConfigWidget`` for the config editor widget."""

    source_factory: Callable[[TConfig], ISource[TDesc, TData]]
    """Callable ``(config) -> ISource`` that constructs a live source instance."""

name instance-attribute

Unique, human-readable name shown in the Add Source dialog.

category instance-attribute

Hierarchical category path (e.g. ("Video", "Camera")).

config_factory instance-attribute

Zero-argument callable that returns a default source config instance.

config_widget_factory instance-attribute

Callable (config, parent) -> IConfigWidget for the config editor widget.

source_factory instance-attribute

Callable (config) -> ISource that constructs a live source instance.

flow3r.app.api.plugins.source_type_registry.SourceTypeRegistry

Bases: ISourceTypeRegistry

Concrete registry that maps source type names to :class:ISourceType objects.

Plugins call :meth:register inside :meth:~flow3r.core.plugin.plugin.IPlugin.initialize to make their source types available in the Add Source dialog.

Source code in src/flow3r/app/api/plugins/source_type_registry.py
class SourceTypeRegistry(ISourceTypeRegistry):
    """Concrete registry that maps source type names to :class:`ISourceType` objects.

    Plugins call :meth:`register` inside :meth:`~flow3r.core.plugin.plugin.IPlugin.initialize`
    to make their source types available in the *Add Source* dialog.
    """

    def __init__(self):
        self._source_types = {}

    def register(self, source_type: ISourceType) -> None:
        """Register a source type with the application.

        Args:
            source_type: A fully configured :class:`~flow3r.core.source.abc.source_type.SourceType`
                (or any object satisfying :class:`~flow3r.core.source.abc.source_type.ISourceType`)
                describing how to create config objects, config widgets, and source instances.
        """
        self._source_types[source_type.name] = source_type

    def get_source_types(self) -> Dict[str, ISourceType]:
        """Return all registered source types keyed by name."""
        return self._source_types

register(source_type)

Register a source type with the application.

Parameters:

Name Type Description Default
source_type ISourceType

A fully configured :class:~flow3r.core.source.abc.source_type.SourceType (or any object satisfying :class:~flow3r.core.source.abc.source_type.ISourceType) describing how to create config objects, config widgets, and source instances.

required
Source code in src/flow3r/app/api/plugins/source_type_registry.py
def register(self, source_type: ISourceType) -> None:
    """Register a source type with the application.

    Args:
        source_type: A fully configured :class:`~flow3r.core.source.abc.source_type.SourceType`
            (or any object satisfying :class:`~flow3r.core.source.abc.source_type.ISourceType`)
            describing how to create config objects, config widgets, and source instances.
    """
    self._source_types[source_type.name] = source_type

get_source_types()

Return all registered source types keyed by name.

Source code in src/flow3r/app/api/plugins/source_type_registry.py
def get_source_types(self) -> Dict[str, ISourceType]:
    """Return all registered source types keyed by name."""
    return self._source_types

Sources

flow3r.core.source.abc.source.ISource

Bases: Protocol[TDesc, TData]

Protocol for a single data source (camera, microphone, file, …).

A source encapsulates one device or file and exposes its data as a reactive stream. The application calls :meth:open before subscribing to the stream and :meth:close when the source is no longer needed.

Class Type Parameters:

Name Bound or Constraints Description Default
TDesc

Descriptor type emitted by the stream (e.g. frame metadata).

required
TData

Data type emitted by the stream (e.g. numpy.ndarray for video).

required
Source code in src/flow3r/core/source/abc/source.py
class ISource(Protocol[TDesc, TData]):
    """Protocol for a single data source (camera, microphone, file, …).

    A source encapsulates one *device or file* and exposes its data as a
    reactive stream.  The application calls :meth:`open` before subscribing
    to the stream and :meth:`close` when the source is no longer needed.

    Type parameters:
        TDesc: Descriptor type emitted by the stream (e.g. frame metadata).
        TData: Data type emitted by the stream (e.g. ``numpy.ndarray`` for video).
    """

    @property
    def stream(self) -> IStream[TDesc, TData]:
        """The reactive stream that emits items from this source."""
        ...

    def open(self) -> None:
        """Open/initialize the underlying device or file and start producing items."""
        ...

    def close(self) -> None:
        """Stop producing items and release all resources held by this source."""
        ...

stream property

The reactive stream that emits items from this source.

open()

Open/initialize the underlying device or file and start producing items.

Source code in src/flow3r/core/source/abc/source.py
def open(self) -> None:
    """Open/initialize the underlying device or file and start producing items."""
    ...

close()

Stop producing items and release all resources held by this source.

Source code in src/flow3r/core/source/abc/source.py
def close(self) -> None:
    """Stop producing items and release all resources held by this source."""
    ...

Pipeline types

flow3r.core.pipeline.abc.pipeline_type.IPipelineType

Bases: Protocol[TConfig, TPipeline]

Protocol that describes a kind of pipeline (e.g. "Record Video").

Flow3R uses :class:IPipelineType objects as factories. When a user adds a pipeline to a group, the application calls the factory callables to create a default config, a config-editing widget, and ultimately a pipeline instance.

Use the concrete :class:PipelineType dataclass when registering a pipeline type from a plugin — it satisfies this protocol automatically.

Source code in src/flow3r/core/pipeline/abc/pipeline_type.py
class IPipelineType(Protocol[TConfig, TPipeline]):
    """Protocol that describes a *kind* of pipeline (e.g. "Record Video").

    Flow3R uses :class:`IPipelineType` objects as factories.  When a user adds a
    pipeline to a group, the application calls the factory callables to create a
    default config, a config-editing widget, and ultimately a pipeline instance.

    Use the concrete :class:`PipelineType` dataclass when registering a pipeline
    type from a plugin — it satisfies this protocol automatically.
    """

    @property
    def name(self) -> str:
        """Unique, human-readable name shown in the *Add Pipeline* dialog (e.g. ``"Record Video"``)."""
        ...

    @property
    def category(self) -> Tuple[str, ...]:
        """Hierarchical category path used to group pipelines in menus (e.g. ``("Video",)``)."""
        ...

    @property
    def config_factory(self) -> Callable[[], TConfig]:
        """Zero-argument callable that returns a default pipeline config instance."""
        ...

    @property
    def config_widget_factory(self) -> Callable[[IAppContext, TConfig, QWidget], IConfigWidget]:
        """Callable ``(app_context, config, parent) -> IConfigWidget`` for the config editor widget."""
        ...

    @property
    def pipeline_factory(self) -> Callable[[], IPipeline]:
        """Zero-argument callable that constructs a new pipeline instance."""
        ...

name property

Unique, human-readable name shown in the Add Pipeline dialog (e.g. "Record Video").

category property

Hierarchical category path used to group pipelines in menus (e.g. ("Video",)).

config_factory property

Zero-argument callable that returns a default pipeline config instance.

config_widget_factory property

Callable (app_context, config, parent) -> IConfigWidget for the config editor widget.

pipeline_factory property

Zero-argument callable that constructs a new pipeline instance.

flow3r.core.pipeline.abc.pipeline_type.PipelineType dataclass

Bases: Generic[TConfig, TPipeline]

Concrete dataclass implementation of :class:IPipelineType.

Create one of these and pass it to :meth:~flow3r.app.api.plugins.pipeline_type_registry.PipelineTypeRegistry.register inside your plugin's :meth:~flow3r.core.plugin.plugin.IPlugin.initialize method.

Example::

MY_PIPELINE_TYPE = PipelineType(
    name="My Analysis",
    category=("Analysis",),
    config_factory=MyAnalysisConfig,
    config_widget_factory=MyAnalysisConfigWidget,
    pipeline_factory=MyAnalysisPipeline,
)

# inside IPlugin.initialize:
api.pipeline_types.register(MY_PIPELINE_TYPE)
Source code in src/flow3r/core/pipeline/abc/pipeline_type.py
@dataclass
class PipelineType(Generic[TConfig, TPipeline]):
    """Concrete dataclass implementation of :class:`IPipelineType`.

    Create one of these and pass it to
    :meth:`~flow3r.app.api.plugins.pipeline_type_registry.PipelineTypeRegistry.register`
    inside your plugin's :meth:`~flow3r.core.plugin.plugin.IPlugin.initialize` method.

    Example::

        MY_PIPELINE_TYPE = PipelineType(
            name="My Analysis",
            category=("Analysis",),
            config_factory=MyAnalysisConfig,
            config_widget_factory=MyAnalysisConfigWidget,
            pipeline_factory=MyAnalysisPipeline,
        )

        # inside IPlugin.initialize:
        api.pipeline_types.register(MY_PIPELINE_TYPE)
    """

    name: str
    """Unique, human-readable name shown in the *Add Pipeline* dialog."""

    category: Tuple[str, ...]
    """Hierarchical category path (e.g. ``("Video",)``)."""

    config_factory: Callable[[], TConfig]
    """Zero-argument callable that returns a default pipeline config instance."""

    config_widget_factory: Callable[[IAppContext, TConfig, QWidget], IConfigWidget]
    """Callable ``(app_context, config, parent) -> IConfigWidget`` for the config editor widget."""

    pipeline_factory: Callable[[], IPipeline]
    """Zero-argument callable that constructs a new pipeline instance."""

name instance-attribute

Unique, human-readable name shown in the Add Pipeline dialog.

category instance-attribute

Hierarchical category path (e.g. ("Video",)).

config_factory instance-attribute

Zero-argument callable that returns a default pipeline config instance.

config_widget_factory instance-attribute

Callable (app_context, config, parent) -> IConfigWidget for the config editor widget.

pipeline_factory instance-attribute

Zero-argument callable that constructs a new pipeline instance.

flow3r.app.api.plugins.pipeline_type_registry.PipelineTypeRegistry

Bases: IPipelineTypeRegistry

Concrete registry that maps pipeline type names to :class:IPipelineType objects.

Plugins call :meth:register inside :meth:~flow3r.core.plugin.plugin.IPlugin.initialize to make their pipeline types available in the Add Pipeline dialog.

Source code in src/flow3r/app/api/plugins/pipeline_type_registry.py
class PipelineTypeRegistry(IPipelineTypeRegistry):
    """Concrete registry that maps pipeline type names to :class:`IPipelineType` objects.

    Plugins call :meth:`register` inside :meth:`~flow3r.core.plugin.plugin.IPlugin.initialize`
    to make their pipeline types available in the *Add Pipeline* dialog.
    """

    def __init__(self):
        self._pipeline_types = {}

    def register(self, pipeline_type: IPipelineType) -> None:
        """Register a pipeline type with the application.

        Args:
            pipeline_type: A fully configured :class:`~flow3r.core.pipeline.abc.pipeline_type.PipelineType`
                (or any object satisfying :class:`~flow3r.core.pipeline.abc.pipeline_type.IPipelineType`)
                describing how to create config objects, config widgets, and pipeline instances.
        """
        self._pipeline_types[pipeline_type.name] = pipeline_type

    def get_pipeline_types(self) -> Dict[str, IPipelineType]:
        """Return all registered pipeline types keyed by name."""
        return self._pipeline_types

register(pipeline_type)

Register a pipeline type with the application.

Parameters:

Name Type Description Default
pipeline_type IPipelineType

A fully configured :class:~flow3r.core.pipeline.abc.pipeline_type.PipelineType (or any object satisfying :class:~flow3r.core.pipeline.abc.pipeline_type.IPipelineType) describing how to create config objects, config widgets, and pipeline instances.

required
Source code in src/flow3r/app/api/plugins/pipeline_type_registry.py
def register(self, pipeline_type: IPipelineType) -> None:
    """Register a pipeline type with the application.

    Args:
        pipeline_type: A fully configured :class:`~flow3r.core.pipeline.abc.pipeline_type.PipelineType`
            (or any object satisfying :class:`~flow3r.core.pipeline.abc.pipeline_type.IPipelineType`)
            describing how to create config objects, config widgets, and pipeline instances.
    """
    self._pipeline_types[pipeline_type.name] = pipeline_type

get_pipeline_types()

Return all registered pipeline types keyed by name.

Source code in src/flow3r/app/api/plugins/pipeline_type_registry.py
def get_pipeline_types(self) -> Dict[str, IPipelineType]:
    """Return all registered pipeline types keyed by name."""
    return self._pipeline_types

Pipelines

flow3r.core.pipeline.abc.pipeline.PipelineContext

Bases: Generic[TConfig]

Passed to :meth:IPipeline.build for a recording run.

Carries the resolved config and services, and exposes registration methods for wiring the full recording pipeline. Call :meth:register_primary_done exactly once before build() returns. Optionally call :meth:register_secondary_done for post-processing (e.g. muxing).

.. code-block:: python

def build(self, context: PipelineContext[MyConfig], sources):
    sub = my_sink.subscribe(sources["Video"])
    context.register_primary_done(sub.done)
    context.add_disposable(sub.disposable)

Attributes:

Name Type Description
config

The resolved pipeline config for this session.

settings

Read-only view of application settings.

widget_service

Service for obtaining visualizer widget handles.

control

Observable that emits None exactly once when the recording gate opens and completes when stop is requested. Useful for pipelines without source inputs and for the :class:~flow3r.core.pipeline.iterative_pipeline.IterativePipeline adapter.

Source code in src/flow3r/core/pipeline/abc/pipeline.py
class PipelineContext(Generic[TConfig]):
    """Passed to :meth:`IPipeline.build` for a recording run.

    Carries the resolved config and services, and exposes registration methods
    for wiring the full recording pipeline.  Call :meth:`register_primary_done`
    exactly once before ``build()`` returns.  Optionally call
    :meth:`register_secondary_done` for post-processing (e.g. muxing).

    .. code-block:: python

        def build(self, context: PipelineContext[MyConfig], sources):
            sub = my_sink.subscribe(sources["Video"])
            context.register_primary_done(sub.done)
            context.add_disposable(sub.disposable)

    Attributes:
        config: The resolved pipeline config for this session.
        settings: Read-only view of application settings.
        widget_service: Service for obtaining visualizer widget handles.
        control: Observable that emits ``None`` exactly once when the recording
            gate opens and completes when stop is requested.  Useful for
            pipelines without source inputs and for the
            :class:`~flow3r.core.pipeline.iterative_pipeline.IterativePipeline`
            adapter.
    """

    def __init__(
        self,
        config: TConfig,
        settings: ISettingsView,
        widget_service: IWidgetService,
        control: Observable,
    ):
        self.config = config
        self.settings = settings
        self.widget_service = widget_service
        self.control = control

        self._primary_done: Optional[Observable] = None
        self._secondary_done: Optional[Observable] = None
        self._progress: Optional[Observable[Tuple[int, int]]] = None
        self._disposables: List[DisposableBase] = []

    # ------------------------------------------------------------------
    # Registration API
    # ------------------------------------------------------------------

    def register_primary_done(self, obs: Observable) -> None:
        """Register the observable that signals primary-work completion.

        The observable should emit (or complete) once the core recording work
        is done — e.g. the video file has been flushed and closed.  Must be
        called exactly once before ``build()`` returns.
        """
        if self._primary_done is not None:
            raise RuntimeError("register_primary_done() called more than once")
        self._primary_done = obs

    def register_secondary_done(self, obs: Observable) -> None:
        """Register the observable for secondary (post-processing) completion.

        Optional.  Use for follow-up steps (e.g. muxing) that happen after the
        primary recording files are closed.
        """
        if self._secondary_done is not None:
            raise RuntimeError("register_secondary_done() called more than once")
        self._secondary_done = obs

    def register_progress(self, obs: "Observable[Tuple[int, int]]") -> None:
        """Register an observable that emits ``(completed, total)`` progress tuples."""
        if self._progress is not None:
            raise RuntimeError("register_progress() called more than once")
        self._progress = obs

    def add_disposable(self, disposable: DisposableBase) -> None:
        """Add a disposable that will be disposed when the pipeline is aborted."""
        self._disposables.append(disposable)

    # ------------------------------------------------------------------
    # Internal — called by RuntimeController after build() returns
    # ------------------------------------------------------------------

    def build_subscription(self) -> "PipelineSubscription":
        """Assemble a :class:`PipelineSubscription` from registered signals.

        Raises:
            RuntimeError: If :meth:`register_primary_done` was never called.
        """
        if self._primary_done is None:
            raise RuntimeError(
                "build() returned without calling context.register_primary_done()"
            )
        return PipelineSubscription(
            disposable=CompositeDisposable(self._disposables),
            primary_done=self._primary_done,
            secondary_done=self._secondary_done,
            progress=self._progress,
        )

register_primary_done(obs)

Register the observable that signals primary-work completion.

The observable should emit (or complete) once the core recording work is done — e.g. the video file has been flushed and closed. Must be called exactly once before build() returns.

Source code in src/flow3r/core/pipeline/abc/pipeline.py
def register_primary_done(self, obs: Observable) -> None:
    """Register the observable that signals primary-work completion.

    The observable should emit (or complete) once the core recording work
    is done — e.g. the video file has been flushed and closed.  Must be
    called exactly once before ``build()`` returns.
    """
    if self._primary_done is not None:
        raise RuntimeError("register_primary_done() called more than once")
    self._primary_done = obs

register_secondary_done(obs)

Register the observable for secondary (post-processing) completion.

Optional. Use for follow-up steps (e.g. muxing) that happen after the primary recording files are closed.

Source code in src/flow3r/core/pipeline/abc/pipeline.py
def register_secondary_done(self, obs: Observable) -> None:
    """Register the observable for secondary (post-processing) completion.

    Optional.  Use for follow-up steps (e.g. muxing) that happen after the
    primary recording files are closed.
    """
    if self._secondary_done is not None:
        raise RuntimeError("register_secondary_done() called more than once")
    self._secondary_done = obs

register_progress(obs)

Register an observable that emits (completed, total) progress tuples.

Source code in src/flow3r/core/pipeline/abc/pipeline.py
def register_progress(self, obs: "Observable[Tuple[int, int]]") -> None:
    """Register an observable that emits ``(completed, total)`` progress tuples."""
    if self._progress is not None:
        raise RuntimeError("register_progress() called more than once")
    self._progress = obs

add_disposable(disposable)

Add a disposable that will be disposed when the pipeline is aborted.

Source code in src/flow3r/core/pipeline/abc/pipeline.py
def add_disposable(self, disposable: DisposableBase) -> None:
    """Add a disposable that will be disposed when the pipeline is aborted."""
    self._disposables.append(disposable)

build_subscription()

Assemble a :class:PipelineSubscription from registered signals.

Raises:

Type Description
RuntimeError

If :meth:register_primary_done was never called.

Source code in src/flow3r/core/pipeline/abc/pipeline.py
def build_subscription(self) -> "PipelineSubscription":
    """Assemble a :class:`PipelineSubscription` from registered signals.

    Raises:
        RuntimeError: If :meth:`register_primary_done` was never called.
    """
    if self._primary_done is None:
        raise RuntimeError(
            "build() returned without calling context.register_primary_done()"
        )
    return PipelineSubscription(
        disposable=CompositeDisposable(self._disposables),
        primary_done=self._primary_done,
        secondary_done=self._secondary_done,
        progress=self._progress,
    )

flow3r.core.pipeline.abc.pipeline.IPipeline

Bases: Protocol[TConfig]

Protocol for a data processing pipeline attached to a recording group.

A pipeline receives streams from one or more sources and performs an operation on them (e.g. encoding video to disk, running pose estimation).

Lifecycle::

pipeline.configure(ConfigureContext(config, settings, widget_service))
pipeline.build(PipelineContext(config, settings, widget_service, control), sources)
# … recording runs …
pipeline.dispose()
Source code in src/flow3r/core/pipeline/abc/pipeline.py
class IPipeline(Protocol[TConfig]):
    """Protocol for a data processing pipeline attached to a recording group.

    A pipeline receives streams from one or more sources and performs an
    operation on them (e.g. encoding video to disk, running pose estimation).

    Lifecycle::

        pipeline.configure(ConfigureContext(config, settings, widget_service))
        pipeline.build(PipelineContext(config, settings, widget_service, control), sources)
        # … recording runs …
        pipeline.dispose()
    """

    @property
    def supports_preview(self) -> bool:
        """Return ``True`` if this pipeline supports a live preview mode.

        The framework checks this after :meth:`configure` is called, so the
        value may depend on the current config (e.g. a checkbox that enables
        an overlay).  Set the backing field in :meth:`configure`:

        .. code-block:: python

            def configure(self, context):
                self._supports_preview = context.config.show_overlay

            @property
            def supports_preview(self):
                return self._supports_preview

        Defaults to ``False`` — pipelines must explicitly opt in to preview.
        """
        ...

    @property
    def supports_recording(self) -> bool:
        """Return ``True`` if this pipeline supports a recording (build) mode.

        Defaults to ``True``.  Override to ``False`` for viewer-only pipelines
        that only display live data and never write files.
        """
        ...

    def configure(self, context: ConfigureContext[TConfig]) -> None:
        """Configure the pipeline for the upcoming session.

        Called whenever the pipeline config changes and before every
        :meth:`build` or :meth:`preview` call.  Use this to stash per-session
        state and pre-allocate resources such as ML models.  Optional — the
        default implementation is a no-op.

        Args:
            context: Carries the resolved config, settings, and widget service.
        """
        ...

    def preview(self, context: PreviewContext[TConfig], sources: Dict[str, IStream]) -> None:
        """Start a live preview (non-recording) run.

        Override to display a live feed or analysis result without writing
        persistent data.  Register the preview's lifecycle via *context*:

        .. code-block:: python

            def preview(self, context: PreviewContext[MyConfig], sources):
                sub = my_sink.subscribe(sources["Video"])
                context.register_done(sub.done)
                context.add_disposable(sub.disposable)

        The default implementation is a no-op that completes immediately.

        Args:
            context: Carries config, settings, widget service, and registration
                methods.
            sources: Mapping of source name → stream for all sources in the group.
        """
        ...

    def build(self, context: PipelineContext[TConfig], sources: Dict[str, IStream]) -> None:
        """Start the pipeline (recording or processing run).

        Register completion signals via *context* rather than returning a value:

        .. code-block:: python

            def build(self, context: PipelineContext[MyConfig], sources):
                sub = my_sink.subscribe(sources["Video"])
                context.register_primary_done(sub.done)
                context.add_disposable(sub.disposable)

        Args:
            context: Carries config, settings, widget service, ``control``
                observable, and registration methods for done/progress signals.
            sources: Mapping of source name → stream for all sources in the group.
        """
        ...

    def dispose(self) -> None:
        """Release any resources held by the pipeline (models, file handles, threads, …)."""
        ...

supports_preview property

Return True if this pipeline supports a live preview mode.

The framework checks this after :meth:configure is called, so the value may depend on the current config (e.g. a checkbox that enables an overlay). Set the backing field in :meth:configure:

.. code-block:: python

def configure(self, context):
    self._supports_preview = context.config.show_overlay

@property
def supports_preview(self):
    return self._supports_preview

Defaults to False — pipelines must explicitly opt in to preview.

supports_recording property

Return True if this pipeline supports a recording (build) mode.

Defaults to True. Override to False for viewer-only pipelines that only display live data and never write files.

configure(context)

Configure the pipeline for the upcoming session.

Called whenever the pipeline config changes and before every :meth:build or :meth:preview call. Use this to stash per-session state and pre-allocate resources such as ML models. Optional — the default implementation is a no-op.

Parameters:

Name Type Description Default
context ConfigureContext[TConfig]

Carries the resolved config, settings, and widget service.

required
Source code in src/flow3r/core/pipeline/abc/pipeline.py
def configure(self, context: ConfigureContext[TConfig]) -> None:
    """Configure the pipeline for the upcoming session.

    Called whenever the pipeline config changes and before every
    :meth:`build` or :meth:`preview` call.  Use this to stash per-session
    state and pre-allocate resources such as ML models.  Optional — the
    default implementation is a no-op.

    Args:
        context: Carries the resolved config, settings, and widget service.
    """
    ...

preview(context, sources)

Start a live preview (non-recording) run.

Override to display a live feed or analysis result without writing persistent data. Register the preview's lifecycle via context:

.. code-block:: python

def preview(self, context: PreviewContext[MyConfig], sources):
    sub = my_sink.subscribe(sources["Video"])
    context.register_done(sub.done)
    context.add_disposable(sub.disposable)

The default implementation is a no-op that completes immediately.

Parameters:

Name Type Description Default
context PreviewContext[TConfig]

Carries config, settings, widget service, and registration methods.

required
sources Dict[str, IStream]

Mapping of source name → stream for all sources in the group.

required
Source code in src/flow3r/core/pipeline/abc/pipeline.py
def preview(self, context: PreviewContext[TConfig], sources: Dict[str, IStream]) -> None:
    """Start a live preview (non-recording) run.

    Override to display a live feed or analysis result without writing
    persistent data.  Register the preview's lifecycle via *context*:

    .. code-block:: python

        def preview(self, context: PreviewContext[MyConfig], sources):
            sub = my_sink.subscribe(sources["Video"])
            context.register_done(sub.done)
            context.add_disposable(sub.disposable)

    The default implementation is a no-op that completes immediately.

    Args:
        context: Carries config, settings, widget service, and registration
            methods.
        sources: Mapping of source name → stream for all sources in the group.
    """
    ...

build(context, sources)

Start the pipeline (recording or processing run).

Register completion signals via context rather than returning a value:

.. code-block:: python

def build(self, context: PipelineContext[MyConfig], sources):
    sub = my_sink.subscribe(sources["Video"])
    context.register_primary_done(sub.done)
    context.add_disposable(sub.disposable)

Parameters:

Name Type Description Default
context PipelineContext[TConfig]

Carries config, settings, widget service, control observable, and registration methods for done/progress signals.

required
sources Dict[str, IStream]

Mapping of source name → stream for all sources in the group.

required
Source code in src/flow3r/core/pipeline/abc/pipeline.py
def build(self, context: PipelineContext[TConfig], sources: Dict[str, IStream]) -> None:
    """Start the pipeline (recording or processing run).

    Register completion signals via *context* rather than returning a value:

    .. code-block:: python

        def build(self, context: PipelineContext[MyConfig], sources):
            sub = my_sink.subscribe(sources["Video"])
            context.register_primary_done(sub.done)
            context.add_disposable(sub.disposable)

    Args:
        context: Carries config, settings, widget service, ``control``
            observable, and registration methods for done/progress signals.
        sources: Mapping of source name → stream for all sources in the group.
    """
    ...

dispose()

Release any resources held by the pipeline (models, file handles, threads, …).

Source code in src/flow3r/core/pipeline/abc/pipeline.py
def dispose(self) -> None:
    """Release any resources held by the pipeline (models, file handles, threads, …)."""
    ...

flow3r.core.pipeline.abc.pipeline.PipelineBase

Bases: IPipeline[TConfig], ABC

Convenient base class for pipeline implementations.

Provides no-op default implementations of all :class:IPipeline methods so that subclasses only need to override the methods they care about.

Default capability flags: supports_preview = False, supports_recording = True. Override either property (or set the backing field in :meth:configure) to change per-instance.

Source code in src/flow3r/core/pipeline/abc/pipeline.py
class PipelineBase(IPipeline[TConfig], ABC):
    """Convenient base class for pipeline implementations.

    Provides no-op default implementations of all :class:`IPipeline` methods so
    that subclasses only need to override the methods they care about.

    Default capability flags: ``supports_preview = False``,
    ``supports_recording = True``.  Override either property (or set the
    backing field in :meth:`configure`) to change per-instance.
    """

    @property
    def supports_preview(self) -> bool:
        return False

    @property
    def supports_recording(self) -> bool:
        return True

    def configure(self, context: ConfigureContext[TConfig]) -> None:
        pass

    def preview(self, context: PreviewContext[TConfig], sources: Dict[str, IStream]) -> None:
        context.register_done(rx.from_list([None]))

    def build(self, context: PipelineContext[TConfig], sources: Dict[str, IStream]) -> None:
        context.register_primary_done(rx.from_list([None]))

    def dispose(self) -> None:
        pass

flow3r.core.pipeline.iterative_pipeline.IterativePipeline

Bases: PipelineBase[TConfig], Generic[TConfig]

Base class for imperative, single-threaded pipeline implementations.

Override :meth:run to process frames one by one. The method receives each source as a regular Python :class:~collections.abc.Iterable; just iterate over it until it is exhausted (which happens automatically when the recording stop is requested).

The default configure, preview, and dispose implementations from :class:~flow3r.core.pipeline.abc.pipeline.PipelineBase are inherited unchanged.

Source code in src/flow3r/core/pipeline/iterative_pipeline.py
class IterativePipeline(PipelineBase[TConfig], Generic[TConfig]):
    """Base class for imperative, single-threaded pipeline implementations.

    Override :meth:`run` to process frames one by one.  The method receives
    each source as a regular Python :class:`~collections.abc.Iterable`; just
    iterate over it until it is exhausted (which happens automatically when
    the recording stop is requested).

    The default ``configure``, ``preview``, and ``dispose`` implementations
    from :class:`~flow3r.core.pipeline.abc.pipeline.PipelineBase` are
    inherited unchanged.
    """

    def run(self, sources: Dict[str, Iterable]) -> None:
        """Process frames until all source iterables are exhausted.

        Args:
            sources: Mapping of input name → iterable of data items.
                Each iterable yields items until the recording stop is
                requested, then raises :class:`StopIteration`.

        Raises:
            Any exception raised here is forwarded to ``primary_done``
            as an observable error.
        """
        raise NotImplementedError(
            f"{self.__class__.__name__}.run() is not implemented"
        )

    # ------------------------------------------------------------------
    # PipelineBase override — internal plumbing
    # ------------------------------------------------------------------

    def build(self, context: PipelineContext, sources: Dict[str, IStream]) -> None:
        done_subject: AsyncSubject = AsyncSubject()
        context.register_primary_done(done_subject)

        iterators = {name: _ObservableIterator(stream.data) for name, stream in sources.items()}

        def _dispose_iterators():
            for it in iterators.values():
                it.dispose()

        def _thread_target():
            try:
                self.run({name: it for name, it in iterators.items()})
                done_subject.on_next(None)
                done_subject.on_completed()
            except Exception as exc:
                done_subject.on_error(exc)
            finally:
                _dispose_iterators()

        def _on_start(_):
            thread = threading.Thread(target=_thread_target, daemon=True)
            thread.start()

        def _on_stop():
            # Completion of control signals stop — unblock the iterators so
            # the run() method exits naturally via StopIteration.
            _dispose_iterators()

        context.add_disposable(
            context.control.subscribe(on_next=_on_start, on_completed=_on_stop)
        )

run(sources)

Process frames until all source iterables are exhausted.

Parameters:

Name Type Description Default
sources Dict[str, Iterable]

Mapping of input name → iterable of data items. Each iterable yields items until the recording stop is requested, then raises :class:StopIteration.

required
Source code in src/flow3r/core/pipeline/iterative_pipeline.py
def run(self, sources: Dict[str, Iterable]) -> None:
    """Process frames until all source iterables are exhausted.

    Args:
        sources: Mapping of input name → iterable of data items.
            Each iterable yields items until the recording stop is
            requested, then raises :class:`StopIteration`.

    Raises:
        Any exception raised here is forwarded to ``primary_done``
        as an observable error.
    """
    raise NotImplementedError(
        f"{self.__class__.__name__}.run() is not implemented"
    )

flow3r.core.pipeline.abc.pipeline.PipelineSubscription

Returned internally by :meth:PipelineContext.build_subscription.

The framework uses this to track and dispose an active recording run. Pipeline authors do not construct this directly — use the registration methods on :class:PipelineContext instead.

Attributes:

Name Type Description
primary_done

Observable that emits once the primary processing step completes.

secondary_done

Observable that emits once any secondary step completes.

progress

Observable of (completed, total) tuples for progress reporting.

Source code in src/flow3r/core/pipeline/abc/pipeline.py
class PipelineSubscription:
    """Returned internally by :meth:`PipelineContext.build_subscription`.

    The framework uses this to track and dispose an active recording run.
    Pipeline authors do not construct this directly — use the registration
    methods on :class:`PipelineContext` instead.

    Attributes:
        primary_done: Observable that emits once the primary processing step completes.
        secondary_done: Observable that emits once any secondary step completes.
        progress: Observable of ``(completed, total)`` tuples for progress reporting.
    """

    def __init__(
        self,
        disposable: DisposableBase,
        primary_done: Observable[Any],
        secondary_done: Optional[Observable[Any]] = None,
        progress: Optional[Observable[Tuple[int, int]]] = None
    ):
        self.disposable = disposable
        self.primary_done = primary_done
        self.secondary_done = secondary_done or rx.from_list([None])
        self.progress = progress or rx.from_list([(0, 0)])

    def dispose(self) -> None:
        """Abort the pipeline run and release resources."""
        self.disposable.dispose()

dispose()

Abort the pipeline run and release resources.

Source code in src/flow3r/core/pipeline/abc/pipeline.py
def dispose(self) -> None:
    """Abort the pipeline run and release resources."""
    self.disposable.dispose()

flow3r.core.pipeline.abc.pipeline.PreviewSubscription

Returned internally by :meth:PreviewContext.build_subscription.

The framework uses this to track and dispose an active preview. Pipeline authors do not construct this directly — use :meth:PreviewContext.register_done instead.

Source code in src/flow3r/core/pipeline/abc/pipeline.py
class PreviewSubscription:
    """Returned internally by :meth:`PreviewContext.build_subscription`.

    The framework uses this to track and dispose an active preview.
    Pipeline authors do not construct this directly — use
    :meth:`PreviewContext.register_done` instead.
    """

    def __init__(self, disposable: DisposableBase, preview_done: Observable[Any]):
        self.disposable = disposable
        self.done = preview_done

    def dispose(self) -> None:
        """Stop the preview and release resources."""
        self.disposable.dispose()

dispose()

Stop the preview and release resources.

Source code in src/flow3r/core/pipeline/abc/pipeline.py
def dispose(self) -> None:
    """Stop the preview and release resources."""
    self.disposable.dispose()

Config base classes

flow3r.core.config.abc.config.ConfigBase dataclass

Bases: ABC, IConfig

Source code in src/flow3r/core/config/abc/config.py
@dataclass
class ConfigBase(ABC, IConfig):
    VERSION: ClassVar[int] = 1

    def to_dict(self) -> Dict[str, Any]:
        return {
            "version": self.VERSION,
            "data": self._to_dict_data(),
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any], type_registry: Dict[str, Type[ITypedConfig]]) -> Self:
        version = data.get("version")
        inner_data = data.get("data")

        if not isinstance(version, int):
            raise ConfigVersionError(f"Invalid config version: {version!r}")

        if not isinstance(inner_data, dict):
            raise ConfigError(f"Invalid config data")

        if version != cls.VERSION:
            assert isinstance(version, int)
            inner_data = cls._migrate_data(inner_data, from_version=version)

        return cls._from_dict_data(inner_data, type_registry)

    def _to_dict_data(self) -> Dict[str, Any]:
        return self.__dict__

    @classmethod
    def _from_dict_data(cls, data: Dict[str, Any], type_registry: Dict[str, Type[ITypedConfig]]) -> Self:
        return cls(**data)

    @classmethod
    def _migrate_data(cls, data: Dict[str, Any], from_version: int) -> Dict[str, Any]:
        raise ConfigVersionError(
            f"Unable to migrate config from version {from_version} to {cls.VERSION}"
        )

flow3r.core.source.abc.source_config.SourceConfigBase dataclass

Bases: ConfigBase, ISourceConfig

Source code in src/flow3r/core/source/abc/source_config.py
@dataclass
class SourceConfigBase(ConfigBase, ISourceConfig):
    TYPE_ID: ClassVar[str]

flow3r.core.pipeline.abc.pipeline_config.PipelineConfigBase dataclass

Bases: ConfigBase, IPipelineConfig, ABC

Source code in src/flow3r/core/pipeline/abc/pipeline_config.py
class PipelineConfigBase(ConfigBase, IPipelineConfig, ABC):
    TYPE_ID: ClassVar[str]

    @property
    def settings_dependencies(self) -> Set[Tuple[str, ...]]:
        return set()

    @property
    @abstractmethod
    def inputs(self) -> List[str]: ...

    @property
    def optional_inputs(self) -> List[str]:
        return []

    @property
    def files(self) -> List[str]:
        return []

    def resolve(self, placeholder_provider: IPlaceholderProvider) -> Self:
        return self