diff --git a/mysignal/__init__.py b/mysignal/__init__.py index b794fd4..df9144c 100644 --- a/mysignal/__init__.py +++ b/mysignal/__init__.py @@ -1 +1 @@ -__version__ = '0.1.0' +__version__ = '0.1.1' diff --git a/mysignal/phasefreq.py b/mysignal/phasefreq.py new file mode 100644 index 0000000..b9f8869 --- /dev/null +++ b/mysignal/phasefreq.py @@ -0,0 +1,504 @@ +import warnings +import numpy as np +import pandas as pd +import pywt + +from scipy.fft import fft, fftfreq +from scipy.signal import find_peaks, hilbert, butter, filtfilt, correlate +from scipy.stats import linregress +from scipy.optimize import curve_fit + + +def filter_signal_by_modes(time, signal, num_modes=1, bandwidth_factor=0.1): + """ + Filtre un signal pour extraire ses composantes fréquentielles dominantes. + + Parameters: + time (array-like): Array contenant les données temporelles. + signal (array-like): Array contenant les données du signal. + num_modes (int): Nombre de modes fréquentiels à extraire. + + Returns: + tuple: (filtered_signals, frequencies, time_filtered) + filtered_signals: Liste des signaux filtrés pour chaque mode + frequencies: Fréquences correspondantes à chaque mode + time_filtered: Temps après filtrage (même que l'entrée) + """ + # Convertir en arrays numpy + time = np.array(time) + signal = np.array(signal) + + # Calculer la période d'échantillonnage et la fréquence + dt = np.mean(np.diff(time)) + fs = 1 / dt + + # Supprimer la composante DC + signal_clean = signal - np.mean(signal) + + # Calculer la FFT + n = len(time) + fft_signal = fft(signal_clean) + freqs = fftfreq(n, dt) + + # Prendre seulement les fréquences positives + positive_mask = freqs > 0 + freqs_pos = freqs[positive_mask] + fft_pos = fft_signal[positive_mask] + + # Trouver les pics dans le spectre + magnitude = np.abs(fft_pos) + peaks, _ = find_peaks(magnitude, height=0.1*np.max(magnitude)) + peak_freqs = freqs_pos[peaks] + peak_mags = magnitude[peaks] + + # Trier les pics par magnitude et sélectionner les num_modes plus importants + idx = np.argsort(peak_mags)[-num_modes:] + dominant_freqs = peak_freqs[idx] + + # Trier les fréquences par ordre croissant + dominant_freqs.sort() + + # Initialiser la liste des signaux filtrés + filtered_signals = [] + + # Pour chaque fréquence dominante, appliquer un filtre passe-bande + for freq in dominant_freqs: + # Définir la bande passante (10% de la fréquence centrale de chaque côté) + bandwidth = bandwidth_factor * freq + lowcut = freq - bandwidth + highcut = freq + bandwidth + + # Éviter les fréquences négatives ou supérieures à la fréquence de Nyquist + nyq = 0.5 * fs + if lowcut <= 0: + lowcut = 0.01 + if highcut >= nyq: + highcut = nyq - 0.01 + + # Créer le filtre Butterworth + low = lowcut / nyq + high = highcut / nyq + b, a = butter(4, [low, high], btype='band') + + # Appliquer le filtre (filtfilt pour éviter le déphasage) + filtered_signal = filtfilt(b, a, signal_clean) + filtered_signals.append(filtered_signal) + + # Le temps reste le même après filtrage + time_filtered = time + + return filtered_signals, dominant_freqs, time_filtered + + +def analyze_signal_Hilbert(time, e_signal, s_signal): + """ + Analyzes the dominant frequencies, phase shift, time shift, and periods between two signals e_signal(t) and s_signal(t) from a DataFrame. + Uses Hilbert transform method for phase shift calculation (single mode only). + """ + # Extract signals and time + #time = df[time_column].values + #e_signal = df[e_signal_column].values + #s_signal = df[s_signal_column].values + + # Calculate sampling period and rate + dt = np.mean(np.diff(time)) + fs = 1 / dt + + # Remove mean to eliminate DC component + e_signal_clean = e_signal - np.mean(e_signal) + s_signal_clean = s_signal - np.mean(s_signal) + + # Compute FFT + n = len(time) + e_fft = fft(e_signal_clean) + s_fft = fft(s_signal_clean) + freqs = fftfreq(n, dt) + + # Get positive frequencies only + positive_mask = freqs > 0 + freqs_pos = freqs[positive_mask] + e_fft_pos = e_fft[positive_mask] + s_fft_pos = s_fft[positive_mask] + + # Find dominant frequencies using peak detection + e_magnitude = np.abs(e_fft_pos) + s_magnitude = np.abs(s_fft_pos) + + # Find peaks for e signal + e_peaks, _ = find_peaks(e_magnitude, height=0.1*np.max(e_magnitude)) + e_peak_freqs = freqs_pos[e_peaks] + e_peak_mags = e_magnitude[e_peaks] + + # Find peaks for s signal + s_peaks, _ = find_peaks(s_magnitude, height=0.1*np.max(s_magnitude)) + s_peak_freqs = freqs_pos[s_peaks] + s_peak_mags = s_magnitude[s_peaks] + + # Sort peaks by magnitude and select the top one + e_idx = np.argsort(e_peak_mags)[-1] + s_idx = np.argsort(s_peak_mags)[-1] + + freq_e = e_peak_freqs[e_idx] + freq_s = s_peak_freqs[s_idx] + + # Check if frequencies match within 1% + if np.abs(freq_e - freq_s) / ((freq_e + freq_s)/2) > 0.01: + raise ValueError("Frequency difference exceeds 1%. Phase shift cannot be determined.") + + # Calculate mean frequency for phase shift calculation + mean_freq = (freq_e + freq_s) / 2 + + # Create a bandpass filter around the mean frequency + nyq = 0.5 * fs + low = (mean_freq - 0.1 * mean_freq) / nyq + high = (mean_freq + 0.1 * mean_freq) / nyq + + if low <= 0: + low = 0.01 + if high >= 1: + high = 0.99 + + b, a = butter(4, [low, high], btype='band') + + # Filter both signals + e_filtered = filtfilt(b, a, e_signal_clean) + s_filtered = filtfilt(b, a, s_signal_clean) + + # Apply Hilbert transform to filtered signals + e_analytic_filtered = hilbert(e_filtered) + s_analytic_filtered = hilbert(s_filtered) + + # Extract phase + e_phase_filtered = np.unwrap(np.angle(e_analytic_filtered)) + s_phase_filtered = np.unwrap(np.angle(s_analytic_filtered)) + + # Calculate phase difference + phase_diff = s_phase_filtered - e_phase_filtered + + # Average phase difference over the stable part of the signal + n_stable = len(phase_diff) // 4 + stable_phase_diff = phase_diff[n_stable:-n_stable] + + # Calculate mean phase difference + mean_phase_diff = np.mean(stable_phase_diff) + + # Normalize to [-π, π] + mean_phase_diff = np.arctan2(np.sin(mean_phase_diff), np.cos(mean_phase_diff)) + + # Convert to degrees + phase_diff_deg = np.degrees(mean_phase_diff) + + # Calculate time shift + time_shift = mean_phase_diff / (2 * np.pi * mean_freq) + + # Calculate periods + period_e = 1 / freq_e + period_s = 1 / freq_s + + print(f'phase = {mean_phase_diff}') + return (period_e, period_s, freq_e, freq_s, + mean_phase_diff, phase_diff_deg, time_shift) + +def analyze_signal_sinfit(time, e_signal, s_signal): + """ + Analyzes the dominant frequencies, phase shift, time shift, and periods between two signals e_signal(t) and s_signal(t) from a DataFrame. + Uses curve fitting method for phase shift calculation (single mode only). + """ + # Extract signals and time + #time = df[time_column].values + #e_signal = df[e_signal_column].values + #s_signal = df[s_signal_column].values + + # Calculate sampling period and rate + dt = np.mean(np.diff(time)) + fs = 1 / dt + + # Remove mean to eliminate DC component + e_signal_clean = e_signal - np.mean(e_signal) + s_signal_clean = s_signal - np.mean(s_signal) + + # Compute FFT + n = len(time) + e_fft = fft(e_signal_clean) + s_fft = fft(s_signal_clean) + freqs = fftfreq(n, dt) + + # Get positive frequencies only + positive_mask = freqs > 0 + freqs_pos = freqs[positive_mask] + e_fft_pos = e_fft[positive_mask] + s_fft_pos = s_fft[positive_mask] + + # Find dominant frequencies using peak detection + e_magnitude = np.abs(e_fft_pos) + s_magnitude = np.abs(s_fft_pos) + + # Find peaks for e signal + e_peaks, _ = find_peaks(e_magnitude, height=0.1*np.max(e_magnitude)) + e_peak_freqs = freqs_pos[e_peaks] + e_peak_mags = e_magnitude[e_peaks] + + # Find peaks for s signal + s_peaks, _ = find_peaks(s_magnitude, height=0.1*np.max(s_magnitude)) + s_peak_freqs = freqs_pos[s_peaks] + s_peak_mags = s_magnitude[s_peaks] + + # Sort peaks by magnitude and select the top one + e_idx = np.argsort(e_peak_mags)[-1] + s_idx = np.argsort(s_peak_mags)[-1] + + freq_e = e_peak_freqs[e_idx] + freq_s = s_peak_freqs[s_idx] + + # Check if frequencies match within 1% + if np.abs(freq_e - freq_s) / ((freq_e + freq_s)/2) > 0.01: + raise ValueError("Frequency difference exceeds 1%. Phase shift cannot be determined.") + + # Calculate mean frequency for phase shift calculation + mean_freq = (freq_e + freq_s) / 2 + + # Define sine function for fitting + def sine_func(t, A, phi, offset): + return A * np.sin(2 * np.pi * mean_freq * t + phi) + offset + + # Fit sine wave to e signal + try: + popt_e, _ = curve_fit(sine_func, time, e_signal, + p0=[np.std(e_signal), 0, np.mean(e_signal)], + bounds=([0, -np.pi, -np.inf], [np.inf, np.pi, np.inf])) + except: + popt_e = [np.std(e_signal), 0, np.mean(e_signal)] + + # Fit sine wave to s signal + try: + popt_s, _ = curve_fit(sine_func, time, s_signal, + p0=[np.std(s_signal), 0, np.mean(s_signal)], + bounds=([0, -np.pi, -np.inf], [np.inf, np.pi, np.inf])) + except: + popt_s = [np.std(s_signal), 0, np.mean(s_signal)] + + # Calculate phase difference + phase_diff = popt_s[1] - popt_e[1] + + # Normalize to [-π, π] + phase_diff = np.arctan2(np.sin(phase_diff), np.cos(phase_diff)) + + # Convert to degrees + phase_diff_deg = np.degrees(phase_diff) + + # Calculate time shift + time_shift = phase_diff / (2 * np.pi * mean_freq) + + # Calculate periods + period_e = 1 / freq_e + period_s = 1 / freq_s + + print(f'phase = {phase_diff}') + return (period_e, period_s, freq_e, freq_s, + phase_diff, phase_diff_deg, time_shift) + + + +def analyze_signal_cross_correlation(time, e_signal, s_signal): + """ + Analyzes signals using cross-correlation method for phase shift calculation (single mode only). + """ + # Extract signals and time + #time = df[time_column].values + #e_signal = df[e_signal_column].values + #s_signal = df[s_signal_column].values + + # Calculate sampling period and rate + dt = np.mean(np.diff(time)) + fs = 1 / dt + + # Remove mean to eliminate DC component + e_signal_clean = e_signal - np.mean(e_signal) + s_signal_clean = s_signal - np.mean(s_signal) + + # Compute FFT + n = len(time) + e_fft = fft(e_signal_clean) + s_fft = fft(s_signal_clean) + freqs = fftfreq(n, dt) + + # Get positive frequencies only + positive_mask = freqs > 0 + freqs_pos = freqs[positive_mask] + e_fft_pos = e_fft[positive_mask] + s_fft_pos = s_fft[positive_mask] + + # Find dominant frequencies using peak detection + e_magnitude = np.abs(e_fft_pos) + s_magnitude = np.abs(s_fft_pos) + + # Find peaks for e signal + e_peaks, _ = find_peaks(e_magnitude, height=0.1*np.max(e_magnitude)) + e_peak_freqs = freqs_pos[e_peaks] + e_peak_mags = e_magnitude[e_peaks] + + # Find peaks for s signal + s_peaks, _ = find_peaks(s_magnitude, height=0.1*np.max(s_magnitude)) + s_peak_freqs = freqs_pos[s_peaks] + s_peak_mags = s_magnitude[s_peaks] + + # Sort peaks by magnitude and select the top one + e_idx = np.argsort(e_peak_mags)[-1] + s_idx = np.argsort(s_peak_mags)[-1] + + freq_e = e_peak_freqs[e_idx] + freq_s = s_peak_freqs[s_idx] + + # Check if frequencies match within 1% + if np.abs(freq_e - freq_s) / ((freq_e + freq_s)/2) > 0.01: + raise ValueError("Frequency difference exceeds 1%. Phase shift cannot be determined.") + + # Calculate mean frequency for phase shift calculation + mean_freq = (freq_e + freq_s) / 2 + + # Create a bandpass filter around the mean frequency + nyq = 0.5 * fs + low = (mean_freq - 0.1 * mean_freq) / nyq + high = (mean_freq + 0.1 * mean_freq) / nyq + + if low <= 0: + low = 0.01 + if high >= 1: + high = 0.99 + + b, a = butter(4, [low, high], btype='band') + + # Filter both signals + e_filtered = filtfilt(b, a, e_signal_clean) + s_filtered = filtfilt(b, a, s_signal_clean) + + # Compute cross-correlation + correlation = correlate(s_filtered, e_filtered, mode='full') + lags = np.arange(-len(e_filtered) + 1, len(e_filtered)) + + # Find the lag with maximum correlation + max_lag = lags[np.argmax(correlation)] + + # Calculate time shift + time_shift = max_lag * dt + + # Calculate phase shift + phase_diff = 2 * np.pi * mean_freq * time_shift + + # Normalize to [-π, π] + phase_diff = np.arctan2(np.sin(phase_diff), np.cos(phase_diff)) + + # Convert to degrees + phase_diff_deg = np.degrees(phase_diff) + + # Calculate periods + period_e = 1 / freq_e + period_s = 1 / freq_s + + print(f'phase = {phase_diff}') + return (period_e, period_s, freq_e, freq_s, + phase_diff, phase_diff_deg, time_shift) + + +def analyze_signal_wavelet(time, e_signal, s_signal): + """ + Analyzes signals using wavelet transform for phase shift calculation (single mode only). + """ + # Extract signals and time + #time = df[time_column].values + #e_signal = df[e_signal_column].values + #s_signal = df[s_signal_column].values + + # Calculate sampling period and rate + dt = np.mean(np.diff(time)) + fs = 1 / dt + + # Remove mean to eliminate DC component + e_signal_clean = e_signal - np.mean(e_signal) + s_signal_clean = s_signal - np.mean(s_signal) + + # Compute FFT + n = len(time) + e_fft = fft(e_signal_clean) + s_fft = fft(s_signal_clean) + freqs = fftfreq(n, dt) + + # Get positive frequencies only + positive_mask = freqs > 0 + freqs_pos = freqs[positive_mask] + e_fft_pos = e_fft[positive_mask] + s_fft_pos = s_fft[positive_mask] + + # Find dominant frequencies using peak detection + e_magnitude = np.abs(e_fft_pos) + s_magnitude = np.abs(s_fft_pos) + + # Find peaks for e signal + e_peaks, _ = find_peaks(e_magnitude, height=0.1*np.max(e_magnitude)) + e_peak_freqs = freqs_pos[e_peaks] + e_peak_mags = e_magnitude[e_peaks] + + # Find peaks for s signal + s_peaks, _ = find_peaks(s_magnitude, height=0.1*np.max(s_magnitude)) + s_peak_freqs = freqs_pos[s_peaks] + s_peak_mags = s_magnitude[s_peaks] + + # Sort peaks by magnitude and select the top one + e_idx = np.argsort(e_peak_mags)[-1] + s_idx = np.argsort(s_peak_mags)[-1] + + freq_e = e_peak_freqs[e_idx] + freq_s = s_peak_freqs[s_idx] + + # Check if frequencies match within 1% + if np.abs(freq_e - freq_s) / ((freq_e + freq_s)/2) > 0.01: + raise ValueError("Frequency difference exceeds 1%. Phase shift cannot be determined.") + + # Calculate mean frequency for phase shift calculation + mean_freq = (freq_e + freq_s) / 2 + + # Perform continuous wavelet transform + scales = np.arange(1, 128) + coefficients_e, frequencies_e = pywt.cwt(e_signal_clean, scales, 'morl', sampling_period=dt) + coefficients_s, frequencies_s = pywt.cwt(s_signal_clean, scales, 'morl', sampling_period=dt) + + # Find the scale index closest to the mean frequency + scale_idx = np.argmin(np.abs(frequencies_e - mean_freq)) + + # Extract the wavelet coefficients at this scale + coeffs_e = coefficients_e[scale_idx, :] + coeffs_s = coefficients_s[scale_idx, :] + + # Calculate the instantaneous phase using the Hilbert transform + analytic_e = hilbert(coeffs_e) + analytic_s = hilbert(coeffs_s) + + phase_e = np.unwrap(np.angle(analytic_e)) + phase_s = np.unwrap(np.angle(analytic_s)) + + # Calculate phase difference + phase_diff = phase_s - phase_e + + # Average phase difference over the stable part of the signal + n_stable = len(phase_diff) // 4 + stable_phase_diff = phase_diff[n_stable:-n_stable] + + # Calculate mean phase difference + mean_phase_diff = np.mean(stable_phase_diff) + + # Normalize to [-π, π] + mean_phase_diff = np.arctan2(np.sin(mean_phase_diff), np.cos(mean_phase_diff)) + + # Convert to degrees + phase_diff_deg = np.degrees(mean_phase_diff) + + # Calculate time shift + time_shift = mean_phase_diff / (2 * np.pi * mean_freq) + + # Calculate periods + period_e = 1 / freq_e + period_s = 1 / freq_s + + print(f'phase = {mean_phase_diff}') + return (period_e, period_s, freq_e, freq_s, + mean_phase_diff, phase_diff_deg, time_shift) diff --git a/requirements.txt b/requirements.txt index 475fbd5..fc49bb6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ numpy scipy pandas +PyWavelets