Source code for hermespy.modem.precoding.single_carrier

# -*- coding: utf-8 -*-

from __future__ import annotations

import numpy as np

from hermespy.core import Serializable
from ..symbols import StatedSymbols
from .symbol_precoding import SymbolPrecoder

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler", "André Noll Barreto"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class SingleCarrier(SymbolPrecoder, Serializable): """Single Carrier data symbol precoding step. Takes a on-dimensional input stream and distributes the symbols to multiple output streams. """ yaml_tag = "SingleCarrier" def __init__(self) -> None: """Single Carrier object initialization.""" SymbolPrecoder.__init__(self)
[docs] def encode(self, symbols: StatedSymbols) -> StatedSymbols: if symbols.num_streams != 1: raise RuntimeError( "Single-Carrier spatial multiplexing only supports one-dimensional input streams during encoding" ) repeated_symbols = symbols.copy() repeated_symbols.raw = np.repeat(repeated_symbols.raw, self.num_output_streams, axis=0) repeated_symbols.states = np.repeat( repeated_symbols.states, self.num_output_streams, axis=0 ) return repeated_symbols
[docs] def decode(self, symbols: StatedSymbols) -> StatedSymbols: # Decode data using SC receive diversity with N_rx received antennas. # # Received signal with equal noise power is assumed, the decoded signal has same noise # level as input. It is assumed that all data points equal noise levels. # Essentially, over all symbol streams for each symbol the one with the strongest response will be selected symbols = symbols.copy() dense_states = symbols.dense_states() squeezed_channel_state = dense_states.sum(axis=1, keepdims=False) # Select proper antenna for each symbol timestamp antenna_selection = np.argmax(np.abs(squeezed_channel_state), axis=0) new_symbols = np.take_along_axis(symbols.raw, antenna_selection[np.newaxis, ::], axis=0) # stream_noises = np.take_along_axis(symbols, antenna_selection.T, axis=0) channel_state_selection = antenna_selection[np.newaxis, np.newaxis, ::].repeat(2, axis=1) new_states = np.take_along_axis(dense_states, channel_state_selection, axis=0) symbols.raw = new_symbols symbols.states = new_states return symbols
@property def num_input_streams(self) -> int: return 1 @property def num_output_streams(self) -> int: return self.required_num_output_streams