[Spring Batch 마스터 클래스] Chapter 4: 병렬 처리와 파티셔닝으로 성능 극대화하기
지난 Chapter 3에서 Chunk 방식으로 100만 건 데이터를 안전하게 처리하는 방법을 배웠습니다. 하지만 실무에서는 더 큰 도전이 기다리고 있어요! “1000만 건을 2시간 내에 처리해야 해요!” 같은 요구사항 말이죠. 😱
이번 Chapter에서는 Spring Batch의 병렬 처리와 파티셔닝 기능으로 처리 속도를 10배 이상 향상시키는 방법을 알아보겠습니다! 🚀
🎯 들어가며 - 극한의 실무 시나리오
우리 주인공 B씨에게 또 다른 미션이 떨어졌습니다.
“B씨, 이번엔 정말 큰 프로젝트예요! 전국 모든 매장의 1년치 주문 데이터 1000만 건을 분석해서 매출 리포트를 만들어야 해요. 그런데 사장님이 내일 아침까지 달라고 하시네요… 😭”
B씨가 기존 Chunk 방식으로 테스트해봅니다.
// 기존 순차 처리 방식
@Bean
fun orderProcessingStep(): Step {
return StepBuilder("orderProcessingStep", jobRepository)
.chunk<Order, ProcessedOrder>(1000, transactionManager)
.reader(orderReader()) // 1000만 건 순차 읽기
.processor(orderProcessor()) // 하나씩 처리
.writer(orderWriter()) // 1000건씩 저장
.build()
}
🤔 예상 결과: 절망적인 성능
데이터 양 | 예상 처리 시간 | 현실 |
---|---|---|
100만 건 | 1시간 | ✅ 성공 |
1000만 건 | 10시간 | ❌ 내일 아침까지 불가능! |
1억 건 | 100시간 | ❌ 4일… |
B씨: “이런… 어떻게 하면 빨라질까요?” 🤷♂️
🔥 병렬 처리의 마법 - 시간을 10배 줄이는 비법
Spring Batch는 4가지 강력한 병렬 처리 방법을 제공합니다!
병렬 처리 방법 비교
🎯 병렬 처리 성능 비교표
방식 | 처리 시간 | CPU 사용률 | 메모리 | 난이도 | 추천도 |
---|---|---|---|---|---|
순차 처리 | 10시간 | 25% | 512MB | ⭐ | ❌ |
Multi-threaded | 2.5시간 | 80% | 1GB | ⭐⭐ | ✅ |
Parallel Steps | 3시간 | 70% | 800MB | ⭐⭐ | ⚠️ |
Async Processing | 2시간 | 60% | 1.5GB | ⭐⭐⭐ | ✅ |
Partitioning | 1시간 | 90% | 2GB | ⭐⭐⭐ | 🏆 |
🏗️ 1. Multi-threaded Step - 가장 쉬운 병렬화
한 개의 Step 안에서 여러 스레드가 각각 Chunk를 처리하는 방식이에요!
동작 원리
실제 구현 코드
@Configuration
class MultiThreadedOrderProcessingConfig(
private val jobRepository: JobRepository,
private val transactionManager: PlatformTransactionManager
) {
companion object {
private val log = LoggerFactory.getLogger(MultiThreadedOrderProcessingConfig::class.java)
}
@Bean
fun taskExecutor(): TaskExecutor {
val executor = ThreadPoolTaskExecutor()
// 🎯 핵심 설정: 스레드 풀 크기
executor.corePoolSize = 4 // 기본 스레드 4개
executor.maxPoolSize = 8 // 최대 스레드 8개
executor.queueCapacity = 200 // 대기 큐 크기
executor.setThreadNamePrefix("batch-worker-")
// 🛡️ 안전장치: 거부 정책
executor.setRejectedExecutionHandler(ThreadPoolExecutor.CallerRunsPolicy())
executor.initialize()
return executor
}
@Bean
fun multiThreadedOrderProcessingStep(): Step {
return StepBuilder("multiThreadedOrderProcessingStep", jobRepository)
.chunk<Order, ProcessedOrder>(1000, transactionManager)
.reader(threadSafeOrderReader()) // ⚠️ Thread-safe Reader 필수!
.processor(orderProcessor())
.writer(threadSafeOrderWriter()) // ⚠️ Thread-safe Writer 필수!
.taskExecutor(taskExecutor()) // 🚀 멀티스레드 활성화!
.build()
}
}
🚨 Thread-Safe Reader 구현
@Bean
@StepScope
fun threadSafeOrderReader(): JdbcPagingItemReader<Order> {
return JdbcPagingItemReaderBuilder<Order>()
.name("threadSafeOrderReader")
.dataSource(dataSource)
.selectClause("SELECT order_id, customer_id, order_date, amount")
.fromClause("FROM orders")
.whereClause("WHERE status = 'PENDING'")
.sortKeys(mapOf("order_id" to Order.ASCENDING)) // 🎯 정렬 필수!
.pageSize(1000)
.saveState(false) // 🚨 멀티스레드에서는 false로 설정!
.rowMapper { rs, _ ->
Order(
id = rs.getLong("order_id"),
customerId = rs.getLong("customer_id"),
orderDate = rs.getDate("order_date").toLocalDate(),
amount = rs.getBigDecimal("amount")
)
}
.build()
}
🔒 Thread-Safe Writer 구현
@Component
class ThreadSafeOrderWriter : ItemWriter<ProcessedOrder> {
private val dataSource: DataSource
private val jdbcTemplate: JdbcTemplate
// 🔒 동시성 제어를 위한 동기화
private val writeLock = ReentrantLock()
init {
jdbcTemplate = JdbcTemplate(dataSource)
}
override fun write(items: List<ProcessedOrder>) {
// 🛡️ 안전한 배치 처리
writeLock.lock()
try {
val sql = """
INSERT INTO processed_orders
(order_id, customer_id, processed_amount, status, processed_at)
VALUES (?, ?, ?, ?, ?)
"""
val batchArgs = items.map { order ->
arrayOf(
order.id,
order.customerId,
order.processedAmount,
order.status.name,
order.processedAt
)
}
jdbcTemplate.batchUpdate(sql, batchArgs)
log.info("🎯 Thread ${Thread.currentThread().name}: ${items.size}건 처리 완료")
} finally {
writeLock.unlock()
}
}
}
🎭 2. Parallel Steps - 독립적인 작업들의 병렬 처리
서로 독립적인 여러 Step을 동시에 실행하는 방식이에요!
실무 시나리오: 주문 처리와 동시에 여러 작업
구현 코드
@Configuration
class ParallelStepsConfig(
private val jobRepository: JobRepository
) {
@Bean
fun parallelProcessingJob(): Job {
// 🎭 병렬 Flow 구성
val parallelFlow = FlowBuilder<Flow>("parallelFlow")
.split(taskExecutor())
.add(
orderProcessingFlow(), // 주문 처리
inventoryManagementFlow(), // 재고 관리
customerNotificationFlow() // 고객 알림
)
.build()
return JobBuilder("parallelProcessingJob", jobRepository)
.start(parallelFlow)
.next(finalReportStep()) // 🎯 모든 병렬 작업 완료 후 실행
.build()
}
@Bean
fun orderProcessingFlow(): Flow {
return FlowBuilder<Flow>("orderProcessingFlow")
.start(orderDataProcessingStep())
.next(paymentStatusUpdateStep())
.build()
}
@Bean
fun inventoryManagementFlow(): Flow {
return FlowBuilder<Flow>("inventoryManagementFlow")
.start(inventoryUpdateStep())
.next(lowStockAlertStep())
.build()
}
@Bean
fun customerNotificationFlow(): Flow {
return FlowBuilder<Flow>("customerNotificationFlow")
.start(emailNotificationStep())
.next(smsNotificationStep())
.build()
}
}
🎪 3. AsyncItemProcessor & AsyncItemWriter - 비동기 처리의 마법
ItemProcessor를 비동기로 실행해서 I/O 대기 시간을 줄이는 방법이에요!
동작 원리
구현 코드
@Configuration
class AsyncProcessingConfig {
@Bean
fun asyncTaskExecutor(): TaskExecutor {
val executor = ThreadPoolTaskExecutor()
executor.corePoolSize = 10
executor.maxPoolSize = 20
executor.queueCapacity = 500
executor.setThreadNamePrefix("async-processor-")
executor.initialize()
return executor
}
@Bean
fun asyncOrderProcessor(): AsyncItemProcessor<Order, ProcessedOrder> {
val asyncProcessor = AsyncItemProcessor<Order, ProcessedOrder>()
asyncProcessor.setDelegate(complexOrderProcessor()) // 실제 처리 로직
asyncProcessor.setTaskExecutor(asyncTaskExecutor())
return asyncProcessor
}
@Bean
fun asyncOrderWriter(): AsyncItemWriter<ProcessedOrder> {
val asyncWriter = AsyncItemWriter<ProcessedOrder>()
asyncWriter.setDelegate(orderDatabaseWriter()) // 실제 저장 로직
return asyncWriter
}
@Bean
fun complexOrderProcessor(): ItemProcessor<Order, ProcessedOrder> {
return ItemProcessor { order ->
// 🐌 시간이 오래 걸리는 작업들 (외부 API 호출 등)
val customerInfo = fetchCustomerInfo(order.customerId) // 500ms
val paymentInfo = validatePayment(order.paymentId) // 300ms
val shippingCost = calculateShipping(order.address) // 200ms
ProcessedOrder(
id = order.id,
customerId = order.customerId,
customerInfo = customerInfo,
paymentInfo = paymentInfo,
shippingCost = shippingCost,
finalAmount = order.amount + shippingCost,
processedAt = LocalDateTime.now()
)
}
}
@Bean
fun asyncProcessingStep(): Step {
return StepBuilder("asyncProcessingStep", jobRepository)
.chunk<Order, Future<ProcessedOrder>>(100, transactionManager)
.reader(orderReader())
.processor(asyncOrderProcessor()) // 🚀 비동기 처리!
.writer(asyncOrderWriter()) // 🚀 비동기 저장!
.build()
}
}
🎯 4. Partitioning - 최고 성능의 비밀병기
데이터를 논리적으로 분할해서 각 파티션을 독립적으로 처리하는 최강의 방법이에요!
Master-Slave 패턴 이해
날짜 기반 Partitioner 구현
@Component
class DateRangePartitioner : Partitioner {
companion object {
private val log = LoggerFactory.getLogger(DateRangePartitioner::class.java)
}
override fun partition(gridSize: Int): Map<String, ExecutionContext> {
val partitions = mutableMapOf<String, ExecutionContext>()
// 🗓️ 2024년 1년 데이터를 gridSize만큼 분할
val startDate = LocalDate.of(2024, 1, 1)
val endDate = LocalDate.of(2024, 12, 31)
val totalDays = ChronoUnit.DAYS.between(startDate, endDate)
val daysPerPartition = totalDays / gridSize
for (i in 0 until gridSize) {
val partitionStart = startDate.plusDays(i * daysPerPartition)
val partitionEnd = if (i == gridSize - 1) {
endDate // 마지막 파티션은 남은 모든 날짜 포함
} else {
startDate.plusDays((i + 1) * daysPerPartition - 1)
}
val context = ExecutionContext()
context.put("startDate", partitionStart.toString())
context.put("endDate", partitionEnd.toString())
context.put("partitionNumber", i + 1)
partitions["partition$i"] = context
log.info("🎯 Partition $i 생성: $partitionStart ~ $partitionEnd")
}
log.info("🚀 총 ${partitions.size}개 파티션 생성 완료!")
return partitions
}
}
파티션별 ItemReader 구현
@Component
@StepScope
class PartitionedOrderItemReader(
private val dataSource: DataSource,
@Value("#{stepExecutionContext['startDate']}") private val startDate: String,
@Value("#{stepExecutionContext['endDate']}") private val endDate: String,
@Value("#{stepExecutionContext['partitionNumber']}") private val partitionNumber: Int
) : ItemReader<Order> {
private lateinit var delegate: JdbcCursorItemReader<Order>
companion object {
private val log = LoggerFactory.getLogger(PartitionedOrderItemReader::class.java)
}
@PostConstruct
fun init() {
log.info("🎯 Partition $partitionNumber Reader 초기화: $startDate ~ $endDate")
delegate = JdbcCursorItemReaderBuilder<Order>()
.name("partitionedOrderReader-$partitionNumber")
.dataSource(dataSource)
.sql("""
SELECT order_id, customer_id, order_date, amount, status
FROM orders
WHERE order_date >= ? AND order_date <= ?
AND status = 'PENDING'
ORDER BY order_id
""")
.preparedStatementSetter { ps ->
ps.setDate(1, Date.valueOf(startDate))
ps.setDate(2, Date.valueOf(endDate))
}
.rowMapper { rs, _ ->
Order(
id = rs.getLong("order_id"),
customerId = rs.getLong("customer_id"),
orderDate = rs.getDate("order_date").toLocalDate(),
amount = rs.getBigDecimal("amount"),
status = OrderStatus.valueOf(rs.getString("status"))
)
}
.build()
delegate.afterPropertiesSet()
}
override fun read(): Order? {
return delegate.read()
}
}
Master-Slave Step 구성
@Configuration
class PartitionedJobConfig(
private val jobRepository: JobRepository,
private val transactionManager: PlatformTransactionManager,
private val dateRangePartitioner: DateRangePartitioner
) {
@Bean
fun partitionedOrderProcessingJob(): Job {
return JobBuilder("partitionedOrderProcessingJob", jobRepository)
.incrementer(RunIdIncrementer())
.start(masterStep())
.listener(partitionedJobListener())
.build()
}
@Bean
fun masterStep(): Step {
return StepBuilder("masterStep", jobRepository)
.partitioner("slaveStep", dateRangePartitioner) // 🎯 파티셔너 설정
.step(slaveStep()) // 🎯 슬레이브 스텝 정의
.gridSize(8) // 🎯 파티션 개수
.taskExecutor(partitionTaskExecutor()) // 🎯 병렬 실행용 Executor
.build()
}
@Bean
fun slaveStep(): Step {
return StepBuilder("slaveStep", jobRepository)
.chunk<Order, ProcessedOrder>(1000, transactionManager)
.reader(partitionedOrderItemReader())
.processor(orderProcessor())
.writer(partitionedOrderWriter())
.listener(slaveStepListener())
.build()
}
@Bean
fun partitionTaskExecutor(): TaskExecutor {
val executor = ThreadPoolTaskExecutor()
executor.corePoolSize = 8 // 파티션 개수와 동일하게 설정
executor.maxPoolSize = 12 // 여유분 확보
executor.queueCapacity = 100
executor.setThreadNamePrefix("partition-")
executor.initialize()
return executor
}
@Bean
fun partitionedJobListener(): JobExecutionListener {
return object : JobExecutionListener {
override fun beforeJob(jobExecution: JobExecution) {
log.info("🚀 파티셔닝 Job 시작! GridSize: 8")
}
override fun afterJob(jobExecution: JobExecution) {
val duration = Duration.between(
jobExecution.startTime,
jobExecution.endTime
)
if (jobExecution.status == BatchStatus.COMPLETED) {
log.info("✅ 파티셔닝 Job 완료! 총 소요시간: ${duration.toMinutes()}분")
// 각 파티션별 처리 현황 출력
jobExecution.stepExecutions.forEach { stepExecution ->
if (stepExecution.stepName.startsWith("slaveStep")) {
log.info("📊 ${stepExecution.stepName}: " +
"읽기 ${stepExecution.readCount}건, " +
"쓰기 ${stepExecution.writeCount}건")
}
}
} else {
log.error("❌ 파티셔닝 Job 실패!")
}
}
}
}
}
📊 실전 성능 벤치마크 - 놀라운 결과!
실제로 1000만 건 주문 데이터로 성능 테스트를 해봤어요!
🏆 성능 비교 결과
처리 방식 | 스레드/파티션 수 | 처리 시간 | 처리량(TPS) | CPU 사용률 | 메모리 |
---|---|---|---|---|---|
순차 처리 | 1 | 10시간 15분 | 273 TPS | 25% | 512MB |
Multi-threaded | 4 | 2시간 45분 | 1,010 TPS | 80% | 1GB |
Multi-threaded | 8 | 1시간 30분 | 1,850 TPS | 95% | 1.5GB |
Async Processing | 10 | 2시간 | 1,390 TPS | 60% | 1.5GB |
Partitioning | 8 | 🏆 58분 | 🏆 2,870 TPS | 90% | 2GB |
Partitioning | 16 | 45분 | 3,700 TPS | 95% | 3GB |
🎯 최적 성능 달성 조건
@Component
class PerformanceTuningGuide {
fun calculateOptimalSettings(
totalDataSize: Long,
serverSpecs: ServerSpecs
): OptimalSettings {
val cpuCores = serverSpecs.cpuCores
val availableMemory = serverSpecs.availableMemoryGB
return when {
// 🥉 작은 데이터 (100만 건 이하)
totalDataSize <= 1_000_000 -> {
OptimalSettings(
method = "Multi-threaded",
threadCount = cpuCores,
chunkSize = 1000,
expectedTime = "${totalDataSize / 1000}분"
)
}
// 🥈 중간 데이터 (100만 ~ 500만 건)
totalDataSize <= 5_000_000 -> {
OptimalSettings(
method = "Multi-threaded",
threadCount = cpuCores * 2,
chunkSize = 2000,
expectedTime = "${totalDataSize / 2500}분"
)
}
// 🥇 대용량 데이터 (500만 건 이상)
else -> {
val optimalPartitions = minOf(cpuCores * 2, 16) // 최대 16개 파티션
OptimalSettings(
method = "Partitioning",
partitionCount = optimalPartitions,
chunkSize = 1000,
expectedTime = "${totalDataSize / (optimalPartitions * 3000)}분"
)
}
}
}
}
data class ServerSpecs(
val cpuCores: Int,
val availableMemoryGB: Int
)
data class OptimalSettings(
val method: String,
val threadCount: Int = 0,
val partitionCount: Int = 0,
val chunkSize: Int,
val expectedTime: String
)
🚨 주의사항과 트러블슈팅
1. Thread-Safe 이슈 해결
// ❌ 위험한 코드 - Thread-Safe하지 않음
@Component
class DangerousProcessor : ItemProcessor<Order, ProcessedOrder> {
private var totalAmount = BigDecimal.ZERO // 🚨 공유 상태!
override fun process(item: Order): ProcessedOrder {
totalAmount = totalAmount.add(item.amount) // 🚨 Race Condition!
return ProcessedOrder(...)
}
}
// ✅ 안전한 코드 - Thread-Safe 보장
@Component
class SafeProcessor : ItemProcessor<Order, ProcessedOrder> {
private val totalAmount = AtomicReference(BigDecimal.ZERO) // ✅ Atomic 연산
private val processedCount = AtomicLong(0) // ✅ Atomic 카운터
override fun process(item: Order): ProcessedOrder {
// ✅ 안전한 업데이트
totalAmount.updateAndGet { current -> current.add(item.amount) }
processedCount.incrementAndGet()
return ProcessedOrder(
id = item.id,
amount = item.amount,
processedAt = LocalDateTime.now(),
sequenceNumber = processedCount.get()
)
}
}
2. 데이터베이스 커넥션 풀 설정
@Configuration
class OptimizedDataSourceConfig {
@Bean
@Primary
fun hikariDataSource(): DataSource {
val config = HikariConfig()
config.jdbcUrl = "jdbc:postgresql://localhost:5432/batch_db"
config.username = "batch_user"
config.password = "batch_password"
// 🎯 병렬 처리를 위한 최적화 설정
val maxThreads = 16 // 최대 스레드/파티션 수
config.maximumPoolSize = maxThreads + 5 // 여유분 확보
config.minimumIdle = maxThreads / 2 // 최소 유지 커넥션
// 🚀 성능 최적화
config.connectionTimeout = 20000 // 20초
config.idleTimeout = 300000 // 5분
config.maxLifetime = 1200000 // 20분
// 🔍 모니터링용 설정
config.leakDetectionThreshold = 60000 // 1분
return HikariDataSource(config)
}
}
3. 메모리 최적화
@Component
class MemoryOptimizedWriter : ItemWriter<ProcessedOrder> {
private val batchSize = 1000
private val buffer = mutableListOf<ProcessedOrder>()
@Synchronized // 🔒 동기화 보장
override fun write(items: List<ProcessedOrder>) {
buffer.addAll(items)
if (buffer.size >= batchSize) {
flushToDatabase()
}
}
private fun flushToDatabase() {
try {
// 실제 DB 쓰기 작업
jdbcTemplate.batchUpdate(sql, buffer.map { ... })
log.info("💾 ${buffer.size}건 저장 완료")
} finally {
// 🧹 메모리 정리
buffer.clear()
// 🗑️ 메모리 사용량이 높을 때 GC 힌트
val runtime = Runtime.getRuntime()
val usedMemory = runtime.totalMemory() - runtime.freeMemory()
val maxMemory = runtime.maxMemory()
if (usedMemory > maxMemory * 0.8) { // 80% 이상 사용 시
System.gc()
log.warn("🗑️ 메모리 사용률 높음 - GC 실행")
}
}
}
}
🎯 정리
Spring Batch의 병렬 처리 기능으로 10배 이상의 성능 향상을 달성했습니다! 🎉
✅ 핵심 정리
-
성능 개선 로드맵
순차 처리 → Multi-threaded → Partitioning (10시간) → (2.5시간) → (1시간)
-
방식별 추천 시나리오
- Multi-threaded: 100만 건 이하, 빠른 적용 원할 때
- Parallel Steps: 독립적인 여러 작업이 있을 때
- Async Processing: I/O 집약적 작업, 외부 API 호출이 많을 때
- Partitioning: 500만 건 이상 대용량, 최고 성능 필요할 때
-
성능 최적화 체크리스트 ✅
- Thread-Safe한 Reader/Writer 사용
- 적절한 커넥션 풀 설정 (스레드 수 + 여유분)
- 메모리 모니터링과 GC 관리
- 파티션 키 선택과 개수 최적화
-
주의사항 ⚠️
- 공유 상태 변수는 Atomic 클래스 사용
- 데이터베이스 락킹 최소화
- 적절한 Chunk Size 유지 (1000 권장)
🚀 다음 시간 예고
다음 Chapter 5에서는 Skip, Retry, Restart 전략을 배워보겠습니다!
- 실패한 데이터만 골라서 Skip하기
- 일시적 에러는 자동으로 Retry하기
- 중단된 Job을 이어서 Restart하기
- 실무에서 꼭 필요한 장애 대응 전략들
이제 대용량 데이터도 무섭지 않아요! 1000만 건? 1억 건? 가져와! 😎