[docs]classDrop(Generic[DTT,DRT],HDFSerializable):"""Drop containing the information transmitted and received by all devices within a scenario."""__timestamp:float# Time at which the drop was generated__device_transmissions:Sequence[DTT]# Transmitted device information__device_receptions:Sequence[DRT]# Received device informationdef__init__(self,timestamp:float,device_transmissions:Sequence[DTT],device_receptions:Sequence[DRT],)->None:""" Args: timestamp (float): Time at which the drop was generated. device_transmissions (Sequence[DTT]): Transmitted device information. device_receptions (Sequence[DRT]): Received device information. """self.__timestamp=timestampself.__device_transmissions=device_transmissionsself.__device_receptions=device_receptions@propertydeftimestamp(self)->float:"""Time at which the drop was generated."""returnself.__timestamp@propertydefdevice_transmissions(self)->Sequence[DTT]:"""Transmitted device information within this drop."""returnself.__device_transmissions@propertydefdevice_receptions(self)->Sequence[DRT]:"""Received device information within this drop."""returnself.__device_receptions@propertydefnum_device_transmissions(self)->int:"""Number of device transmissions within this drop."""returnlen(self.__device_transmissions)@propertydefnum_device_receptions(self)->int:"""Number of device receptions within this drop."""returnlen(self.__device_receptions)@propertydefoperator_inputs(self)->Sequence[Sequence[Signal]]:"""Signals feeding into device's operators during reception. Returns: Operator inputs. """return[reception.operator_inputsforreceptioninself.device_receptions]@classmethoddeffrom_HDF(cls:Type[Drop],group:Group,devices:Sequence[Device]|None=None)->Drop:# Recall attributestimestamp=group.attrs.get("timestamp",0.0)num_transmissions:int=group.attrs.get("num_transmissions",0)num_receptions:int=group.attrs.get("num_receptions",0)ifdevicesisNone:transmissions=[DeviceTransmission.from_HDF(group[f"transmission_{t:02d}"])fortinrange(num_transmissions)]receptions=[DeviceReception.from_HDF(group[f"reception_{r:02d}"])forrinrange(num_receptions)]else:transmissions=[device.recall_transmission(group[f"transmission_{t:02d}"])fort,deviceinzip(range(num_transmissions),devices)]receptions=[device.recall_reception(group[f"reception_{r:02d}"])forr,deviceinzip(range(num_receptions),devices)]drop=cls(timestamp=timestamp,device_transmissions=transmissions,device_receptions=receptions)returndropdefto_HDF(self,group:Group)->None:# Serialize groupsfort,transmissioninenumerate(self.device_transmissions):transmission.to_HDF(group.create_group(f"transmission_{t:02d}"))forr,receptioninenumerate(self.device_receptions):reception.to_HDF(group.create_group(f"reception_{r:02d}"))# Serialize attributesgroup.attrs["timestamp"]=self.timestampgroup.attrs["num_transmissions"]=self.num_device_transmissionsgroup.attrs["num_receptions"]=self.num_device_receptions
DropType=TypeVar("DropType",bound=Drop)"""Type of a drop."""
[docs]classEvaluatedDrop(Drop):"""Drop containing the information transmitted and received by all devices within a scenario as well as their evaluations."""# Evaluation artifacts generated for this drop.__artifacts:Sequence[Artifact]def__init__(self,timestamp:float,device_transmissions:Sequence[DeviceTransmission],device_receptions:Sequence[DeviceReception],artifacts:Sequence[Artifact],)->None:""" Args: timestamp (float): Time at which the drop was generated. device_transmissions (Sequence[DeviceTransmission]): Transmitted device information. device_receptions (Sequence[DeviceReception]): Received device information. artifacts (Sequence[Artifact]): Evaluation artifacts generated for this scenario drop. """Drop.__init__(self,timestamp,device_transmissions,device_receptions)self.__artifacts=artifacts@propertydefnum_artifacts(self)->int:"""Number of evaluation artifacts. Returns: The number of artifacts. """returnlen(self.__artifacts)@propertydefartifacts(self)->Sequence[Artifact]:"""Evaluation artifacts generated from the drop's data. Returns: List of artifacts. """returnself.__artifacts