Linear Regressoin 线性回归
Theory
Given Data 已有数据
- 用于训练的数据
matrix indicate 使用矩阵表示
Hypothesis Function 期望函数
vector indicate
Cost Function 损失函数
Minimize Cost 使损失最小化
Minimize cost function J(w,b) ,deravetive the J(w,b)
Gradient Descet 梯度下降
Solution
开发流程
收集数据: 采用任意方法收集数据
准备数据: 回归需要数值型数据,标称型数据将被转换成二值型数据
分析数据: 绘出数据的可视化二维图将有助于对数据做出理解和分析,在采用缩减法求得新回归系数之后,可以将新拟合线绘在图上作为对比
训练算法: 找到回归系数
测试算法: 使用 R^2 或者预测值和数据的拟合度,来分析模型的效果
使用算法: 使用回归,可以在给定输入的时候预测出一个数值,这是对分类方法的提升,因为这样可以预测连续型数据而不仅仅是离散的类别标签
算法特点
优点:结果易于理解,计算上不复杂。
缺点:对非线性的数据拟合不好。
适用于数据类型:数值型和标称型数据。
拟合直线
找出该数据的最佳拟合直线。
- 数据格式为:
x0 x1 y
1.000000 0.067732 3.176513
1.000000 0.427810 3.816464
1.000000 0.995731 4.550095
1.000000 0.738336 4.256571
- 线性回归 编写代码
def loadDataSet(fileName):
""" 加载数据
解析以tab键分隔的文件中的浮点数
Returns:
dataMat : feature 对应的数据集
labelMat : feature 对应的分类标签,即类别标签
"""
# 获取样本特征的总数,不算最后的目标变量
numFeat = len(open(fileName).readline().split('\t')) - 1
dataMat = []
labelMat = []
fr = open(fileName)
for line in fr.readlines():
# 读取每一行
lineArr =[]
# 删除一行中以tab分隔的数据前后的空白符号
curLine = line.strip().split('\t')
# i 从0到2,不包括2
for i in range(numFeat):
# 将数据添加到lineArr List中,每一行数据测试数据组成一个行向量
lineArr.append(float(curLine[i]))
# 将测试数据的输入数据部分存储到dataMat 的List中
dataMat.append(lineArr)
# 将每一行的最后一个数据,即类别,或者叫目标变量存储到labelMat List中
labelMat.append(float(curLine[-1]))
return dataMat,labelMat
def standRegres(xArr,yArr):
'''
Description:
线性回归
Args:
xArr :输入的样本数据,包含每个样本数据的 feature
yArr :对应于输入数据的类别标签,也就是每个样本对应的目标变量
Returns:
ws:回归系数
'''
# mat()函数将xArr,yArr转换为矩阵 mat().T 代表的是对矩阵进行转置操作
xMat = mat(xArr)
yMat = mat(yArr).T
# 矩阵乘法的条件是左矩阵的列数等于右矩阵的行数
xTx = xMat.T*xMat
# 因为要用到xTx的逆矩阵,所以事先需要确定计算得到的xTx是否可逆,条件是矩阵的行列式不为0
# linalg.det() 函数是用来求得矩阵的行列式的,如果矩阵的行列式为0,则这个矩阵是不可逆的,就无法进行接下来的运算
if linalg.det(xTx) == 0.0:
print "This matrix is singular, cannot do inverse"
return
# 最小二乘法
# http://cwiki.apachecn.org/pages/viewpage.action?pageId=5505133
# 书中的公式,求得w的最优解
ws = xTx.I * (xMat.T*yMat)
return ws
def regression1():
xArr, yArr = loadDataSet("input/8.Regression/data.txt")
xMat = mat(xArr)
yMat = mat(yArr)
ws = standRegres(xArr, yArr)
fig = plt.figure()
ax = fig.add_subplot(111) #add_subplot(349)函数的参数的意思是,将画布分成3行4列图像画在从左到右从上到下第9块
ax.scatter(xMat[:, 1].flatten(), yMat.T[:, 0].flatten().A[0]) #scatter 的x是xMat中的第二列,y是yMat的第一列
xCopy = xMat.copy()
xCopy.sort(0)
yHat = xCopy * ws
ax.plot(xCopy[:, 1], yHat)
plt.show()
预测乐高玩具套装的价格
(1) 收集数据:用 Google Shopping 的API收集数据。
(2) 准备数据:从返回的JSON数据中抽取价格。
(3) 分析数据:可视化并观察数据。
(4) 训练算法:构建不同的模型,采用逐步线性回归和直接的线性回归模型。
(5) 测试算法:使用交叉验证来测试不同的模型,分析哪个效果最好。
(6) 使用算法:这次练习的目标就是生成数据模型。
- 收集数据: 使用 Google 购物的 API
由于 Google 提供的 api 失效,我们只能自己下载咯,将数据存储在了 input 文件夹下的 setHtml 文件夹下
- 准备数据: 从返回的 JSON 数据中抽取价格
因为我们这里不是在线的,就不再是 JSON 了,我们直接解析线下的网页,得到我们想要的数据。
- 分析数据: 可视化并观察数据
这里我们将解析得到的数据打印出来,然后观察数据。
- 训练算法: 构建不同的模型
from numpy import *
from bs4 import BeautifulSoup
# 从页面读取数据,生成retX和retY列表
def scrapePage(retX, retY, inFile, yr, numPce, origPrc):
# 打开并读取HTML文件
fr = open(inFile)
soup = BeautifulSoup(fr.read())
i=1
# 根据HTML页面结构进行解析
currentRow = soup.findAll('table', r="%d" % i)
while(len(currentRow)!=0):
currentRow = soup.findAll('table', r="%d" % i)
title = currentRow[0].findAll('a')[1].text
lwrTitle = title.lower()
# 查找是否有全新标签
if (lwrTitle.find('new') > -1) or (lwrTitle.find('nisb') > -1):
newFlag = 1.0
else:
newFlag = 0.0
# 查找是否已经标志出售,我们只收集已出售的数据
soldUnicde = currentRow[0].findAll('td')[3].findAll('span')
if len(soldUnicde)==0:
print "item #%d did not sell" % i
else:
# 解析页面获取当前价格
soldPrice = currentRow[0].findAll('td')[4]
priceStr = soldPrice.text
priceStr = priceStr.replace('$','') #strips out $
priceStr = priceStr.replace(',','') #strips out ,
if len(soldPrice)>1:
priceStr = priceStr.replace('Free shipping', '')
sellingPrice = float(priceStr)
# 去掉不完整的套装价格
if sellingPrice > origPrc * 0.5:
print "%d\t%d\t%d\t%f\t%f" % (yr,numPce,newFlag,origPrc, sellingPrice)
retX.append([yr, numPce, newFlag, origPrc])
retY.append(sellingPrice)
i += 1
currentRow = soup.findAll('table', r="%d" % i)
# 依次读取六种乐高套装的数据,并生成数据矩阵
def setDataCollect(retX, retY):
scrapePage(retX, retY, 'input/8.Regression/setHtml/lego8288.html', 2006, 800, 49.99)
scrapePage(retX, retY, 'input/8.Regression/setHtml/lego10030.html', 2002, 3096, 269.99)
scrapePage(retX, retY, 'input/8.Regression/setHtml/lego10179.html', 2007, 5195, 499.99)
scrapePage(retX, retY, 'input/8.Regression/setHtml/lego10181.html', 2007, 3428, 199.99)
scrapePage(retX, retY, 'input/8.Regression/setHtml/lego10189.html', 2008, 5922, 299.99)
scrapePage(retX, retY, 'input/8.Regression/setHtml/lego10196.html', 2009, 3263, 249.99)
- 测试算法:使用交叉验证来测试不同的模型,分析哪个效果最好
# 交叉验证测试岭回归
def crossValidation(xArr,yArr,numVal=10):
# 获得数据点个数,xArr和yArr具有相同长度
m = len(yArr)
indexList = range(m)
errorMat = zeros((numVal,30))
# 主循环 交叉验证循环
for i in range(numVal):
# 随机拆分数据,将数据分为训练集(90%)和测试集(10%)
trainX=[]; trainY=[]
testX = []; testY = []
# 对数据进行混洗操作
random.shuffle(indexList)
# 切分训练集和测试集
for j in range(m):
if j < m*0.9:
trainX.append(xArr[indexList[j]])
trainY.append(yArr[indexList[j]])
else:
testX.append(xArr[indexList[j]])
testY.append(yArr[indexList[j]])
# 获得回归系数矩阵
wMat = ridgeTest(trainX,trainY)
# 循环遍历矩阵中的30组回归系数
for k in range(30):
# 读取训练集和数据集
matTestX = mat(testX); matTrainX=mat(trainX)
# 对数据进行标准化
meanTrain = mean(matTrainX,0)
varTrain = var(matTrainX,0)
matTestX = (matTestX-meanTrain)/varTrain
# 测试回归效果并存储
yEst = matTestX * mat(wMat[k,:]).T + mean(trainY)
# 计算误差
errorMat[i,k] = ((yEst.T.A-array(testY))**2).sum()
# 计算误差估计值的均值
meanErrors = mean(errorMat,0)
minMean = float(min(meanErrors))
bestWeights = wMat[nonzero(meanErrors==minMean)]
# 不要使用标准化的数据,需要对数据进行还原来得到输出结果
xMat = mat(xArr); yMat=mat(yArr).T
meanX = mean(xMat,0); varX = var(xMat,0)
unReg = bestWeights/varX
# 输出构建的模型
print "the best model from Ridge Regression is:\n",unReg
print "with constant term: ",-1*sum(multiply(meanX,unReg)) + mean(yMat)
# predict for lego's price
def regression5():
lgX = []
lgY = []
setDataCollect(lgX, lgY)
crossValidation(lgX, lgY, 10)
- 使用算法:这次练习的目标就是生成数据模型
Programing Sample
TensorFlow
"""Simple tutorial for using TensorFlow to compute a linear regression.
Parag K. Mital, Jan. 2016"""
# %% imports
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
# %% Let's create some toy data
plt.ion()
n_observations = 100
fig, ax = plt.subplots(1, 1)
xs = np.linspace(-3, 3, n_observations)
ys = np.sin(xs) + np.random.uniform(-0.5, 0.5, n_observations)
ax.scatter(xs, ys)
fig.show()
plt.draw()
# %% tf.placeholders for the input and output of the network. Placeholders are
# variables which we need to fill in when we are ready to compute the graph.
X = tf.placeholder(tf.float32)
Y = tf.placeholder(tf.float32)
# %% We will try to optimize min_(W,b) ||(X*w + b) - y||^2
# The `Variable()` constructor requires an initial value for the variable,
# which can be a `Tensor` of any type and shape. The initial value defines the
# type and shape of the variable. After construction, the type and shape of
# the variable are fixed. The value can be changed using one of the assign
# methods.
W = tf.Variable(tf.random_normal([1]), name='weight')
b = tf.Variable(tf.random_normal([1]), name='bias')
Y_pred = tf.add(tf.multiply(X, W), b)
# %% Loss function will measure the distance between our observations
# and predictions and average over them.
cost = tf.reduce_sum(tf.pow(Y_pred - Y, 2)) / (n_observations - 1)
# %% if we wanted to add regularization, we could add other terms to the cost,
# e.g. ridge regression has a parameter controlling the amount of shrinkage
# over the norm of activations. the larger the shrinkage, the more robust
# to collinearity.
# cost = tf.add(cost, tf.mul(1e-6, tf.global_norm([W])))
# %% Use gradient descent to optimize W,b
# Performs a single step in the negative gradient
learning_rate = 0.01
optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)
# %% We create a session to use the graph
n_epochs = 1000
with tf.Session() as sess:
# Here we tell tensorflow that we want to initialize all
# the variables in the graph so we can use them
sess.run(tf.global_variables_initializer())
# Fit all training data
prev_training_cost = 0.0
for epoch_i in range(n_epochs):
for (x, y) in zip(xs, ys):
sess.run(optimizer, feed_dict={X: x, Y: y})
training_cost = sess.run(
cost, feed_dict={X: xs, Y: ys})
print(training_cost)
if epoch_i % 20 == 0:
ax.plot(xs, Y_pred.eval(
feed_dict={X: xs}, session=sess),
'k', alpha=epoch_i / n_epochs)
fig.show()
plt.draw()
# Allow the training to quit if we've reached a minimum
if np.abs(prev_training_cost - training_cost) < 0.000001:
break
prev_training_cost = training_cost
fig.show()
plt.waitforbuttonpress()
Sklearn
#Import Library
#Import other necessary libraries like pandas, numpy...
from sklearn import linear_model
#Load Train and Test datasets
#Identify feature and response variable(s) and values must be numeric and numpy arrays
x_train=input_variables_values_training_datasets
y_train=target_variables_values_training_datasets
x_test=input_variables_values_test_datasets
# Create linear regression object
linear = linear_model.LinearRegression()
# Train the model using the training sets and check score
linear.fit(x_train, y_train)
linear.score(x_train, y_train)
#Equation coefficient and Intercept
print('Coefficient: n', linear.coef_)
print('Intercept: n', linear.intercept_)
#Predict Output
predicted= linear.predict(x_test)
pytorch
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
# Hyper-parameters
input_size = 1
output_size = 1
num_epochs = 60
learning_rate = 0.001
# Toy dataset
x_train = np.array([[3.3], [4.4], [5.5], [6.71], [6.93], [4.168],
[9.779], [6.182], [7.59], [2.167], [7.042],
[10.791], [5.313], [7.997], [3.1]], dtype=np.float32)
y_train = np.array([[1.7], [2.76], [2.09], [3.19], [1.694], [1.573],
[3.366], [2.596], [2.53], [1.221], [2.827],
[3.465], [1.65], [2.904], [1.3]], dtype=np.float32)
# Linear regression model
model = nn.Linear(input_size, output_size)
# Loss and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
# Train the model
for epoch in range(num_epochs):
# Convert numpy arrays to torch tensors
inputs = torch.from_numpy(x_train)
targets = torch.from_numpy(y_train)
# Forward pass
outputs = model(inputs)
loss = criterion(outputs, targets)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (epoch+1) % 5 == 0:
print ('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, loss.item()))
# Plot the graph
predicted = model(torch.from_numpy(x_train)).detach().numpy()
plt.plot(x_train, y_train, 'ro', label='Original data')
plt.plot(x_train, predicted, label='Fitted line')
plt.legend()
plt.show()
# Save the model checkpoint
torch.save(model.state_dict(), 'model.ckpt')