Demo - Shell执行SparkJar
更新时间: 2024-03-11 02:49:55
阅读 1569
DEMO-Shell执行SparkJar
适用模块
客户端
具体说明
Spark支持Java、Scala及python语言编写代码,提交至集群运行;
使用示例
###### Java代码简单示例
package pers.semon.demo;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
/**
* @Description:spark简单Java代码示例
* @Author: Semon
* @Version: v1.0
* @Date: 2021/9/14 21:00
*/
public class SparkJavaDemo {
public static void main(String[] args) throws IOException {
Properties prop = new Properties();
String filename = System.getProperty("user.dir")+"/demo.sql";
prop.load(new FileInputStream(new File(filename)));
SparkConf sc = new SparkConf();
SparkSession ss = SparkSession.builder().appName("javaDemo").config(sc).enableHiveSupport().getOrCreate();
String sql = prop.getProperty("sql");
System.out.println(ss.sql(sql).showString(10,0,false));
ss.close();
}
}
代码需打为jar包后通过spark-submit提交;假设打包名为 `spark-java-demo.jar`
###### Scala代码简单示例
package pers.semon.demo
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import java.io.FileInputStream
import java.util.Properties
/**
* @Description:spark简单Scala代码示例
* @Author: Semon
* @Version: v1.0
* @Date: 2021/9/14 20:27
*/
object SparkScalaDemo {
def main(args: Array[String]): Unit = {
val prop = new Properties()
prop.load(new FileInputStream(System.getProperty("user.dir") + "/demo.sql"))
val sc = new SparkConf()
val ss = SparkSession.builder().appName("scalaDemo").config(sc).enableHiveSupport().getOrCreate()
val sql = prop.getProperty("sql")
val df = ss.sql(sql)
df.show()
ss.stop()
}
}
代码需打为jar包后通过spark-submit提交;假设打包名为`spark-scala-demo.jar`
###### python代码简单示例
# demo.py
from pyspark import SparkConf, SparkContext
conf = {"appname": "demo", "driver_memory": "4g", "executor_memory": "4g", "executor_cores": 2, "executor_num": 2, "master": "yarn", "deploy_mode": "client"}
sc = SparkConf()
sc.setMaster(conf['master']) \
.setAppName(conf['appname']) \
.set('spark.driver.memory', conf['driver_memory']) \
.set('spark.executor.memory', conf['executor_memory']) \
.set('spark.executor.cores', conf['executor_cores']) \
.set('spark.deploy_mode', conf["deploy_mode"])\
.set('spark.yarn.queue', 'root.poc')\
.set('spark.executor.memoryOverhead', '2g')\
.set('spark.driver.memoryOverhead', '2g')
spark= SparkSession.builder.config(conf=sc).enableHiveSupport().getOrCreate()
spark.sql("select count(*) from poc.demo").show()
spark.stop()
###### 任务提交
提交jar包至yarn集群执行:
$SPARK_HOME/bin/spark-submit --class pers.semon.demo.SparkJavaDemo \
--master yarn \
--deploy-mode cluster \
--driver-momory 2g \
--executor-memory 4g \
--executor-cores 1 \
--num-executors 10 \
--files demo.sql \
./spark-scala-demo.jar
提交python脚本至yarn集群执行:
$SPARK_HOME/bin/spark-submit --class pers.semon.demo.SparkJavaDemo \
--master yarn \
--deploy-mode cluster \
--driver-momory 2g \
--executor-memory 4g \
--executor-cores 1 \
--num-executors 10 \
--files demo.sql \
--py-files demo.py
作者:wangsong
文档反馈
以上内容对您是否有帮助?