from pyspark.sql import SparkSession | SparkSession 가져오기 |
spark = SparkSession.builder.appName("App").getOrCreate() | 세션 생성 |
.master("local[*]") | 로컬 모드 (모든 코어) |
.config("spark.executor.memory", "4g") | 실행기 메모리 설정 |
.config("spark.driver.memory", "2g") | 드라이버 메모리 설정 |
spark.stop() | 세션 중지 |
sc = spark.sparkContext | SparkContext 얻기 |
df = spark.read.csv("file.csv", header=True, inferSchema=True) | CSV 읽기 |
df = spark.read.json("file.json") | JSON 읽기 |
df = spark.read.parquet("file.parquet") | Parquet 읽기 |
df = spark.createDataFrame(data, schema) | Python 데이터에서 |
df = spark.createDataFrame(pandas_df) | Pandas에서 |
df.write.csv("output.csv") | CSV 쓰기 |
df.write.parquet("output.parquet") | Parquet 쓰기 |
df.show() | 처음 20행 표시 |
df.show(n, truncate=False) | n행 표시 (잘림 없음) |
df.printSchema() | 스키마 출력 |
df.columns | 열 이름 |
df.dtypes | 열 타입 |
df.count() | 행 수 |
df.describe().show() | 통계 |
df.select("col1", "col2") | 열 선택 |
df.select(df.col1, df.col2) | 선택 (Column 객체) |
from pyspark.sql.functions import col | col 함수 가져오기 |
df.select(col("col1")) | col()로 선택 |
df.selectExpr("col1", "col2 * 2 as doubled") | 표현식으로 선택 |
df.drop("col1") | 열 삭제 |
df.filter(df.col > 10) | 행 필터링 |
df.filter("col > 10") | 필터 (문자열 표현식) |
df.where(df.col == "value") | where (별칭) |
df.filter((df.col1 > 10) & (df.col2 < 20)) | 다중 조건 |
df.filter(df.col.isin([1, 2, 3])) | isin으로 필터 |
df.filter(df.col.isNull()) | null 필터 |
df.filter(df.col.isNotNull()) | 널 아닌 필터 |
df.filter(df.col.like("%pattern%")) | like로 필터 |
df.withColumn("new", df.col * 2) | 열 추가/교체 |
df.withColumnRenamed("old", "new") | 열 이름 변경 |
df.withColumn("col", df.col.cast("integer")) | 타입 캐스트 |
df.withColumn("col", F.when(condition, value).otherwise(other)) | 조건부 열 |
df.withColumn("col", F.lit("constant")) | 상수 값 |
df.fillna(0) | null 값 채우기 |
df.na.drop() | null 행 삭제 |
df.distinct() | 중복 제거 |
df.dropDuplicates(["col1"]) | 열 기준 중복 삭제 |
df.orderBy("col") | 오름차순 정렬 |
df.orderBy(df.col.desc()) | 내림차순 정렬 |
df.limit(10) | 행 제한 |
df.sample(fraction=0.1) | 랜덤 샘플 |
from pyspark.sql import functions as F | 함수 가져오기 |
df.agg(F.sum("col")) | 합계 |
df.agg(F.avg("col")) | 평균 |
df.agg(F.count("col")) | 개수 |
df.agg(F.min("col"), F.max("col")) | 최소/최대 |
df.agg(F.countDistinct("col")) | 고유 개수 |
df.groupBy("col").count() | 그룹별 개수 |
df.groupBy("col").sum("value") | 그룹별 합계 |
df.groupBy("col").agg(F.avg("val"), F.max("val")) | 다중 집계 |
df.groupBy("col1", "col2").count() | 다중 기준 그룹화 |
df.groupBy("col").pivot("pivot_col").sum("val") | 피벗 테이블 |
df1.join(df2, "key") | 키로 내부 조인 |
df1.join(df2, df1.k1 == df2.k2) | 표현식으로 조인 |
df1.join(df2, "key", "left") | 왼쪽 조인 |
df1.join(df2, "key", "right") | 오른쪽 조인 |
df1.join(df2, "key", "outer") | 완전 외부 조인 |
df1.join(df2, "key", "left_anti") | 왼쪽 안티 조인 |
df1.crossJoin(df2) | 크로스 조인 |
df1.union(df2) | 유니온 (중복 유지) |
df1.unionByName(df2) | 열 이름으로 유니온 |
df1.intersect(df2) | 교집합 |
df1.subtract(df2) | 차집합 |
df.createOrReplaceTempView("table") | 임시 뷰 생성 |
spark.sql("SELECT * FROM table") | SQL 쿼리 실행 |
spark.sql("SELECT col, COUNT(*) FROM table GROUP BY col") | SQL 집계 |
df.createOrReplaceGlobalTempView("gtable") | 전역 임시 뷰 |
spark.sql("SELECT * FROM global_temp.gtable") | 전역 뷰 쿼리 |
F.upper(col) / F.lower(col) | 대문자/소문자 |
F.trim(col) / F.ltrim(col) / F.rtrim(col) | 공백 제거 |
F.length(col) | 문자열 길이 |
F.substring(col, start, len) | 부분 문자열 |
F.concat(col1, col2) | 연결 |
F.split(col, pattern) | 문자열 분할 |
F.regexp_replace(col, pattern, replacement) | 정규식 교체 |
F.current_date() | 현재 날짜 |
F.current_timestamp() | 현재 타임스탬프 |
F.year(col) / F.month(col) / F.day(col) | 날짜 부분 추출 |
F.datediff(end, start) | 날짜 차이 |
F.date_add(col, days) | 일 추가 |
F.to_date(col, format) | 문자열을 날짜로 |