Spark 简介
Why use Spark
Spark是一个大规模数据处理的快速通用引擎
- 速度 Spark在内存中运行比Hadoop快100倍以上,在硬盘上运行比Hadoop快10倍以上
- 便捷 Spark提供超过80+的高阶操作去建立并行的应用,可以使用Python、R、Scale等高阶语言进行交互
- 通用 Spark提供了一系列的类库,包括SQL、DataFrame、用于机器学习的MLlib,Spark Streaming,可以在同一个应用中使用这些类库
- 易用 Spark可以在Hadoop、Mesos的单实例或云上运行,也可以使用不同的数据源如HDFS、Cassandra、Hbase、S3
Tutorials
下载安装
使用Python开发时,可以使用PIP进行安装
pip install pysparkd.
基础示例
分词示例
使用此示例生成(字符串,数量)结果,保存进文件
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
π计算
Spark也可用于数值的并行计算
def inside(p):
x, y = random.random(), random.random()
return x*x + y*y < 1
count = sc.parallelize(xrange(0, NUM_SAMPLES)) \
.filter(inside).count()
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
文本查询 [DataFrame]
在日志文件中查询特定的错误信息
extFile = sc.textFile("hdfs://...")
# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()
数值操作 [DataFrame]
从数据库中读取数据计算人员的平均年龄,把计算记过按照JSON格式存储在S3数据库中,数据库中存在表people,含有两列name,age
# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", "people") \
.load()
# Looks the schema of this DataFrame.
df.printSchema()
# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()
# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
线性回归计算 [Machine Learn]
在这个示例中,我们有特征向量和对应标签的数据集,使用线性回归用特征向量来预测标签
# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])
# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)
# Fit the model to the data.
model = lr.fit(df)
# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
Spark 使用详细
Install & Config
安装python
# 配置环境变量
vim /etc/profile
export PYTHONHOME=/usr/local/python2.7
export PATH=$PATH:$PYTHONHOME/bin
安装配置spark
https://www.apache.org/dyn/closer.lua/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
# 配置环境变量
vim /etc/profile
export SPARK_HOME=/usr/python/spark-2.0.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
# IP、机器别名配置
10.*.*.41 hostname1 namenode
10.*.*.42 hostname2 datanode
10.*.*.43 hostname3 datanode
# ssh登陆
yum install -y openssh-server
ssh-keygen -t rsa -P #生成公钥在/root/.ssh目录,id_rsa(私钥)、id_rsa.pub(公钥)
ssh-copy-id hostname1
ssh-copy-id hostname2
配置spark集群
# 修改spark-env.sh和slaves
#重命名 & 添加内容
cp conf/spark-env.sh.template conf /spark-env.sh
PYSPARK_PYTHON="python2.7"
# 修改slaves
cp slaves.template slaves
hostname1
hostname2
hostname3
# 同步配置
rsync -av /usr/local/spark-2.2.0/ hostname1:/usr/local/spark-2.2.0/
rsync -av /usr/local/spark-2.2.0/ hostname2:/usr/local/spark-2.2.0/
# 启动Spark
$SPARK_HOME/sbin/start-all.sh
$SPARK_HOME/sbin/stop-all.sh
# 测试
SparkMaster_IP:8080
配置hadoop
#配置四个文件
/usr/local/hadoop/etc/hadoop/core-site.xml
/usr/local/hadoop/etc/hadoop/hdfs-site.xml
/usr/local/hadoop/etc/hadoop/mapred-site.xml /usr/local/hadoop/etc/hadoop/yarn-site.xml
# 复制到其他机器
scp -r /usr/local/hadoop hostname2
scp -r /usr/local/hadoop hostname3
hdfs namenode -format # 格式化hdfs
sbin/start-dfs.sh #启动dfs
sbin/start-yarn.sh #启动yarm
配置文件说明
spark-env.sh
SPARK_LOCAL_IP=10.*.*.41 #本机ip或hostname
SPARK_LOCAL_DIRS=/opt/data/spark/local #配置spark的local目录
SPARK_MASTER_IP=10.*.*.41 #master节点ip或hostname
SPARK_MASTER_WEBUI_PORT=8080 #web页面端口
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4" #spark-shell启动使用核数
SPARK_WORKER_CORES=2 #Worker的cpu核数
SPARK_WORKER_MEMORY=8g #worker内存大小
SPARK_WORKER_DIR=/opt/data/spark/work #worker目录
export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=604800" #worker自动清理及清理间间隔
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 - Dspark.history.fs.logDirectory=hdfs://10.*.*.41:9000/tmp/spark/applicationHistory" #history server页面端口、备份数、log日志HDFS的位置
SPARK_LOG_DIR=/opt/data/spark/log #配置Spark的log日志目录
export JAVA_HOME=/usr/local/jdk1.8.0_91/ #配置java路径
export SCALA_HOME=/usr/local/scala-2.10.4/ #配置scala路径
export SPARK_MASTER_IP=10.*.*.41
export SPARK_WORKER_MEMORY=10240m
export HADOOP_HOME=/home/lscm/hadoop/hadoop/lib/native #配置hadoop的lib路径
export HADOOP_CONF_DIR=/home/lscm/hadoop/hadoop/etc/hadoop/ #配置hadoop的配置路径
spark-defaults.conf
spark.eventLog.enabled true #eventLog是否生效(建议开启,可以对已完成的任务记录其详细日志)
spark.eventLog.compress true #eventLog是否启用压缩(cpu性能好的情况下建议开启,以减少内存等的占用)
spark.eventLog.dir hdfs://10.30.96.41:9000/tmp/spark/applicationHistory #eventLog的文件存放位置,与spark-env.s中history server配置位置一致,这两个位置必须手动创建 hadoop fs -mkdir -p /tmp/spark/applicationHistory,否则spark启动失败
spark.broadcast.blockSize 8m #广播块大小
spark.executor.cores 1 #Executor的cpu核数
spark.executor.memory 512m #Executor的内存大小
spark.executor.heartbeatInterval 20s #Executor心跳交换时间间隔
spark.files.fetchTimeout 120s #文件抓取的timeout
spark.task.maxFailures 6 #作业最大失败次数(达到此次数后,该作业不再继续执行,运行失败)
spark.serializer org.apache.spark.serializer.KryoSerializer #设置序列化机制(默认使用java的序列化,但是速度很慢建议用Kryo)
spark.kryoserializer.buffer.max 256m #序列化缓冲大小
spark.akka.frameSize 128 #Akka调度帧大小
spark.default.parallelism 20 #默认并行数
spark.network.timeout 300s #最大网络延时
spark.speculation true #Spark推测机制(建议开启)
Resilient Distributed Datasets (RDDs)
RDDs 是以Hadoop的文件作为基础,来进行分布式的并行运算,Pyspark可以从任意支持Hadoop的存储集上创建分布式数据,包括本地文件系统、Cassandra, HBase, Amazon S3,etc。Spark支持文本文件,序列文件以及Hadoop输入格式
读取数据
从本地文件系统、Hadoop URI中读取文本内容
textFile(name, minPartitions=None, use_unicode=True)
保存数据
saveAsHadoopDataset(conf, keyConverter=None, valueConverter=None)
saveAsHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None)
saveAsNewAPIHadoopDataset(conf, keyConverter=None, valueConverter=None)
saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None)
saveAsTextFile(path, compressionCodecClass=None)
base sample 分词示例
使用此示例生成(字符串,数量)结果,保存进文件 ```python from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
text_file = sc.textFile(“hdfs://…”) counts = text_file.flatMap(lambda line: line.split(“ “)) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) counts.saveAsTextFile(“hdfs://…”)
## Spark SQL, DataFrames and Datasets
sql,dataframes,datasets
## Machine Learning Library (MLlib)
##### Linear Regression
loss function
```math
L(\wv;\x,y) := \frac{1}{2} (\wv^T \x - y)^2.
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.replace(',', ' ').split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)
# Build the model
model = LinearRegressionWithSGD.train(parsedData, iterations=100, step=0.00000001)
# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds \
.map(lambda vp: (vp[0] - vp[1])**2) \
.reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
# Save and load model
model.save(sc, "target/tmp/pythonLinearRegressionWithSGDModel")
sameModel = LinearRegressionModel.load(sc, "target/tmp/pythonLinearRegressionWithSGDModel")
Logistic Regression
loss function
L(\wv;\x,y) := \log(1+\exp( -y \wv^T \x)).
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)
# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
# Save and load model
model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc,
"target/tmp/pythonLogisticRegressionWithLBFGSModel")