# -*- coding: utf-8 -*-
"""
===============
Coding Pipeline
===============
This module introduces the concept of bit :class:`.Encoder` steps,
which form single chain link within a channel coding processing chain.
Considering an arbitrary coding scheme consisting of multiple steps,
the process of encoding bit streams during transmission and decoding them during
subsequent reception is modeled by a chain of :class:`.Encoder` instances:
.. mermaid::
%%{init: {'theme': 'dark'}}%%
flowchart LR
input([Input Bits]) --> n_i[...]
n_i --> n_a[Encoder N-1] --> n_b[Encoder N] --> n_c[Encoder N+1] --> n_o[...]
n_o --> output([Coded Bits])
During transmission encoding the processing chain is sequentially executed from left to right,
during reception decoding in reverse order.
Within bit streams, :class:`.Encoder` instances sequentially encode block sections of :math:`K_n` bits into
code sections of :math:`L_n` bits.
Therefore, the rate of the :math:`n`-th :class:`.Encoder`
.. math::
R_n = \\frac{K_n}{L_n}
is defined as the relation between input and output block length.
The pipeline configuration as well as the encoding step execution is managed by the :class:`.EncoderManager`.
Provided with a frame of :math:`K` input bits, the manager will generate a coded frame of :math:`L` bits by
sequentially executing all :math:`N` configured encoders.
Considering a frame of :math:`K_{\\mathrm{Frame}, n}` input bits to the :math:`n`-th encoder within the pipeline,
the manager will split the frame into
.. math::
M_n(K_{\\mathrm{Frame}, n}) = \\left\\lceil \\frac{K_{\\mathrm{Frame}, n}}{K_n} \\right\\rceil
blocks to be encoded independently.
The last block will be padded with zeros should it not contain sufficient bits.
While this may not be exactly standard-compliant behaviour, it is a necessary simplification to enable
arbitrary combinations of encoders.
Therefore, the coding rate of the whole pipeline
.. math::
R = \\frac{K}{L} = \\frac{K}{M_N \\cdot R_N}
can only be defined recursively considering the number of input blocks :math:`M_N` and rate :math:`R_N` of the last
encoder with in the pipeline, respectively.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from math import ceil
from typing import TYPE_CHECKING, Type, List, Optional
import numpy as np
from ruamel.yaml import SafeRepresenter, SafeConstructor, Node
from hermespy.core.factory import Serializable
from hermespy.core.random_node import RandomNode
if TYPE_CHECKING:
from hermespy.modem.modem import BaseModem # pragma: no cover
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Tobias Kronauer", "Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class Encoder(ABC, Serializable):
"""Base class of a single coding step within a channel coding pipeline.
Instances of this class represent the :math:`n`-th coding step within an :class:`.EncoderManager` configuration,
encoding blocks of :math:`K_n` bits into blocks of :math:`L_n` bits, respectively, therefore achieving a rate of
.. math::
R_n = \\frac{K_n}{L_n} \\mathrm{.}
All inheriting classes represent implementations of coding steps and are required to implement the methods
* :meth:`.encode` for encoding blocks during transmission
* :meth:`.decode` for decoding blocks during reception
* :meth:`.bit_block_size` reporting the input bit block length
* :meth:`.code_block_size` reporting the output bit block length
"""
yaml_tag: Optional[str] = "Encoder"
"""YAML serialization tag."""
# Coding pipeline configuration this encoder is registered to
__manager: Optional[EncoderManager]
def __init__(self, manager: EncoderManager = None) -> None:
"""
Args:
manager (EncoderManager, optional):
The coding pipeline configuration this encoder is registered in.
"""
# Default settings
self.__manager = None
self.enabled = True
if manager is not None:
self.manager = manager
@property
def manager(self) -> Optional[EncoderManager]:
"""Coding pipeline configuration this encoder is registered in.
Returns: Handle to the coding pipeline.
"""
return self.__manager
@manager.setter
def manager(self, value: EncoderManager) -> None:
if self.__manager is not value:
self.__manager = value
@property
def enabled(self) -> bool:
"""Is the encoding currently enabled within its chain?"""
return self.__enabled
@enabled.setter
def enabled(self, value: bool) -> None:
self.__enabled = value
[docs]
@abstractmethod
def encode(self, bits: np.ndarray) -> np.ndarray:
"""Encodes a single block of bits.
Bit encoding routine during data transmission,
encoding a block of :math:`K_n` input bits into a block of :math:`L_n` code bits.
Args:
bits (numpy.ndarray):
A numpy vector of :math:`K_n` bits, representing a single bit block to be encoded.
Returns:
np.ndarray:
A numpy vector of :math:`L_n` bits, representing a single code block.
Raises:
ValueError:
If the length of ``bits`` does not equal :meth:`bit_block_size`.
"""
... # pragma: no cover
[docs]
@abstractmethod
def decode(self, encoded_bits: np.ndarray) -> np.ndarray:
"""Decodes a single block of bits.
Bit decoding routine during data reception,
decoding a block of :math:`L_n` code bits into a block of :math:`K_n` data bits.
Args:
encoded_bits (numpy.ndarray):
A numpy vector of :math:`L_n` code bits, representing a single code block to be decoded.
Returns:
np.ndarray:
A numpy vector of :math:`K_n` bits, representing a single data block.
Raises:
ValueError:
If the length of ``encoded_bits`` does not equal :meth:`code_block_size`.
"""
... # pragma: no cover
@property
@abstractmethod
def bit_block_size(self) -> int:
"""Data bit block size of a single coding operation.
In other words, the number of input bits within a single code block during transmit encoding,
or the number of output bits during receive decoding.
Referred to as :math:`K_n` within the respective equations.
Returns: Number of bits :math:`K_n`.
"""
... # pragma: no cover
@property
@abstractmethod
def code_block_size(self) -> int:
"""Code bit block size of a single coding operation.
In other words, the number of input bits within a single code block during receive decoding,
or the number of output bits during transmit encoding.
Referred to as :math:`L_n` within the respective equations.
Returns: Number of bits :math:`L_n`.
"""
... # pragma: no cover
@property
def rate(self) -> float:
"""Code rate achieved by this coding step.
Defined as the relation
.. math::
R_n = \\frac{K_n}{L_n}
between the :meth:`.bit_block_size` :math:`K_n` and :meth:`code_block_size` :math:`L_n`.
Returns:
float:
The code rate :math:`R_n`
"""
return self.bit_block_size / self.code_block_size
[docs]
class EncoderManager(RandomNode, Serializable):
"""Configuration managing a channel coding pipeline."""
yaml_tag: str = "Encoding"
allow_padding: bool
"""Tolerate padding of data bit blocks during encoding."""
allow_truncating: bool
"""Tolerate truncating of data code blocks during decoding."""
# Communication modem instance this coding pipeline configuration is attached to
__modem: Optional[BaseModem]
# List of encoding steps defining the internal pipeline configuration
_encoders: List[Encoder]
def __init__(
self, modem: BaseModem = None, allow_padding: bool = True, allow_truncating: bool = True
) -> None:
"""
Args:
modem (BaseModem, optional):
Communication modem instance this coding pipeline configuration is attached to.
By default, the coding pipeline is considered to be floating.
allow_padding(bool, optional):
Tolerate padding of data bit blocks during encoding.
Enabled by default.
allow_truncating(bool, optional):
Tolerate truncating of data code blocks during decoding.
Enabled by default.
"""
# Default parameters
self.__modem = None
self._encoders: List[Encoder] = []
self.allow_padding = allow_padding
self.allow_truncating = allow_truncating
if modem is not None:
self.modem = modem
RandomNode.__init__(self)
@classmethod
def to_yaml(
cls: Type[EncoderManager], representer: SafeRepresenter, node: EncoderManager
) -> Node:
"""Serialize an EncoderManager to YAML.
Args:
representer (RoundTripRepresenter):
A handle to a representer used to generate valid YAML code.
The representer gets passed down the serialization tree to each node.
node (EncoderManager):
The EncoderManager instance to be serialized.
Returns:
Node:
The serialized YAML node.
:meta private:
"""
return representer.represent_sequence(cls.yaml_tag, node.encoders)
@classmethod
def from_yaml(
cls: Type[EncoderManager], constructor: SafeConstructor, node: Node
) -> EncoderManager:
"""Recall a new `EncoderManager` instance from YAML.
Args:
constructor (RoundTripConstructor):
A handle to the constructor extracting the YAML information.
node (Node):
YAML node representing the `EncoderManager` serialization.
Returns:
EncoderManager:
Newly created `EncoderManager` instance.
:meta private:
"""
manager = cls()
manager._encoders = constructor.construct_sequence(node, deep=True)
return manager
@property
def modem(self) -> BaseModem:
"""Communication modem instance this coding pipeline configuration is attached to.
Returns:
Modem:
Handle to the modem instance.
Raises:
RuntimeError:
If the encoding configuration is floating, i.e. not attached to a modem.
"""
if self.__modem is None:
raise RuntimeError("Trying to access the modem of a floating encoding configuration")
return self.__modem
@modem.setter
def modem(self, modem: BaseModem) -> None:
if self.__modem is not modem:
self.__modem = modem
[docs]
def add_encoder(self, encoder: Encoder) -> None:
"""Register a new encoder instance to this pipeline configuration.
Args:
encoder (Encoder):
The new encoder to be added.
"""
# Register this encoding configuration to the encoder
if hasattr(encoder, "manager"):
encoder.manager = self
# Add new encoder to the queue of configured encoders
self._encoders.append(encoder)
self._encoders = self.__execution_order()
@property
def encoders(self) -> List[Encoder]:
"""List of encoders registered within this pipeline.
Returns:
List[Encoder]:
List of :math:`N` :class:`Encoder` instances where the :math:`n`-th entry represents
the :math:`n`-th coding operation during transmit encoding, or, inversely,
the :math:`1 + N - n`-th coding operation during receive decoding.
"""
return self._encoders
[docs]
def encode(self, data_bits: np.ndarray, num_code_bits: Optional[int] = None) -> np.ndarray:
"""Encode a stream of data bits to a stream of code bits.
By default, the input `data_bits` will be padded with zeros
to match the next integer multiple of the expected :meth:`.Encoder.bit_block_size`.
The resulting code will be padded with zeros to match the requested `num_code_bits`.
Args:
data_bits (numpy.ndarray):
Numpy vector of data bits to be encoded.
num_code_bits (int, optional):
The expected resulting number of code bits.
Returns:
np.ndarray:
Numpy vector of encoded bits.
Raises:
ValueError:
If `num_code_bits` is smaller than the resulting code bits after encoding.
"""
code_state = data_bits.copy()
# Loop through the encoders and encode the data, using the output of the last encoder as input to the next
for encoder in self._encoders:
# Skip if the respective encoder is disabled
if not encoder.enabled:
continue
data_block_size = encoder.bit_block_size
code_block_size = encoder.code_block_size
# Compute the number of blocks within the code for this coding step
num_blocks = ceil(len(code_state) / data_block_size)
# Pad if allowed
data_state = code_state
code_state = np.empty(num_blocks * code_block_size, dtype=bool)
required_num_data_bits = num_blocks * data_block_size
if len(data_state) < required_num_data_bits:
if not self.allow_padding:
raise RuntimeError("Encoding would require padding, but padding is not allowed")
num_padding_bits = required_num_data_bits - len(data_state)
data_state = np.append(
data_state, self._rng.integers(0, 2, num_padding_bits, dtype=bool)
)
# Encode all blocks sequentially
for block_idx in range(num_blocks):
encoded_block = encoder.encode(
data_state[block_idx * data_block_size : (1 + block_idx) * data_block_size]
)
code_state[block_idx * code_block_size : (1 + block_idx) * code_block_size] = (
encoded_block
)
if num_code_bits and len(code_state) > num_code_bits:
raise RuntimeError(
"Too many input bits provided for encoding, truncating would destroy information"
)
if num_code_bits and len(code_state) < num_code_bits:
if not self.allow_padding:
raise RuntimeError("Encoding would require padding, but padding is not allowed")
num_padding_bits = num_code_bits - len(code_state)
if num_padding_bits >= self.code_block_size:
raise ValueError("Insufficient number of input blocks provided for encoding")
code_state = np.append(
code_state, self._rng.integers(0, 2, num_padding_bits, dtype=bool)
)
# Return resulting overall code
return code_state
[docs]
def decode(self, encoded_bits: np.ndarray, num_data_bits: Optional[int] = None) -> np.ndarray:
"""Decode a stream of code bits to a stream of plain data bits.
By default, decoding `encoded_bits` may ignore bits in order
to match the next integer multiple of the expected `code_block_size`.
The resulting data might be cut to match the requested `num_data_bits`.
Args:
encoded_bits (numpy.ndarray):
Numpy vector of code bits to be decoded to data bits.
num_data_bits (int, optional):
The expected number of resulting data bits.
Returns:
np.ndarray:
Numpy vector of the resulting data bit stream after decoding.
Raises:
RuntimeError:
If `num_data_bits` is bigger than the resulting data bits after decoding.
RuntimeError:
If truncating is required but disabled by :meth:`.allow_truncating`.
"""
bit_block_size = self.bit_block_size
code_block_size = self.code_block_size
# Float to int conversion floors by default
num_blocks = int(encoded_bits.shape[0] / self.code_block_size)
if num_data_bits is not None:
num_data_bits_full = num_blocks * bit_block_size
if num_data_bits > num_data_bits_full:
raise RuntimeError(
"The requested number of data bits is larger than number "
"of bits recovered by decoding"
)
if not self.allow_truncating and num_data_bits != num_data_bits_full:
raise RuntimeError("Data truncating is required but not allowed")
else:
num_data_bits = num_blocks * bit_block_size
data_state = encoded_bits.copy()
# Loop through the encoders decode the code using the output of the last encoder as input to the next
for encoder in reversed(self._encoders):
# Skip if the respective encoder is disabled
if not encoder.enabled:
continue
code_block_size = encoder.code_block_size
data_block_size = encoder.bit_block_size
# Compute the number of blocks within the code for this coding step
num_blocks = int(len(data_state) / code_block_size)
# Truncate if allowed, otherwise throw an exception
code_state = data_state[: num_blocks * code_block_size]
data_state = np.empty(num_blocks * data_block_size, dtype=bool)
# Decode all blocks sequentially
for block_idx in range(num_blocks):
data_state[block_idx * data_block_size : (1 + block_idx) * data_block_size] = (
encoder.decode(
code_state[block_idx * code_block_size : (1 + block_idx) * code_block_size]
)
)
# Return resulting data
return data_state[:num_data_bits]
@property
def bit_block_size(self) -> int:
"""Data bit block size of a single coding operation.
In other words, the number of input bits within a single code block during transmit encoding,
or the number of output bits during receive decoding.
Referred to as :math:`K` within the respective equations.
Returns:
int:
Number of bits :math:`K`.
"""
if len(self._encoders) < 1:
return 1
encoder_index = 0
block_size = 1
num_bits = 1
for encoder_index, encoder in enumerate(self._encoders):
if encoder.enabled:
block_size = encoder.bit_block_size
num_bits = encoder.code_block_size
break
for encoder in self._encoders[encoder_index + 1 :]:
if not encoder.enabled:
continue
repetitions = int(encoder.bit_block_size / num_bits)
block_size *= repetitions
num_bits *= int(repetitions / encoder.rate)
return block_size
@property
def code_block_size(self) -> int:
"""Code bit block size of a single coding operation.
In other words, the number of input bits within a single code block during receive decoding,
or the number of output bits during transmit encoding.
Referred to as :math:`L` within the respective equations.
Returns:
int:
Number of bits :math:`L`.
"""
for encoder in reversed(self.encoders):
if encoder.enabled:
return encoder.code_block_size
return 1
def __execution_order(self) -> List[Encoder]:
"""Sort the encoders into an order of execution.
Returns:
List[Encoder]: A list of encoders in order of transmit execution (reversed receive execution).
"""
return sorted(self._encoders, key=lambda encoder: encoder.bit_block_size)
@property
def rate(self) -> float:
"""Code rate achieved by this coding pipeline configuration.
Defined as the relation
.. math::
R = \\frac{K}{L}
between the :meth:`.bit_block_size` :math:`K` and :meth:`.code_block_size` :math:`L`.
Returns:
float:
The code rate :math:`R`.
"""
code_rate = 1.0
for encoder in self._encoders:
if encoder.enabled:
code_rate *= encoder.rate
return code_rate
[docs]
def required_num_data_bits(self, num_code_bits: int) -> int:
"""Compute the number of input bits required to produce a certain number of output bits.
Args:
num_code_bits (int): The expected number of output bits.
Returns:
int: The required number of input bits.
"""
num_blocks = int(num_code_bits / self.code_block_size)
return self.bit_block_size * num_blocks
def __getitem__(self, item: int) -> Encoder:
"""Select an encoder from the current configuration chain.
Args:
item (int):
Index of the encoder within the chain.
Returns:
Encoder:
The selected encoder.
"""
return self._encoders[item]