# -*- coding: utf-8 -*-
from __future__ import annotations
import numpy as np
from sparse import SparseArray # type: ignore
from hermespy.core import Serializable
from ..symbols import StatedSymbols
from .symbol_precoding import TransmitSymbolEncoder, ReceiveSymbolDecoder
__author__ = "Tobias Kronauer"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Tobias Kronauer", "Jan Adler", "Egor Achkasov"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class Alamouti(TransmitSymbolEncoder, ReceiveSymbolDecoder, Serializable):
"""Alamouti precoder distributing symbols in space and time.
Support for 2 transmit antennas only.
Refer to :footcite:t:`1998:alamouti` for further information.
"""
yaml_tag = "ALAMOUTI"
def __init__(self) -> None:
# Initialize base classes
TransmitSymbolEncoder.__init__(self)
ReceiveSymbolDecoder.__init__(self)
Serializable.__init__(self)
[docs]
def encode_symbols(self, symbols: StatedSymbols, num_output_streams: int) -> StatedSymbols:
"""Encode data into multiple antennas with space-time/frequency block codes.
Args:
symbols (StatedSymbols):
Input signal featuring :math:`K` blocks.
num_output_streams (int):
Number of required streams resulting from the encoding process.
Should always be 2 for Alamouti encoding.
Returns: Encoded data with size :math:`2 \\times K` symbols
Raises:
ValueError: If more than a single symbol stream is provided.
ValueError: If the number of transmit antennas is not two.
ValueError: If the number of data symbols is not even.
"""
if symbols.num_streams != 1:
raise ValueError("Space-Time block codings require a single symbol input stream")
input_data = symbols.raw[0, :, :]
# 2x2 MIMO Alamouti code
if num_output_streams != 2:
raise ValueError(
f"Alamouti encoding requires two transmit antennas ({num_output_streams} requested)"
)
if symbols.num_blocks % 2 != 0:
raise ValueError("Alamouti encoding must contain an even amount of data symbols blocks")
output = np.empty((2, symbols.num_blocks, symbols.num_symbols), dtype=np.complex128)
output[0, :, :] = input_data
output[1, 0::2, :] = -input_data[1::2, :].conj()
output[1, 1::2, :] = input_data[0::2, :].conj()
state = np.repeat(symbols.states, num_output_streams, axis=0)
return StatedSymbols(output, state)
[docs]
def decode_symbols(self, symbols: StatedSymbols, num_output_streams: int) -> StatedSymbols:
"""Decode data for STBC with 2 antenna streams
Received signal with equal noise power is assumed, the decoded signal has same noise level as input.
Args:
symbols (StatedSymbols):
Input signal with :math:`N \\times K` symbol blocks.
num_output_streams (int):
Number of required streams resulting from the decoding process.
Will be ignored by this decoder.
Returns: Decoded data with size :math:`N \\times K`
"""
if symbols.num_blocks % 2 != 0:
raise ValueError("Alamouti decoding must contain an even amount of data symbols blocks")
channel_state = symbols.states[:, :2, 0::2, :]
channel_state = (
channel_state.todense() if isinstance(channel_state, SparseArray) else channel_state
)
weight_norms = np.sum(np.abs(channel_state) ** 2, axis=1, keepdims=False)
decoded_symbols = np.empty(
(symbols.num_streams, symbols.num_blocks, symbols.num_symbols), dtype=complex
)
decoded_symbols[:, 0::2, :] = (
channel_state[:, 0, ::].conj() * symbols.raw[:, 0::2, :]
+ channel_state[:, 1, ::] * symbols.raw[:, 1::2, :].conj()
) / weight_norms
decoded_symbols[:, 1::2, :] = (
channel_state[:, 0, ::].conj() * symbols.raw[:, 1::2, :]
- channel_state[:, 1, ::] * symbols.raw[:, 0::2, :].conj()
) / weight_norms
return StatedSymbols(
decoded_symbols,
np.ones(
(symbols.num_streams, 1, symbols.num_blocks, symbols.num_symbols), dtype=complex
),
)
def _num_transmit_input_streams(self, num_output_streams: int) -> int:
return 1 if num_output_streams == 2 else -1
[docs]
def num_receive_output_streams(self, num_input_streams: int) -> int:
return num_input_streams
@property
def num_transmit_input_symbols(self) -> int:
return 2
@property
def num_transmit_output_symbols(self) -> int:
return 2
@property
def num_receive_input_symbols(self) -> int:
return 2
@property
def num_receive_output_symbols(self) -> int:
return 2
[docs]
class Ganesan(TransmitSymbolEncoder, ReceiveSymbolDecoder, Serializable):
"""Girish Ganesan and Petre Stoica general precoder distributing symbols in space and time.
Supports 4 transmit antennas. Features a :math:`\\frac{3}{4}` symbol rate.
Refer to :footcite:t:`2001:ganesan` for further information.
"""
yaml_tag = "GANESAN"
[docs]
def encode_symbols(self, symbols: StatedSymbols, num_output_streams: int) -> StatedSymbols:
"""Encode data into multiple antennas with space-time/frequency block codes.
Note that Ganesan schema's symbol rate is :math:`\\frac{3}{4}` so the encoding process increases the number of blocks by :math:`\\frac{4}{3}`.
Args:
symbols (StatedSymbols):
Input signal featuring :math:`K` blocks.
num_output_streams (int):
Number of required streams resulting from the encoding process.
Should always be 4 for Ganesan encoding.
Returns:
Encoded data with size :math:`\\frac{4}{3} \\times K` symbol blocks. Thus num_blocks is changed to num_blocks / 3 * 4.
Returned channel states are initialized with ones (np.ones is used).
Raises:
ValueError: If more than a single symbol stream is provided.
RuntimeError: If the number of transmit antennas is not four.
ValueError: If the number of data symbols blocks is not divisable by three.
"""
if symbols.num_streams != 1:
raise ValueError("Space-Time block codings require a single symbol input stream")
input_data = symbols.raw[0, :, :]
if num_output_streams != 4:
raise RuntimeError(
f"Ganesan encoding requires 4 transmit antennas ({num_output_streams} requested)"
)
if symbols.num_blocks % 3 != 0:
raise ValueError("Number of blocks must be divisable by 3.")
# Change symbol block amount because of the 3/4 symbol rate.
output = np.empty((4, symbols.num_blocks // 3 * 4, symbols.num_symbols), dtype=np.complex128)
zero = np.zeros((symbols.num_blocks // 3, symbols.num_symbols), dtype=np.complex128)
# Encode data explicitly element-wise.
# Notice that matrix Z (Eq. 41) is m by N in the paper,
# where m = num Tx, and N = num symbol periods,
# so each column is a symbol period and each row is TX
# Note that input_data[i::3, :] relates to symbol s_{i-1} in the paper.
# Tx 1
output[0, 0::4, :] = input_data[0::3, :]
output[0, 1::4, :] = zero
output[0, 2::4, :] = input_data[1::3, :]
output[0, 3::4, :] = -input_data[2::3, :]
# Tx 2
output[1, 0::4, :] = zero
output[1, 1::4, :] = input_data[0::3, :]
output[1, 2::4, :] = input_data[2::3, :].conj()
output[1, 3::4, :] = input_data[1::3, :].conj()
# Tx 3
output[2, 0::4, :] = -input_data[1::3, :].conj()
output[2, 1::4, :] = -input_data[2::3, :]
output[2, 2::4, :] = input_data[0::3, :].conj()
output[2, 3::4, :] = zero
# Tx 4
output[3, 0::4, :] = input_data[2::3, :].conj()
output[3, 1::4, :] = -input_data[1::3, :]
output[3, 2::4, :] = zero
output[3, 3::4, :] = input_data[0::3, :].conj()
# Cast the result to StatedSymbols
st = symbols.states
st = np.ones((4, st.shape[1], st.shape[2] // 3 * 4, st.shape[3]))
return StatedSymbols(output, st)
[docs]
def decode_symbols(self, symbols: StatedSymbols, num_output_streams: int) -> StatedSymbols:
"""Decode data for STBC with 4 antenna streams
Note that Ganesan schema's symbol rate is :math:`\\frac{3}{4}` so the decoding process decreases the number of blocks by :math:`\\frac{3}{4}`.
Args:
symbols (StatedSymbols):
Input signal with :math:`4 \\times N` symbol blocks.
num_output_streams (int):
Number of required streams resulting from the decoding process.
Will be ignored by this decoder.
Returns:
Decoded data with size :math:`3 \\times N`
Returned channel states are initialized with ones (np.ones is used).
"""
# check the number of blocks (we expect them to be divisable by 4)
if symbols.num_blocks % 4 != 0:
raise ValueError(
"Ganesan decoding must be given an amount of data symbols blocks that is divisable by 4"
)
# check number of Tx (must be 4)
if symbols.num_transmit_streams != 4:
raise ValueError(
f"Ganesan decoding must be given 4 transmit antennas ({symbols.num_transmit_streams} were given)"
)
states = symbols.dense_states()
# Init the decoded symbols ndarray. Notice that num_blocks is reduced because of the 3/4 symbol rate.
num_rx = symbols.num_streams
decoded_symbols = np.empty(
(num_rx, symbols.num_blocks // 4 * 3, symbols.num_symbols), dtype=np.complex128
)
# split the r vector onto real and imag vectors and concatenate them
b = symbols.raw
b = np.concatenate((b.real, b.imag), axis=1)
# Let each Rx antenna receive 4 signals over 4 time moments (=1 symbol period).
# Let R be a vector of 4 received signals by a Rx antenna
# Let A be a matrix of channel states of size of 4x4 (4 Tx to 4 symbols in a symbol period):
# R = A@s, where
# R = {r1, r2, r3, r4}
# s = {s1, s2, s3, 0}
# Split each variable into real and imag parts, expanding the system
# R' = A' @ s', where
# R' = {r1.real, r2.real, r3.real, r4.real, r1.imag, r2.imag, r3.imag, r4.imag}, where
# s' = {s1.real, s2.real, s3.real, s1.imag, s2.imag, s3.imag}
# Then matrix A can be constructed with the following sings and index matrices
signs_matrix_real = np.array(
[[1, -1, 1, -1, -1, 1], [1, -1, -1, -1, 1, 1], [1, 1, 1, 1, -1, 1], [1, 1, -1, 1, 1, 1]]
)
signs_matrix_imag = signs_matrix_real.copy()
signs_matrix_imag[:, 3:] *= -1
signs_matrix = np.concatenate((signs_matrix_real, signs_matrix_imag), axis=0)
index_matrix = np.array([[0, 2, 3], [1, 3, 2], [2, 0, 1], [3, 1, 0]])
# Init result(decoded_symbols), matrix A(an) and estimator with lhs(b) of the linear system
decoded_symbols = np.empty(
(num_rx, symbols.num_blocks * 3 // 4, symbols.num_symbols), dtype=np.complex128
)
an = np.empty((num_rx, 6, 8, symbols.num_symbols), dtype=np.float64)
estimator = np.empty((symbols.num_symbols, symbols.num_streams, 6, 8), dtype=np.float64)
b = np.empty((num_rx, 8, symbols.num_symbols), dtype=np.float64)
# Init einsum paths to optimize einsum in the future
an_path = np.einsum_path("ikjl,jk->lijk", an, signs_matrix, optimize="optimal")[0]
estimation_path = np.einsum_path("ijkl,jli->jki", estimator, b, optimize="optimal")[0]
# For each symbol period (which is 4 blocks) decode 3 encoded symbol blocks
for n in range(symbols.num_blocks // 4):
# Assemble matrix A'
for n_ in range(4):
an[:, 3:, n_ + 4, :] = an[:, :3, n_, :] = states.real[
:, index_matrix[n_], n * 4 + n_, :
]
an[:, :3, n_ + 4, :] = an[:, 3:, n_, :] = states.imag[
:, index_matrix[n_], n * 4 + n_, :
]
# Calculate estimator such that estimator @ R' = s'
# this einsum applies the signs matrix to A' and transposes it
estimator = np.linalg.pinv(
np.einsum("ikjl,jk->lijk", an, signs_matrix, optimize=an_path)
)
# Init R' for this symbol period
received_symbols_blocks = symbols.raw[:, n * 4 : n * 4 + 4, :]
b = np.concatenate((received_symbols_blocks.real, received_symbols_blocks.imag), axis=1)
# Solve the system and assemble extended results from 6 floats back to 3 complex
estimated_split_symbols = np.einsum(
"ijkl,jli->jki", estimator, b, optimize=estimation_path
)
decoded_symbols[:, n * 3 : n * 3 + 3, :] = (
estimated_split_symbols[:, :3, :] + 1j * estimated_split_symbols[:, 3:, :]
)
# Construct ideal channel states to cast result to StatedSymbols
ideal_states = np.ones((num_rx, 1, decoded_symbols.shape[1], decoded_symbols.shape[2]))
return StatedSymbols(decoded_symbols, ideal_states)
def _num_transmit_input_streams(self, num_output_streams: int) -> int:
return 1 if num_output_streams == 4 else -1
[docs]
def num_receive_output_streams(self, num_input_streams: int) -> int:
return num_input_streams
@property
def num_transmit_input_symbols(self) -> int:
return 3
@property
def num_transmit_output_symbols(self) -> int:
return 4
@property
def num_receive_input_symbols(self) -> int:
return 4
@property
def num_receive_output_symbols(self) -> int:
return 3