Note

This static document was automatically created from the output of a jupyter notebook.

Execute and modify the notebook online here.

Implementing FEC codings#

This Jupyter notebook will outline the step-by-step process of implementing a new coding scheme for the forward error correction in communication modems. The selected algorithm is a repetition encoder, since it is arguably the most basic error correction coding.

As an initial step, we will import all required modules from HermesPy:

[2]:
import numpy as np
from hermespy.fec import Encoder

Error correcting coding steps are represented by the abstract Encoder interface. Each coding algorithm is represented by a class inheriting from the interface, which requires them to implement the abstract functions and properties

Let’s assume our repetition coding takes blocks of \(K = 4\) data bits and repeats them \(3\) times. The resulting code block size would be \(N = 3K = 12\), which results in a rate of \begin{equation} \mathbf{R} = \frac{K}{N} = \frac{1}{3} \end{equation} for the full encoding. During decoding, the repetition coding decides by majority voting which bit has been transmitted. So, our coding implementation is

[3]:
class RepetitionCoding(Encoder):

    @property
    def bit_block_size(self) -> int:
        return 4

    @property
    def code_block_size(self) -> int:
        return 12

    def encode(self, data: np.ndarray) -> np.ndarray:
        return np.tile(data, 3)

    def decode(self, code: np.ndarray) -> np.ndarray:
        return (np.mean(np.reshape(code, (3, self.bit_block_size)), axis=0, keepdims=False) > .5).astype(int)

Now we can inspect our coding implementation:

[4]:
coding = RepetitionCoding()
print(f"Our coding rate is {coding.rate} with an input block size of {coding.bit_block_size} and an output block size of {coding.code_block_size}")

data = np.random.randint(0, 2, coding.bit_block_size)
print(f"Let's assume we transmit the following data block: {data}")

code = coding.encode(data)
print(f"After encoding the respective code block is: {code}")

error_code = code.copy()
error_code[0] = not error_code[0]
print(f"After channel propagation the first bit has flipped: {error_code}")

corrected_data = coding.decode(error_code)
print(f"But the coding can correct a single bit flip to: {corrected_data}")

Our coding rate is 0.3333333333333333 with an input block size of 4 and an output block size of 12
Let's assume we transmit the following data block: [1 0 1 0]
After encoding the respective code block is: [1 0 1 0 1 0 1 0 1 0 1 0]
After channel propagation the first bit has flipped: [0 0 1 0 1 0 1 0 1 0 1 0]
But the coding can correct a single bit flip to: [1 0 1 0]

We may now investigate our newly created forward error correction coding within a full Hermes simulation capaign.

[5]:
from hermespy.core import ConsoleMode, dB
from hermespy.simulation import Simulation
from hermespy.modem import BitErrorEvaluator, DuplexModem, RootRaisedCosineWaveform

# Create a new simulation featuring a single device transmitting at 10GHz
simulation = Simulation(console_mode=ConsoleMode.SILENT)
device = simulation.scenario.new_device(carrier_frequency=10e9)

# Configure a communication operation on the device, using our coding
modem = DuplexModem()
modem.waveform_generator = RootRaisedCosineWaveform(modulation_order=4, oversampling_factor=2, num_preamble_symbols=0, num_data_symbols=10, symbol_rate=1e6)
modem.encoder_manager.add_encoder(coding)
modem.device = device

# Run a very low-demanding simulation for demonstration purposes
simulation.new_dimension('snr', dB(0, 4, 8, 16, 18, 20))
simulation.add_evaluator(BitErrorEvaluator(modem, modem))
simulation.num_samples = 1000

result = simulation.run()

This allows us to visualize the achieved bit error rates for given linear signal to noise ratios:

[6]:
_ = result.plot()
../_images/notebooks_fec_coding_10_0.png
Exception in thread ray_listen_error_messages:
Traceback (most recent call last):
  File "C:\Users\Stealth\AppData\Local\Programs\Python\Python310\lib\threading.py", line 1016, in _bootstrap_inner
2023-07-12 12:17:21,973 ERROR worker.py:914 -- print_logs: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:61812 {grpc_message:"Stream removed", grpc_status:2, created_time:"2023-07-12T10:17:21.971775063+00:00"}"
>
2023-07-12 12:17:21,975 ERROR import_thread.py:80 -- ImportThread: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:61812 {grpc_message:"Stream removed", grpc_status:2, created_time:"2023-07-12T10:17:21.971759664+00:00"}"
>
    self.run()
  File "C:\Users\Stealth\AppData\Local\Programs\Python\Python310\lib\threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "d:\envs\hermes-310\lib\site-packages\ray\_private\worker.py", line 2004, in listen_error_messages
    _, error_data = worker.gcs_error_subscriber.poll()
  File "d:\envs\hermes-310\lib\site-packages\ray\_private\gcs_pubsub.py", line 300, in poll
    self._poll_locked(timeout=timeout)
  File "d:\envs\hermes-310\lib\site-packages\ray\_private\gcs_pubsub.py", line 217, in _poll_locked
    fut.result(timeout=1)
  File "d:\envs\hermes-310\lib\site-packages\grpc\_channel.py", line 744, in result
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:61812 {created_time:"2023-07-12T10:17:21.971524624+00:00", grpc_status:2, grpc_message:"Stream removed"}"
>