- 개요
A, B 서버가 있고, A서버는 특정 디렉토리를 Polling하여 B서버에 전송하고, B서버는 A서버가 보낸 데이터를 Kafka로 전송
1. Spooldir Source -> Avro Sink
# Components
Agent.sources = spool_source
Agent.channels = memory_channel
Agent.sinks = avro_sink
# Spool Source
Agent.sources.spool_source.type = spooldir
Agent.sources.spool_source.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
Agent.sources.spool_source.maxBlobLength = 200000000
Agent.sources.spool_source.channels = memory_channel
Agent.sources.spool_source.spoolDir = /path/to/spool # spooling 할 디렉토리의 경로
Agent.sources.spool_source.fileHeader = true # 파일 헤더 추가
Agent.sources.spool_source.deletePolicy = immediate # 즉시 삭제
Agent.sources.spool_source.recursiveDirectorySearch = true # spooldir 내부 폴더의 파일도 검색
Agent.sources.spool_source.batchSize = 1 # 일괄 처리 개수
# Memory Channel
Agent.channels.memory_channel.type = memory # channel이 메모리를 사용함
# Avro Sink
Agent.sinks.avro_sink.type = avro
Agent.sinks.avro_sink.channel = memory_channel # memory_channel의 데이터를 가져와서 사용
Agent.sinks.avro_sink.hostname = 127.0.0.1 # memory channel의 데이터를 전송할 IP Address
Agent.sinks.avro_sink.port = 34100 # memory channel의 데이터를 전송할 Port
2. Avro Source -> Kafka Sink
# Components
Agent.sources = avro_source
Agent.channels = memory_channel
Agent.sinks = kafka_sink
# Avro Source
Agent.sources.avro_source.type = avro
Agent.sources.avro_source.channels = memory_channel
Agent.sources.avro_source.bind = 0.0.0.0 # 데이터를 수신할 IP Address
Agent.sources.avro_source.port = 34100 # 데이터를 수신할 Port
Agent.sources.avro_source.fileHeader = true
# Memory Channel
Agent.channels.memory_channel.type = memory # channel이 메모리를 사용함
# Kafka Sink
Agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
Agent.sinks.kafka_sink.kafka.topic = kafka_topic_name # Kafka에 송신하는데 쓰일 토픽명
Agent.sinks.kafka_sink.kafka.bootstrap.servers = 127.0.0.1:9092 # Kafka의 Listening IPAddr:Port
Agent.sinks.kafka_sink.channel = memory_channel
Agent.sinks.kafka_sink.producer_linger.ms = 10 # Kafka로의 전송 이전에 대기할 시간
Agent.sinks.kafka_sink.kafka.producer.acks = 1 # Kafka로부터의 응답 대기 여부(0:대기X, 1:대기O, all:팔로워까지 완료될때까지 대기)
Agent.sinks.kafka_sink.flumeBatchSize = 1
'개발 > Apache Flume' 카테고리의 다른 글
Apache Flume Custom Sink 구현 (0) | 2021.01.22 |
---|---|
Flume Spooldir 멈춤 현상 (0) | 2020.10.28 |