This commit is contained in:
François Boulogne 2024-07-23 10:56:29 +02:00
commit 49189c3f48
6 changed files with 353 additions and 0 deletions

1
mysignal/__init__.py Normal file
View file

@ -0,0 +1 @@
__version__ = '0.1.0'

143
mysignal/mysignal.py Normal file
View file

@ -0,0 +1,143 @@
import functools
import numpy as np
import pandas as pd
from scipy.fft import fft, fftfreq
from scipy.signal import find_peaks
from scipy.signal import correlate
import matplotlib.pyplot as plt
def normalize_signal(s):
"""
Set the mean at 0, and max at 1.
"""
s -= s.mean()
s /= s.max()
return s
def get_global_mode(modes):
"""
For a series of modes, compute the period of the resulting signal.
Ex (1, 2, 3) -> 6
"""
return functools.reduce(np.lcm, modes)
def sync_phase_signal_in(df_, modes, baseperiod):
df = df_.copy()
dt = np.mean(np.diff(df['time']))
N = len(df['time'])
t = np.linspace(0., N*dt, N, endpoint=False)
global_mode = get_global_mode(modes)
signal_ref = 0
for mode in modes:
signal_ref += np.sin(2 * np.pi * t / (mode * baseperiod))
correlation = correlate(df['RHset'], signal_ref, mode='full', method='direct')
corr_peaks = find_peaks(correlation, distance=.95 * baseperiod * global_mode)[0]
x_corr = np.arange(0, len(correlation), 1) - (N-4)
# Take the first positive peak
corr_idx = corr_peaks[corr_peaks>np.argwhere(x_corr>0)[0][0]][0]
idx = x_corr[corr_idx]
df = df[idx:]
df['time'] -= df['time'].iloc[0]
df = df.reset_index()
del df['index']
return df
def resample_df(df_, modes, baseperiod, id_t='time', id_in='RH', id_out='dm_m'):
t_origin = df_[id_t].to_numpy()
in_origin = df_[id_in].to_numpy()
out_origin = df_[id_out].to_numpy()
dt = np.mean(np.diff(t_origin))
signal_period = get_global_mode(modes) * baseperiod
alpha = np.floor(len(t_origin) * dt / signal_period)
print(alpha)
N = int(np.round(alpha * signal_period / dt))
print(f'N_max: {len(t_origin)}, N: {N}')
# reconstructed time (strictly evenly spaced)
t = np.linspace(0., N*dt, N, endpoint=False)
signal_in = in_origin[:N]
signal_out = out_origin[:N]
return t, dt, signal_in, signal_out
def find_prominent_frequencies(signal, sampling_rate, num_frequencies=1):
# Compute the FFT of the signal
fft_result = fft(signal)
# Compute the power spectrum
power_spectrum = np.abs(fft_result) ** 2
# Generate the frequency axis
freqs = np.fft.fftfreq(len(signal), d=1/sampling_rate)
# Only consider the positive frequencies
positive_freqs = freqs[:len(freqs) // 2]
positive_power_spectrum = power_spectrum[:len(power_spectrum) // 2]
# Find the indices of the peaks in the power spectrum
peaks, _ = find_peaks(positive_power_spectrum)
# Get the indices of the highest peaks
highest_peaks = np.argsort(positive_power_spectrum[peaks])[-num_frequencies:]
# Get the corresponding frequencies of the highest peaks
prominent_frequencies = positive_freqs[peaks][highest_peaks]
return prominent_frequencies
def get_IQ_coeff(signal, sampling_rate, frequencies, debug=False):
N = len(signal)
# if not hasattr(frequencies, '__iter__'):
# frequencies = (frequencies,)
# FFT
Yf = fft(signal)[:N//2]
xf = fftfreq(N, 1/sampling_rate)[:N//2]
# Get coefficients
Bcos_fft = []
Bsin_fft = []
if debug:
fig, ax = plt.subplots(nrows=2, figsize=(8, 6))
for f in frequencies:
idx = np.argmin(np.abs(xf - f))
print(f'distance xf f: {np.abs(xf[idx] - f)/f}')
#print('position freq:')
#print(idx, len(xf), idx / len(xf))
Y_f = Yf[idx]
Bcos_fft.append(2.0 * Y_f.real / N)
Bsin_fft.append(-2.0 * Y_f.imag / N)
if debug:
ax[0].plot(xf[idx-10:idx+10], Yf[idx-10:idx+10].real, '+-');
ax[0].plot(xf[idx], Yf[idx].real, 'o', label=f'f={f:.1e}');
ax[1].plot(xf[idx-10:idx+10], Yf[idx-10:idx+10].imag, 'x-');
ax[1].plot(xf[idx], Yf[idx].imag, 'o', label=f'f={f:.1e}');
if debug:
plt.tight_layout()
ax[0].set_title('cos')
ax[0].legend()
ax[1].set_title('sin')
ax[1].legend()
# Display
for i, f in enumerate(frequencies):
print(f"Freq {f:.2e} Hz, Period {1/f:.1f} s: A_I = {Bsin_fft[i]:.2e}, A_Q = {Bcos_fft[i]:.2e}, tan delta = {Bcos_fft[i]/Bsin_fft[i]:.2f}")
return np.array(Bcos_fft), np.array(Bsin_fft)