Note

This static document was automatically created from the output of a jupyter notebook.

Execute and modify the notebook online here.

Implementing Communication Waveforms#

This Jupyter Notebook will highlight the development steps required to implement a new communication waveform within HermesPy.

Communication waveforms are the core configuration object of Modem operators. They can be assigned to the waveform_generator slot and control the actual electromagnetic waveform transmitted and received by the modem, transmitting and receiving information in the form of binary bits.

Waveforms, by default, implement four complementary routines of the signal processing pipeline for communication, namely modulation, demodulation, mapping and unmapping. Mapping converts a sequence of communication bits to communication symbols on the complex plane during transmission, unmapping inverts this process by converting a communication symbols to a sequence of communication bits. Modulation converts communcation symbols to base-band time-domain representations of waveforms to be transmitted, demodulation converts received base-band time-domain waveforms back to complex communication symbols. Additionally, waveforms may implement routines for synchronization and equalization, but for the sake of simplicity the implementation of these routines will be skipped in this tutorial.

The waveform we will implement is a amplitude shift keying (ASK) modulation of variable modulation orders (number of modulation symbols). Initially, we will import all required Hermes classes:

[2]:
import numpy as np

from hermespy.core import ConsoleMode, dB, Signal
from hermespy.modem import Symbols, WaveformGenerator

Signal is Hermes base class describing sampled MIMO signals in time-domain. Symbols is the signal model’s equivalent on the demodulated symbol abstraction level, describing multiple streams of symbols in time-domain. The WaveformGenerator represents, as already mentioned, the abstract base class of all communication waveforms. It features several abstract methods and properties to be implemented by the specific waveforms:

[3]:
class AmplitudeShiftKeying(WaveformGenerator):

    @property
    def samples_in_frame(self) -> int:

        return self.symbols_per_frame * self.oversampling_factor

    @property
    def bits_per_frame(self) -> int:

        return int(self.symbols_per_frame * np.log2(self.modulation_order))

    @property
    def symbols_per_frame(self) -> int:

        return 10

    @property
    def bit_energy(self) -> float:

        return 1. / np.log2(self.modulation_order)

    @property
    def symbol_energy(self) -> float:

        return 1.

    @property
    def power(self) -> float:

        return 1.

    @property
    def bandwidth(self) -> float:

        return 1.

    @property
    def sampling_rate(self) -> float:

        return 1.

    def map(self, data_bits: np.ndarray) -> Symbols:

        symbols_candidates = np.arange(1, 1 + self.modulation_order, dtype=complex) / (self.modulation_order)
        symbol_indices = np.packbits(data_bits.reshape((-1, int(np.log2(self.modulation_order)))), axis=1, bitorder='little').flatten()

        return Symbols(symbols_candidates[symbol_indices])

    def unmap(self, symbols: Symbols) -> np.ndarray:

        symbol_index_estimates = np.round(abs(symbols.raw.flatten()) * self.modulation_order - 1).astype(np.uint8)
        bits = np.unpackbits(symbol_index_estimates[:, np.newaxis], axis=1, bitorder='little')[:, :int(np.log2(self.modulation_order))]

        return bits.flatten()

    def modulate(self, data_symbols: Symbols) -> Signal:

        baseband_samples = np.repeat(data_symbols.raw.flatten(), self.oversampling_factor)
        return Signal(baseband_samples, self.sampling_rate)

    def demodulate(self, baseband_samples: np.ndarray) -> Symbols:

        symbols = Symbols(baseband_samples[::self.oversampling_factor])
        return symbols

This waveform implementation can already be assigned to a modem. Let’s explain each implemented abstract function step by step:

samples_in_frame - indicates the number of required complex base-band samples at optimal sampling rate to demodulate a single frame generated by our waveform implementation.

bits_per_frame - indicates the number of data bits encoded within each communication frame.

symbols_per_frame - indicates the number of data symbols encoded within each communication frame. For convenience we have hard-coded \(10\) symbols here.

bit_energy - average energy of a single bit.

symbol_energy - average energy of a single symbol. Since the PSK constellation is located on the complex unit circle, the energy is always one.

symbol_power - average power of a single symbol. Since the PSK constellation is located on the complex unit circle, the energy is always one.

bandwidth - bandwidth required to properly modulate and demodulate the waveform.

sampling_rate - sampling rate required to properly modulate and demodulate the waveform. For convenience we have chosen \(1\) Hz. Of course this value is variable in practice.

We can now instantiate our newly defined communication waveform and inspect the mapping and modulation routines:

[4]:
# Create pseudo-random number generator
rng = np.random.default_rng(42)

# Instantiante waveform
ask = AmplitudeShiftKeying(oversampling_factor=8, modulation_order=4)

# Generate a frame of data symbols from a set of random bits
data_bits = rng.integers(0, 2, ask.bits_per_frame)
mapped_symbols = ask.map(data_bits)
_ = mapped_symbols.plot_constellation()

# Generate the base-band representation of a single communication frame
baseband_transmission = ask.modulate(mapped_symbols)
_ = baseband_transmission.plot()
../_images/notebooks_waveform_6_0.png
../_images/notebooks_waveform_6_1.png

We can now model the effects of hardware noise by adding a normally distributed noise signal to the baseband signal and observe the effects on the demodulated symbol constellation:

[5]:
# Generate additive white gaussian noise
noise = rng.normal(0, .02, ask.samples_in_frame)

# Compute the baseband signal received over an AWGN channel
baseband_reception = baseband_transmission.samples[0, :] + noise
_ = Signal(baseband_reception, ask.sampling_rate).plot()

# Demodulate the received signal
received_symbols = ask.demodulate(baseband_reception)
_ = received_symbols.plot_constellation()

bits = ask.unmap(received_symbols)
print(f'Transmitted data bits: {data_bits}')
print(f'Received data bits: {data_bits}')
Transmitted data bits: [0 1 1 0 0 1 0 1 0 0 1 1 1 1 1 1 1 0 1 0]
Received data bits: [0 1 1 0 0 1 0 1 0 0 1 1 1 1 1 1 1 0 1 0]
../_images/notebooks_waveform_8_1.png
../_images/notebooks_waveform_8_2.png

Now, the newly added waveform is available to be configured to modems and used within the context of a simulation campaign:

[ ]:
from hermespy.modem import BitErrorEvaluator, DuplexModem
from hermespy.simulation import Simulation

simulation = Simulation(console_mode=ConsoleMode.SILENT)

modem = DuplexModem()
modem.waveform_generator = ask
modem.device = simulation.scenario.new_device()

simulation.new_dimension('snr', dB(0, 2, 4, 8, 16))
simulation.add_evaluator(BitErrorEvaluator(modem, modem))
simulation.num_samples = 1000
result = simulation.run()
_ = result.plot()
../_images/notebooks_waveform_10_0.png
2023-07-12 12:17:23,966 ERROR import_thread.py:80 -- ImportThread: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:63657 {grpc_message:"Stream removed", grpc_status:2, created_time:"2023-07-12T10:17:23.965801815+00:00"}"
>
Exception in thread ray_listen_error_messages:
Traceback (most recent call last):
  File "C:\Users\Stealth\AppData\Local\Programs\Python\Python310\lib\threading.py", line 1016, in _bootstrap_inner
2023-07-12 12:17:23,968 ERROR worker.py:914 -- print_logs: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:63657 {created_time:"2023-07-12T10:17:23.966120062+00:00", grpc_status:2, grpc_message:"Stream removed"}"
>
    self.run()
  File "C:\Users\Stealth\AppData\Local\Programs\Python\Python310\lib\threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "d:\envs\hermes-310\lib\site-packages\ray\_private\worker.py", line 2004, in listen_error_messages
    _, error_data = worker.gcs_error_subscriber.poll()
  File "d:\envs\hermes-310\lib\site-packages\ray\_private\gcs_pubsub.py", line 300, in poll
    self._poll_locked(timeout=timeout)
  File "d:\envs\hermes-310\lib\site-packages\ray\_private\gcs_pubsub.py", line 217, in _poll_locked
    fut.result(timeout=1)
  File "d:\envs\hermes-310\lib\site-packages\grpc\_channel.py", line 744, in result
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:63657 {grpc_message:"Stream removed", grpc_status:2, created_time:"2023-07-12T10:17:23.966104202+00:00"}"
>