Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #
os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit"), "--conf", "spark.driver.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir), "--conf", "spark.executor.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir), ]
""" Create a temp file with the given name and content and return its path. Strips leading spaces from content up to the first '|' in each line. """ else:
""" Create a zip archive containing a file with the given content and return its path. Strips leading spaces from content up to the first '|' in each line. """ else:
|<?xml version="1.0" encoding="UTF-8"?> |<project xmlns="http://maven.apache.org/POM/4.0.0" | xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 | http://maven.apache.org/xsd/maven-4.0.0.xsd"> | <modelVersion>4.0.0</modelVersion> | <groupId>%s</groupId> | <artifactId>%s</artifactId> | <version>%s</version> |</project> """ % (group_id, artifact_id, version)).lstrip(), os.path.join(group_id, artifact_id, version)) |def myfunc(x): | return x + 1 """, ".jar", os.path.join(group_id, artifact_id, version), "%s-%s" % (artifact_id, version))
"""Submit and test a single script file""" |from pyspark import SparkContext | |sc = SparkContext() |print(sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect()) """)
"""Submit and test a single script file calling a global function""" |from pyspark import SparkContext | |def foo(x): | return x * 3 | |sc = SparkContext() |print(sc.parallelize([1, 2, 3]).map(foo).collect()) """)
"""Submit and test a script with a dependency on another module""" |from pyspark import SparkContext |from mylib import myfunc | |sc = SparkContext() |print(sc.parallelize([1, 2, 3]).map(myfunc).collect()) """) |def myfunc(x): | return x + 1 """) stdout=subprocess.PIPE)
"""Submit and test a script with a dependency on another module on a cluster""" |from pyspark import SparkContext |from mylib import myfunc | |sc = SparkContext() |print(sc.parallelize([1, 2, 3]).map(myfunc).collect()) """) |def myfunc(x): | return x + 1 """) "local-cluster[1,1,1024]", script], stdout=subprocess.PIPE)
"""Submit and test a script with a dependency on a Spark Package""" |from pyspark import SparkContext |from mylib import myfunc | |sc = SparkContext() |print(sc.parallelize([1, 2, 3]).map(myfunc).collect()) """) self.sparkSubmit + ["--packages", "a:mylib:0.1", "--repositories", "file:" + self.programDir, script], stdout=subprocess.PIPE)
"""Submit and test a script with a dependency on a Spark Package on a cluster""" |from pyspark import SparkContext |from mylib import myfunc | |sc = SparkContext() |print(sc.parallelize([1, 2, 3]).map(myfunc).collect()) """) self.sparkSubmit + ["--packages", "a:mylib:0.1", "--repositories", "file:" + self.programDir, "--master", "local-cluster[1,1,1024]", script], stdout=subprocess.PIPE)
"""Submit and test a single script on a cluster""" |from pyspark import SparkContext | |def foo(x): | return x * 2 | |sc = SparkContext() |print(sc.parallelize([1, 2, 3]).map(foo).collect()) """) # this will fail if you have different spark.executor.memory # in conf/spark-defaults.conf self.sparkSubmit + ["--master", "local-cluster[1,1,1024]", script], stdout=subprocess.PIPE)
"""Make sure user configuration is respected (SPARK-19307)""" |from pyspark import SparkConf, SparkContext | |conf = SparkConf().set("spark.test_config", "1") |sc = SparkContext(conf = conf) |try: | if sc._conf.get("spark.test_config") != "1": | raise RuntimeError("Cannot find spark.test_config in SparkContext's conf.") |finally: | sc.stop() """) self.sparkSubmit + ["--master", "local", script], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|