from pyspark.sql import SparkSession | Import SparkSession |
spark = SparkSession.builder.appName("App").getOrCreate() | Create session |
.master("local[*]") | Local mode (all cores) |
.config("spark.executor.memory", "4g") | Set executor memory |
.config("spark.driver.memory", "2g") | Set driver memory |
spark.stop() | Stop session |
sc = spark.sparkContext | Get SparkContext |
df = spark.read.csv("file.csv", header=True, inferSchema=True) | Read CSV |
df = spark.read.json("file.json") | Read JSON |
df = spark.read.parquet("file.parquet") | Read Parquet |
df = spark.createDataFrame(data, schema) | From Python data |
df = spark.createDataFrame(pandas_df) | From Pandas |
df.write.csv("output.csv") | Write CSV |
df.write.parquet("output.parquet") | Write Parquet |
df.show() | Show first 20 rows |
df.show(n, truncate=False) | Show n rows (no truncate) |
df.printSchema() | Print schema |
df.columns | Column names |
df.dtypes | Column types |
df.count() | Row count |
df.describe().show() | Statistics |
df.select("col1", "col2") | Select columns |
df.select(df.col1, df.col2) | Select (Column objects) |
from pyspark.sql.functions import col | Import col function |
df.select(col("col1")) | Select with col() |
df.selectExpr("col1", "col2 * 2 as doubled") | Select with expressions |
df.drop("col1") | Drop column |
df.filter(df.col > 10) | Filter rows |
df.filter("col > 10") | Filter (string expr) |
df.where(df.col == "value") | Where (alias) |
df.filter((df.col1 > 10) & (df.col2 < 20)) | Multiple conditions |
df.filter(df.col.isin([1, 2, 3])) | Filter with isin |
df.filter(df.col.isNull()) | Filter null |
df.filter(df.col.isNotNull()) | Filter not null |
df.filter(df.col.like("%pattern%")) | Filter with like |
df.withColumn("new", df.col * 2) | Add/replace column |
df.withColumnRenamed("old", "new") | Rename column |
df.withColumn("col", df.col.cast("integer")) | Cast type |
df.withColumn("col", F.when(condition, value).otherwise(other)) | Conditional column |
df.withColumn("col", F.lit("constant")) | Constant value |
df.fillna(0) | Fill null values |
df.na.drop() | Drop null rows |
df.distinct() | Remove duplicates |
df.dropDuplicates(["col1"]) | Drop duplicates by column |
df.orderBy("col") | Sort ascending |
df.orderBy(df.col.desc()) | Sort descending |
df.limit(10) | Limit rows |
df.sample(fraction=0.1) | Random sample |
from pyspark.sql import functions as F | Import functions |
df.agg(F.sum("col")) | Sum |
df.agg(F.avg("col")) | Average |
df.agg(F.count("col")) | Count |
df.agg(F.min("col"), F.max("col")) | Min/Max |
df.agg(F.countDistinct("col")) | Count distinct |
df.groupBy("col").count() | Count per group |
df.groupBy("col").sum("value") | Sum per group |
df.groupBy("col").agg(F.avg("val"), F.max("val")) | Multiple aggregations |
df.groupBy("col1", "col2").count() | Group by multiple |
df.groupBy("col").pivot("pivot_col").sum("val") | Pivot table |
df1.join(df2, "key") | Inner join on key |
df1.join(df2, df1.k1 == df2.k2) | Join on expression |
df1.join(df2, "key", "left") | Left join |
df1.join(df2, "key", "right") | Right join |
df1.join(df2, "key", "outer") | Full outer join |
df1.join(df2, "key", "left_anti") | Left anti join |
df1.crossJoin(df2) | Cross join |
df1.union(df2) | Union (keep duplicates) |
df1.unionByName(df2) | Union by column names |
df1.intersect(df2) | Intersection |
df1.subtract(df2) | Subtract |
df.createOrReplaceTempView("table") | Create temp view |
spark.sql("SELECT * FROM table") | Run SQL query |
spark.sql("SELECT col, COUNT(*) FROM table GROUP BY col") | SQL aggregation |
df.createOrReplaceGlobalTempView("gtable") | Global temp view |
spark.sql("SELECT * FROM global_temp.gtable") | Query global view |
F.upper(col) / F.lower(col) | Upper/lower case |
F.trim(col) / F.ltrim(col) / F.rtrim(col) | Trim whitespace |
F.length(col) | String length |
F.substring(col, start, len) | Substring |
F.concat(col1, col2) | Concatenate |
F.split(col, pattern) | Split string |
F.regexp_replace(col, pattern, replacement) | Regex replace |
F.current_date() | Current date |
F.current_timestamp() | Current timestamp |
F.year(col) / F.month(col) / F.day(col) | Extract date parts |
F.datediff(end, start) | Date difference |
F.date_add(col, days) | Add days |
F.to_date(col, format) | String to date |