[docs]classPhysicalScenario(Generic[PDT],Scenario[PDT]):"""Scenario of physical device bindings. Managing physical devices by a scenario enables synchronized triggering and shared random seed configuration. """def__init__(self,seed:Optional[int]=None,devices:Optional[Sequence[PDT]]=None)->None:Scenario.__init__(self,seed,devices)@abstractmethoddef_trigger(self)->None:"""Trigger synchronzed transmission and reception for all managed devices."""...# pragma no cover
[docs]defreceive_devices(self,impinging_signals:(Sequence[DeviceInput]|Sequence[Signal]|Sequence[Sequence[Signal]]|None)=None,cache:bool=True,)->Sequence[DeviceReception]:"""Receive over all scenario devices. Internally calls :meth:`Scenario.process_inputs` and :meth:`Scenario.receive_devices`. Args: impinging_signals (Sequence[DeviceInput | Signal | Sequence[Signal]] | None, optional): List of signals impinging onto the devices. If not specified, the device will download the signal samples from its binding. cache (bool, optional): Cache the operator inputs at the registered receive operators for further processing. Enabled by default. Returns: List of the processed device input information. Raises: ValueError: If the number of `impinging_signals` does not match the number of registered devices. """impinging_signals=([None]*self.num_devicesifimpinging_signalsisNoneelseimpinging_signals)# Generate inputsdevice_inputs=[d.process_input(i,cache)ford,iinzip(self.devices,impinging_signals)]# type: ignore# Generate operator receptionsreceptions=self.receive_operators(device_inputs)# Generate device receptionsreturn[DeviceReception.From_ProcessedDeviceInput(i,r)fori,rinzip(device_inputs,receptions)]
def_drop(self)->Drop:# Generate device transmissionsdevice_transmissions=self.transmit_devices()# Trigger the full scenario for phyiscal transmission and receptiontimestamp=time()self._trigger()# Generate device receptionsdevice_receptions=self.receive_devices()returnDrop(timestamp,device_transmissions,device_receptions)
PhysicalScenarioType=TypeVar("PhysicalScenarioType",bound=PhysicalScenario)"""Type of physical scenario"""classSimulatedPhysicalScenario(SimulationScenario,PhysicalScenario):"""Simulated physical scenario for testing purposes."""def_trigger(self)->None:# Triggering does nothingpass# pragma: no coverdefreceive_devices(self,impinging_signals:(Sequence[DeviceInput]|Sequence[Signal]|Sequence[Sequence[Signal]]|None)=None,cache:bool=True,trigger_realizations:Sequence[TriggerRealization]|None=None,)->Sequence[SimulatedDeviceReception]:ifimpinging_signalsisNone:physical_device_receptions=PhysicalScenario.receive_devices(self,None,cache)impinging_signals=[r.impinging_signalsforrinphysical_device_receptions]returnSimulationScenario.receive_devices(self,impinging_signals,cache,trigger_realizations)else:returnSimulationScenario.receive_devices(self,impinging_signals,cache,trigger_realizations)