from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
import numpy as np
import scipy.linalg
from sklearn.preprocessing import LabelEncoder, StandardScaler
import optuna
import scipy.linalg
from sklearn.linear_model import BayesianRidge
import pandas as pd
from sklearn.model_selection import LeaveOneOut, cross_val_score
classEmscScaler(object):def__init__(self, order=1):
self.order = order
self._mx =Nonedefmlr(self, x, y):"""Multiple linear regression fit of the columns of matrix x
(dependent variables) to constituent vector y (independent variables)
order - order of a smoothing polynomial, which can be included
in the set of independent variables. If order is
not specified, no background will be included.
b - fit coeffs
f - fit result (m x 1 column vector)
r - residual (m x 1 column vector)
"""if self.order >0:
s = np.ones((len(y),1))for j inrange(self.order):
s = np.concatenate((s,(np.arange(0,1+(1.0/(len(y)-1)),1.0/(len(y)-1))** j).reshape(-1,1)[0:len(y)]),1)
X = np.concatenate((x.reshape(-1,1), s),1)else:
X = x
# calc fit b=fit coefficients
b = np.dot(np.dot(scipy.linalg.pinv(np.dot(X.T, X)), X.T), y)
f = np.dot(X, b)
r = y - f
return b, f, r
deffit(self, X, y=None):"""fit to X (get average spectrum), y is a passthrough for pipeline compatibility"""
self._mx = np.mean(X, axis=0)deftransform(self, X, y=None, copy=None):iftype(self._mx)==type(None):print("EMSC not fit yet. run .fit method on reference spectra")else:# do fitting
corr = np.zeros(X.shape)for i inrange(len(X)):
b, f, r = self.mlr(self._mx, X[i,:])
corr[i,:]= np.reshape((r / b[0])+ self._mx,(corr.shape[1],))return corr
deffit_transform(self, X, y=None):
self.fit(X)return self.transform(X)from sklearn.base import BaseEstimator, TransformerMixin
classSpectraPreprocessor(BaseEstimator, TransformerMixin):def__init__(self, emsc_order=3,X_ref=None):
self.emsc_order = emsc_order
self.emsc_scalers =[EmscScaler(order=emsc_order)for _ inrange(4)]
self.X_ref = X_ref
deffit(self, X, y=None):
X_ref = self.X_ref
if X_ref isNone:
X_ref = X.copy()# Define the column ranges for each segment
ranges =[(0,251),(281,482),(482,683),(683,854)]# Fit EmscScaler for each segmentfor i,(start, end)inenumerate(ranges):
self.emsc_scalers[i].fit(X_ref[:, start:end])return self
deftransform(self, X, y=None):# Define the column ranges for each segment
ranges =[(0,251),(281,482),(482,683),(683,854)]# Transform each segment
transformed_segments =[]for i,(start, end)inenumerate(ranges):
segment = X[:, start:end]
transformed_segment = self.emsc_scalers[i].transform(segment)
transformed_segments.append(transformed_segment)# Concatenate all transformed segmentsreturn np.concatenate(transformed_segments, axis=1)deffit_transform(self, X, y=None):
self.fit(X)return self.transform(X)defbayesian_ridge_optuna_for_emsc_data(x_train, y_train, pipeline_):defobjective(trial):try:
alpha_1 = trial.suggest_float('alpha_1',0.001,1, log=True)
alpha_2 = trial.suggest_float('alpha_2',0.001,1, log=True)
lambda_1 = trial.suggest_float('lambda_1',0.001,1, log=True)
lambda_2 = trial.suggest_float('lambda_2',0.001,1, log=True)
model = pipeline_.set_params(
bayesian_ridge__alpha_1=alpha_1,
bayesian_ridge__alpha_2=alpha_2,
bayesian_ridge__lambda_1=lambda_1,
bayesian_ridge__lambda_2=lambda_2
)
model.fit(x_train, y_train)
score = cross_val_score(model, x_train, y_train, cv=10, n_jobs=-1, scoring='r2')return np.mean(score)except ValueError as e:return-np.inf
optuna.logging.set_verbosity(optuna.logging.WARNING)
pruner = optuna.pruners.MedianPruner()
study = optuna.create_study(direction="maximize", pruner=pruner)
study.optimize(objective, n_trials=500, show_progress_bar=True, n_jobs=1)return study.best_params
defgetdata(filenamex, filenamey):
x = pd.read_csv(filenamex, header=None)
y = pd.read_csv(filenamey)
data = pd.concat([x, y], axis=1)return data
name ='test'
x, y = np.random.rand(100,884), np.random.rand(100)
x_ref = np.random.rand(30,884)
pipeline = Pipeline([('preprocessor', SpectraPreprocessor(emsc_order=3, X_ref=None)),('scaler', StandardScaler()),('bayesian_ridge', BayesianRidge())])
pipeline.set_params(preprocessor__X_ref=x_ref)############################################################################################################################################################
best_params = bayesian_ridge_optuna_for_emsc_data(x, y, pipeline)############################################################################################################################################################
pipeline.set_params(
bayesian_ridge__alpha_1=best_params['alpha_1'],
bayesian_ridge__alpha_2=best_params['alpha_2'],
bayesian_ridge__lambda_1=best_params['lambda_1'],
bayesian_ridge__lambda_2=best_params['lambda_2'])
pipeline.fit(x, y)
y_pred = pipeline.predict(x)print(y_pred)