Source code for hermespy.core.state

# -*- coding: utf-8 -*-

from __future__ import annotations
from typing import Sequence, TypeVar

import numpy as np

from .antennas import AntennaArrayState, AntennaPort, IdealAntenna
from .transformation import Transformation

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class State(object): """Base of all state data containers. They describe the information available to different stages of the signal processing pipelines and are used to pass information to the corresponding signal processing algorithms. """ __timestamp: float __pose: Transformation __velocity: np.ndarray __carrier_frequency: float __sampling_rate: float __antennas: AntennaArrayState def __init__( self, timestamp: float, pose: Transformation, velocity: np.ndarray, carrier_frequency: float, sampling_rate: float, antennas: AntennaArrayState, ) -> None: """ Args: timestamp (float): Global timestamp of the device state. pose (Transformation): Pose of the device with respect to the global coordinate system. velocity (numpy.ndarray): Velocity of the device in m/s. carrier_frequency (float): Carrier frequency of the device in Hz. sampling_rate (float): Sampling rate of the device in Hz. antennas (AntennaArrayState): State of the device's antenna array. """ # Initialize class attributes self.__timestamp = timestamp self.__pose = pose self.__velocity = velocity self.__carrier_frequency = carrier_frequency self.__sampling_rate = sampling_rate self.__antennas = antennas @property def timestamp(self) -> float: """Global timestamp of the device state.""" return self.__timestamp @property def pose(self) -> Transformation: """Global pose of the device.""" return self.__pose @property def position(self) -> np.ndarray: """Global position of the device in cartesian coordinates. Shorthand to :attr:`State.pose.translation`. """ return self.pose.translation @property def velocity(self) -> np.ndarray: """Velocity of the device in m/s.""" return self.__velocity @property def carrier_frequency(self) -> float: """Carrier frequency of the device in Hz.""" return self.__carrier_frequency @property def sampling_rate(self) -> float: """Sampling rate of the device in Hz.""" return self.__sampling_rate @property def antennas(self) -> AntennaArrayState: """State of the device's antenna array.""" return self.__antennas
[docs] class TransmitState(State): """State of a during transmit signal processing. Generated by calling the :meth:`transmit_state<DeviceState.transmit_state>` method of a :class:`DeviceState`. """ __num_digital_transmit_ports: int def __init__( self, timestamp: float, pose: Transformation, velocity: np.ndarray, carrier_frequency: float, sampling_rate: float, antennas: AntennaArrayState, num_digital_transmit_ports: int, ) -> None: """ Args: timestamp (float): Global timestamp of the device state. pose (Transformation): Pose of the device with respect to the global coordinate system. velocity (numpy.ndarray): Velocity of the device in m/s. carrier_frequency (float): Carrier frequency of the device in Hz. sampling_rate (float): Sampling rate of the device in Hz. antennas (AntennaArrayState): State of the device's antenna array. num_digital_transmit_ports (int): Number of digital transmit streams to be generated by the signal processing algorithm. """ # Initialize base class State.__init__(self, timestamp, pose, velocity, carrier_frequency, sampling_rate, antennas) # Initialize class attributes self.__num_digital_transmit_ports = num_digital_transmit_ports @property def num_digital_transmit_ports(self) -> int: """Number of digital transmit streams to be generated by the signal processing algorithm.""" return self.__num_digital_transmit_ports
[docs] class ReceiveState(State): """State of a device during receive signal processing. Generated by calling the :meth:`receive_state<DeviceState.receive_state>` method of a :class:`DeviceState`. """ __num_digital_receive_ports: int def __init__( self, timestamp: float, pose: Transformation, velocity, carrier_frequency: float, sampling_rate: float, antennas: AntennaArrayState, num_digital_receive_ports: int, ) -> None: """ Args: timestamp (float): Global timestamp of the device state. pose (Transformation): Pose of the device with respect to the global coordinate system. velocity (numpy.ndarray): Velocity of the device in m/s. carrier_frequency (float): Carrier frequency of the device in Hz. sampling_rate (float): Sampling rate of the device in Hz. antennas (AntennaArrayState): State of the device's antenna array. num_digital_receive_ports (int): Number of digital receive streams to be consumed by the signal processing algorithm. """ # Initialize base class State.__init__(self, timestamp, pose, velocity, carrier_frequency, sampling_rate, antennas) # Initialize class attributes self.__num_digital_receive_ports = num_digital_receive_ports @property def num_digital_receive_ports(self) -> int: """Number of digital receive streams to be consumed by the signal processing algorithm.""" return self.__num_digital_receive_ports
[docs] class DeviceState(State): """Data container representing the immutable physical state of a device. Generated by calling the :meth:`state<Device.state>` property of a :class:`Device`. In order to avoid confusion with the mutable state of the device, this class is immutable and only references the represented device by its memory address and time stamp. """ __device_id: int __num_digital_transmit_ports: int __num_digital_receive_ports: int def __init__( self, device_id: int, timestamp: float, pose: Transformation, velocity: np.ndarray, carrier_frequency: float, sampling_rate: float, num_digital_transmit_ports: int, num_digital_receive_ports: int, antennas: AntennaArrayState, ) -> None: """ Args: device_id (int): Unique identifier of the device this state represents. timestamp (float): Time stamp of the device state. pose (Transformation): Pose of the device with respect to the global coordinate system. velocity (numpy.ndarray): Velocity of the device in m/s. carrier_frequency (float): Carrier frequency of the device in Hz. sampling_rate (float): Sampling rate of the device in Hz. num_digital_transmit_ports (int): Number of digital transmit streams feeding into the device's signal encoding layer. This is the number of streams expected to be generated by transmit DSP algorithms by default. num_digital_receive_ports (int): Number of digital receive streams emerging from the device's signal decoding layer. This the number of streams expected to be feeding into receive DSP algorithms by default. antennas (AntennaArrayState): State of the device's antenna array. """ # Initialize base class State.__init__(self, timestamp, pose, velocity, carrier_frequency, sampling_rate, antennas) # Initialize class attributes self.__device_id = device_id self.__num_digital_receive_ports = num_digital_receive_ports self.__num_digital_transmit_ports = num_digital_transmit_ports @property def device_id(self) -> int: """Unique identifier of the device this state represents.""" return self.__device_id @property def position(self) -> np.ndarray: """Global position of the device in cartesian coordinates. Shorthand to :attr:`DeviceState.pose.translation`. """ return self.pose.translation @property def num_digital_transmit_ports(self) -> int: """Number of digital transmit streams feeding into the device's signal encoding layer. This is the number of streams expected to be generated by transmit DSP algorithms by default. """ return self.__num_digital_transmit_ports @property def num_digital_receive_ports(self) -> int: """Number of digital receive streams emerging from the device's signal decoding layer. This the number of streams expected to be feeding into receive DSP algorithms by default. """ return self.__num_digital_receive_ports
[docs] def transmit_state( self, selected_digital_transmit_ports: Sequence[int] | None = None ) -> TransmitState: """Generate a device state for transmission. Args: selected_digital_transmit_ports (Sequence[int], optional): Indices of the selected digital transmit ports. If not specified, all available transmit ports will be considered. """ _num_digital_transmit_ports = ( self.num_digital_transmit_ports if selected_digital_transmit_ports is None else len(selected_digital_transmit_ports) ) _selected_digital_transmit_ports = ( list(range(self.antennas.num_transmit_ports)) if selected_digital_transmit_ports is None else selected_digital_transmit_ports ) return TransmitState( self.timestamp, self.pose, self.velocity, self.carrier_frequency, self.sampling_rate, self.antennas[_selected_digital_transmit_ports], _num_digital_transmit_ports, )
[docs] def receive_state( self, selected_digital_receive_ports: Sequence[int] | None = None ) -> ReceiveState: """Generate a device state for reception. Args: selected_digital_receive_ports (Sequence[int], optional): Indices of the selected digital receive ports. If not specified, all available receive ports will be considered. """ _num_digital_receive_ports = ( self.num_digital_receive_ports if selected_digital_receive_ports is None else len(selected_digital_receive_ports) ) _selected_digital_receive_ports = ( list(range(self.antennas.num_receive_ports)) if selected_digital_receive_ports is None else selected_digital_receive_ports ) return ReceiveState( self.timestamp, self.pose, self.velocity, self.carrier_frequency, self.sampling_rate, self.antennas[_selected_digital_receive_ports], _num_digital_receive_ports, )
[docs] @staticmethod def Basic( num_digital_transmit_ports: int = 1, num_digital_receive_ports: int = 1, carrier_frequency: float = 0.0, sampling_rate: float = 1.0, pose: Transformation | None = None, ) -> DeviceState: """Create a basic state initilized with default values. Useful shorthand for debugging and testing. Args: num_digital_transmit_ports (int, optional): Number of digital transmit streams feeding into the device's signal encoding layer. This is the number of streams expected to be generated by transmit DSP algorithms by default. num_digital_receive_ports (int, optional): Number of digital receive streams emerging from the device's signal decoding layer. This the number of streams expected to be feeding into receive DSP algorithms by default. carrier_frequency (float, optional): Carrier frequency of the device in Hz. If not specified, base band is assumed. sampling_rate (float, optional): Sampling rate of the device in Hz. If not specified, 1 Hz is assumed. pose (Transformation, optional): Pose of the device with respect to the global coordinate system. If not specified, the identity transformation is assumed, so the device is located in the origin. Returns: The initialized state. """ _pose = Transformation.No() if pose is None else pose return DeviceState( 0, # ID 0.0, # Timestamp _pose, np.zeros(3), # Velocity carrier_frequency, sampling_rate, num_digital_transmit_ports, num_digital_receive_ports, AntennaArrayState( [ AntennaPort([IdealAntenna()]) for _ in range(max(num_digital_receive_ports, num_digital_transmit_ports)) ], _pose, ), )
DST = TypeVar("DST", bound=DeviceState) """Type variable for device states."""