Source code for hermespy.modem.waveforms.orthogonal.ofdm
# -*- coding: utf-8 -*-
from __future__ import annotations
from math import ceil
from typing import List, Any, Set, Sequence
import numpy as np
from scipy.fft import fft, fftfreq, fftshift, ifft, ifftshift
from scipy.signal import find_peaks
from hermespy.core import Serializable
from ...waveform import Synchronization
from ...waveform_correlation_synchronization import CorrelationSynchronization
from .waveform import (
GridResource,
GridSection,
OrthogonalWaveform,
PilotSection,
PilotSymbolSequence,
)
__author__ = "André Noll Barreto"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["André Noll Barreto", "Tobias Kronauer", "Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class OFDMWaveform(OrthogonalWaveform, Serializable):
"""Generic Orthogonal Frequency Division Multiplexing waveform description."""
yaml_tag: str = "OFDM"
__subcarrier_spacing: float
dc_suppression: bool
@staticmethod
def _arg_signature() -> Set[str]:
return {"modulation_order"} # pragma: no cover
def __init__(
self,
grid_resources: Sequence[GridResource],
grid_structure: Sequence[GridSection],
num_subcarriers: int = 1024,
subcarrier_spacing: float = 1e3,
dc_suppression: bool = True,
pilot_section: PilotSection | None = None,
pilot_sequence: PilotSymbolSequence | None = None,
repeat_pilot_sequence: bool = True,
**kwargs: Any,
) -> None:
"""
Args:
grid_resources (Sequence[GridResource]):
Frequency-domain resource section configurations.
grid_structure (Sequence[GridSection]):
Time-domain frame configuration.
num_subcarriers (int, optional):
Maximum number of assignable subcarriers.
Unassigned subcarriers will be assumed to be zero.
:math:`1024` by default.
subcarrier_spacing (float, optional):
Spacing between individual subcarriers in Hz.
:math:`1~\\mathrm{kHz}` by default.
num_subcarriers (int, optional):
Maximum number of assignable subcarriers.
Unassigned subcarriers will be assumed to be zero.
:math:`1024` by default.
dc_suppression (bool, optional):
Suppress the direct current component during waveform generation.
Enabled by default.
pilot_section (PilotSection, optional):
Pilot section preceding the frame's payload.
If not specified, no dedicated pilot section will be generated.
pilot_sequence (PilotSymbolSequence, optional):
Sequence of symbols used for the pilot section and reference symbols
within the frame. If not specified, pseudo-random sequences will be generated
from the set of data symbols.
**kwargs (Any):
Waveform generator base class initialization parameters.
Refer to :class:`CommunicationWaveform` for details.
"""
# Initialize the base class
OrthogonalWaveform.__init__(
self,
num_subcarriers,
grid_resources,
grid_structure,
pilot_section,
pilot_sequence,
repeat_pilot_sequence,
**kwargs,
)
# Initialize the OFDM specific attributes
self.subcarrier_spacing = subcarrier_spacing
self.dc_suppression = dc_suppression
def _forward_transformation(self, symbol_grid: np.ndarray) -> np.ndarray:
# Normalize the frequency-domain data symbols for unit power transmission
normalized_symbols = symbol_grid / np.sqrt(self.num_subcarriers)
# Zero-pad the grid to account for oversampling
padded_symbol_grid = np.zeros(
(normalized_symbols.shape[0], self.oversampling_factor * self.num_subcarriers),
dtype=np.complex_,
)
padding_start_idx = (
self.oversampling_factor * self.num_subcarriers
) // 2 - self.num_subcarriers // 2
padded_symbol_grid[:, padding_start_idx : self.num_subcarriers + padding_start_idx] = (
normalized_symbols
)
# Shift in order to suppress the dc component
# Note that for configurations without any oversampling the DC component will not be suppressed
if self.dc_suppression:
dc_index = int(0.5 * self.num_subcarriers * self.oversampling_factor)
padded_symbol_grid[:, dc_index:] = np.roll(padded_symbol_grid[:, dc_index:], 1, axis=-1)
# By convention, the length of each time slot is the inverse of the sub-carrier spacing
sample_grid = ifft(
ifftshift(padded_symbol_grid, axes=-1),
self.num_subcarriers * self.oversampling_factor,
axis=-1,
norm="forward",
)
return sample_grid
def _backward_transformation(
self, sample_sections: np.ndarray, normalize: bool = True
) -> np.ndarray:
# Transform the time-domain resource signals to frequency-domain data symbols
symbol_grid = fft(
sample_sections,
n=self.num_subcarriers * self.oversampling_factor,
axis=-1,
norm="backward",
)
# Shift fft bins to the center
symbol_grid = fftshift(symbol_grid, axes=-1)
# Account for the DC suppression
if self.dc_suppression:
dc_index = int(0.5 * self.num_subcarriers * self.oversampling_factor)
symbol_grid[..., dc_index:] = np.roll(symbol_grid[..., dc_index:], -1, axis=-1)
# Remove the zero padding due to the oversampling from the symbol grid
padding_start_idx = (
self.oversampling_factor * self.num_subcarriers
) // 2 - self.num_subcarriers // 2
original_symbol_grid = symbol_grid[
..., padding_start_idx : self.num_subcarriers + padding_start_idx
]
# Normalize the frequency-domain data symbols for unit power reception
if normalize:
original_symbol_grid *= (self.num_subcarriers) ** -0.5 / self.oversampling_factor
return original_symbol_grid
def _correct_sample_offset(self, symbol_subgrid: np.ndarray, sample_offset: int) -> np.ndarray:
frame_start_idx = (
self.oversampling_factor * self.num_subcarriers
) // 2 - self.num_subcarriers // 2
freqs = fftshift(fftfreq(self.oversampling_factor * self.num_subcarriers))[
frame_start_idx : frame_start_idx + self.num_subcarriers
]
if self.dc_suppression:
dc_index = int(0.5 * symbol_subgrid.shape[1])
freqs[dc_index:] = np.roll(freqs[dc_index:], -1)
return symbol_subgrid * np.exp(2j * np.pi * freqs * sample_offset)
@property
def subcarrier_spacing(self) -> float:
"""Subcarrier spacing between frames.
Returns:
float: Spacing in Hz.
"""
return self.__subcarrier_spacing
@subcarrier_spacing.setter
def subcarrier_spacing(self, spacing: float) -> None:
"""Modify the subcarrier spacing between frames.
Args:
spacing (float): New spacing in Hz.
Raises:
ValueError: If `spacing` is smaller or equal to zero.
"""
if spacing <= 0.0:
raise ValueError("Subcarrier spacing must be greater than zero")
self.__subcarrier_spacing = spacing
@property
def samples_per_frame(self) -> int:
num = 0
for section in self.grid_structure:
num += section.num_samples
if self.pilot_section:
num += self.pilot_section.num_samples
return num
@property
def bandwidth(self) -> float:
# OFDM bandwidth currently is identical to the number of subcarriers times the subcarrier spacing
b = self.num_subcarriers * self.subcarrier_spacing
return b
@property
def sampling_rate(self) -> float:
return self.oversampling_factor * self.subcarrier_spacing * self.num_subcarriers
[docs]
class SchmidlCoxPilotSection(PilotSection[OFDMWaveform]):
"""Pilot Symbol Section of the Schmidl Cox Algorithm.
Refer to :footcite:t:`1997:schmidl` for a detailed description.
"""
yaml_tag = "SchmidlCoxPilot"
"""YAML serialization tag"""
def _pilot_sequence(self, num_symbols: int = None) -> np.ndarray:
# The schmidl-cox pilot sequence is zero-stuffed in frequency domain
stuffed_pilot_sequence = np.zeros(self.wave.num_subcarriers, dtype=complex)
stuffed_pilot_sequence[::2] = PilotSection._pilot_sequence(
self, ceil(0.5 * self.wave.num_subcarriers)
)
if self.wave.dc_suppression:
dc_index = int(0.5 * self.wave.num_subcarriers)
stuffed_pilot_sequence[:dc_index] = np.roll(stuffed_pilot_sequence[:dc_index], 1)
return stuffed_pilot_sequence
class OFDMSynchronization(Synchronization[OFDMWaveform]):
"""Synchronization Routine for OFDM Waveforms."""
... # pragma: no cover
class OFDMCorrelationSynchronization(CorrelationSynchronization[OFDMWaveform]):
"""Correlation-Based Pilot Detection and Synchronization for OFDM Waveforms."""
yaml_tag = "OFDM-PilotCorrelation"
[docs]
class SchmidlCoxSynchronization(OFDMSynchronization):
"""Schmidl-Cox Algorithm for OFDM Waveform Time Synchronization and Carrier Frequency Offset Equzalization.
Applying the synchronization routine requires the respective waveform to have a :class:`.SchmidlCoxPilotSection` pilot
symbol section configured.
Refer to :footcite:t:`1997:schmidl` for a detailed description.
"""
yaml_tag = "SchmidlCox"
"""YAML serialization tag"""
[docs]
def synchronize(self, signal: np.ndarray) -> List[int]:
symbol_length = self.waveform.oversampling_factor * self.waveform.num_subcarriers
# Abort if the supplied signal is shorter than one symbol length
if signal.shape[-1] < symbol_length:
return []
half_symbol_length = int(0.5 * symbol_length)
num_delay_candidates = 2 + signal.shape[-1] - symbol_length
delay_powers = np.empty(num_delay_candidates, dtype=float)
delay_powers[0] = 0.0 # In order to be able to detect a peak on the first sample
for d in range(0, num_delay_candidates - 1):
delay_powers[1 + d] = np.sum(
abs(
np.sum(
signal[:, d : d + half_symbol_length].conj()
* signal[:, d + half_symbol_length : d + 2 * half_symbol_length],
axis=1,
)
)
)
num_samples = self.waveform.samples_per_frame
min_height = 0.75 * np.max(delay_powers)
peaks, _ = find_peaks(delay_powers, distance=int(0.9 * num_samples), height=min_height)
frame_indices = peaks - 1 # Correct for the first delay bin being prepended
return frame_indices