Source code for swc.aeon.schema.core
"""Schema definition for core Harp data streams."""
import swc.aeon.io.reader as _reader
from swc.aeon.schema import Stream, StreamGroup
[docs]
class Heartbeat(Stream):
    """Heartbeat event for Harp devices."""
    def __init__(self, pattern):
        """Initializes the Heartbeat stream."""
        super().__init__(_reader.Heartbeat(f"{pattern}_8_*")) 
[docs]
class Video(Stream):
    """Video frame metadata."""
    def __init__(self, pattern):
        """Initializes the Video stream."""
        super().__init__(_reader.Video(f"{pattern}_*")) 
[docs]
class Position(Stream):
    """Position tracking data for the specified camera."""
    def __init__(self, pattern):
        """Initializes the Position stream."""
        super().__init__(_reader.Position(f"{pattern}_200_*")) 
[docs]
class Encoder(Stream):
    """Wheel magnetic encoder data."""
    def __init__(self, pattern):
        """Initializes the Encoder stream."""
        super().__init__(_reader.Encoder(f"{pattern}_90_*")) 
[docs]
class Environment(StreamGroup):
    """Metadata for environment mode and subjects."""
    def __init__(self, pattern):
        """Initializes the Environment stream group."""
        super().__init__(pattern, EnvironmentState, SubjectState) 
[docs]
class EnvironmentState(Stream):
    """Environment state log."""
    def __init__(self, pattern):
        """Initializes the EnvironmentState stream."""
        super().__init__(_reader.Csv(f"{pattern}_EnvironmentState_*", ["state"])) 
[docs]
class SubjectState(Stream):
    """Subject state log."""
    def __init__(self, pattern):
        """Initialises the SubjectState stream."""
        super().__init__(_reader.Subject(f"{pattern}_SubjectState_*")) 
[docs]
class MessageLog(Stream):
    """Message log data."""
    def __init__(self, pattern):
        """Initializes the MessageLog stream."""
        super().__init__(_reader.Log(f"{pattern}_MessageLog_*"))