# -*- coding: utf-8 -*-
"""
==========
UHD Device
==========
"""
from __future__ import annotations
from collections.abc import Sequence
from functools import cached_property
from typing import Any, List, Callable
import numpy as np
from zerorpc.exceptions import LostRemote, RemoteError
from usrp_client import UsrpClient, MimoSignal, TxStreamingConfig, RxStreamingConfig, RfConfig
from hermespy.core import Antenna, AntennaArray, AntennaPort, Serializable, Signal, Transformation
from ..physical_device import PhysicalDevice
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
class UsrpAntennas(AntennaArray[AntennaPort, Antenna]):
"""Antenna and port configuration of a USRP device.
:class:`UsrpAntennas`' port configuration is linked to the attached
:class`UsrpDevice` and cannot be changed manually.
"""
def __init__(self, device: UsrpDevice, pose: Transformation | None = None) -> None:
"""
Args:
device (UsrpDevice):
USRP device this antenna array models.
pose (Transformation, optional):
Pose of the antenna array with respect to the `device`.
"""
# Initialize base class
AntennaArray.__init__(self, pose)
# Initialize attributes
self.__device = device
self.__transmit_ports = [self._new_port() for _ in range(device.num_transmit_ports)]
self.__receive_ports = [self._new_port() for _ in range(device.num_receive_ports)]
# Configure kinematic chain
self.set_base(device)
@property
def device(self) -> UsrpDevice:
"""USRP device the antenna array is attached to."""
return self.__device
@property
def transmit_ports(self) -> Sequence[AntennaPort]:
return self.__transmit_ports
@property
def receive_ports(self) -> Sequence[AntennaPort]:
return self.__receive_ports
@property
def ports(self) -> Sequence[AntennaPort]:
return self.__transmit_ports + self.__receive_ports
@property
def num_transmit_ports(self) -> int:
return self.__device.num_transmit_ports
@property
def num_receive_ports(self) -> int:
return self.__device.num_receive_ports
@property
def num_ports(self) -> int:
return self.num_transmit_ports + self.num_receive_ports
def _new_port(self) -> AntennaPort:
return AntennaPort(array=self)
[docs]
class UsrpDevice(PhysicalDevice, Serializable):
"""Bindung to a USRP device via the UHD library."""
yaml_tag = "USRP"
"""YAML serialization tag"""
property_blacklist = {"topology", "wavelength", "velocity"}
__usrp_client: UsrpClient
__num_rpc_retries = 10
__collection_enabled: bool
__scale_transmission: bool
__sampling_rate: float | None
__num_prepended_zeros: int
__num_appended_zeros: int
__selected_transmit_ports: Sequence[int]
__selected_receive_ports: Sequence[int]
__max_selected_receive_port: int
__max_selected_transmit_port: int
__current_configuration: RfConfig
def __init__(
self,
ip: str,
port: int = 5555,
carrier_frequency: float = 7e8,
sampling_rate: float | None = None,
tx_gain: float = 0.0,
rx_gain: float = 0.0,
scale_transmission: bool = True,
num_prepended_zeros: int = 200,
num_appended_zeros: int = 200,
selected_transmit_ports: Sequence[int] | None = None,
selected_receive_ports: Sequence[int] | None = None,
**kwargs,
) -> None:
"""
Args:
ip (str):
The IP address of the USRP device.
port (int, optional):
The port of the USRP device.
carrier_frequency (float, optional):
Carrier frequency of the USRP device.
:math:`700~\\mathrm{MHz}` by default.
sampling_rate (float, optional):
Sampling rate of the USRP device.
If not provided, the sampling rate is determined from the configured operators.
tx_gain (float, optional):
The transmission gain of the USRP device.
Zero by default.
rx_gain (float, optional):
The reception gain of the USRP device.
Zero by default.
scale_transmission (bool, optional):
If `True`, the transmission signal is scaled to the maximum floating point value of the USRP device.
This ensures a proper digital to analog conversion.
num_prepended_zeros (int, optional):
The number of zeros prepended to the transmission signal.
:math:`200` by default.
num_appended_zeros (int, optional):
The number of zeros appended to the transmission signal.
:math:`200` by default.
selected_transmit_ports (Sequence[int], optional):
Indices of the selected transmit antenna ports.
If not specified, i.e. :py:obj:`None`, only the first antenna port is selected.
selected_receive_ports (Sequence[int], optional):
Indices of the selected receive antenna ports.
If not specified, i.e. :py:obj:`None`, only the first antenna port is selected.
\**kwargs:
Additional arguments passed to the :class:`.PhysicalDevice` parent class.
"""
# Initialize base class
PhysicalDevice.__init__(self, **kwargs)
# Initialize attributes and configure RF frontend
self.__usrp_client = UsrpClient.create(ip, port)
# Query the available number of antennas ports
max_num_ports = self.__usrp_client.getNumAntennas()
self.__selected_transmit_ports = selected_transmit_ports or [0] if max_num_ports > 0 else []
self.__max_selected_transmit_port = max(self.__selected_transmit_ports)
if 1 + self.__max_selected_transmit_port > max_num_ports:
raise ValueError(
f"Selected transmit ports exceed the maximum number of ports ({1 + self.__max_selected_transmit_port} > {max_num_ports})"
)
self.__selected_receive_ports = selected_receive_ports or [0] if max_num_ports > 0 else []
self.__max_selected_receive_port = max(self.__selected_receive_ports)
if 1 + self.__max_selected_receive_port > max_num_ports:
raise ValueError(
f"Selected receive ports exceed the maximum number of ports ({1 + self.__max_selected_receive_port} > {max_num_ports})"
)
self.__antennas = UsrpAntennas(self)
self.carrier_frequency = carrier_frequency
self.tx_gain = tx_gain
self.rx_gain = rx_gain
self.sampling_rate = sampling_rate
self.num_prepeneded_zeros = num_prepended_zeros
self.num_appended_zeros = num_appended_zeros
self.__current_configuration = self.__rpc_call_wrapper(self.__usrp_client.getRfConfig)
self._configure_device(force=True)
self.__collection_enabled = False
self.__scale_transmission = scale_transmission
def __rpc_call_wrapper(self, call: Callable, *args, **kwargs) -> Any:
"""Wrapper to RPC client calls to perform multiple calls to hack the timeout bug.
Returns: The call return.
Raises:
RuntimeError: If the call didn't succeed within the configured amount of attempts.
"""
for _ in range(self.__num_rpc_retries):
try:
return call(*args, **kwargs)
except LostRemote:
continue
except RemoteError as e:
raise RuntimeError(f"Remote exception occured at '{self.ip}:{self.port}': {e.msg}")
raise RuntimeError(f"Lost connection to the remote '{self.ip}:{self.port}'")
def _configure_device(self, force: bool = False) -> None:
tx_filter_bandwidth = 4e8
rx_filter_bandwidth = 4e8
tx_sampling_rate = self.sampling_rate
rx_sampling_rate = self.sampling_rate
tx_carrier_frequency = self.carrier_frequency
rx_carrier_frequency = self.carrier_frequency
tx_gain = self.tx_gain
rx_gain = self.rx_gain
# Check if a frontend reconfiguration is required
if (
any(
[
tx_filter_bandwidth != self.__current_configuration.txAnalogFilterBw,
rx_filter_bandwidth != self.__current_configuration.rxAnalogFilterBw,
tx_sampling_rate != self.__current_configuration.txSamplingRate,
rx_sampling_rate != self.__current_configuration.rxSamplingRate,
tx_carrier_frequency != self.__current_configuration.txCarrierFrequency,
rx_carrier_frequency != self.__current_configuration.rxCarrierFrequency,
tx_gain != self.__current_configuration.txGain,
rx_gain != self.__current_configuration.rxGain,
self.selected_transmit_ports != self.__current_configuration.txAntennaMapping,
self.selected_receive_ports != self.__current_configuration.rxAntennaMapping,
]
)
or force
):
config = RfConfig(
txAnalogFilterBw=tx_filter_bandwidth,
rxAnalogFilterBw=rx_filter_bandwidth,
txSamplingRate=tx_sampling_rate,
rxSamplingRate=rx_sampling_rate,
txCarrierFrequency=tx_carrier_frequency,
rxCarrierFrequency=rx_carrier_frequency,
txGain=tx_gain,
rxGain=rx_gain,
noTxStreams=self.num_transmit_ports,
noRxStreams=self.num_receive_ports,
txAntennaMapping=self.selected_transmit_ports,
rxAntennaMapping=self.selected_receive_ports,
)
self.__rpc_call_wrapper(self.__usrp_client.configureRfConfig, config)
self.__current_configuration = config
def _upload(self, baseband_signal: Signal) -> Signal:
baseband_signal = baseband_signal.copy()
# Configure device
self._configure_device()
# Reset the streaming config
self.__rpc_call_wrapper(self.__usrp_client.resetStreamingConfigs)
# Scale signal to a maximum absolute vlaue of zero to full exploit the DAC range
if baseband_signal.num_samples > 0 and self.scale_transmission:
maxAmp = np.abs(baseband_signal[:, :]).max()
if maxAmp != 0:
for block in baseband_signal:
block /= maxAmp
uploaded_samples = baseband_signal.copy()
# Hack: Prepend some zeros to account for the premature transmission stop
baseband_signal.set_samples(
np.concatenate(
(
np.zeros(
(baseband_signal.num_streams, self.num_prepeneded_zeros), dtype=np.complex_
),
baseband_signal[:, :],
np.zeros(
(baseband_signal.num_streams, self.num_appended_zeros), dtype=np.complex_
),
),
axis=1,
)
)
if baseband_signal.num_samples % 4 != 0:
baseband_signal.set_samples(
np.append(
baseband_signal[:, :],
np.zeros(
(baseband_signal.num_streams, 4 - baseband_signal.num_samples % 4),
dtype=complex,
),
axis=1,
)
)
# Append a zero vector for unselected transmit ports
# Workaround for the USRP wrapper missing dedicated port selections
tx_config = TxStreamingConfig(
max(0.0, -self.delay_calibration.delay), MimoSignal(baseband_signal[:, :])
)
self.__rpc_call_wrapper(self.__usrp_client.configureTx, tx_config)
# Configure reception
duration = self.receivers.min_frame_duration
if duration > 0.0:
num_receive_samples = int((duration + self.max_receive_delay) * self.sampling_rate)
# Hack
num_receive_samples += self.num_prepeneded_zeros + self.num_appended_zeros
# Workaround for the uneven sample bug
num_receive_samples += 4 - num_receive_samples % 4
rx_config = RxStreamingConfig(
max(0.0, self.delay_calibration.delay), num_receive_samples
)
self.__rpc_call_wrapper(self.__usrp_client.configureRx, rx_config)
self.__collection_enabled = True
else:
num_receive_samples = baseband_signal.num_samples + 4 - baseband_signal.num_samples % 4
rx_config = RxStreamingConfig(
max(0.0, self.delay_calibration.delay), num_receive_samples
)
self.__rpc_call_wrapper(self.__usrp_client.configureRx, rx_config)
self.__collection_enabled = True
return uploaded_samples
[docs]
def trigger(self) -> None:
# Queue execution command
self.__usrp_client.executeImmediately()
def _download(self) -> Signal:
# Abort if no samples are to be expcted during collection
if not self.__collection_enabled:
return Signal.Empty(self.sampling_rate, self.num_receive_ports)
mimo_signals = self.__usrp_client.collect()
signal_model = Signal.Empty(
self.sampling_rate, self.num_receive_ports, carrier_frequency=self.carrier_frequency
)
for mimo_signal in mimo_signals:
streams = np.array([mimo_signal.signals[i] for i in self.__selected_receive_ports])
signal_model.set_samples(np.append(signal_model[:, :], streams, axis=1))
# Remove the zero padding hack
signal_model.set_samples(
signal_model[
:, self.num_prepeneded_zeros : signal_model.num_samples - self.num_appended_zeros
]
)
return signal_model
@property
def _client(self) -> UsrpClient:
"""Access to the UHD client.
Returns: Handle to the client.
"""
return self.__usrp_client
@property
def ip(self) -> str:
"""Internet protocol address of the remote host.
Returns:
IP adress.
"""
return self.__usrp_client.ip
@property
def port(self) -> int:
"""Internet protocol port of the remote host.
Returns:
Port.
"""
return self.__usrp_client.port
@property
def tx_gain(self) -> float:
"""Gain of the transmitting front-end in dB."""
return self.__tx_gain
@tx_gain.setter
def tx_gain(self, value: float) -> None:
self.__tx_gain = value
@property
def rx_gain(self) -> float:
"""Gain of the receiving front-end in dB."""
return self.__rx_gain
@rx_gain.setter
def rx_gain(self, value: float) -> None:
self.__rx_gain = value
@property
def num_transmit_ports(self) -> int:
"""Number of transmit ports controlled on the USRP device."""
return len(self.__selected_transmit_ports)
@property
def num_receive_ports(self) -> int:
"""Number of receive ports controlled on the USRP device."""
return len(self.__selected_receive_ports)
@property
def selected_transmit_ports(self) -> Sequence[int]:
"""Indices of the selected transmit ports."""
return self.__selected_transmit_ports
@property
def selected_receive_ports(self) -> Sequence[int]:
"""Indices of the selected receive ports."""
return self.__selected_receive_ports
@property
def antennas(self) -> UsrpAntennas:
"""Antenna array model of the USRP device.
Allows for the further configuration of the antenna elements,
however, the number of :attr:`transmit ports<num_transmit_ports>`
and :attr:`receive ports<num_receive_ports>` is fixed.
"""
return self.__antennas
@property
def sampling_rate(self) -> float:
if self.__sampling_rate is not None:
return self.__sampling_rate
ideal_sampling_rate = (
self.transmitters.max_sampling_rate
if self.transmitters.num_operators > 0
else self.receivers.max_sampling_rate
)
selected_sampling_rate = min(
self.__supported_sampling_rates, key=lambda x: abs(x - ideal_sampling_rate)
)
return selected_sampling_rate
@sampling_rate.setter
def sampling_rate(self, value: float) -> None:
self.__sampling_rate = value
@property
def max_sampling_rate(self) -> float:
return max(self.__supported_sampling_rates)
@property
def carrier_frequency(self) -> float:
return self.__carrier_frequency
@carrier_frequency.setter
def carrier_frequency(self, value: float) -> None:
self.__carrier_frequency = value
@property
def num_prepeneded_zeros(self) -> int:
"""Number of zero padding samples prepended to the transmitted signal.
Returns: Number of prepended samples.
Raises:
ValueError: For negative values.
"""
return self.__num_prepended_zeros
@num_prepeneded_zeros.setter
def num_prepeneded_zeros(self, value: int) -> None:
if value < 0:
raise ValueError(f"Number of prepended zeros must be non-negative ({value} < 0)")
self.__num_prepended_zeros = value
@property
def num_appended_zeros(self) -> int:
"""Number of zero padding samples appended to the transmitted signal.
Returns: Number of appended samples.
Raises:
ValueError: For negative values.
"""
return self.__num_appended_zeros
@num_appended_zeros.setter
def num_appended_zeros(self, value: int) -> None:
if value < 0:
raise ValueError(f"Number of appended zeros must be non-negative ({value} < 0)")
self.__num_appended_zeros = value
@cached_property
def __supported_sampling_rates(self) -> List[float]:
return self.__usrp_client.getSupportedSamplingRates()
@property
def scale_transmission(self) -> bool:
"""Indicates whether the transmission is scaled to the full DAC range.
Returns: Boolean enabled flag.
"""
return self.__scale_transmission
@scale_transmission.setter
def scale_transmission(self, value: bool) -> None:
self.__scale_transmission = value