<aside> 💡
Resilient Distributed Dataset = 스파크의 기본 추상화 객체.
</aside>
data/log.txt
192.217.41.39 | [25/Feb/2023:05:16:25] | "PATCH /lists HTTP/1.1" | 300
199.211.153.193 | [25/Feb/2023:05:16:25] | "PATCH /events HTTP/1.1" | 404
44.123.5.41 | [25/Feb/2023:05:16:25] | "PATCH /playbooks HTTP/1.1" | 301
187.116.192.22 | [25/Feb/2023:05:16:25] | "PUT /customers HTTP/1.1" | 503
95.233.74.58 | [25/Feb/2023:05:16:26] | "POST /events HTTP/1.1" | 503
163.182.179.76 | [25/Feb/2023:05:16:26] | "PATCH /customers HTTP/1.1" | 500
136.214.175.83 | [25/Feb/2023:05:16:27] | "POST /alerts HTTP/1.1" | 200
148.87.176.94 | [25/Feb/2023:05:16:27] | "PATCH /fieldsets HTTP/1.1" | 500
10.93.154.10 | [25/Feb/2023:05:16:27] | "GET /alerts HTTP/1.1" | 204
38.32.236.30 | [25/Feb/2023:05:16:27] | "PUT /lists HTTP/1.1" | 204
220.154.119.151 | [25/Feb/2023:05:16:28] | "PATCH /collectors HTTP/1.1" | 400
...
log_rdd_ex.py
import os, sys
# PySpark의 핵심 모듈 임포트
from pyspark import SparkContext, RDD
from pyspark.sql import SparkSession
# sys.executable: 현재 코드를 실행 중인 Python 인터프리터의 전체 경로
# 예) C:\\workspace\\spark-main\\.venv\\Scripts\\python.exe
#
# PYSPARK_PYTHON: Spark Worker(작업 실행) 프로세스가 사용할 Python 경로
# PYSPARK_DRIVER_PYTHON: Spark Driver(작업 조율) 프로세스가 사용할 Python 경로
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
# 메인 실행 블록
if __name__=="__main__":
# SparkSession 생성
# 로컬 모드로 실행
# 애플리케이션 이름 설정
ss: SparkSession = SparkSession.builder\\
.master("local")\\
.appName("rdd examples ver")\\
.getOrCreate()
# SparkContext는 RDD 연산을 담당하는 하위 레벨 엔진
sc: SparkContext = ss.sparkContext
# -------------------------------------------------------
# [1] 데이터 로드
# -------------------------------------------------------
# 텍스트 파일을 읽어 RDD 형태로 로드
# -> 각 줄(line)이 RDD의 하나의 요소(element)가 됨
log_rdd: RDD[str] = sc.textFile("data/log.txt")
# -------------------------------------------------------
# [2] check count
# -------------------------------------------------------
# 텍스트 파일을 읽어 로그를 count 집계 (라인) //action
print(f"count of RDD ==> {log_rdd.count()}")
# -------------------------------------------------------
# [3] pirnt each row
# -------------------------------------------------------
# 라인 별 호출
log_rdd.foreach(lambda v:print(v))
C:\\workspace\\spark-main\\.venv\\Scripts\\python.exe C:\\workspace\\spark-main\\part02\\ch02_batch\\log_rdd_ex2.py
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/06 17:03:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
count of RDD ==> 2060
192.217.41.39 | [25/Feb/2023:05:16:25] | "PATCH /lists HTTP/1.1" | 300
199.211.153.193 | [25/Feb/2023:05:16:25] | "PATCH /events HTTP/1.1" | 404
44.123.5.41 | [25/Feb/2023:05:16:25] | "PATCH /playbooks HTTP/1.1" | 301
187.116.192.22 | [25/Feb/2023:05:16:25] | "PUT /customers HTTP/1.1" | 503
95.233.74.58 | [25/Feb/2023:05:16:26] | "POST /events HTTP/1.1" | 503
163.182.179.76 | [25/Feb/2023:05:16:26] | "PATCH /customers HTTP/1.1" | 500
...
log_rdd_ex.py