🚀 Java 병렬 스트림(Parallel Stream) 사용 시 예상되는 문제와 해결 방법
Java에서는 Stream.parallel()을 사용하면 멀티스레드를 활용하여 데이터 처리를 병렬로 수행할 수 있습니다.
하지만 병렬 스트림을 사용할 때 몇 가지 문제가 발생할 수 있으며, 적절한 해결 방법을 적용해야 합니다.
✅ 1. 공유된 상태(Shared State)로 인한 동기화 문제
📌 문제점
병렬 스트림은 여러 스레드에서 동시에 실행되므로, 공유 변수(Shared State)를 변경할 경우 데이터 충돌 및 경쟁 조건(Race Condition)이 발생할 수 있음.
❌ 잘못된 예제
import java.util.stream.IntStream;
public class ParallelStreamSharedState {
private static int sum = 0;
public static void main(String[] args) {
IntStream.rangeClosed(1, 1000)
.parallel()
.forEach(i -> sum += i); // 공유 변수 sum을 여러 스레드가 동시에 변경
System.out.println("Sum: " + sum); // 예상 결과(500500)와 다를 수 있음 (경쟁 상태 발생)
}
}
🚨 문제 발생:
- sum += i 연산은 원자적(Atomic)이지 않음 → 여러 스레드가 동시에 sum을 수정하면서 값이 틀어질 가능성이 있음.
✅ 해결 방법
1. reduce() 사용 (불변 데이터 활용)
int sum = IntStream.rangeClosed(1, 1000)
.parallel()
.reduce(0, Integer::sum); // 안전한 방식
System.out.println("Sum: " + sum); // 항상 500500
📌 reduce()는 공유 변수를 사용하지 않고 안전한 병렬 연산을 수행함.
2. AtomicInteger 사용 (동기화 활용)
import java.util.concurrent.atomic.AtomicInteger;
AtomicInteger sum = new AtomicInteger(0);
IntStream.rangeClosed(1, 1000)
.parallel()
.forEach(i -> sum.addAndGet(i)); // 원자적 연산
System.out.println("Sum: " + sum.get()); // 항상 500500
📌 AtomicInteger는 동기화 없이도 원자적 연산을 보장함.
✅ 2. 순서 보장이 되지 않는 문제
📌 문제점
병렬 스트림은 여러 스레드에서 동시에 실행되므로 출력 순서가 보장되지 않음.
❌ 잘못된 예제
IntStream.rangeClosed(1, 10)
.parallel()
.forEach(System.out::println); // 출력 순서가 뒤섞일 수 있음
🚨 예상 문제:
- 실행할 때마다 출력 순서가 달라짐.
✅ 해결 방법
- 정렬된 순서 유지가 필요하면 forEachOrdered() 사용:
IntStream.rangeClosed(1, 10)
.parallel()
.forEachOrdered(System.out::println); // 항상 1~10 순서대로 출력됨
📌 forEachOrdered()를 사용하면 병렬 스트림에서도 순서가 유지됨.
✅ 3. 병렬 스트림이 더 느려질 수 있는 문제
📌 문제점
- 작은 데이터셋이나 비싼 컨텍스트 스위칭(Context Switching) 이 발생하는 경우, 병렬 스트림이 오히려 성능이 나빠질 수 있음.
❌ 잘못된 예제 (작은 데이터셋에서 병렬 스트림 사용)
List<String> smallList = List.of("A", "B", "C");
smallList.parallelStream()
.map(String::toLowerCase)
.forEach(System.out::println); // 병렬 실행할 필요 없음
🚨 문제 발생:
- 작은 데이터셋에서는 병렬 처리로 인한 오버헤드(스레드 생성 및 스케줄링 비용)가 더 커질 수 있음.
✅ 해결 방법
1. 데이터 크기 확인 후 병렬 스트림 사용
List<String> list = List.of("A", "B", "C");
Stream<String> stream = list.size() > 1000 ? list.parallelStream() : list.stream();
stream.map(String::toLowerCase)
.forEach(System.out::println);
2. CPU 개수 고려 후 병렬 여부 결정
if (list.size() > Runtime.getRuntime().availableProcessors() * 10) {
list.parallelStream().forEach(System.out::println);
} else {
list.stream().forEach(System.out::println);
}
📌 작은 데이터셋에서는 일반 스트림을 사용하고, 충분한 크기일 때만 병렬 스트림을 사용.
✅ 4. 병렬 스트림에서 I/O 작업 시 성능 저하 문제
📌 문제점
- 병렬 스트림은 CPU 연산에 최적화되어 있음.
- 파일 I/O, DB 쿼리, 네트워크 요청 등 입출력(IO) 작업이 포함된 경우, 병렬 처리가 오히려 성능을 저하시킬 수 있음.
❌ 잘못된 예제 (병렬 스트림에서 파일 I/O 실행)
List<String> lines = Files.readAllLines(Path.of("data.txt"));
lines.parallelStream()
.forEach(line -> {
try {
Files.writeString(Path.of("output.txt"), line, StandardOpenOption.APPEND);
} catch (IOException e) {
e.printStackTrace();
}
});
🚨 문제 발생:
- 여러 스레드가 동시에 파일에 쓰기를 시도하면서 경쟁 상태(Race Condition) 발생.
- 성능 저하 + 데이터 손상 가능성.
✅ 해결 방법
1. 병렬 스트림 대신 일반 스트림 사용
lines.stream()
.forEach(line -> {
try {
Files.writeString(Path.of("output.txt"), line, StandardOpenOption.APPEND);
} catch (IOException e) {
e.printStackTrace();
}
});
2. 파일 쓰기 작업을 병렬이 아닌 ExecutorService로 처리
ExecutorService executor = Executors.newFixedThreadPool(4);
lines.forEach(line -> executor.submit(() -> {
try {
Files.writeString(Path.of("output.txt"), line, StandardOpenOption.APPEND);
} catch (IOException e) {
e.printStackTrace();
}
}));
executor.shutdown();
📌 병렬 스트림이 아닌, ExecutorService를 사용하여 안전한 병렬 I/O 처리를 수행.
🎯 정리: 병렬 스트림의 문제와 해결 방법
문제 | 원인 | 해결 방법 |
공유 상태 문제 | 여러 스레드가 같은 변수를 수정 | reduce(), AtomicInteger 사용 |
순서 보장 안됨 | 병렬 스트림은 순서를 유지하지 않음 | forEachOrdered() 사용 |
작은 데이터셋에서 오버헤드 발생 | 병렬 스트림 실행 비용이 큼 | 데이터 크기 확인 후 parallelStream() 사용 |
I/O 작업에서 성능 저하 | 병렬 처리는 CPU 연산에 적합함 | 일반 스트림 사용 또는 ExecutorService 활용 |
📌 병렬 스트림은 적절하게 사용하면 성능을 높일 수 있지만, 무조건 사용하면 오히려 성능이 저하될 수 있음.
➡ 데이터 크기, 연산 유형, 공유 상태 여부를 고려하여 신중하게 사용해야 함!
'서버&백엔드 > 🔥 JAVA' 카테고리의 다른 글
JAVA | 뮤텍스(Mutex) vs 세마포어(Semaphore) (0) | 2025.02.03 |
---|---|
Java | 힙(Heap)과 스택(Stack) 메모리 (0) | 2025.02.03 |
JAVA | var 이란? (0) | 2024.12.22 |
JAVA | TDD API 테스트 & 리팩토링 (2) | 2024.12.20 |
Java | TDD시 사용되는 Assert (0) | 2024.12.19 |