489 lines
19 KiB
Python
489 lines
19 KiB
Python
import ast
|
|
|
|
import tensorflow as tf
|
|
import random
|
|
import os
|
|
import numpy as np
|
|
from scipy.fft import fft, ifft
|
|
import soundfile as sf
|
|
import math
|
|
import pandas as pd
|
|
import scipy as sp
|
|
import glob
|
|
from tqdm import tqdm
|
|
|
|
#generator function. It reads the csv file with pandas and loads the largest audio segments from each recording. If extend=False, it will only read the segments with length>length_seg, trim them and yield them with no further processing. Otherwise, if the segment length is inferior, it will extend the length using concatenative synthesis.
|
|
def __noise_sample_generator(info_file,fs, length_seq, split):
|
|
head=os.path.split(info_file)[0]
|
|
load_data=pd.read_csv(info_file)
|
|
#split= train, validation, test
|
|
load_data_split=load_data.loc[load_data["split"]==split]
|
|
load_data_split=load_data_split.reset_index(drop=True)
|
|
while True:
|
|
r = list(range(len(load_data_split)))
|
|
if split!="test":
|
|
random.shuffle(r)
|
|
for i in r:
|
|
segments=ast.literal_eval(load_data_split.loc[i,"segments"])
|
|
if split=="test":
|
|
loaded_data, Fs=sf.read(os.path.join(head,load_data_split["recording"].loc[i],load_data_split["largest_segment"].loc[i]))
|
|
else:
|
|
num=np.random.randint(0,len(segments))
|
|
loaded_data, Fs=sf.read(os.path.join(head,load_data_split["recording"].loc[i],segments[num]))
|
|
assert(fs==Fs, "wrong sampling rate")
|
|
|
|
yield __extend_sample_by_repeating(loaded_data,fs,length_seq)
|
|
|
|
def __extend_sample_by_repeating(data, fs,seq_len):
|
|
rpm=78
|
|
target_samp=seq_len
|
|
large_data=np.zeros(shape=(target_samp,2))
|
|
|
|
if len(data)>=target_samp:
|
|
large_data=data[0:target_samp]
|
|
return large_data
|
|
|
|
bls=(1000*44100)/1000 #hardcoded
|
|
|
|
window=np.stack((np.hanning(bls) ,np.hanning(bls)), axis=1)
|
|
window_left=window[0:int(bls/2),:]
|
|
window_right=window[int(bls/2)::,:]
|
|
bls=int(bls/2)
|
|
|
|
rps=rpm/60
|
|
period=1/rps
|
|
|
|
period_sam=int(period*fs)
|
|
|
|
overhead=len(data)%period_sam
|
|
|
|
if(overhead>bls):
|
|
complete_periods=(len(data)//period_sam)*period_sam
|
|
else:
|
|
complete_periods=(len(data)//period_sam -1)*period_sam
|
|
|
|
|
|
a=np.multiply(data[0:bls], window_left)
|
|
b=np.multiply(data[complete_periods:complete_periods+bls], window_right)
|
|
c_1=np.concatenate((data[0:complete_periods,:],b))
|
|
c_2=np.concatenate((a,data[bls:complete_periods,:],b))
|
|
c_3=np.concatenate((a,data[bls::,:]))
|
|
|
|
large_data[0:complete_periods+bls,:]=c_1
|
|
|
|
|
|
pointer=complete_periods
|
|
not_finished=True
|
|
while (not_finished):
|
|
if target_samp>pointer+complete_periods+bls:
|
|
large_data[pointer:pointer+complete_periods+bls] +=c_2
|
|
pointer+=complete_periods
|
|
else:
|
|
large_data[pointer::]+=c_3[0:(target_samp-pointer)]
|
|
#finish
|
|
not_finished=False
|
|
|
|
return large_data
|
|
|
|
|
|
def generate_real_recordings_data(path_recordings, fs=44100, seg_len_s=15, stereo=False):
|
|
|
|
records_info=os.path.join(path_recordings,"audio_files.txt")
|
|
num_lines = sum(1 for line in open(records_info))
|
|
f = open(records_info,"r")
|
|
#load data record files
|
|
print("Loading record files")
|
|
records=[]
|
|
seg_len=fs*seg_len_s
|
|
pointer=int(fs*5) #starting at second 5 by default
|
|
for i in tqdm(range(num_lines)):
|
|
audio=f.readline()
|
|
audio=audio[:-1]
|
|
data, fs=sf.read(os.path.join(path_recordings,audio))
|
|
if len(data.shape)>1 and not(stereo):
|
|
data=np.mean(data,axis=1)
|
|
#elif stereo and len(data.shape)==1:
|
|
# data=np.stack((data, data), axis=1)
|
|
|
|
#normalize
|
|
data=data/np.max(np.abs(data))
|
|
segment=data[pointer:pointer+seg_len]
|
|
records.append(segment.astype("float32"))
|
|
|
|
return records
|
|
|
|
def generate_paired_data_test_formal(path_pianos, path_noises, noise_amount="low_snr",num_samples=-1, fs=44100, seg_len_s=5 , extend=True, stereo=False, prenoise=False):
|
|
|
|
print(num_samples)
|
|
segments_clean=[]
|
|
segments_noisy=[]
|
|
seg_len=fs*seg_len_s
|
|
noises_info=os.path.join(path_noises,"info.csv")
|
|
np.random.seed(42)
|
|
if noise_amount=="low_snr":
|
|
SNRs=np.random.uniform(2,6,num_samples)
|
|
elif noise_amount=="mid_snr":
|
|
SNRs=np.random.uniform(6,12,num_samples)
|
|
|
|
scales=np.random.uniform(-4,0,num_samples)
|
|
#SNRs=[2,6,12] #HARDCODED!!!!
|
|
i=0
|
|
print(path_pianos[0])
|
|
print(seg_len)
|
|
train_samples=glob.glob(os.path.join(path_pianos[0],"*.wav"))
|
|
train_samples=sorted(train_samples)
|
|
|
|
if prenoise:
|
|
noise_generator=__noise_sample_generator(noises_info,fs, seg_len+fs, extend, "test") #Adds 1s of silence add the begiing, longer noise
|
|
else:
|
|
noise_generator=__noise_sample_generator(noises_info,fs, seg_len, extend, "test") #this will take care of everything
|
|
#load data clean files
|
|
for file in tqdm(train_samples): #add [1:5] for testing
|
|
data_clean, samplerate = sf.read(file)
|
|
if samplerate!=fs:
|
|
print("!!!!WRONG SAMPLE RATe!!!")
|
|
#Stereo to mono
|
|
if len(data_clean.shape)>1 and not(stereo):
|
|
data_clean=np.mean(data_clean,axis=1)
|
|
#elif stereo and len(data_clean.shape)==1:
|
|
# data_clean=np.stack((data_clean, data_clean), axis=1)
|
|
#normalize
|
|
data_clean=data_clean/np.max(np.abs(data_clean))
|
|
#data_clean_loaded.append(data_clean)
|
|
|
|
#framify data clean files
|
|
|
|
#framify arguments: seg_len, hop_size
|
|
hop_size=int(seg_len)# no overlap
|
|
|
|
num_frames=np.floor(len(data_clean)/hop_size - seg_len/hop_size +1)
|
|
print(num_frames)
|
|
if num_frames==0:
|
|
data_clean=np.concatenate((data_clean, np.zeros(shape=(int(2*seg_len-len(data_clean)),))), axis=0)
|
|
num_frames=1
|
|
|
|
data_not_finished=True
|
|
pointer=0
|
|
while(data_not_finished):
|
|
if i>=num_samples:
|
|
break
|
|
segment=data_clean[pointer:pointer+seg_len]
|
|
pointer=pointer+hop_size
|
|
if pointer+seg_len>len(data_clean):
|
|
data_not_finished=False
|
|
segment=segment.astype('float32')
|
|
|
|
#SNRs=np.random.uniform(2,20)
|
|
snr=SNRs[i]
|
|
scale=scales[i]
|
|
#load noise signal
|
|
data_noise= next(noise_generator)
|
|
data_noise=np.mean(data_noise,axis=1)
|
|
#normalize
|
|
data_noise=data_noise/np.max(np.abs(data_noise))
|
|
new_noise=data_noise #if more processing needed, add here
|
|
#load clean data
|
|
#configure sizes
|
|
power_clean=np.var(segment)
|
|
#estimate noise power
|
|
if prenoise:
|
|
power_noise=np.var(new_noise[fs::])
|
|
else:
|
|
power_noise=np.var(new_noise)
|
|
|
|
snr = 10.0**(snr/10.0)
|
|
|
|
#sum both signals according to snr
|
|
if prenoise:
|
|
segment=np.concatenate((np.zeros(shape=(fs,)),segment),axis=0) #add one second of silence
|
|
summed=segment+np.sqrt(power_clean/(snr*power_noise))*new_noise #not sure if this is correct, maybe revisit later!!
|
|
|
|
summed=summed.astype('float32')
|
|
#yield tf.convert_to_tensor(summed), tf.convert_to_tensor(segment)
|
|
|
|
|
|
summed=10.0**(scale/10.0) *summed
|
|
segment=10.0**(scale/10.0) *segment
|
|
segments_noisy.append(summed.astype('float32'))
|
|
segments_clean.append(segment.astype('float32'))
|
|
i=i+1
|
|
|
|
return segments_noisy, segments_clean
|
|
|
|
def generate_test_data(path_music, path_noises,num_samples=-1, fs=44100, seg_len_s=5):
|
|
|
|
segments_clean=[]
|
|
segments_noisy=[]
|
|
seg_len=fs*seg_len_s
|
|
noises_info=os.path.join(path_noises,"info.csv")
|
|
SNRs=[2,6,12] #HARDCODED!!!!
|
|
for path in path_music:
|
|
print(path)
|
|
train_samples=glob.glob(os.path.join(path,"*.wav"))
|
|
train_samples=sorted(train_samples)
|
|
|
|
noise_generator=__noise_sample_generator(noises_info,fs, seg_len, "test") #this will take care of everything
|
|
#load data clean files
|
|
jj=0
|
|
for file in tqdm(train_samples): #add [1:5] for testing
|
|
data_clean, samplerate = sf.read(file)
|
|
if samplerate!=fs:
|
|
print("!!!!WRONG SAMPLE RATe!!!")
|
|
#Stereo to mono
|
|
if len(data_clean.shape)>1:
|
|
data_clean=np.mean(data_clean,axis=1)
|
|
#normalize
|
|
data_clean=data_clean/np.max(np.abs(data_clean))
|
|
#data_clean_loaded.append(data_clean)
|
|
|
|
#framify data clean files
|
|
|
|
#framify arguments: seg_len, hop_size
|
|
hop_size=int(seg_len)# no overlap
|
|
|
|
num_frames=np.floor(len(data_clean)/hop_size - seg_len/hop_size +1)
|
|
if num_frames==0:
|
|
data_clean=np.concatenate((data_clean, np.zeros(shape=(int(2*seg_len-len(data_clean)),))), axis=0)
|
|
num_frames=1
|
|
|
|
pointer=0
|
|
segment=data_clean[pointer:pointer+(seg_len-2*fs)]
|
|
segment=segment.astype('float32')
|
|
segment=np.concatenate(( np.zeros(shape=(2*fs,)), segment), axis=0) #I hope its ok
|
|
#segments_clean.append(segment)
|
|
|
|
for snr in SNRs:
|
|
#load noise signal
|
|
data_noise= next(noise_generator)
|
|
data_noise=np.mean(data_noise,axis=1)
|
|
#normalize
|
|
data_noise=data_noise/np.max(np.abs(data_noise))
|
|
new_noise=data_noise #if more processing needed, add here
|
|
#load clean data
|
|
#configure sizes
|
|
#estimate clean signal power
|
|
power_clean=np.var(segment)
|
|
#estimate noise power
|
|
power_noise=np.var(new_noise)
|
|
|
|
snr = 10.0**(snr/10.0)
|
|
|
|
#sum both signals according to snr
|
|
summed=segment+np.sqrt(power_clean/(snr*power_noise))*new_noise #not sure if this is correct, maybe revisit later!!
|
|
summed=summed.astype('float32')
|
|
#yield tf.convert_to_tensor(summed), tf.convert_to_tensor(segment)
|
|
|
|
segments_noisy.append(summed.astype('float32'))
|
|
segments_clean.append(segment.astype('float32'))
|
|
|
|
return segments_noisy, segments_clean
|
|
|
|
def generate_val_data(path_music, path_noises,split,num_samples=-1, fs=44100, seg_len_s=5):
|
|
|
|
val_samples=[]
|
|
for path in path_music:
|
|
val_samples.extend(glob.glob(os.path.join(path,"*.wav")))
|
|
|
|
#load data clean files
|
|
print("Loading clean files")
|
|
data_clean_loaded=[]
|
|
for ff in tqdm(range(0,len(val_samples))): #add [1:5] for testing
|
|
data_clean, samplerate = sf.read(val_samples[ff])
|
|
if samplerate!=fs:
|
|
print("!!!!WRONG SAMPLE RATe!!!")
|
|
#Stereo to mono
|
|
if len(data_clean.shape)>1 :
|
|
data_clean=np.mean(data_clean,axis=1)
|
|
#normalize
|
|
data_clean=data_clean/np.max(np.abs(data_clean))
|
|
data_clean_loaded.append(data_clean)
|
|
del data_clean
|
|
|
|
#framify data clean files
|
|
print("Framifying clean files")
|
|
seg_len=fs*seg_len_s
|
|
segments_clean=[]
|
|
for file in tqdm(data_clean_loaded):
|
|
|
|
#framify arguments: seg_len, hop_size
|
|
hop_size=int(seg_len)# no overlap
|
|
|
|
num_frames=np.floor(len(file)/hop_size - seg_len/hop_size +1)
|
|
pointer=0
|
|
for i in range(0,int(num_frames)):
|
|
segment=file[pointer:pointer+seg_len]
|
|
pointer=pointer+hop_size
|
|
segment=segment.astype('float32')
|
|
segments_clean.append(segment)
|
|
|
|
del data_clean_loaded
|
|
|
|
SNRs=np.random.uniform(2,20,len(segments_clean))
|
|
scales=np.random.uniform(-6,4,len(segments_clean))
|
|
#noise_shapes=np.random.randint(0,len(noise_samples), len(segments_clean))
|
|
noises_info=os.path.join(path_noises,"info.csv")
|
|
|
|
noise_generator=__noise_sample_generator(noises_info,fs, seg_len, split) #this will take care of everything
|
|
|
|
|
|
#generate noisy segments
|
|
#load noise samples using pandas dataframe. Each split (train, val, test) should have its unique csv info file
|
|
|
|
#noise_samples=glob.glob(os.path.join(path_noises,"*.wav"))
|
|
segments_noisy=[]
|
|
print("Processing noisy segments")
|
|
|
|
for i in tqdm(range(0,len(segments_clean))):
|
|
#load noise signal
|
|
data_noise= next(noise_generator)
|
|
#Stereo to mono
|
|
data_noise=np.mean(data_noise,axis=1)
|
|
#normalize
|
|
data_noise=data_noise/np.max(np.abs(data_noise))
|
|
new_noise=data_noise #if more processing needed, add here
|
|
#load clean data
|
|
data_clean=segments_clean[i]
|
|
#configure sizes
|
|
|
|
|
|
#estimate clean signal power
|
|
power_clean=np.var(data_clean)
|
|
#estimate noise power
|
|
power_noise=np.var(new_noise)
|
|
|
|
snr = 10.0**(SNRs[i]/10.0)
|
|
|
|
#sum both signals according to snr
|
|
summed=data_clean+np.sqrt(power_clean/(snr*power_noise))*new_noise #not sure if this is correct, maybe revisit later!!
|
|
#the rest is normal
|
|
|
|
summed=10.0**(scales[i]/10.0) *summed
|
|
segments_clean[i]=10.0**(scales[i]/10.0) *segments_clean[i]
|
|
|
|
segments_noisy.append(summed.astype('float32'))
|
|
|
|
return segments_noisy, segments_clean
|
|
|
|
|
|
|
|
def generator_train(path_music, path_noises,split, fs=44100, seg_len_s=5, extend=True, stereo=False):
|
|
|
|
train_samples=[]
|
|
for path in path_music:
|
|
train_samples.extend(glob.glob(os.path.join(path.decode("utf-8") ,"*.wav")))
|
|
|
|
seg_len=fs*seg_len_s
|
|
noises_info=os.path.join(path_noises.decode("utf-8"),"info.csv")
|
|
noise_generator=__noise_sample_generator(noises_info,fs, seg_len, split.decode("utf-8")) #this will take care of everything
|
|
#load data clean files
|
|
while True:
|
|
random.shuffle(train_samples)
|
|
for file in train_samples:
|
|
data, samplerate = sf.read(file)
|
|
assert(samplerate==fs, "wrong sampling rate")
|
|
data_clean=data
|
|
#Stereo to mono
|
|
if len(data.shape)>1 :
|
|
data_clean=np.mean(data_clean,axis=1)
|
|
|
|
#normalize
|
|
data_clean=data_clean/np.max(np.abs(data_clean))
|
|
|
|
#framify data clean files
|
|
|
|
#framify arguments: seg_len, hop_size
|
|
hop_size=int(seg_len)
|
|
|
|
num_frames=np.floor(len(data_clean)/seg_len)
|
|
if num_frames==0:
|
|
data_clean=np.concatenate((data_clean, np.zeros(shape=(int(2*seg_len-len(data_clean)),))), axis=0)
|
|
num_frames=1
|
|
pointer=0
|
|
data_clean=np.roll(data_clean, np.random.randint(0,seg_len)) #if only one frame, roll it for augmentation
|
|
elif num_frames>1:
|
|
pointer=np.random.randint(0,hop_size) #initial shifting, graeat for augmentation, better than overlap as we get different frames at each "while" iteration
|
|
else:
|
|
pointer=0
|
|
|
|
data_not_finished=True
|
|
while(data_not_finished):
|
|
segment=data_clean[pointer:pointer+seg_len]
|
|
pointer=pointer+hop_size
|
|
if pointer+seg_len>len(data_clean):
|
|
data_not_finished=False
|
|
segment=segment.astype('float32')
|
|
|
|
SNRs=np.random.uniform(2,20)
|
|
scale=np.random.uniform(-6,4)
|
|
|
|
|
|
#load noise signal
|
|
data_noise= next(noise_generator)
|
|
data_noise=np.mean(data_noise,axis=1)
|
|
#normalize
|
|
data_noise=data_noise/np.max(np.abs(data_noise))
|
|
new_noise=data_noise #if more processing needed, add here
|
|
#load clean data
|
|
#configure sizes
|
|
if stereo:
|
|
#estimate clean signal power
|
|
power_clean=0.5*np.var(segment[:,0])+0.5*np.var(segment[:,1])
|
|
#estimate noise power
|
|
power_noise=0.5*np.var(new_noise[:,0])+0.5*np.var(new_noise[:,1])
|
|
else:
|
|
#estimate clean signal power
|
|
power_clean=np.var(segment)
|
|
#estimate noise power
|
|
power_noise=np.var(new_noise)
|
|
|
|
snr = 10.0**(SNRs/10.0)
|
|
|
|
|
|
#sum both signals according to snr
|
|
summed=segment+np.sqrt(power_clean/(snr*power_noise))*new_noise #not sure if this is correct, maybe revisit later!!
|
|
summed=10.0**(scale/10.0) *summed
|
|
segment=10.0**(scale/10.0) *segment
|
|
|
|
summed=summed.astype('float32')
|
|
yield tf.convert_to_tensor(summed), tf.convert_to_tensor(segment)
|
|
|
|
def load_data(buffer_size, path_music_train, path_music_val, path_noises, fs=44100, seg_len_s=5, extend=True, stereo=False) :
|
|
print("Generating train dataset")
|
|
trainshape=int(fs*seg_len_s)
|
|
|
|
dataset_train = tf.data.Dataset.from_generator(generator_train,args=(path_music_train, path_noises,"train", fs, seg_len_s, extend, stereo), output_shapes=(tf.TensorShape((trainshape,)),tf.TensorShape((trainshape,))), output_types=(tf.float32, tf.float32) )
|
|
|
|
|
|
print("Generating validation dataset")
|
|
segments_noisy, segments_clean=generate_val_data(path_music_val, path_noises,"validation",fs=fs, seg_len_s=seg_len_s)
|
|
|
|
dataset_val=tf.data.Dataset.from_tensor_slices((segments_noisy, segments_clean))
|
|
|
|
return dataset_train.shuffle(buffer_size), dataset_val
|
|
|
|
def load_data_test(buffer_size, path_pianos_test, path_noises, **kwargs):
|
|
print("Generating test dataset")
|
|
segments_noisy, segments_clean=generate_test_data(path_pianos_test, path_noises, extend=True, **kwargs)
|
|
dataset_test=tf.data.Dataset.from_tensor_slices((segments_noisy, segments_clean))
|
|
#dataset_test=tf.data.Dataset.from_tensor_slices((segments_noisy[1:3], segments_clean[1:3]))
|
|
#train_dataset = train.cache().shuffle(buffer_size).take(info.splits["train"].num_examples)
|
|
return dataset_test
|
|
def load_data_formal( path_pianos_test, path_noises, **kwargs) :
|
|
print("Generating test dataset")
|
|
segments_noisy, segments_clean=generate_paired_data_test_formal(path_pianos_test, path_noises, extend=True, **kwargs)
|
|
print("segments::")
|
|
print(len(segments_noisy))
|
|
dataset_test=tf.data.Dataset.from_tensor_slices((segments_noisy, segments_clean))
|
|
#dataset_test=tf.data.Dataset.from_tensor_slices((segments_noisy[1:3], segments_clean[1:3]))
|
|
#train_dataset = train.cache().shuffle(buffer_size).take(info.splits["train"].num_examples)
|
|
return dataset_test
|
|
|
|
def load_real_test_recordings(buffer_size, path_recordings, **kwargs):
|
|
print("Generating real test dataset")
|
|
|
|
segments_noisy=generate_real_recordings_data(path_recordings, **kwargs)
|
|
|
|
dataset_test=tf.data.Dataset.from_tensor_slices(segments_noisy)
|
|
#train_dataset = train.cache().shuffle(buffer_size).take(info.splits["train"].num_examples)
|
|
return dataset_test
|