[docs]classCorrelationSynchronization(Generic[PGT],Synchronization[PGT],Serializable):"""Correlation-based clock synchronization for arbitrary communication waveforms. The implemented algorithm is equivalent to :cite:p:`1976:knapp` without pre-filtering. """yaml_tag="PilotCorrelation""""YAML serialization tag."""__threshold:float# Correlation threshold at which a pilot signal is detected__guard_ratio:float# Guard ratio of frame duration__peak_prominence:float# Minimum peak prominence for peak detectiondef__init__(self,threshold:float=0.9,guard_ratio:float=0.8,peak_prominence:float=0.2,*args:Any,**kwargs:Any,)->None:""" Args: threshold (float, optional): Correlation threshold at which a pilot signal is detected. guard_ratio (float, optional): Guard ratio of frame duration. peak_prominence (float, optional): Minimum peak prominence for peak detection in the interval (0, 1]. :math:`0.2` is a good default value for most applications. *args: Synchronization base class initialization parameters. """self.threshold=thresholdself.guard_ratio=guard_ratioSynchronization.__init__(self,*args,**kwargs)@propertydefthreshold(self)->float:"""Correlation threshold at which a pilot signal is detected. Returns: float: Threshold between zero and one. Raises: ValueError: If threshold is smaller than zero or greater than one. """returnself.__threshold@threshold.setterdefthreshold(self,value:float):"""Set correlation threshold at which a pilot signal is detected."""ifvalue<0.0orvalue>1.0:raiseValueError("Synchronization threshold must be between zero and one.")self.__threshold=value@propertydefguard_ratio(self)->float:"""Correlation guard ratio at which a pilot signal is detected. After the detection of a pilot section, `guard_ratio` prevents the detection of another pilot in the following samples for a span relative to the configured frame duration. Returns: float: Guard Ratio between zero and one. Raises: ValueError: If guard ratio is smaller than zero or greater than one. """returnself.__guard_ratio@guard_ratio.setterdefguard_ratio(self,value:float):"""Set correlation guard ratio at which a pilot signal is detected."""ifvalue<0.0orvalue>1.0:raiseValueError("Synchronization guard ratio must be between zero and one.")self.__guard_ratio=value
[docs]defsynchronize(self,signal:np.ndarray)->List[int]:# Expand the dimensionality for flat signal streamsifsignal.ndim==1:signal=signal[np.newaxis,:]# Query the pilot signal from the waveform generatorpilot_sequence=self.waveform.pilot_signal.getitem().flatten()# Raise a runtime error if pilot sequence is emptyiflen(pilot_sequence)<1:raiseRuntimeError("No pilot sequence configured, time-domain correlation synchronization impossible")# Compute the correlation between each signal stream and the pilot sequence, sum up as a resultcorrelation=np.zeros(len(pilot_sequence)+signal.shape[1]-1,dtype=float)forstreaminsignal:correlation+=abs(correlate(stream,pilot_sequence,mode="full",method="fft"))correlation/=correlation.max()# Normalize correlation# Determine the pilot sequence locations by performing a peak search over the correlation profileframe_length=self.waveform.samples_per_framepilot_indices,_=find_peaks(abs(correlation),height=0.9,distance=int(0.8*frame_length))# Abort if no pilot section has been detectediflen(pilot_indices)<1:return[]# Correct pilot indices by the convolution lengthpilot_length=len(pilot_sequence)pilot_indices-=pilot_length-1# Correct infeasible pilot index choicespilot_indices=np.where(pilot_indices<0,0,pilot_indices)pilot_indices=np.where(pilot_indices>(signal.shape[1]-frame_length),abs(signal.shape[1]-frame_length),pilot_indices,)returnpilot_indices.tolist()