[docs]classMatchedFilterJcas(DuplexJCASOperator[CommunicationWaveform],Serializable):"""Joint Communication and Sensing Operator. A combination of communication and sensing operations. Senses the enviroment via a correlatiom-based time of flight estimation of transmitted waveforms. """yaml_tag="MatchedFilterJcas"property_blacklist={"slot"}# The specific required sampling rate__last_transmission:JCASTransmission|None=None__sampling_rate:float|None__max_range:float# Maximally detectable rangedef__init__(self,max_range:float,waveform:CommunicationWaveform|None=None,selected_transmit_ports:Sequence[int]|None=None,selected_receive_ports:Sequence[int]|None=None,carrier_frequency:float|None=None,seed:int|None=None,)->None:""" Args: max_range (float): Maximally detectable range in m. selected_transmit_ports (Sequence[int] | None): Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array. If not specified, all available ports will be considered. selected_receive_ports (Sequence[int] | None): Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array. If not specified, all available antenna ports will be considered. carrier_frequency (float, optional): Central frequency of the mixed signal in radio-frequency transmission band. If not specified, the operated device's default carrier frequency will be assumed during signal processing. seed (int, optional): Random seed used to initialize the pseudo-random number generator. """# Initialize base classDuplexJCASOperator.__init__(self,waveform,selected_transmit_ports,selected_receive_ports,carrier_frequency,seed)# Initialize class attributesself.__last_transmission=Noneself.__sampling_rate=Noneself.max_range=max_range@propertydefpower(self)->float:return0.0ifself.waveformisNoneelseself.waveform.powerdef_transmit(self,device:TransmitState,duration:float)->JCASTransmission:communication_transmission=TransmittingModemBase._transmit(self,device,duration)self.__last_transmission=JCASTransmission(communication_transmission)returnself.__last_transmissiondef_receive(self,signal:Signal,device:ReceiveState)->JCASReception:# There must be a recent transmission being cached in order to correlateifself.__last_transmissionisNone:raiseRuntimeError("Receiving from a matched filter joint must be preceeded by a transmission")# Receive informationcommunication_reception=ReceivingModemBase._receive(self,signal,device)# Re-sample communication waveformsignal=signal.resample(self.sampling_rate)resolution=self.range_resolutionnum_propagated_samples=int(2*self.max_range/resolution)# Append additional samples if the signal is too shortrequired_num_received_samples=(self.__last_transmission.signal.num_samples+num_propagated_samples)ifsignal.num_samples<required_num_received_samples:signal.append_samples(Signal.Create(np.zeros((signal.num_streams,required_num_received_samples-signal.num_samples),dtype=complex,),self.sampling_rate,signal.carrier_frequency,signal.noise_power,signal.delay,))# Digital receive beamformerangle_bins,beamformed_samples=self._receive_beamform(signal,device)# Predict the signal transmitted towards the angles of interesttransmitted_samples=np.empty((angle_bins.shape[0],self.__last_transmission.signal.num_samples),dtype=np.complex128)fort,angleinenumerate(angle_bins):phase_response=device.antennas.horizontal_phase_response(self.__last_transmission.signal.carrier_frequency,angle[0],angle[1],AntennaMode.TX,)transmitted_samples[t,:]=(phase_response.conj()@self.__last_transmission.signal.getitem(unflatten=True))# Transmit-receive correlation for range estimationcorrelation=(abs(correlate(beamformed_samples,transmitted_samples,mode="valid",method="fft"))/self.__last_transmission.signal.num_samples)lags=correlation_lags(beamformed_samples.shape[1],transmitted_samples.shape[1],mode="valid")# Append zeros for correct depth estimation# num_appended_zeros = max(0, num_samples - resampled_signal.num_samples)# correlation = np.append(correlation, np.zeros(num_appended_zeros))# Create the cube objectvelocity_bins=np.array([0.0])range_bins=0.5*lags[:num_propagated_samples]*resolutioncube_data=correlation[:,None,:num_propagated_samples]cube=RadarCube(cube_data,angle_bins,velocity_bins,range_bins,device.carrier_frequency)# Infer the point cloud, if a detector has been configuredcloud=Noneifself.detectorisNoneelseself.detector.detect(cube)radar_reception=RadarReception(signal,cube,cloud)jcas_reception=JCASReception(communication_reception,radar_reception)returnjcas_reception@propertydefsampling_rate(self)->float:modem_sampling_rate=self.waveform.sampling_rateifself.__sampling_rateisNone:returnmodem_sampling_ratereturnmax(modem_sampling_rate,self.__sampling_rate)@sampling_rate.setterdefsampling_rate(self,value:float|None)->None:ifvalueisNone:self.__sampling_rate=Nonereturnifvalue<=0.0:raiseValueError("Sampling rate must be greater than zero")self.__sampling_rate=value@propertydefrange_resolution(self)->float:"""Resolution of the Range Estimation. Returns: float: Resolution in m. Raises: ValueError: If the range resolution is smaller or equal to zero. """returnspeed_of_light/self.sampling_rate@range_resolution.setterdefrange_resolution(self,value:float)->None:ifvalue<=0.0:raiseValueError("Range resolution must be greater than zero")self.sampling_rate=speed_of_light/value@propertydefframe_duration(self)->float:returnself.waveform.frame_duration@propertydefmax_range(self)->float:"""Maximally Estimated Range. Returns: The maximum range in m. Raises: ValueError: If `max_range` is smaller or equal to zero. """returnself.__max_range@max_range.setterdefmax_range(self,value)->None:ifvalue<=0.0:raiseValueError("Maximum range must be greater than zero")self.__max_range=value