Spark的简介&应用

Spark Summary

Posted by YangLong on April 3, 2017

Spark 简介

Why use Spark

Spark是一个大规模数据处理的快速通用引擎

  • 速度 Spark在内存中运行比Hadoop快100倍以上,在硬盘上运行比Hadoop快10倍以上
  • 便捷 Spark提供超过80+的高阶操作去建立并行的应用,可以使用Python、R、Scale等高阶语言进行交互
  • 通用 Spark提供了一系列的类库,包括SQL、DataFrame、用于机器学习的MLlib,Spark Streaming,可以在同一个应用中使用这些类库
  • 易用 Spark可以在Hadoop、Mesos的单实例或云上运行,也可以使用不同的数据源如HDFS、Cassandra、Hbase、S3

Tutorials

下载安装

使用Python开发时,可以使用PIP进行安装

pip install pysparkd.

基础示例

分词示例

使用此示例生成(字符串,数量)结果,保存进文件

text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
π计算

Spark也可用于数值的并行计算

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(xrange(0, NUM_SAMPLES)) \
             .filter(inside).count()
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
文本查询 [DataFrame]

在日志文件中查询特定的错误信息

extFile = sc.textFile("hdfs://...")

# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()
数值操作 [DataFrame]

从数据库中读取数据计算人员的平均年龄,把计算记过按照JSON格式存储在S3数据库中,数据库中存在表people,含有两列name,age

# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
  .read \
  .format("jdbc") \
  .option("url", url) \
  .option("dbtable", "people") \
  .load()

# Looks the schema of this DataFrame.
df.printSchema()

# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()

# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
线性回归计算 [Machine Learn]

在这个示例中,我们有特征向量和对应标签的数据集,使用线性回归用特征向量来预测标签

# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()

Spark 使用详细

Install & Config

安装python

# 配置环境变量  
vim /etc/profile
export PYTHONHOME=/usr/local/python2.7
export PATH=$PATH:$PYTHONHOME/bin

安装配置spark

https://www.apache.org/dyn/closer.lua/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz

# 配置环境变量 
vim  /etc/profile
export SPARK_HOME=/usr/python/spark-2.0.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin

# IP、机器别名配置
10.*.*.41       hostname1   namenode
10.*.*.42       hostname2   datanode
10.*.*.43       hostname3   datanode

# ssh登陆 
yum install -y openssh-server  
ssh-keygen -t rsa -P   #生成公钥在/root/.ssh目录,id_rsa(私钥)、id_rsa.pub(公钥) 
 
ssh-copy-id hostname1
ssh-copy-id hostname2

配置spark集群

# 修改spark-env.sh和slaves  
#重命名 & 添加内容 
cp conf/spark-env.sh.template conf /spark-env.sh
PYSPARK_PYTHON="python2.7"


# 修改slaves
cp slaves.template slaves  
hostname1  
hostname2  
hostname3  

# 同步配置 
rsync -av /usr/local/spark-2.2.0/ hostname1:/usr/local/spark-2.2.0/
rsync -av /usr/local/spark-2.2.0/ hostname2:/usr/local/spark-2.2.0/

# 启动Spark 
$SPARK_HOME/sbin/start-all.sh
$SPARK_HOME/sbin/stop-all.sh

# 测试 
SparkMaster_IP:8080

配置hadoop

#配置四个文件
/usr/local/hadoop/etc/hadoop/core-site.xml
/usr/local/hadoop/etc/hadoop/hdfs-site.xml
/usr/local/hadoop/etc/hadoop/mapred-site.xml /usr/local/hadoop/etc/hadoop/yarn-site.xml

# 复制到其他机器
scp -r /usr/local/hadoop hostname2
scp -r /usr/local/hadoop hostname3
hdfs namenode -format # 格式化hdfs 
sbin/start-dfs.sh #启动dfs  
sbin/start-yarn.sh #启动yarm

配置文件说明
spark-env.sh

SPARK_LOCAL_IP=10.*.*.41                         #本机ip或hostname
SPARK_LOCAL_DIRS=/opt/data/spark/local           #配置spark的local目录
SPARK_MASTER_IP=10.*.*.41                        #master节点ip或hostname
SPARK_MASTER_WEBUI_PORT=8080                     #web页面端口
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4"    #spark-shell启动使用核数
SPARK_WORKER_CORES=2                              #Worker的cpu核数
SPARK_WORKER_MEMORY=8g                            #worker内存大小
SPARK_WORKER_DIR=/opt/data/spark/work             #worker目录
export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=604800"  #worker自动清理及清理间间隔
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -            Dspark.history.fs.logDirectory=hdfs://10.*.*.41:9000/tmp/spark/applicationHistory"    #history server页面端口、备份数、log日志HDFS的位置
SPARK_LOG_DIR=/opt/data/spark/log                  #配置Spark的log日志目录
export JAVA_HOME=/usr/local/jdk1.8.0_91/           #配置java路径
export SCALA_HOME=/usr/local/scala-2.10.4/         #配置scala路径
export SPARK_MASTER_IP=10.*.*.41
export SPARK_WORKER_MEMORY=10240m
export HADOOP_HOME=/home/lscm/hadoop/hadoop/lib/native       #配置hadoop的lib路径
export HADOOP_CONF_DIR=/home/lscm/hadoop/hadoop/etc/hadoop/  #配置hadoop的配置路径

spark-defaults.conf

 spark.eventLog.enabled          true    #eventLog是否生效(建议开启,可以对已完成的任务记录其详细日志)
 spark.eventLog.compress         true   #eventLog是否启用压缩(cpu性能好的情况下建议开启,以减少内存等的占用)
 spark.eventLog.dir              hdfs://10.30.96.41:9000/tmp/spark/applicationHistory    #eventLog的文件存放位置,与spark-env.s中history server配置位置一致,这两个位置必须手动创建 hadoop fs -mkdir -p /tmp/spark/applicationHistory,否则spark启动失败
 spark.broadcast.blockSize       8m               #广播块大小
 spark.executor.cores            1                #Executor的cpu核数
 spark.executor.memory           512m             #Executor的内存大小
 spark.executor.heartbeatInterval        20s      #Executor心跳交换时间间隔


 spark.files.fetchTimeout        120s             #文件抓取的timeout
 spark.task.maxFailures          6                #作业最大失败次数(达到此次数后,该作业不再继续执行,运行失败)
 spark.serializer                org.apache.spark.serializer.KryoSerializer    #设置序列化机制(默认使用java的序列化,但是速度很慢建议用Kryo)
 spark.kryoserializer.buffer.max         256m    #序列化缓冲大小
 spark.akka.frameSize            128             #Akka调度帧大小
 spark.default.parallelism       20               #默认并行数
 spark.network.timeout           300s             #最大网络延时
 spark.speculation               true             #Spark推测机制(建议开启)

Resilient Distributed Datasets (RDDs)

RDDs 是以Hadoop的文件作为基础,来进行分布式的并行运算,Pyspark可以从任意支持Hadoop的存储集上创建分布式数据,包括本地文件系统、Cassandra, HBase, Amazon S3,etc。Spark支持文本文件,序列文件以及Hadoop输入格式

读取数据

从本地文件系统、Hadoop URI中读取文本内容

textFile(name, minPartitions=None, use_unicode=True)

保存数据

saveAsHadoopDataset(conf, keyConverter=None, valueConverter=None)

saveAsHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None)

saveAsNewAPIHadoopDataset(conf, keyConverter=None, valueConverter=None)

saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None)

saveAsTextFile(path, compressionCodecClass=None)

base sample 分词示例

使用此示例生成(字符串,数量)结果,保存进文件 ```python from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)

text_file = sc.textFile(“hdfs://…”) counts = text_file.flatMap(lambda line: line.split(“ “)) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) counts.saveAsTextFile(“hdfs://…”)


## Spark SQL, DataFrames and Datasets

sql,dataframes,datasets 

## Machine Learning Library (MLlib)

##### Linear Regression 
loss function
```math
L(\wv;\x,y) :=  \frac{1}{2} (\wv^T \x - y)^2.
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.replace(',', ' ').split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)

# Build the model
model = LinearRegressionWithSGD.train(parsedData, iterations=100, step=0.00000001)

# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds \
    .map(lambda vp: (vp[0] - vp[1])**2) \
    .reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

# Save and load model
model.save(sc, "target/tmp/pythonLinearRegressionWithSGDModel")
sameModel = LinearRegressionModel.load(sc, "target/tmp/pythonLinearRegressionWithSGDModel")
Logistic Regression

loss function

L(\wv;\x,y) :=  \log(1+\exp( -y \wv^T \x)).
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)

# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc,
                                         "target/tmp/pythonLogisticRegressionWithLBFGSModel")