Source code for hermespy.modem.waveform_chirp_fsk
# -*- coding: utf-8 -*-
from __future__ import annotations
from importlib.metadata import version
from typing import Tuple
from math import ceil
from functools import lru_cache
import numpy as np
from scipy import integrate
from hermespy.core.factory import Serializable
from hermespy.modem.waveform import (
PilotCommunicationWaveform,
StatedSymbols,
CommunicationWaveform,
Synchronization,
)
from hermespy.core.signal_model import Signal
from .symbols import Symbols
from .waveform_correlation_synchronization import CorrelationSynchronization
__author__ = "Andre Noll Barreto"
__copyright__ = "Copyright 2021, Barkhausen Institut gGmbH"
__credits__ = ["Andre Noll Barreto", "Tobias Kronauer", "Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
scipy_minor_version = int(version("scipy").split(".")[1])
[docs]
class ChirpFSKWaveform(PilotCommunicationWaveform, Serializable):
"""Chirp Frequency Shift Keying communication waveform description."""
# YAML tag
yaml_tag = "ChirpFsk"
# Modulation parameters
symbol_type = np.int_
synchronization: ChirpFSKSynchronization
__chirp_duration: float # Duraiton of a single chirp in seconds
__chirp_bandwidth: float # Frequency range over which a single chirp sweeps
__freq_difference: float # Frequency offset between two adjecent chirp symbols
# Frame parameters
__num_pilot_chirps: int
__num_data_chirps: int
__guard_interval: float
def __init__(
self,
chirp_duration: float = 10e-9,
chirp_bandwidth: float = 1e9,
freq_difference: float | None = None,
num_pilot_chirps: int = 14,
num_data_chirps: int = 50,
guard_interval: float = 0.0,
**kwargs,
) -> None:
"""
Args:
chirp_duration (float, optional):
Duration of a single chirp in seconds.
chirp_bandwidth (float, optional):
Bandwidth of a single chirp in Hz.
freq_difference (float, optional):
Frequency difference of two adjacent chirp symbols.
num_pilot_chirps (int, optional):
Number of pilot symbols within a single frame.
num_data_chirps (int, optional):
Number of data symbols within a single frame.
guard_interval (float, optional):
Frame guard interval in seconds.
\**kwargs:
Base waveform generator initialization arguments.
"""
# Init base class
PilotCommunicationWaveform.__init__(self, **kwargs)
# Configure waveform paramters
self.synchronization = ChirpFSKSynchronization()
self.chirp_duration = chirp_duration
self.chirp_bandwidth = chirp_bandwidth
self.freq_difference = freq_difference
self.num_pilot_chirps = num_pilot_chirps
self.num_data_chirps = num_data_chirps
self.guard_interval = guard_interval
@property
def frame_duration(self) -> float:
"""Length of one data frame in seconds.
Returns:
float: Frame length in seconds.
"""
return (
self.chirp_duration * (self.num_data_chirps + self.num_pilot_chirps)
+ self.guard_interval
)
@property
def chirp_duration(self) -> float:
"""Duration of a single chirp within a frame.
Returns:
float:
Chirp duration in seconds.
Raises:
ValueError: If the duration is less or equal to zero.
"""
return self.__chirp_duration
@chirp_duration.setter
def chirp_duration(self, value: float) -> None:
if value < 0.0:
raise ValueError("Chirp duration must be greater than zero")
self.__chirp_duration = value
self._clear_cache()
@property
def chirp_bandwidth(self) -> float:
"""Access the chirp bandwidth.
Returns:
float:
The chirp bandwidth in Hz.
"""
return self.__chirp_bandwidth
@chirp_bandwidth.setter
def chirp_bandwidth(self, bandwidth: float) -> None:
"""Modify the chirp bandwidth.
Args:
bandwidth (float):
The new bandwidth in Hz.
Raises:
ValueError:
If the bandwidth is les sor equal to zero.
"""
if bandwidth <= 0.0:
raise ValueError("Chirp bandwidth must be greater than zero")
self.__chirp_bandwidth = bandwidth
self._clear_cache()
@property
def freq_difference(self) -> float:
"""The frequency offset between neighbouring chirp symbols.
Returns:
float:
The frequency difference in Hz.
Raises:
ValueError: If `freq_difference` is smaller or equal to zero.
"""
if self.__freq_difference is None:
return self.chirp_bandwidth / self.modulation_order
return self.__freq_difference
@freq_difference.setter
def freq_difference(self, value: float | None) -> None:
if value is None:
self.__freq_difference = None
return
if value <= 0.0:
raise ValueError("Frequency difference must be greater than zero")
self.__freq_difference = value
@property
def num_pilot_chirps(self) -> int:
"""Access the number of pilot chirps.
Returns:
int:
The number of pilot chirps.
"""
return self.__num_pilot_chirps
@num_pilot_chirps.setter
def num_pilot_chirps(self, num: int) -> None:
"""Modify the number of pilot chirps
Args:
num (int):
The new number of pilot chirps.
Raises:
ValueError:
If the `num`ber of pilot chirps is less than zero.
"""
if num < 0:
raise ValueError("The number of pilot chirps must be greater or equal to zero.")
self.__num_pilot_chirps = num
self._clear_cache()
@property
def num_data_chirps(self) -> int:
"""Access the number of data chirps.
Returns:
int:
The number of data chirps.
"""
return self.__num_data_chirps
@num_data_chirps.setter
def num_data_chirps(self, num: int) -> None:
"""Modify the number of pilot chirps
Args:
num (int):
The new number of data chirps.
Raises:
ValueError:
If the `num`ber of data chirps is less than zero.
"""
if num < 0:
raise ValueError("The number of data chirps must be greater or equal to zero")
self.__num_data_chirps = num
@property
def guard_interval(self) -> float:
"""Access the guard interval.
Returns:
float:
The guard interval in seconds.
"""
return self.__guard_interval
@guard_interval.setter
def guard_interval(self, interval: float) -> None:
"""Modify the guard interval.
Args:
interval (float):
The new guard `interval` in seconds.
Raises:
ValueError:
If the frequency guard `interval` is less than zero.
"""
if interval < 0.0:
raise ValueError("The guard interval must be greater or equal to zero.")
self.__guard_interval = interval
@property
def bits_per_symbol(self) -> int:
"""The number of bits per generated symbol.
Returns:
int:
The number of bits.
"""
return int(np.log2(self.modulation_order))
@property
def num_data_symbols(self) -> int:
return self.num_data_chirps
@property
def samples_in_chirp(self) -> int:
"""The number of discrete samples per generated chirp.
Returns:
int:
The number of samples.
"""
return int(ceil(self.chirp_duration * self.sampling_rate))
@property
def chirps_in_frame(self) -> int:
"""The number of chirps per generated frame.
Returns:
int:
The number of chirps.
"""
return self.num_pilot_chirps + self.num_data_chirps
@property
def samples_per_frame(self) -> int:
return self.samples_in_chirp * self.chirps_in_frame + int(
(np.around(self.__guard_interval * self.sampling_rate))
)
@property
def symbol_duration(self) -> float:
return self.chirp_duration
@property
def symbol_energy(self) -> float:
_, energy = self._prototypes()
return energy
@property
def bit_energy(self) -> float:
"""Theoretical average bit energy of the modulated signal.
Returns:
The average bit energy in UNIT.
"""
_, symbol_energy = self._prototypes()
bit_energy = symbol_energy / self.bits_per_symbol
return bit_energy
[docs]
def map(self, data_bits: np.ndarray) -> Symbols:
offset = self._calculate_frequency_offsets(data_bits)
return Symbols(offset[np.newaxis, np.newaxis, :])
[docs]
def unmap(self, symbols: Symbols) -> np.ndarray:
bits_per_symbol = self.bits_per_symbol
bits = np.empty(symbols.num_symbols * self.bits_per_symbol)
for s, symbol in enumerate(symbols.raw[0, ::].flat):
symbol_bits = [
int(x) for x in list(np.binary_repr(int(symbol.real), width=bits_per_symbol))
]
bits[s * bits_per_symbol : (s + 1) * bits_per_symbol] = symbol_bits
return bits
[docs]
def modulate(self, symbols: Symbols) -> np.ndarray:
prototypes, _ = self._prototypes()
frame_samples = np.empty(self.samples_per_frame, dtype=complex)
sample_idx = 0
samples_in_chirp = self.samples_in_chirp
# Add pilot samples
pilot_samples = self.pilot_signal.getitem().flatten()
num_pilot_samples = len(pilot_samples)
frame_samples[:num_pilot_samples] = pilot_samples
sample_idx += num_pilot_samples
# Modulate data symbols
for symbol in symbols.raw[0, ::].flat:
frame_samples[sample_idx : sample_idx + samples_in_chirp] = prototypes[
int(symbol.real), :
]
sample_idx += samples_in_chirp
return frame_samples
[docs]
def demodulate(self, baseband_signal: np.ndarray) -> Symbols:
# Assess number of frames contained within this signal
samples_in_chirp = self.samples_in_chirp
samples_in_pilot_section = samples_in_chirp * self.num_pilot_chirps
prototypes, _ = self._prototypes()
data_frame = baseband_signal[samples_in_pilot_section:]
symbol_signals = data_frame.reshape(-1, self.samples_in_chirp)
symbol_metrics = abs(symbol_signals @ prototypes.T.conj())
# ToDo: Unfortunately the demodulation-scheme is non-linear. Is there a better way?
symbols = np.argmax(symbol_metrics, axis=1)
return Symbols(symbols[np.newaxis, np.newaxis, :])
@property
def bandwidth(self) -> float:
# The bandwidth is identical to the chirp bandwidth
return self.chirp_bandwidth
def _calculate_frequency_offsets(self, data_bits: np.ndarray) -> np.ndarray:
"""Calculates the frequency offsets on frame creation.
Args:
data_bits (numpy.ndarray): Data bits to calculate the offsets for.
Returns:
np.array: Array of length `number_data_chirps`.
"""
# convert bits to integer frequency offsets
# e.g. [8, 4, 2, 1]
power_of_2 = 2 ** np.arange(self.bits_per_symbol - 1, -1, -1)
bits = np.reshape(data_bits, (self.bits_per_symbol, -1), order="F")
# generate offset according to bits
offset = np.matmul(power_of_2, bits)
return offset
@property
def power(self) -> float:
return self.symbol_energy / self.samples_in_chirp
@CommunicationWaveform.modulation_order.setter # type: ignore
def modulation_order(self, value: int) -> None:
self._prototypes.cache_clear()
CommunicationWaveform.modulation_order.fset(self, value) # type: ignore
@property
def symbol_precoding_support(self) -> bool:
return False
@lru_cache(maxsize=1, typed=True)
def _prototypes(self) -> Tuple[np.ndarray, float]:
"""Generate chirp prototypes.
This method generates the prototype chirps for all possible modulation symbols, that will be correlated with the
received signal for detection.
Since the computation is quite costly, the most recent output will be cached.
Returns: Tuple[np.array, np.array, float]
np.array:
Prototype.
float:
Symbol energy.
"""
# Chirp parameter inference
slope = self.chirp_bandwidth / self.chirp_duration
chirp_time = np.arange(self.samples_in_chirp) / self.sampling_rate
f0 = -0.5 * self.chirp_bandwidth
f1 = -f0
# non-coherent detection
prototypes = np.zeros((2**self.bits_per_symbol, self.samples_in_chirp), dtype=complex)
for idx in range(self.modulation_order):
initial_frequency = f0 + idx * self.freq_difference
frequency = chirp_time * slope + initial_frequency
frequency[frequency > f1] -= self.chirp_bandwidth
# integrate.cumtrapz was changed to cumulative_trapezoid in scipy 1.14.0
phase: np.ndarray
if scipy_minor_version < 14:
phase = integrate.cumtrapz(frequency, dx=1 / self.sampling_rate, initial=0)
else:
phase = integrate.cumulative_trapezoid(
frequency, dx=1 / self.sampling_rate, initial=0
)
phase *= 2 * np.pi
prototypes[idx, :] = np.exp(1j * phase)
symbol_energy = sum(abs(prototypes[0, :]) ** 2)
return prototypes, symbol_energy
@property
def sampling_rate(self) -> float:
# Sampling rate scales with the chirp bandwidth
return self.oversampling_factor * self.__chirp_bandwidth
@property
def pilot_signal(self) -> Signal:
"""Samples of the frame's pilot section.
Returns:
samples (numpy.ndarray): Pilot samples.
"""
# Generate single pilot chirp prototype
prototypes, _ = self._prototypes()
pilot_samples = np.empty(self.samples_in_chirp * self.num_pilot_chirps, dtype=complex)
for pilot_idx in range(self.num_pilot_chirps):
pilot_samples[
pilot_idx * self.samples_in_chirp : (1 + pilot_idx) * self.samples_in_chirp
] = prototypes[pilot_idx % len(prototypes)]
return Signal.Create(pilot_samples, self.sampling_rate)
def _clear_cache(self) -> None:
"""Clear cached properties because a parameter has changed."""
self._prototypes.cache_clear()
[docs]
class ChirpFSKSynchronization(Synchronization[ChirpFSKWaveform], Serializable):
"""Synchronization for chirp-based frequency shift keying communication waveforms."""
yaml_tag = "ChirpFsk-Synchronization"
def __init__(self, waveform: ChirpFSKWaveform | None = None) -> None:
"""
Args:
waveform (CommunicationWaveform, optional):
The waveform generator this synchronization routine is attached to.
"""
Synchronization.__init__(self, waveform)
[docs]
class ChirpFSKCorrelationSynchronization(
CorrelationSynchronization[ChirpFSKWaveform], Serializable
):
"""Correlation-based clock-synchronization for Chirp-FSK waveforms."""
yaml_tag = "ChirpFsk-Correlation"