143 lines
4.2 KiB
Python
143 lines
4.2 KiB
Python
import functools
|
|
import numpy as np
|
|
import pandas as pd
|
|
from scipy.fft import fft, fftfreq
|
|
from scipy.signal import find_peaks
|
|
from scipy.signal import correlate
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
|
def normalize_signal(s):
|
|
"""
|
|
Set the mean at 0, and max at 1.
|
|
"""
|
|
s -= s.mean()
|
|
s /= s.max()
|
|
return s
|
|
|
|
|
|
def get_global_mode(modes):
|
|
"""
|
|
For a series of modes, compute the period of the resulting signal.
|
|
|
|
Ex (1, 2, 3) -> 6
|
|
"""
|
|
return functools.reduce(np.lcm, modes)
|
|
|
|
|
|
def sync_phase_signal_in(df_, modes, baseperiod):
|
|
df = df_.copy()
|
|
dt = np.mean(np.diff(df['time']))
|
|
N = len(df['time'])
|
|
t = np.linspace(0., N*dt, N, endpoint=False)
|
|
global_mode = get_global_mode(modes)
|
|
|
|
signal_ref = 0
|
|
for mode in modes:
|
|
signal_ref += np.sin(2 * np.pi * t / (mode * baseperiod))
|
|
correlation = correlate(df['RHset'], signal_ref, mode='full', method='direct')
|
|
corr_peaks = find_peaks(correlation, distance=.95 * baseperiod * global_mode)[0]
|
|
x_corr = np.arange(0, len(correlation), 1) - (N-4)
|
|
# Take the first positive peak
|
|
corr_idx = corr_peaks[corr_peaks>np.argwhere(x_corr>0)[0][0]][0]
|
|
idx = x_corr[corr_idx]
|
|
|
|
df = df[idx:]
|
|
df['time'] -= df['time'].iloc[0]
|
|
df = df.reset_index()
|
|
del df['index']
|
|
return df
|
|
|
|
|
|
def resample_df(df_, modes, baseperiod, id_t='time', id_in='RH', id_out='dm_m'):
|
|
|
|
t_origin = df_[id_t].to_numpy()
|
|
in_origin = df_[id_in].to_numpy()
|
|
out_origin = df_[id_out].to_numpy()
|
|
|
|
|
|
dt = np.mean(np.diff(t_origin))
|
|
|
|
|
|
signal_period = get_global_mode(modes) * baseperiod
|
|
alpha = np.floor(len(t_origin) * dt / signal_period)
|
|
print(alpha)
|
|
|
|
N = int(np.round(alpha * signal_period / dt))
|
|
print(f'N_max: {len(t_origin)}, N: {N}')
|
|
# reconstructed time (strictly evenly spaced)
|
|
t = np.linspace(0., N*dt, N, endpoint=False)
|
|
signal_in = in_origin[:N]
|
|
signal_out = out_origin[:N]
|
|
return t, dt, signal_in, signal_out
|
|
|
|
|
|
def find_prominent_frequencies(signal, sampling_rate, num_frequencies=1):
|
|
# Compute the FFT of the signal
|
|
fft_result = fft(signal)
|
|
|
|
# Compute the power spectrum
|
|
power_spectrum = np.abs(fft_result) ** 2
|
|
|
|
# Generate the frequency axis
|
|
freqs = np.fft.fftfreq(len(signal), d=1/sampling_rate)
|
|
|
|
# Only consider the positive frequencies
|
|
positive_freqs = freqs[:len(freqs) // 2]
|
|
positive_power_spectrum = power_spectrum[:len(power_spectrum) // 2]
|
|
|
|
# Find the indices of the peaks in the power spectrum
|
|
peaks, _ = find_peaks(positive_power_spectrum)
|
|
|
|
# Get the indices of the highest peaks
|
|
highest_peaks = np.argsort(positive_power_spectrum[peaks])[-num_frequencies:]
|
|
|
|
# Get the corresponding frequencies of the highest peaks
|
|
prominent_frequencies = positive_freqs[peaks][highest_peaks]
|
|
|
|
return prominent_frequencies
|
|
|
|
|
|
def get_IQ_coeff(signal, sampling_rate, frequencies, debug=False):
|
|
N = len(signal)
|
|
|
|
# if not hasattr(frequencies, '__iter__'):
|
|
# frequencies = (frequencies,)
|
|
|
|
# FFT
|
|
Yf = fft(signal)[:N//2]
|
|
xf = fftfreq(N, 1/sampling_rate)[:N//2]
|
|
|
|
# Get coefficients
|
|
Bcos_fft = []
|
|
Bsin_fft = []
|
|
|
|
if debug:
|
|
fig, ax = plt.subplots(nrows=2, figsize=(8, 6))
|
|
|
|
for f in frequencies:
|
|
idx = np.argmin(np.abs(xf - f))
|
|
print(f'distance xf f: {np.abs(xf[idx] - f)/f}')
|
|
#print('position freq:')
|
|
#print(idx, len(xf), idx / len(xf))
|
|
Y_f = Yf[idx]
|
|
Bcos_fft.append(2.0 * Y_f.real / N)
|
|
Bsin_fft.append(-2.0 * Y_f.imag / N)
|
|
|
|
if debug:
|
|
ax[0].plot(xf[idx-10:idx+10], Yf[idx-10:idx+10].real, '+-');
|
|
ax[0].plot(xf[idx], Yf[idx].real, 'o', label=f'f={f:.1e}');
|
|
ax[1].plot(xf[idx-10:idx+10], Yf[idx-10:idx+10].imag, 'x-');
|
|
ax[1].plot(xf[idx], Yf[idx].imag, 'o', label=f'f={f:.1e}');
|
|
|
|
if debug:
|
|
plt.tight_layout()
|
|
ax[0].set_title('cos')
|
|
ax[0].legend()
|
|
ax[1].set_title('sin')
|
|
ax[1].legend()
|
|
|
|
# Display
|
|
for i, f in enumerate(frequencies):
|
|
print(f"Freq {f:.2e} Hz, Period {1/f:.1f} s: A_I = {Bsin_fft[i]:.2e}, A_Q = {Bcos_fft[i]:.2e}, tan delta = {Bcos_fft[i]/Bsin_fft[i]:.2f}")
|
|
return np.array(Bcos_fft), np.array(Bsin_fft)
|