Source code for hermespy.hardware_loop.uhd.usrp

# -*- coding: utf-8 -*-
"""
==========
UHD Device
==========
"""

from __future__ import annotations
from collections.abc import Sequence
from datetime import datetime
from functools import cached_property
from typing import Any, List, Callable

import numpy as np
from zerorpc.exceptions import LostRemote, RemoteError
from usrp_client import UsrpClient, MimoSignal, TxStreamingConfig, RxStreamingConfig, RfConfig

from hermespy.core import (
    Antenna,
    AntennaArray,
    AntennaPort,
    IdealAntenna,
    Serializable,
    Signal,
    Transformation,
)
from ..physical_device import PhysicalDevice, PhysicalDeviceState

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


class UsrpAntennas(AntennaArray[AntennaPort, Antenna]):
    """Antenna and port configuration of a USRP device.

    :class:`UsrpAntennas`' port configuration is linked to the attached
    :class`UsrpDevice` and cannot be changed manually.
    """

    def __init__(self, device: UsrpDevice, pose: Transformation | None = None) -> None:
        """
        Args:

            device (UsrpDevice):
                USRP device this antenna array models.

            pose (Transformation, optional):
                Pose of the antenna array with respect to the `device`.
        """

        # Initialize base class
        AntennaArray.__init__(self, pose)

        # Initialize attributes
        self.__device = device
        self.__transmit_ports = [self._new_port() for _ in range(device.num_digital_transmit_ports)]
        self.__receive_ports = [self._new_port() for _ in range(device.num_digital_receive_ports)]

        # Configure kinematic chain
        self.set_base(device)

    @property
    def device(self) -> UsrpDevice:
        """USRP device the antenna array is attached to."""

        return self.__device

    @property
    def transmit_ports(self) -> Sequence[AntennaPort]:
        return self.__transmit_ports

    @property
    def receive_ports(self) -> Sequence[AntennaPort]:
        return self.__receive_ports

    @property
    def ports(self) -> Sequence[AntennaPort]:
        return self.__transmit_ports + self.__receive_ports

    @property
    def num_transmit_ports(self) -> int:
        return self.__device.num_digital_transmit_ports

    @property
    def num_receive_ports(self) -> int:
        return self.__device.num_digital_receive_ports

    @property
    def num_ports(self) -> int:
        return self.num_transmit_ports + self.num_receive_ports

    def _new_port(self) -> AntennaPort:
        return AntennaPort(array=self)

    @property
    def transmit_antennas(self) -> Sequence[Antenna]:
        return [IdealAntenna(pose=port.pose) for port in self.transmit_ports]

    @property
    def receive_antennas(self) -> Sequence[Antenna]:
        return [IdealAntenna(pose=port.pose) for port in self.receive_ports]

    @property
    def num_transmit_antennas(self) -> int:
        return self.num_transmit_ports

    @property
    def num_receive_antennas(self) -> int:
        return self.num_receive_ports


[docs] class UsrpDevice(PhysicalDevice[PhysicalDeviceState], Serializable): """Bindung to a USRP device via the UHD library.""" yaml_tag = "USRP" """YAML serialization tag""" property_blacklist = {"topology", "wavelength", "velocity"} __usrp_client: UsrpClient __num_rpc_retries = 10 __collection_enabled: bool __scale_transmission: bool __sampling_rate: float | None __num_prepended_zeros: int __num_appended_zeros: int __selected_transmit_ports: Sequence[int] __selected_receive_ports: Sequence[int] __max_selected_receive_port: int __max_selected_transmit_port: int __current_configuration: RfConfig __receive_delay: float # Configured front-end delay during reception def __init__( self, ip: str, port: int = 5555, carrier_frequency: float = 7e8, sampling_rate: float | None = None, tx_gain: float = 0.0, rx_gain: float = 0.0, scale_transmission: bool = True, num_prepended_zeros: int = 200, num_appended_zeros: int = 200, selected_transmit_ports: Sequence[int] | None = None, selected_receive_ports: Sequence[int] | None = None, **kwargs, ) -> None: """ Args: ip (str): The IP address of the USRP device. port (int, optional): The port of the USRP device. carrier_frequency (float, optional): Carrier frequency of the USRP device. :math:`700~\\mathrm{MHz}` by default. sampling_rate (float, optional): Sampling rate of the USRP device. If not provided, the sampling rate is determined from the configured operators. tx_gain (float, optional): The transmission gain of the USRP device. Zero by default. rx_gain (float, optional): The reception gain of the USRP device. Zero by default. scale_transmission (bool, optional): If `True`, the transmission signal is scaled to the maximum floating point value of the USRP device. This ensures a proper digital to analog conversion. num_prepended_zeros (int, optional): The number of zeros prepended to the transmission signal. :math:`200` by default. num_appended_zeros (int, optional): The number of zeros appended to the transmission signal. :math:`200` by default. selected_transmit_ports (Sequence[int], optional): Indices of the selected transmit antenna ports. If not specified, i.e. :py:obj:`None`, only the first antenna port is selected. selected_receive_ports (Sequence[int], optional): Indices of the selected receive antenna ports. If not specified, i.e. :py:obj:`None`, only the first antenna port is selected. \**kwargs: Additional arguments passed to the :class:`.PhysicalDevice` parent class. """ # Initialize base class PhysicalDevice.__init__(self, **kwargs) # Initialize attributes and configure RF frontend self.__usrp_client = UsrpClient.create(ip, port) # Query the available number of antennas ports max_num_ports = self.__usrp_client.getNumAntennas() # Configure transmit port selection if selected_transmit_ports is not None: self.__selected_transmit_ports = selected_transmit_ports elif max_num_ports > 0: self.__selected_transmit_ports = [0] else: self.__selected_transmit_ports = [] self.__max_selected_transmit_port = max(self.__selected_transmit_ports, default=0) if 1 + self.__max_selected_transmit_port > max_num_ports: raise ValueError( f"Selected transmit ports exceed the maximum number of ports ({1 + self.__max_selected_transmit_port} > {max_num_ports})" ) # Configure receive port selection if selected_receive_ports is not None: self.__selected_receive_ports = selected_receive_ports elif max_num_ports > 0: self.__selected_receive_ports = [0] else: self.__selected_receive_ports = [] self.__max_selected_receive_port = max(self.__selected_receive_ports, default=0) if 1 + self.__max_selected_receive_port > max_num_ports: raise ValueError( f"Selected receive ports exceed the maximum number of ports ({1 + self.__max_selected_receive_port} > {max_num_ports})" ) self.__antennas = UsrpAntennas(self) self.carrier_frequency = carrier_frequency self.tx_gain = tx_gain self.rx_gain = rx_gain self.sampling_rate = sampling_rate self.num_prepeneded_zeros = num_prepended_zeros self.num_appended_zeros = num_appended_zeros self.__current_configuration = self.__rpc_call_wrapper(self.__usrp_client.getRfConfig) self._configure_device(force=True) self.__collection_enabled = False self.__scale_transmission = scale_transmission self.__receive_delay = 0.0
[docs] def state(self) -> PhysicalDeviceState: return PhysicalDeviceState( id(self), datetime.now().timestamp(), self.pose, self.velocity, self.carrier_frequency, self.sampling_rate, self.num_digital_transmit_ports, self.num_digital_receive_ports, self.antennas.state(self.pose), )
def __rpc_call_wrapper(self, call: Callable, *args, **kwargs) -> Any: """Wrapper to RPC client calls to perform multiple calls to hack the timeout bug. Returns: The call return. Raises: RuntimeError: If the call didn't succeed within the configured amount of attempts. """ for _ in range(self.__num_rpc_retries): try: return call(*args, **kwargs) except LostRemote: continue except RemoteError as e: raise RuntimeError(f"Remote exception occured at '{self.ip}:{self.port}': {e.msg}") raise RuntimeError(f"Lost connection to the remote '{self.ip}:{self.port}'") def _configure_device(self, force: bool = False) -> None: tx_filter_bandwidth = 4e8 rx_filter_bandwidth = 4e8 tx_sampling_rate = self.sampling_rate rx_sampling_rate = self.sampling_rate tx_carrier_frequency = self.carrier_frequency rx_carrier_frequency = self.carrier_frequency tx_gain = self.tx_gain rx_gain = self.rx_gain # Check if a frontend reconfiguration is required if ( any( [ tx_filter_bandwidth != self.__current_configuration.txAnalogFilterBw, rx_filter_bandwidth != self.__current_configuration.rxAnalogFilterBw, tx_sampling_rate != self.__current_configuration.txSamplingRate, rx_sampling_rate != self.__current_configuration.rxSamplingRate, tx_carrier_frequency != self.__current_configuration.txCarrierFrequency, rx_carrier_frequency != self.__current_configuration.rxCarrierFrequency, tx_gain != self.__current_configuration.txGain, rx_gain != self.__current_configuration.rxGain, self.selected_transmit_ports != self.__current_configuration.txAntennaMapping, self.selected_receive_ports != self.__current_configuration.rxAntennaMapping, ] ) or force ): config = RfConfig( txAnalogFilterBw=tx_filter_bandwidth, rxAnalogFilterBw=rx_filter_bandwidth, txSamplingRate=tx_sampling_rate, rxSamplingRate=rx_sampling_rate, txCarrierFrequency=tx_carrier_frequency, rxCarrierFrequency=rx_carrier_frequency, txGain=tx_gain, rxGain=rx_gain, noTxStreams=self.num_digital_transmit_ports, noRxStreams=self.num_digital_receive_ports, txAntennaMapping=self.selected_transmit_ports, rxAntennaMapping=self.selected_receive_ports, ) self.__rpc_call_wrapper(self.__usrp_client.configureRfConfig, config) self.__current_configuration = config def _upload(self, baseband_signal: Signal) -> Signal: baseband_signal = baseband_signal.copy() # Configure device self._configure_device() # Reset the streaming config self.__rpc_call_wrapper(self.__usrp_client.resetStreamingConfigs) # Make a copy in order to avoid changing baseband_signal uploaded_samples = baseband_signal.copy() # Apply the antenna array calibration for block in uploaded_samples: self.antenna_calibration.correct_transmission(block) # Scale signal to a maximum absolute vlaue of zero to full exploit the DAC range if uploaded_samples.num_samples > 0 and self.scale_transmission: maxAmp = np.abs([b for b in uploaded_samples]).max() if maxAmp != 0: for block in uploaded_samples: block /= maxAmp # Make a copy here corrected_uploaded_samples = uploaded_samples.copy() # Hack: Prepend some zeros to account for the premature transmission stop uploaded_samples.set_samples( np.concatenate( ( np.zeros( (uploaded_samples.num_streams, self.num_prepeneded_zeros), dtype=np.complex128 ), uploaded_samples.getitem(), np.zeros( (uploaded_samples.num_streams, self.num_appended_zeros), dtype=np.complex128 ), ), axis=1, ) ) if uploaded_samples.num_samples % 4 != 0: uploaded_samples.set_samples( np.append( uploaded_samples.getitem(), np.zeros( (uploaded_samples.num_streams, 4 - uploaded_samples.num_samples % 4), dtype=complex, ), axis=1, ) ) # Append a zero vector for unselected transmit ports # Workaround for the USRP wrapper missing dedicated port selections transmit_delay = max(0.0, -self.delay_calibration.delay) self.__receive_delay = max(0.0, self.delay_calibration.delay) uploaded_samples.delay = transmit_delay tx_config = TxStreamingConfig(transmit_delay, MimoSignal(uploaded_samples.getitem())) self.__rpc_call_wrapper(self.__usrp_client.configureTx, tx_config) # Configure reception duration = max(self.receivers.min_frame_duration, uploaded_samples.duration) if duration > 0.0: num_receive_samples = int((duration + self.max_receive_delay) * self.sampling_rate) # Hack num_receive_samples += self.num_prepeneded_zeros + self.num_appended_zeros # Workaround for the uneven sample bug num_receive_samples += 4 - num_receive_samples % 4 rx_config = RxStreamingConfig(self.__receive_delay, num_receive_samples) self.__rpc_call_wrapper(self.__usrp_client.configureRx, rx_config) self.__collection_enabled = True else: num_receive_samples = ( uploaded_samples.num_samples + 4 - uploaded_samples.num_samples % 4 ) rx_config = RxStreamingConfig(self.__receive_delay, num_receive_samples) self.__rpc_call_wrapper(self.__usrp_client.configureRx, rx_config) self.__collection_enabled = True return corrected_uploaded_samples
[docs] def trigger(self) -> None: # Queue execution command self.__usrp_client.executeImmediately()
def _download(self) -> Signal: # Abort if no samples are to be expcted during collection if not self.__collection_enabled: return Signal.Empty(self.sampling_rate, self.num_receive_antenna_ports) mimo_signals = self.__usrp_client.collect() signal_model = Signal.Empty( self.sampling_rate, self.num_receive_antenna_ports, carrier_frequency=self.carrier_frequency, delay=self.__receive_delay, ) for mimo_signal in mimo_signals: streams = np.array( [mimo_signal.signals[i] for i in range(self.num_receive_antenna_ports)] ) signal_model.append_samples(streams) # Remove the zero padding hack signal_model.set_samples( signal_model.getitem( ( slice(None, None), slice( self.num_prepeneded_zeros, signal_model.num_samples - self.num_appended_zeros, ), ) ) ) # Apply the antenna array calibration for block in signal_model: self.antenna_calibration.correct_reception(block) return signal_model @property def _client(self) -> UsrpClient: """Access to the UHD client. Returns: Handle to the client. """ return self.__usrp_client @property def ip(self) -> str: """Internet protocol address of the remote host. Returns: IP adress. """ return self.__usrp_client.ip @property def port(self) -> int: """Internet protocol port of the remote host. Returns: Port. """ return self.__usrp_client.port @property def tx_gain(self) -> float: """Gain of the transmitting front-end in dB.""" return self.__tx_gain @tx_gain.setter def tx_gain(self, value: float) -> None: self.__tx_gain = value @property def rx_gain(self) -> float: """Gain of the receiving front-end in dB.""" return self.__rx_gain @rx_gain.setter def rx_gain(self, value: float) -> None: self.__rx_gain = value @property def num_transmit_antenna_ports(self) -> int: """Number of transmit ports controlled on the USRP device.""" return len(self.__selected_transmit_ports) @property def num_receive_antenna_ports(self) -> int: """Number of receive ports controlled on the USRP device.""" return len(self.__selected_receive_ports) @property def selected_transmit_ports(self) -> Sequence[int]: """Indices of the selected transmit ports.""" return self.__selected_transmit_ports @property def selected_receive_ports(self) -> Sequence[int]: """Indices of the selected receive ports.""" return self.__selected_receive_ports @property def antennas(self) -> UsrpAntennas: """Antenna array model of the USRP device. Allows for the further configuration of the antenna elements, however, the number of :attr:`transmit ports<num_transmit_ports>` and :attr:`receive ports<num_receive_ports>` is fixed. """ return self.__antennas @property def sampling_rate(self) -> float: if self.__sampling_rate is not None: return self.__sampling_rate ideal_sampling_rate = ( self.transmitters.max_sampling_rate if self.transmitters.num_operators > 0 else self.receivers.max_sampling_rate ) selected_sampling_rate = min( self.__supported_sampling_rates, key=lambda x: abs(x - ideal_sampling_rate) ) return selected_sampling_rate @sampling_rate.setter def sampling_rate(self, value: float) -> None: self.__sampling_rate = value @property def max_sampling_rate(self) -> float: return max(self.__supported_sampling_rates) @property def carrier_frequency(self) -> float: return self.__carrier_frequency @carrier_frequency.setter def carrier_frequency(self, value: float) -> None: self.__carrier_frequency = value @property def num_prepeneded_zeros(self) -> int: """Number of zero padding samples prepended to the transmitted signal. Returns: Number of prepended samples. Raises: ValueError: For negative values. """ return self.__num_prepended_zeros @num_prepeneded_zeros.setter def num_prepeneded_zeros(self, value: int) -> None: if value < 0: raise ValueError(f"Number of prepended zeros must be non-negative ({value} < 0)") self.__num_prepended_zeros = value @property def num_appended_zeros(self) -> int: """Number of zero padding samples appended to the transmitted signal. Returns: Number of appended samples. Raises: ValueError: For negative values. """ return self.__num_appended_zeros @num_appended_zeros.setter def num_appended_zeros(self, value: int) -> None: if value < 0: raise ValueError(f"Number of appended zeros must be non-negative ({value} < 0)") self.__num_appended_zeros = value @cached_property def __supported_sampling_rates(self) -> List[float]: return self.__usrp_client.getSupportedSamplingRates() @property def scale_transmission(self) -> bool: """Indicates whether the transmission is scaled to the full DAC range. Returns: Boolean enabled flag. """ return self.__scale_transmission @scale_transmission.setter def scale_transmission(self, value: bool) -> None: self.__scale_transmission = value