3、Spark MLlib Deep Learning Convolution Neural Network (深度学习-卷积神经网络)3.1

http://blog.csdn.net/sunbow0

Spark MLlib Deep Learning工具箱,是依据现有深度学习教程《UFLDL教程》中的算法。在SparkMLlib中的实现。详细Spark MLlib Deep Learning(深度学习)文件夹结构:

第一章Neural Net(NN)

1、源代码

2、源代码解析

3、实例

第二章Deep Belief Nets(DBNs)

1、源代码

2、源代码解析

3、实例

第三章Convolution Neural Network(CNN)

1、源代码

2、源代码解析

3、实例

第四章 Stacked Auto-Encoders(SAE)

第五章CAE



第三章Convolution Neural Network (卷积神经网络)

1 源代码

眼下SparkMLlib Deep Learning工具箱源代码的github地址为:

https://github.com/sunbow1/SparkMLlibDeepLearn

1.1 CNN代码

package CNN

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed.RowMatrix import breeze.linalg.{
Matrix => BM,
CSCMatrix => BSM,
DenseMatrix => BDM,
Vector => BV,
DenseVector => BDV,
SparseVector => BSV,
axpy => brzAxpy,
svd => brzSvd,
accumulate => Accumulate,
rot90 => Rot90,
sum => Bsum
}
import breeze.numerics.{
exp => Bexp,
tanh => Btanh
} import scala.collection.mutable.ArrayBuffer
import java.util.Random
import scala.math._ /**
* types:网络层类别
* outputmaps:特征map数量
* kernelsize:卷积核k大小
* k: 卷积核
* b: 偏置
* dk: 卷积核的偏导
* db: 偏置的偏导
* scale: pooling大小
*/
case class CNNLayers(
types: String,
outputmaps: Double,
kernelsize: Double,
scale: Double,
k: Array[Array[BDM[Double]]],
b: Array[Double],
dk: Array[Array[BDM[Double]]],
db: Array[Double]) extends Serializable /**
* CNN(convolution neural network)卷积神经网络
*/ class CNN(
private var mapsize: BDM[Double],
private var types: Array[String],
private var layer: Int,
private var onum: Int,
private var outputmaps: Array[Double],
private var kernelsize: Array[Double],
private var scale: Array[Double],
private var alpha: Double,
private var batchsize: Double,
private var numepochs: Double) extends Serializable with Logging {
// var mapsize = new BDM(1, 2, Array(28.0, 28.0))
// var types = Array("i", "c", "s", "c", "s")
// var layer = 5
// var onum = 10
// var outputmaps = Array(0.0, 6.0, 0.0, 12.0, 0.0)
// var kernelsize = Array(0.0, 5.0, 0.0, 5.0, 0.0)
// var scale = Array(0.0, 0.0, 2.0, 0.0, 2.0)
// var alpha = 1.0
// var batchsize = 50.0
// var numepochs = 1.0 def this() = this(new BDM(1, 2, Array(28.0, 28.0)),
Array("i", "c", "s", "c", "s"), 5, 10,
Array(0.0, 6.0, 0.0, 12.0, 0.0),
Array(0.0, 5.0, 0.0, 5.0, 0.0),
Array(0.0, 0.0, 2.0, 0.0, 2.0),
1.0, 50.0, 1.0) /** 设置输入层大小. Default: [28, 28]. */
def setMapsize(mapsize: BDM[Double]): this.type = {
this.mapsize = mapsize
this
} /** 设置网络层类别. Default: [1"i", "c", "s", "c", "s"]. */
def setTypes(types: Array[String]): this.type = {
this.types = types
this
} /** 设置网络层数. Default: 5. */
def setLayer(layer: Int): this.type = {
this.layer = layer
this
} /** 设置输出维度. Default: 10. */
def setOnum(onum: Int): this.type = {
this.onum = onum
this
} /** 设置特征map数量. Default: [0.0, 6.0, 0.0, 12.0, 0.0]. */
def setOutputmaps(outputmaps: Array[Double]): this.type = {
this.outputmaps = outputmaps
this
} /** 设置卷积核k大小. Default: [0.0, 5.0, 0.0, 5.0, 0.0]. */
def setKernelsize(kernelsize: Array[Double]): this.type = {
this.kernelsize = kernelsize
this
} /** 设置scale大小. Default: [0.0, 0.0, 2.0, 0.0, 2.0]. */
def setScale(scale: Array[Double]): this.type = {
this.scale = scale
this
} /** 设置学习因子. Default: 1. */
def setAlpha(alpha: Double): this.type = {
this.alpha = alpha
this
} /** 设置迭代大小. Default: 50. */
def setBatchsize(batchsize: Double): this.type = {
this.batchsize = batchsize
this
} /** 设置迭代次数. Default: 1. */
def setNumepochs(numepochs: Double): this.type = {
this.numepochs = numepochs
this
} /** 卷积神经网络层參数初始化. */
def CnnSetup: (Array[CNNLayers], BDM[Double], BDM[Double], Double) = {
var inputmaps1 = 1.0
var mapsize1 = mapsize
var confinit = ArrayBuffer[CNNLayers]()
for (l <- 0 to layer - 1) { // layer
val type1 = types(l)
val outputmap1 = outputmaps(l)
val kernelsize1 = kernelsize(l)
val scale1 = scale(l)
val layersconf = if (type1 == "s") { // 每一层參数初始化
mapsize1 = mapsize1 / scale1
val b1 = Array.fill(inputmaps1.toInt)(0.0)
val ki = Array(Array(BDM.zeros[Double](1, 1)))
new CNNLayers(type1, outputmap1, kernelsize1, scale1, ki, b1, ki, b1)
} else if (type1 == "c") {
mapsize1 = mapsize1 - kernelsize1 + 1.0
val fan_out = outputmap1 * math.pow(kernelsize1, 2)
val fan_in = inputmaps1 * math.pow(kernelsize1, 2)
val ki = ArrayBuffer[Array[BDM[Double]]]()
for (i <- 0 to inputmaps1.toInt - 1) { // input map
val kj = ArrayBuffer[BDM[Double]]()
for (j <- 0 to outputmap1.toInt - 1) { // output map
val kk = (BDM.rand[Double](kernelsize1.toInt, kernelsize1.toInt) - 0.5) * 2.0 * sqrt(6.0 / (fan_in + fan_out))
kj += kk
}
ki += kj.toArray
}
val b1 = Array.fill(outputmap1.toInt)(0.0)
inputmaps1 = outputmap1
new CNNLayers(type1, outputmap1, kernelsize1, scale1, ki.toArray, b1, ki.toArray, b1)
} else {
val ki = Array(Array(BDM.zeros[Double](1, 1)))
val b1 = Array(0.0)
new CNNLayers(type1, outputmap1, kernelsize1, scale1, ki, b1, ki, b1)
}
confinit += layersconf
}
val fvnum = mapsize1(0, 0) * mapsize1(0, 1) * inputmaps1
val ffb = BDM.zeros[Double](onum, 1)
val ffW = (BDM.rand[Double](onum, fvnum.toInt) - 0.5) * 2.0 * sqrt(6.0 / (onum + fvnum))
(confinit.toArray, ffb, ffW, alpha)
} /**
* 执行卷积神经网络算法.
*/
def CNNtrain(train_d: RDD[(BDM[Double], BDM[Double])], opts: Array[Double]): CNNModel = {
val sc = train_d.sparkContext
var initStartTime = System.currentTimeMillis()
var initEndTime = System.currentTimeMillis()
// 參数初始化配置
var (cnn_layers, cnn_ffb, cnn_ffW, cnn_alpha) = CnnSetup
// 样本数据划分:训练数据、交叉检验数据
val validation = opts(2)
val splitW1 = Array(1.0 - validation, validation)
val train_split1 = train_d.randomSplit(splitW1, System.nanoTime())
val train_t = train_split1(0)
val train_v = train_split1(1)
// m:训练样本的数量
val m = train_t.count
// 计算batch的数量
val batchsize = opts(0).toInt
val numepochs = opts(1).toInt
val numbatches = (m / batchsize).toInt
var rL = Array.fill(numepochs * numbatches.toInt)(0.0)
var n = 0
// numepochs是循环的次数
for (i <- 1 to numepochs) {
initStartTime = System.currentTimeMillis()
val splitW2 = Array.fill(numbatches)(1.0 / numbatches)
// 依据分组权重,随机划分每组样本数据
for (l <- 1 to numbatches) {
// 权重
val bc_cnn_layers = sc.broadcast(cnn_layers)
val bc_cnn_ffb = sc.broadcast(cnn_ffb)
val bc_cnn_ffW = sc.broadcast(cnn_ffW) // 样本划分
val train_split2 = train_t.randomSplit(splitW2, System.nanoTime())
val batch_xy1 = train_split2(l - 1) // CNNff是进行前向传播
// net = cnnff(net, batch_x);
val train_cnnff = CNN.CNNff(batch_xy1, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW) // CNNbp是后向传播
// net = cnnbp(net, batch_y);
val train_cnnbp = CNN.CNNbp(train_cnnff, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW) // 权重更新
// net = cnnapplygrads(net, opts);
val train_nnapplygrads = CNN.CNNapplygrads(train_cnnbp, bc_cnn_ffb, bc_cnn_ffW, cnn_alpha)
cnn_ffW = train_nnapplygrads._1
cnn_ffb = train_nnapplygrads._2
cnn_layers = train_nnapplygrads._3 // error and loss
// 输出误差计算
// net.L = 1/2* sum(net.e(:) .^ 2) / size(net.e, 2);
val rdd_loss1 = train_cnnbp._1.map(f => f._5)
val (loss2, counte) = rdd_loss1.treeAggregate((0.0, 0L))(
seqOp = (c, v) => {
// c: (e, count), v: (m)
val e1 = c._1
val e2 = (v :* v).sum
val esum = e1 + e2
(esum, c._2 + 1)
},
combOp = (c1, c2) => {
// c: (e, count)
val e1 = c1._1
val e2 = c2._1
val esum = e1 + e2
(esum, c1._2 + c2._2)
})
val Loss = (loss2 / counte.toDouble) * 0.5
if (n == 0) {
rL(n) = Loss
} else {
rL(n) = 0.09 * rL(n - 1) + 0.01 * Loss
}
n = n + 1
}
initEndTime = System.currentTimeMillis()
// 打印输出结果
printf("epoch: numepochs = %d , Took = %d seconds; batch train mse = %f.\n", i, scala.math.ceil((initEndTime - initStartTime).toDouble / 1000).toLong, rL(n - 1))
}
// 计算训练误差及交叉检验误差
// Full-batch train mse
var loss_train_e = 0.0
var loss_val_e = 0.0
loss_train_e = CNN.CNNeval(train_t, sc.broadcast(cnn_layers), sc.broadcast(cnn_ffb), sc.broadcast(cnn_ffW))
if (validation > 0) loss_val_e = CNN.CNNeval(train_v, sc.broadcast(cnn_layers), sc.broadcast(cnn_ffb), sc.broadcast(cnn_ffW))
printf("epoch: Full-batch train mse = %f, val mse = %f.\n", loss_train_e, loss_val_e)
new CNNModel(cnn_layers, cnn_ffW, cnn_ffb)
} } /**
* NN(neural network)
*/
object CNN extends Serializable { // Initialization mode names /**
* sigm激活函数
* X = 1./(1+exp(-P));
*/
def sigm(matrix: BDM[Double]): BDM[Double] = {
val s1 = 1.0 / (Bexp(matrix * (-1.0)) + 1.0)
s1
} /**
* tanh激活函数
* f=1.7159*tanh(2/3.*A);
*/
def tanh_opt(matrix: BDM[Double]): BDM[Double] = {
val s1 = Btanh(matrix * (2.0 / 3.0)) * 1.7159
s1
} /**
* 克罗内克积
*
*/
def expand(a: BDM[Double], s: Array[Int]): BDM[Double] = {
// val a = BDM((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
// val s = Array(3, 2)
val sa = Array(a.rows, a.cols)
var tt = new Array[Array[Int]](sa.length)
for (ii <- sa.length - 1 to 0 by -1) {
var h = BDV.zeros[Int](sa(ii) * s(ii))
h(0 to sa(ii) * s(ii) - 1 by s(ii)) := 1
tt(ii) = Accumulate(h).data
}
var b = BDM.zeros[Double](tt(0).length, tt(1).length)
for (j1 <- 0 to b.rows - 1) {
for (j2 <- 0 to b.cols - 1) {
b(j1, j2) = a(tt(0)(j1) - 1, tt(1)(j2) - 1)
}
}
b
} /**
* convn卷积计算
*/
def convn(m0: BDM[Double], k0: BDM[Double], shape: String): BDM[Double] = {
//val m0 = BDM((1.0, 1.0, 1.0, 1.0), (0.0, 0.0, 1.0, 1.0), (0.0, 1.0, 1.0, 0.0), (0.0, 1.0, 1.0, 0.0))
//val k0 = BDM((1.0, 1.0), (0.0, 1.0))
//val m0 = BDM((1.0, 1.0, 1.0), (1.0, 1.0, 1.0), (1.0, 1.0, 1.0))
//val k0 = BDM((1.0, 2.0, 3.0), (4.0, 5.0, 6.0), (7.0, 8.0, 9.0))
val out1 = shape match {
case "valid" =>
val m1 = m0
val k1 = k0.t
val row1 = m1.rows - k1.rows + 1
val col1 = m1.cols - k1.cols + 1
var m2 = BDM.zeros[Double](row1, col1)
for (i <- 0 to row1 - 1) {
for (j <- 0 to col1 - 1) {
val r1 = i
val r2 = r1 + k1.rows - 1
val c1 = j
val c2 = c1 + k1.cols - 1
val mi = m1(r1 to r2, c1 to c2)
m2(i, j) = (mi :* k1).sum
}
}
m2
case "full" =>
var m1 = BDM.zeros[Double](m0.rows + 2 * (k0.rows - 1), m0.cols + 2 * (k0.cols - 1))
for (i <- 0 to m0.rows - 1) {
for (j <- 0 to m0.cols - 1) {
m1((k0.rows - 1) + i, (k0.cols - 1) + j) = m0(i, j)
}
}
val k1 = Rot90(Rot90(k0))
val row1 = m1.rows - k1.rows + 1
val col1 = m1.cols - k1.cols + 1
var m2 = BDM.zeros[Double](row1, col1)
for (i <- 0 to row1 - 1) {
for (j <- 0 to col1 - 1) {
val r1 = i
val r2 = r1 + k1.rows - 1
val c1 = j
val c2 = c1 + k1.cols - 1
val mi = m1(r1 to r2, c1 to c2)
m2(i, j) = (mi :* k1).sum
}
}
m2
}
out1
} /**
* cnnff是进行前向传播
* 计算神经网络中的每一个节点的输出值;
*/
def CNNff(
batch_xy1: RDD[(BDM[Double], BDM[Double])],
bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]],
bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],
bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]): RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double])] = {
// 第1层:a(1)=[x]
val train_data1 = batch_xy1.map { f =>
val lable = f._1
val features = f._2
val nna1 = Array(features)
val nna = ArrayBuffer[Array[BDM[Double]]]()
nna += nna1
(lable, nna)
}
// 第2至n-1层计算
val train_data2 = train_data1.map { f =>
val lable = f._1
val nn_a = f._2
var inputmaps1 = 1.0
val n = bc_cnn_layers.value.length
// for each layer
for (l <- 1 to n - 1) {
val type1 = bc_cnn_layers.value(l).types
val outputmap1 = bc_cnn_layers.value(l).outputmaps
val kernelsize1 = bc_cnn_layers.value(l).kernelsize
val scale1 = bc_cnn_layers.value(l).scale
val k1 = bc_cnn_layers.value(l).k
val b1 = bc_cnn_layers.value(l).b
val nna1 = ArrayBuffer[BDM[Double]]()
if (type1 == "c") {
for (j <- 0 to outputmap1.toInt - 1) { // output map
// create temp output map
var z = BDM.zeros[Double](nn_a(l - 1)(0).rows - kernelsize1.toInt + 1, nn_a(l - 1)(0).cols - kernelsize1.toInt + 1)
for (i <- 0 to inputmaps1.toInt - 1) { // input map
// convolve with corresponding kernel and add to temp output map
// z = z + convn(net.layers{l - 1}.a{i}, net.layers{l}.k{i}{j}, 'valid');
z = z + convn(nn_a(l - 1)(i), k1(i)(j), "valid")
}
// add bias, pass through nonlinearity
// net.layers{l}.a{j} = sigm(z + net.layers{l}.b{j})
val nna0 = sigm(z + b1(j))
nna1 += nna0
}
nn_a += nna1.toArray
inputmaps1 = outputmap1
} else if (type1 == "s") {
for (j <- 0 to inputmaps1.toInt - 1) {
// z = convn(net.layers{l - 1}.a{j}, ones(net.layers{l}.scale) / (net.layers{l}.scale ^ 2), 'valid'); replace with variable
// net.layers{l}.a{j} = z(1 : net.layers{l}.scale : end, 1 : net.layers{l}.scale : end, :);
val z = convn(nn_a(l - 1)(j), BDM.ones[Double](scale1.toInt, scale1.toInt) / (scale1 * scale1), "valid")
val zs1 = z(::, 0 to -1 by scale1.toInt).t + 0.0
val zs2 = zs1(::, 0 to -1 by scale1.toInt).t + 0.0
val nna0 = zs2
nna1 += nna0
}
nn_a += nna1.toArray
}
}
// concatenate all end layer feature maps into vector
val nn_fv1 = ArrayBuffer[Double]()
for (j <- 0 to nn_a(n - 1).length - 1) {
nn_fv1 ++= nn_a(n - 1)(j).data
}
val nn_fv = new BDM[Double](nn_fv1.length, 1, nn_fv1.toArray)
// feedforward into output perceptrons
// net.o = sigm(net.ffW * net.fv + repmat(net.ffb, 1, size(net.fv, 2)));
val nn_o = sigm(bc_cnn_ffW.value * nn_fv + bc_cnn_ffb.value)
(lable, nn_a.toArray, nn_fv, nn_o)
}
train_data2
} /**
* CNNbp是后向传播
* 计算权重的平均偏导数
*/
def CNNbp(
train_cnnff: RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double])],
bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]],
bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],
bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]): (RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double], BDM[Double], BDM[Double], BDM[Double], Array[Array[BDM[Double]]])], BDM[Double], BDM[Double], Array[CNNLayers]) = {
// error : net.e = net.o - y
val n = bc_cnn_layers.value.length
val train_data3 = train_cnnff.map { f =>
val nn_e = f._4 - f._1
(f._1, f._2, f._3, f._4, nn_e)
}
// backprop deltas
// 输出层的 灵敏度 或者 残差
// net.od = net.e .* (net.o .* (1 - net.o))
// net.fvd = (net.ffW' * net.od)
val train_data4 = train_data3.map { f =>
val nn_e = f._5
val nn_o = f._4
val nn_fv = f._3
val nn_od = nn_e :* (nn_o :* (1.0 - nn_o))
val nn_fvd = if (bc_cnn_layers.value(n - 1).types == "c") {
// net.fvd = net.fvd .* (net.fv .* (1 - net.fv));
val nn_fvd1 = bc_cnn_ffW.value.t * nn_od
val nn_fvd2 = nn_fvd1 :* (nn_fv :* (1.0 - nn_fv))
nn_fvd2
} else {
val nn_fvd1 = bc_cnn_ffW.value.t * nn_od
nn_fvd1
}
(f._1, f._2, f._3, f._4, f._5, nn_od, nn_fvd)
}
// reshape feature vector deltas into output map style
val sa1 = train_data4.map(f => f._2(n - 1)(1)).take(1)(0).rows
val sa2 = train_data4.map(f => f._2(n - 1)(1)).take(1)(0).cols
val sa3 = 1
val fvnum = sa1 * sa2 val train_data5 = train_data4.map { f =>
val nn_a = f._2
val nn_fvd = f._7
val nn_od = f._6
val nn_fv = f._3
var nnd = new Array[Array[BDM[Double]]](n)
val nnd1 = ArrayBuffer[BDM[Double]]()
for (j <- 0 to nn_a(n - 1).length - 1) {
val tmp1 = nn_fvd((j * fvnum) to ((j + 1) * fvnum - 1), 0)
val tmp2 = new BDM(sa1, sa2, tmp1.data)
nnd1 += tmp2
}
nnd(n - 1) = nnd1.toArray
for (l <- (n - 2) to 0 by -1) {
val type1 = bc_cnn_layers.value(l).types
var nnd2 = ArrayBuffer[BDM[Double]]()
if (type1 == "c") {
for (j <- 0 to nn_a(l).length - 1) {
val tmp_a = nn_a(l)(j)
val tmp_d = nnd(l + 1)(j)
val tmp_scale = bc_cnn_layers.value(l + 1).scale.toInt
val tmp1 = tmp_a :* (1.0 - tmp_a)
val tmp2 = expand(tmp_d, Array(tmp_scale, tmp_scale)) / (tmp_scale.toDouble * tmp_scale)
nnd2 += (tmp1 :* tmp2)
}
} else if (type1 == "s") {
for (i <- 0 to nn_a(l).length - 1) {
var z = BDM.zeros[Double](nn_a(l)(0).rows, nn_a(l)(0).cols)
for (j <- 0 to nn_a(l + 1).length - 1) {
// z = z + convn(net.layers{l + 1}.d{j}, rot180(net.layers{l + 1}.k{i}{j}), 'full');
z = z + convn(nnd(l + 1)(j), Rot90(Rot90(bc_cnn_layers.value(l + 1).k(i)(j))), "full")
}
nnd2 += z
}
}
nnd(l) = nnd2.toArray
}
(f._1, f._2, f._3, f._4, f._5, f._6, f._7, nnd)
}
// dk db calc gradients
var cnn_layers = bc_cnn_layers.value
for (l <- 1 to n - 1) {
val type1 = bc_cnn_layers.value(l).types
val lena1 = train_data5.map(f => f._2(l).length).take(1)(0)
val lena2 = train_data5.map(f => f._2(l - 1).length).take(1)(0)
if (type1 == "c") {
for (j <- 0 to lena1 - 1) {
for (i <- 0 to lena2 - 1) {
val rdd_dk_ij = train_data5.map { f =>
val nn_a = f._2
val nn_d = f._8
val tmp_d = nn_d(l)(j)
val tmp_a = nn_a(l - 1)(i)
convn(Rot90(Rot90(tmp_a)), tmp_d, "valid")
}
val initdk = BDM.zeros[Double](rdd_dk_ij.take(1)(0).rows, rdd_dk_ij.take(1)(0).cols)
val (dk_ij, count_dk) = rdd_dk_ij.treeAggregate((initdk, 0L))(
seqOp = (c, v) => {
// c: (m, count), v: (m)
val m1 = c._1
val m2 = m1 + v
(m2, c._2 + 1)
},
combOp = (c1, c2) => {
// c: (m, count)
val m1 = c1._1
val m2 = c2._1
val m3 = m1 + m2
(m3, c1._2 + c2._2)
})
val dk = dk_ij / count_dk.toDouble
cnn_layers(l).dk(i)(j) = dk
}
val rdd_db_j = train_data5.map { f =>
val nn_d = f._8
val tmp_d = nn_d(l)(j)
Bsum(tmp_d)
}
val db_j = rdd_db_j.reduce(_ + _)
val count_db = rdd_db_j.count
val db = db_j / count_db.toDouble
cnn_layers(l).db(j) = db
}
}
} // net.dffW = net.od * (net.fv)' / size(net.od, 2);
// net.dffb = mean(net.od, 2);
val train_data6 = train_data5.map { f =>
val nn_od = f._6
val nn_fv = f._3
nn_od * nn_fv.t
}
val train_data7 = train_data5.map { f =>
val nn_od = f._6
nn_od
}
val initffW = BDM.zeros[Double](bc_cnn_ffW.value.rows, bc_cnn_ffW.value.cols)
val (ffw2, countfffw2) = train_data6.treeAggregate((initffW, 0L))(
seqOp = (c, v) => {
// c: (m, count), v: (m)
val m1 = c._1
val m2 = m1 + v
(m2, c._2 + 1)
},
combOp = (c1, c2) => {
// c: (m, count)
val m1 = c1._1
val m2 = c2._1
val m3 = m1 + m2
(m3, c1._2 + c2._2)
})
val cnn_dffw = ffw2 / countfffw2.toDouble
val initffb = BDM.zeros[Double](bc_cnn_ffb.value.rows, bc_cnn_ffb.value.cols)
val (ffb2, countfffb2) = train_data7.treeAggregate((initffb, 0L))(
seqOp = (c, v) => {
// c: (m, count), v: (m)
val m1 = c._1
val m2 = m1 + v
(m2, c._2 + 1)
},
combOp = (c1, c2) => {
// c: (m, count)
val m1 = c1._1
val m2 = c2._1
val m3 = m1 + m2
(m3, c1._2 + c2._2)
})
val cnn_dffb = ffb2 / countfffb2.toDouble
(train_data5, cnn_dffw, cnn_dffb, cnn_layers)
} /**
* NNapplygrads是权重更新
* 权重更新
*/
def CNNapplygrads(
train_cnnbp: (RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double], BDM[Double], BDM[Double], BDM[Double], Array[Array[BDM[Double]]])], BDM[Double], BDM[Double], Array[CNNLayers]),
bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],
bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]],
alpha: Double): (BDM[Double], BDM[Double], Array[CNNLayers]) = {
val train_data5 = train_cnnbp._1
val cnn_dffw = train_cnnbp._2
val cnn_dffb = train_cnnbp._3
var cnn_layers = train_cnnbp._4
var cnn_ffb = bc_cnn_ffb.value
var cnn_ffW = bc_cnn_ffW.value
val n = cnn_layers.length for (l <- 1 to n - 1) {
val type1 = cnn_layers(l).types
val lena1 = train_data5.map(f => f._2(l).length).take(1)(0)
val lena2 = train_data5.map(f => f._2(l - 1).length).take(1)(0)
if (type1 == "c") {
for (j <- 0 to lena1 - 1) {
for (ii <- 0 to lena2 - 1) {
cnn_layers(l).k(ii)(j) = cnn_layers(l).k(ii)(j) - cnn_layers(l).dk(ii)(j)
}
cnn_layers(l).b(j) = cnn_layers(l).b(j) - cnn_layers(l).db(j)
}
}
}
cnn_ffW = cnn_ffW + cnn_dffw
cnn_ffb = cnn_ffb + cnn_dffb
(cnn_ffW, cnn_ffb, cnn_layers)
} /**
* nneval是进行前向传播并计算输出误差
* 计算神经网络中的每一个节点的输出值,并计算平均误差;
*/
def CNNeval(
batch_xy1: RDD[(BDM[Double], BDM[Double])],
bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]],
bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],
bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]): Double = {
// CNNff是进行前向传播
val train_cnnff = CNN.CNNff(batch_xy1, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW)
// error and loss
// 输出误差计算
val rdd_loss1 = train_cnnff.map { f =>
val nn_e = f._4 - f._1
nn_e
}
val (loss2, counte) = rdd_loss1.treeAggregate((0.0, 0L))(
seqOp = (c, v) => {
// c: (e, count), v: (m)
val e1 = c._1
val e2 = (v :* v).sum
val esum = e1 + e2
(esum, c._2 + 1)
},
combOp = (c1, c2) => {
// c: (e, count)
val e1 = c1._1
val e2 = c2._1
val esum = e1 + e2
(esum, c1._2 + c2._2)
})
val Loss = (loss2 / counte.toDouble) * 0.5
Loss
}
}

1.2 CNNModel代码

package CNN

import breeze.linalg.{
Matrix => BM,
CSCMatrix => BSM,
DenseMatrix => BDM,
Vector => BV,
DenseVector => BDV,
SparseVector => BSV
}
import org.apache.spark.rdd.RDD /**
* label:目标矩阵
* features:特征矩阵
* predict_label:预測矩阵
* error:误差
*/
case class PredictCNNLabel(label: BDM[Double], features: BDM[Double], predict_label: BDM[Double], error: BDM[Double]) extends Serializable class CNNModel(
val cnn_layers: Array[CNNLayers],
val cnn_ffW: BDM[Double],
val cnn_ffb: BDM[Double]) extends Serializable { /**
* 返回预測结果
* 返回格式:(label, feature, predict_label, error)
*/
def predict(dataMatrix: RDD[(BDM[Double], BDM[Double])]): RDD[PredictCNNLabel] = {
val sc = dataMatrix.sparkContext
val bc_cnn_layers = sc.broadcast(cnn_layers)
val bc_cnn_ffW = sc.broadcast(cnn_ffW)
val bc_cnn_ffb = sc.broadcast(cnn_ffb)
// CNNff是进行前向传播
val train_cnnff = CNN.CNNff(dataMatrix, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW)
val rdd_predict = train_cnnff.map { f =>
val label = f._1
val nna1 = f._2(0)(0)
val nnan = f._4
val error = f._4 - f._1
PredictCNNLabel(label, nna1, nnan, error)
}
rdd_predict
} /**
* 计算输出误差
* 平均误差;
*/
def Loss(predict: RDD[PredictCNNLabel]): Double = {
val predict1 = predict.map(f => f.error)
// error and loss
// 输出误差计算
val loss1 = predict1
val (loss2, counte) = loss1.treeAggregate((0.0, 0L))(
seqOp = (c, v) => {
// c: (e, count), v: (m)
val e1 = c._1
val e2 = (v :* v).sum
val esum = e1 + e2
(esum, c._2 + 1)
},
combOp = (c1, c2) => {
// c: (e, count)
val e1 = c1._1
val e2 = c2._1
val esum = e1 + e2
(esum, c1._2 + c2._2)
})
val Loss = (loss2 / counte.toDouble) * 0.5
Loss
} }

转载请注明出处:

http://blog.csdn.net/sunbow0

Spark MLlib Deep Learning Convolution Neural Network (深度学习-卷积神经网络)3.1的更多相关文章

  1. Spark MLlib Deep Learning Convolution Neural Network (深度学习-卷积神经网络)3.2

    3.Spark MLlib Deep Learning Convolution Neural Network(深度学习-卷积神经网络)3.2 http://blog.csdn.net/sunbow0 ...

  2. Spark MLlib Deep Learning Convolution Neural Network (深度学习-卷积神经网络)3.3

    3.Spark MLlib Deep Learning Convolution Neural Network(深度学习-卷积神经网络)3.3 http://blog.csdn.net/sunbow0 ...

  3. Spark MLlib Deep Learning Deep Belief Network (深度学习-深度信念网络)2.1

    Spark MLlib Deep Learning Deep Belief Network (深度学习-深度信念网络)2.1 http://blog.csdn.net/sunbow0 Spark ML ...

  4. Spark MLlib Deep Learning Deep Belief Network (深度学习-深度信念网络)2.3

    Spark MLlib Deep Learning Deep Belief Network (深度学习-深度信念网络)2.3 http://blog.csdn.net/sunbow0 第二章Deep ...

  5. Spark MLlib Deep Learning Deep Belief Network (深度学习-深度信念网络)2.2

    Spark MLlib Deep Learning Deep Belief Network (深度学习-深度信念网络)2.2 http://blog.csdn.net/sunbow0 第二章Deep ...

  6. 通过Visualizing Representations来理解Deep Learning、Neural network、以及输入样本自身的高维空间结构

    catalogue . 引言 . Neural Networks Transform Space - 神经网络内部的空间结构 . Understand the data itself by visua ...

  7. Deep learning与Neural Network

    深度学习是机器学习研究中的一个新的领域,其动机在于建立.模拟人脑进行分析学习的神经网络,它模仿人脑的机制来解释数据,例如图像,声音和文本.深度学习是无监督学习的一种. 深度学习的概念源于人工神经网络的 ...

  8. Deep Learning 教程(斯坦福深度学习研究团队)

    http://www.zhizihua.com/blog/post/602.html 说明:本教程将阐述无监督特征学习和深度学习的主要观点.通过学习,你也将实现多个功能学习/深度学习算法,能看到它们为 ...

  9. 【Deep Learning读书笔记】深度学习中的概率论

    本文首发自公众号:RAIS,期待你的关注. 前言 本系列文章为 <Deep Learning> 读书笔记,可以参看原书一起阅读,效果更佳. 概率论 机器学习中,往往需要大量处理不确定量,或 ...

随机推荐

  1. table居中

    - -bcenter标签已经被淘汰了让一个容器水平居中,可以使用css控制 body { text-align:center; } table { margin:0px auto; }

  2. S3C2440上RTC时钟驱动开发实例讲解(转载)

    嵌入式Linux之我行,主要讲述和总结了本人在学习嵌入式linux中的每个步骤.一为总结经验,二希望能给想入门嵌入式Linux的朋友提供方便.如有错误之处,谢请指正. 共享资源,欢迎转载:http:/ ...

  3. 网络流 最大流HDU 3549

    //////////在这幅图中我们首先要增广1->2->4->6,这时可以获得一个容量为2的流,但是如果不建立4->2反向弧的话,则无法进一步增广,最终答案为2,显然是不对的, ...

  4. Codeforces VK Cup 2012 Round 3 A. Variable, or There and Back Again(dfs)

    题目链接:http://codeforces.com/problemset/problem/164/A 思路:用vector分别保留原图和发图,然后分别从val值为1的点正向遍历,va值为2的点反向遍 ...

  5. Nginx开启Gzip压缩大幅提高页面加载速度

    [root@12 conf]# vi nginx.conf gzip on;#开启GZIP gzip_min_length 1k; #不压缩临界值,大于1K的才压缩,一般不用改 gzip_buffer ...

  6. iOS socket小结01

    一.网络各个协议:TCP/IP.SOCKET.HTTP等 网络七层由下往上分别为物理层.数据链路层.网络层.传输层.会话层.表示层和应用层. 其中物理层.数据链路层和网络层通常被称作媒体层,是网络工程 ...

  7. Ubuntu12.04 cuda5.5安装

    预处理步骤: 首先确认你的电脑装了一个可以运行CUDA程序的GPU. lspci | grep -i nvidia 另外要确认linux版本和gcc版本 具体参考链接:http://docs.nvid ...

  8. Twitter数据抓取的方法(三)

    Scraping Tweets Directly from Twitters Search – Update Published August 1, 2015 Sorry for my delayed ...

  9. Maya 常用环境变量详解

    Maya 常用环境变量详解 前言: Maya 的环境变量让用户可以很方便的自定义 Maya 的功能. 在 Maya 的 Help 帮助文档中有专门的一个章节< Environment Varia ...

  10. asp.net对象合并

    public class com { /// <summary> /// 把参数转为JSON字符串 /// </summary> /// <param name=&quo ...