개발/Apache Flume

Flume Spooldir Source -> Avro Sink -> Avro Source -> Kafka Sink 설정

Rumine 2020. 10. 28. 11:25

- 개요

A, B 서버가 있고, A서버는 특정 디렉토리를 Polling하여 B서버에 전송하고, B서버는 A서버가 보낸 데이터를 Kafka로 전송

 

1. Spooldir Source -> Avro Sink

 

# Components

Agent.sources = spool_source
Agent.channels = memory_channel
Agent.sinks = avro_sink

 

# Spool Source

Agent.sources.spool_source.type = spooldir
Agent.sources.spool_source.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
Agent.sources.spool_source.maxBlobLength = 200000000
Agent.sources.spool_source.channels = memory_channel
Agent.sources.spool_source.spoolDir = /path/to/spool # spooling 할 디렉토리의 경로
Agent.sources.spool_source.fileHeader = true # 파일 헤더 추가
Agent.sources.spool_source.deletePolicy = immediate # 즉시 삭제
Agent.sources.spool_source.recursiveDirectorySearch = true # spooldir 내부 폴더의 파일도 검색
Agent.sources.spool_source.batchSize = 1 # 일괄 처리 개수

 

# Memory Channel

Agent.channels.memory_channel.type = memory # channel이 메모리를 사용함

 

# Avro Sink

Agent.sinks.avro_sink.type = avro
Agent.sinks.avro_sink.channel = memory_channel # memory_channel의 데이터를 가져와서 사용
Agent.sinks.avro_sink.hostname = 127.0.0.1 # memory channel의 데이터를 전송할 IP Address
Agent.sinks.avro_sink.port = 34100 # memory channel의 데이터를 전송할 Port

 

 

2. Avro Source -> Kafka Sink

 

# Components

Agent.sources = avro_source

Agent.channels = memory_channel

Agent.sinks = kafka_sink

 

# Avro Source

Agent.sources.avro_source.type = avro

Agent.sources.avro_source.channels = memory_channel

Agent.sources.avro_source.bind = 0.0.0.0 # 데이터를 수신할 IP Address

Agent.sources.avro_source.port = 34100 # 데이터를 수신할 Port

Agent.sources.avro_source.fileHeader = true

 

# Memory Channel

Agent.channels.memory_channel.type = memory # channel이 메모리를 사용함

 

# Kafka Sink

Agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink

Agent.sinks.kafka_sink.kafka.topic = kafka_topic_name # Kafka에 송신하는데 쓰일 토픽명

Agent.sinks.kafka_sink.kafka.bootstrap.servers = 127.0.0.1:9092 # Kafka의 Listening IPAddr:Port

Agent.sinks.kafka_sink.channel = memory_channel

Agent.sinks.kafka_sink.producer_linger.ms = 10 # Kafka로의 전송 이전에 대기할 시간

Agent.sinks.kafka_sink.kafka.producer.acks = 1 # Kafka로부터의 응답 대기 여부(0:대기X, 1:대기O, all:팔로워까지 완료될때까지 대기)

Agent.sinks.kafka_sink.flumeBatchSize = 1

'개발 > Apache Flume' 카테고리의 다른 글

Apache Flume Custom Sink 구현  (0) 2021.01.22
Flume Spooldir 멈춤 현상  (0) 2020.10.28