使用tensorflow的线性回归的例子(六)

发布于:2025-07-06 ⋅ 阅读:(15) ⋅ 点赞:(0)

波士顿房价

import matplotlib.pyplot as plt

%matplotlib inline

import tensorflow as tf

import numpy as np

from sklearn.datasets import load_boston

import sklearn.linear_model as sk

boston = load_boston()

features = np.array(boston.data)

labels = np.array(boston.target)

print(boston["DESCR"])

def normalize(dataset):

    mu = np.mean(dataset, axis = 0)

    sigma = np.std(dataset, axis = 0)

return (dataset-mu)/sigma

n_training_samples = features.shape[0]

n_dim = features.shape[1]

print('The dataset has',n_training_samples,'training samples.')

print('The dataset has',n_dim,'features.')

features_norm = normalize(features)

print(features_norm.shape)

print(labels.shape)

np.random.seed(42)

rnd = np.random.rand(len(features_norm)) < 0.8

train_x = tf.Variable(features_norm[rnd],dtype=tf.float32)

train_y = tf.Variable(labels[rnd],dtype=tf.float32)

test_x = tf.Variable(features_norm[~rnd],dtype=tf.float32)

test_y = tf.Variable(labels[~rnd],dtype=tf.float32)

print(train_x.shape)

print(train_y.shape)

print(test_x.shape)

print(test_y.shape)

train_x.numpy()

train_y.numpy()

test_x.numpy()

test_x.numpy()[1]

test_y.numpy()

# cost_history = np.empty(shape=[0], dtype = float)

# cost_history = np.append(cost_history, cost_)           

#设置超参数

learn_rate = 0.01  # 学习率

iter = 2000  # 迭代次数

display_step = 200  # 显示间隔

#设置模型变量初始值

np.random.seed(612)

W = tf.Variable(np.random.randn(13,1),dtype=tf.float32)

b = tf.Variable(tf.zeros(1),tf.float32)

mse_train = []  # 训练损失,训练误差

mse_test = []  # 测试损失,测试误差

for i in range(0 , iter+1):

   with tf.GradientTape() as tape:

       PRED_train = tf.matmul(train_x, W)+b

       Loss_train = 0.5 * tf.reduce_mean(tf.square(train_y - PRED_train))

       mse_train.append(Loss_train)

       

       PRED_test = tf.matmul(test_x, W)+b

       Loss_test = 0.5 * tf.reduce_mean(tf.square(test_y - PRED_test))

       mse_test.append(Loss_test)

       dL_dW,dL_db = tape.gradient(Loss_train,[W,b])

   W.assign_sub(learn_rate * dL_dW)

   b.assign_sub(learn_rate * dL_db)

   # 输出训练误差和测试误差

   if i % display_step == 0:

       print("i:%i,Train Loss: %f, Test Loss: %f" % (i, Loss_train, Loss_test))  

train_x

#开始训练,轮数为epoch,采用SGD随机梯度下降优化方法

W = tf.Variable(np.random.randn(13,1),dtype=tf.float32)

b = tf.Variable(tf.zeros(1),tf.float32)

train_epochs = 200

learn_rate = 0.01  # 学习率

iter = 200  # 迭代次数

display_step = 200  # 显示间隔

#设置模型变量初始值

np.random.seed(612)

mse_train = []  # 训练损失,训练误差

mse_test = []  # 测试损失,测试误差

#w = tf.Variable(np.random.randn(13,1),tf.float32)

#b = tf.Variable(tf.zeros(1),tf.float32)

batch_size=10

count = tf.Variable(0.0,tf.float32)

optimizer = tf.keras.optimizers.SGD(learn_rate)

total_step = int(train_x.shape[0]/batch_size)

for epoch in range(train_epochs):

        for step in range(total_step):

            xs= train_x.numpy()[step*batch_size:(step+1)*batch_size,:]

            ys= train_y.numpy()[step*batch_size:(step+1)*batch_size]

            #计算当前[w,b]的梯度

            #delta_w,delta_b = grad(xs,ys,w,b)

            with tf.GradientTape() as tape:

                PRED_train = tf.matmul(xs, W)+b

                Loss_train = 0.5 * tf.reduce_mean(tf.square(ys - PRED_train))

                mse_train.append(Loss_train)

                dL_dW,dL_db = tape.gradient(Loss_train,[W,b])

            W.assign_sub(learn_rate * dL_dW)

            b.assign_sub(learn_rate * dL_db)   

            #    grads = grad(xs,ys,w,b)

            #delta_w,delta_b = optimizer.apply_gradients(zip(grads,[w,b]))

            #change_w = delta_w * learning_rate

            #change_b = delta_b * learning_rate

            #w.assign_sub(change_w)

            #b.assign_sub(change_b)

        #计算损失,并保存本次损失计算结果

        #loss_ =loss_fun(xs,ys,w,b)

        #loss_ =loss_fun(xs,ys,w,b)

        #loss.append(loss_)

        #训练步数加1

        count = count +1

        if count % display_count == 0:

               print('train epoch : ','%02d'%(epoch+1),'step:%03d' % (count),'loss= ','{:.9f}'.format(Loss_train))

            #完成一轮训练后,画图

        #plt.plot(x,w.numpy() * x +b.numpy())   #plt.plot(x_data,w.numpy() * x_data +b.numpy())

W

plt.rc('font', family='arial')

plt.rc('xtick', labelsize='x-small')

plt.rc('ytick', labelsize='x-small')

plt.tight_layout()

fig = plt.figure(figsize=(10, 7))

ax = fig.add_subplot(1, 1, 1)

ax.plot(mse_train, ls='solid', color = 'black')

ax.set_xlabel('epochs', fontsize = 16)

ax.set_ylabel('Cost function $J$ (MSE)', fontsize = 16)

plt.xlim(0,10000)

plt.tick_params(labelsize=16)

b

#pred_y = sess.run(y_, feed_dict = {X: test_x, Y: test_y})

#mse = tf.reduce_mean(tf.square(pred_y - test_y))

pred_y = model(test_x,W,b)

pred_y

test_y

mse = tf.reduce_mean(tf.square(pred_y - test_y))

plt.rc('font', family='arial')

plt.rc('xtick', labelsize='x-small')

plt.rc('ytick', labelsize='x-small')

   

plt.tight_layout()

fig = plt.figure(figsize=(10, 7))

ax = fig.add_subplot(1, 1, 1)

ax.scatter(test_y, pred_y, lw = 5)

ax.plot([test_y.numpy().min(), test_y.numpy().max()], [test_y.numpy().min(), test_y.numpy().max()], 'k--', lw = 5)

ax.set_xlabel('Measured Target Value', fontsize = 16)

ax.set_ylabel('Predicted Target Value', fontsize = 16)

plt.tick_params(labelsize=16)

print ('Starting first model')

#cost_history1 = run_linear_model(learning_r = 0.1,

#                                 training_epochs = 10000,

#                                train_obs = train_x,

#                                train_labels = train_y,

#                                debug = True)

#开始训练,轮数为epoch,采用SGD随机梯度下降优化方法

W = tf.Variable(np.random.randn(13,1),dtype=tf.float32)

b = tf.Variable(tf.zeros(1),tf.float32)

train_epochs = 1000

learn_rate = 0.1  # 学习率

iter = 200  # 迭代次数

display_step = 200  # 显示间隔

#设置模型变量初始值

np.random.seed(612)

mse_train1 = []  # 训练损失,训练误差

mse_test = []  # 测试损失,测试误差

w = tf.Variable(np.random.randn(13,1),tf.float32)

b = tf.Variable(tf.zeros(1),tf.float32)

batch_size=10

count = tf.Variable(0.0,tf.float32)

optimizer = tf.keras.optimizers.SGD(learn_rate)

total_step = int(train_x.shape[0]/batch_size)

for epoch in range(train_epochs):

        for step in range(total_step):

            xs= train_x.numpy()[step*batch_size:(step+1)*batch_size,:]

            ys= train_y.numpy()[step*batch_size:(step+1)*batch_size]

            #计算当前[w,b]的梯度

            #delta_w,delta_b = grad(xs,ys,w,b)

            with tf.GradientTape() as tape:

                PRED_train = tf.matmul(xs, W)+b

                Loss_train = 0.5 * tf.reduce_mean(tf.square(ys - PRED_train))

                mse_train1.append(Loss_train)

                dL_dW,dL_db = tape.gradient(Loss_train,[W,b])

            W.assign_sub(learn_rate * dL_dW)

            b.assign_sub(learn_rate * dL_db)    

            #    grads = grad(xs,ys,w,b)

            #delta_w,delta_b = optimizer.apply_gradients(zip(grads,[w,b]))

            #change_w = delta_w * learning_rate

            #change_b = delta_b * learning_rate

            #w.assign_sub(change_w)

            #b.assign_sub(change_b)

        #计算损失,并保存本次损失计算结果

        #loss_ =loss_fun(xs,ys,w,b)

        #loss_ =loss_fun(xs,ys,w,b)

        #loss.append(loss_)

        #训练步数加1

        count = count +1

        if count % display_count == 0:

               print('train epoch : ','%02d'%(epoch+1),'step:%03d' % (count),'loss= ','{:.9f}'.format(Loss_train))

            #完成一轮训练后,画图

        #plt.plot(x,w.numpy() * x +b.numpy())   #plt.plot(x_data,w.numpy() * x_data +b.numpy())

  

print ('Starting second model')

#sess2, cost_history2 = run_linear_model(learning_r = 0.01,

#                                training_epochs = 10000,

#                                train_obs = train_x,

#                                train_labels = train_y,

# #开始训练,轮数为epoch,采用SGD随机梯度下降优化方法

W = tf.Variable(np.random.randn(13,1),dtype=tf.float32)

b = tf.Variable(tf.zeros(1),tf.float32)

train_epochs = 1000

learn_rate = 0.01  # 学习率

iter = 200  # 迭代次数

display_step = 200  # 显示间隔

#设置模型变量初始值

np.random.seed(612)

mse_train2 = []  # 训练损失,训练误差

mse_test = []  # 测试损失,测试误差

w = tf.Variable(np.random.randn(13,1),tf.float32)

b = tf.Variable(tf.zeros(1),tf.float32)

batch_size=10

count = tf.Variable(0.0,tf.float32)

optimizer = tf.keras.optimizers.SGD(learn_rate)

total_step = int(train_x.shape[0]/batch_size)

for epoch in range(train_epochs):

        for step in range(total_step):

            xs= train_x.numpy()[step*batch_size:(step+1)*batch_size,:]

            ys= train_y.numpy()[step*batch_size:(step+1)*batch_size]

            #计算当前[w,b]的梯度

            #delta_w,delta_b = grad(xs,ys,w,b)

            with tf.GradientTape() as tape:

                PRED_train = tf.matmul(xs, W)+b

                Loss_train = 0.5 * tf.reduce_mean(tf.square(ys - PRED_train))

                mse_train2.append(Loss_train)

                dL_dW,dL_db = tape.gradient(Loss_train,[W,b])

            W.assign_sub(learn_rate * dL_dW)

            b.assign_sub(learn_rate * dL_db)   

            #    grads = grad(xs,ys,w,b)

            #delta_w,delta_b = optimizer.apply_gradients(zip(grads,[w,b]))

            #change_w = delta_w * learning_rate

            #change_b = delta_b * learning_rate

            #w.assign_sub(change_w)

            #b.assign_sub(change_b)

        #计算损失,并保存本次损失计算结果

        #loss_ =loss_fun(xs,ys,w,b)

        #loss_ =loss_fun(xs,ys,w,b)

        #loss.append(loss_)

        #训练步数加1

        count = count +1

        if count % display_count == 0:

               print('train epoch : ','%02d'%(epoch+1),'step:%03d' % (count),'loss= ','{:.9f}'.format(Loss_train))

            #完成一轮训练后,画图

        #plt.plot(x,w.numpy() * x +b.numpy())   #plt.plot(x_data,w.numpy() * x_data +b.numpy())

                                

print ('Starting third model')

#sess3, cost_history3 = run_linear_model(learning_r = 0.001,

#                                training_epochs = 10000,

#                                train_obs = train_x,

#                                train_labels = train_y,

#                                debug = True)

#开始训练,轮数为epoch,采用SGD随机梯度下降优化方法

W = tf.Variable(np.random.randn(13,1),dtype=tf.float32)

b = tf.Variable(tf.zeros(1),tf.float32)

train_epochs = 1000

learn_rate = 0.001  # 学习率

iter = 200  # 迭代次数

display_step = 200  # 显示间隔

#设置模型变量初始值

np.random.seed(612)

mse_train3 = []  # 训练损失,训练误差

mse_test = []  # 测试损失,测试误差

w = tf.Variable(np.random.randn(13,1),tf.float32)

b = tf.Variable(tf.zeros(1),tf.float32)

batch_size=10

count = tf.Variable(0.0,tf.float32)

optimizer = tf.keras.optimizers.SGD(learn_rate)

total_step = int(train_x.shape[0]/batch_size)

for epoch in range(train_epochs):

        for step in range(total_step):

            xs= train_x.numpy()[step*batch_size:(step+1)*batch_size,:]

            ys= train_y.numpy()[step*batch_size:(step+1)*batch_size]

            #计算当前[w,b]的梯度

            #delta_w,delta_b = grad(xs,ys,w,b)

            with tf.GradientTape() as tape:

                PRED_train = tf.matmul(xs, W)+b

                Loss_train = 0.5 * tf.reduce_mean(tf.square(ys - PRED_train))

                mse_train3.append(Loss_train)

                dL_dW,dL_db = tape.gradient(Loss_train,[W,b])

            W.assign_sub(learn_rate * dL_dW)

            b.assign_sub(learn_rate * dL_db)   

            #    grads = grad(xs,ys,w,b)

            #delta_w,delta_b = optimizer.apply_gradients(zip(grads,[w,b]))

            #change_w = delta_w * learning_rate

            #change_b = delta_b * learning_rate

            #w.assign_sub(change_w)

            #b.assign_sub(change_b)

        #计算损失,并保存本次损失计算结果

        #loss_ =loss_fun(xs,ys,w,b)

        #loss_ =loss_fun(xs,ys,w,b)

        #loss.append(loss_)

        #训练步数加1

        count = count +1

        if count % display_count == 0:

               print('train epoch : ','%02d'%(epoch+1),'step:%03d' % (count),'loss= ','{:.9f}'.format(Loss_train))

            #完成一轮训练后,画图

        #plt.plot(x,w.numpy() * x +b.numpy())   #plt.plot(x_data,w.numpy() * x_data +b.numpy())

  

plt.rc('font', family='arial')

plt.rc('xtick', labelsize='x-small')

plt.rc('ytick', labelsize='x-small')

   

plt.tight_layout()

fig = plt.figure(figsize=(10, 7))

ax = fig.add_subplot(1, 1, 1)

ax.plot(mse_train1, ls='solid', color = 'black', label='$\gamma=0.1$')

ax.plot(mse_train2, ls='dashed', color = 'black', label='$\gamma=0.01$')

ax.plot(mse_train3, ls='dotted', color = 'black', label='$\gamma=0.001$')

ax.set_xlabel('epochs', fontsize = 16)

ax.set_ylabel('Cost function $J$ (MSE)', fontsize = 16)

plt.xlim(0,300)

plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0., fontsize = 16)

plt.tick_params(labelsize=16)

lm = sk.LinearRegression()

lm.fit(train_x, train_y)

msetest = np.mean((test_y-lm.predict(test_x))**2)

msetrain = np.mean((train_y-lm.predict(train_x))**2)

print('Train MSE=',msetrain)

print('Test MSE=',msetest)

model = tf.keras.Sequential()

"""添加层:其实就是 wx+b"""

model.add(tf.keras.layers.Dense(1,input_shape = (13,)))    #输出是1维数据,输入是14维数据

"""查看网络结构"""

model.summary()

"""编译,配置"""

model.compile(optimizer = 'adam',

              loss = 'mse',

              metrics=['mae','mse']

)

"""训练数据"""

history = model.fit(train_x.numpy(), train_y.numpy(), epochs = 500)

predict = model.predict(test_x.numpy())

predict