Spark RDD란

<aside> 💡

Resilient Distributed Dataset = 스파크의 기본 추상화 객체.

</aside>

주요 구성 요소

1. 의존성 정보

2. 파티션 (지역성 정보 포함)

3. 연산 함수 : Partition ⇒ Iterator[T]

RDD 실습 - 로그 집계 파이프라인 만들기 (map, filter, reduce, group by)

1. log.txt 파일 가지고 count 및 라인 별 호출 (action)

data/log.txt

192.217.41.39 | [25/Feb/2023:05:16:25] | "PATCH /lists HTTP/1.1" | 300
199.211.153.193 | [25/Feb/2023:05:16:25] | "PATCH /events HTTP/1.1" | 404
44.123.5.41 | [25/Feb/2023:05:16:25] | "PATCH /playbooks HTTP/1.1" | 301
187.116.192.22 | [25/Feb/2023:05:16:25] | "PUT /customers HTTP/1.1" | 503
95.233.74.58 | [25/Feb/2023:05:16:26] | "POST /events HTTP/1.1" | 503
163.182.179.76 | [25/Feb/2023:05:16:26] | "PATCH /customers HTTP/1.1" | 500
136.214.175.83 | [25/Feb/2023:05:16:27] | "POST /alerts HTTP/1.1" | 200
148.87.176.94 | [25/Feb/2023:05:16:27] | "PATCH /fieldsets HTTP/1.1" | 500
10.93.154.10 | [25/Feb/2023:05:16:27] | "GET /alerts HTTP/1.1" | 204
38.32.236.30 | [25/Feb/2023:05:16:27] | "PUT /lists HTTP/1.1" | 204
220.154.119.151 | [25/Feb/2023:05:16:28] | "PATCH /collectors HTTP/1.1" | 400
...

log_rdd_ex.py

import os, sys
# PySpark의 핵심 모듈 임포트
from pyspark import SparkContext, RDD
from pyspark.sql import SparkSession

# sys.executable: 현재 코드를 실행 중인 Python 인터프리터의 전체 경로
#   예) C:\\workspace\\spark-main\\.venv\\Scripts\\python.exe
#
# PYSPARK_PYTHON: Spark Worker(작업 실행) 프로세스가 사용할 Python 경로
# PYSPARK_DRIVER_PYTHON: Spark Driver(작업 조율) 프로세스가 사용할 Python 경로
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# 메인 실행 블록
if __name__=="__main__":

    # SparkSession 생성
    # 로컬 모드로 실행
    # 애플리케이션 이름 설정
    ss: SparkSession = SparkSession.builder\\
        .master("local")\\
        .appName("rdd examples ver")\\
        .getOrCreate()

    # SparkContext는 RDD 연산을 담당하는 하위 레벨 엔진
    sc: SparkContext = ss.sparkContext

    # -------------------------------------------------------
    # [1] 데이터 로드
    # -------------------------------------------------------
    # 텍스트 파일을 읽어 RDD 형태로 로드
    # -> 각 줄(line)이 RDD의 하나의 요소(element)가 됨
    log_rdd: RDD[str] = sc.textFile("data/log.txt")

    # -------------------------------------------------------
    # [2] check count
    # -------------------------------------------------------
    # 텍스트 파일을 읽어 로그를 count 집계 (라인) //action
    print(f"count of RDD ==> {log_rdd.count()}")

    # -------------------------------------------------------
    # [3] pirnt each row
    # -------------------------------------------------------
    # 라인 별 호출
    log_rdd.foreach(lambda v:print(v))
C:\\workspace\\spark-main\\.venv\\Scripts\\python.exe C:\\workspace\\spark-main\\part02\\ch02_batch\\log_rdd_ex2.py 
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/06 17:03:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
count of RDD ==> 2060
192.217.41.39 | [25/Feb/2023:05:16:25] | "PATCH /lists HTTP/1.1" | 300
199.211.153.193 | [25/Feb/2023:05:16:25] | "PATCH /events HTTP/1.1" | 404
44.123.5.41 | [25/Feb/2023:05:16:25] | "PATCH /playbooks HTTP/1.1" | 301
187.116.192.22 | [25/Feb/2023:05:16:25] | "PUT /customers HTTP/1.1" | 503
95.233.74.58 | [25/Feb/2023:05:16:26] | "POST /events HTTP/1.1" | 503
163.182.179.76 | [25/Feb/2023:05:16:26] | "PATCH /customers HTTP/1.1" | 500
...

2. log.txt 파일 가지고 map 함수 사용 (Transformation)

log_rdd_ex.py