개발/Apache Flume

Apache Flume Custom Sink 구현

Rumine 2021. 1. 22. 10:49

참고 페이지 : www.programmersought.com/article/4050422971/

 

Flume Custom Sink—implements splitting files by day or hour and compressing them - Programmer Sought

Writing is not easy, please indicate: http://shihlei.iteye.com/blog/2306151   Project needs: Flume collects logs, expects to drop the file system, split by hour, and compress and save. Flume's own File Roll Sink can only split files by time, not to store

www.programmersought.com

 

1. 이클립스/STS에서 New Project > Maven > Maven Project

 

2. Create a simple project 체크 후 next

3. Group Id, Artifact Id, Name ... 입력 후 Finish

 

4. pom.xml 열어서 다음 코드 추가

1
2
3
4
5
6
7
<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
    </dependency>
</dependencies>
cs

 

5. RollingFileFlumeSink.java 생성

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package light.flume;
 
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class RollingFileFlumeSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(RollingFileFlumeSink.class);
     
    //flume 설정값
    //Agent.sources.source_name.설정값
    //예시) Agent.sources.source_name.sink.id=1
    private static final String SINK_ID = "sink.id";
    private static final String SINK_FILENAME = "sink.filename";
    private static final String SINK_FILEPATTERN = "sink.filepattern";
 
    private RollingFileLogger rollingFileLogger;
 
    //설정값을 configure 객체를 통해 가져와서 적용
    //context.getString("설정값 이름", "미 설정 시 디폴트값");
    @Override
    public void configure(Context context) {
        String sinkId = context.getString(SINK_ID, "1");
        String sinkFileName = context.getString(SINK_FILENAME, "/tmp/flume_rollingfile_sink/rolling_file.log");
        String sinkFilePattern = context.getString(SINK_FILEPATTERN, "/tmp/flume_rollingfile_sink/${date:yyyy-MM}/rolling_file-%d{yyyy-MM-dd}.log.gz");
 
        logger.info("{} : {} ", SINK_ID, sinkId);
        logger.info("{} : {} ", SINK_FILENAME, sinkFileName);
        logger.info("{} : {} ", SINK_FILEPATTERN, sinkFilePattern);
 
        rollingFileLogger = new RollingFileLogger(sinkId, sinkFileName, sinkFilePattern);
    }
 
    //실제 sink가 작업을 처리하는 부분
    //본 소스 코드에서는 process() -> handleEvent()에서 작업을 진행
    //이벤트 데이터가 필요할 경우, event 객체의 getHeaders(), getBody() 메소드 사용
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        // Start transaction
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        txn.begin();
        try {
            // This try clause includes whatever Channel operations you want to
            // do
            Event event = ch.take();
            // Send the Event to the external repository.
            // storeSomeData(e);
             
            handleEvent(event.getBody());
 
            txn.commit();
            status = Status.READY;
        } catch (Throwable t) {
            txn.rollback();
            // Log exception, handle individual exceptions as needed
            status = Status.BACKOFF;
 
            // re-throw all Errors
            if (t instanceof Error) {
                throw (Error) t;
            }
        }
        // you must add this line of code in order to close the Transaction.
        txn.close();
        return status;
    }
 
    public void handleEvent(byte[] msg)  {
        try {
            //body부분을 utf-8로 변환, byte[] 그대로 필요할 경우(예- BlobDeserializer를 거쳐온 byte[] 바이너리 파일)에는 아래 라인 삭제
            String msgStr = new String(msg, "utf-8");
            rollingFileLogger.write(msgStr);
        } catch (Exception e) {
            logger.error("Cookie inject error : ", e.getMessage(), e);
        }
    }
}
 
cs

 

6. RollingFileLogger.java 생성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
 
package light.flume;
 
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
 
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.RollingFileAppender;
import org.apache.logging.log4j.core.appender.rolling.TimeBasedTriggeringPolicy;
import org.apache.logging.log4j.core.appender.rolling.TriggeringPolicy;
import org.apache.logging.log4j.core.config.AppenderRef;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.logging.log4j.core.layout.PatternLayout;
 
public class RollingFileLogger {
 
    private static final Logger logger = LogManager.getLogger(RollingFileLogger.class);
 
    private Logger fileWriter;
 
    private String fileName;
    private String filePattern;
    private String appenderName;
    private String loggerName;
 
    //생성자
    public RollingFileLogger(String loggerId, String fileName, String filePattern) {
        this.fileName = fileName;
        this.filePattern = filePattern;
 
        appenderName = loggerId + "_appender";
        loggerName = loggerId + "_logger";
 
        logger.info("fileName : " + fileName);
        logger.info("filePattern : " + filePattern);
        logger.info("appenderName : " + appenderName);
        logger.info("loggerName : " + loggerName);
 
        updateLoggerConfig();
        fileWriter = LogManager.getLogger(loggerName);
    }
 
    //로그 설정값 적용
    private void updateLoggerConfig() {
        final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
        final Configuration config = ctx.getConfiguration();
 
        // add RollingFileAppender
        TriggeringPolicy policy = TimeBasedTriggeringPolicy.createPolicy("1""true");
        Layout<?> layout = PatternLayout.createLayout("%m%n"null, config, null, Charset.forName("utf-8"), truefalsenullnull);
        Appender appender = RollingFileAppender.createAppender(fileName, filePattern, "true", appenderName, "true""""true", policy, null, layout, null"true""false"null, config);
        appender.start();
        config.addAppender(appender);
 
        // add AsyncLogger
        AppenderRef ref = AppenderRef.createAppenderRef(appenderName, nullnull);
        AppenderRef[] refs = new AppenderRef[] { ref };
 
        LoggerConfig loggerConfig = LoggerConfig.createLogger(false, Level.INFO, loggerName, "true", refs, null, config, null);
 
        loggerConfig.addAppender(appender, nullnull);
        config.addLogger(loggerName, loggerConfig);
        ctx.updateLoggers();
    }
 
    //실제 로그 쓰는 부분(RollingFileFlumeSink에서 호출)
    public void write(String msg) {
        fileWriter.info("{}", msg);
    }
 
}
cs

 

7. 코드 작동 순서

1
2
3
4
5
6
7
8
01. RollingFileFlumeSink    - configure()                : 플럼 실행 시 설정값 받아오기
02. RollingFileLogger       - RollingFileLogger()        : 생성자
03. RollingFileLogger       - updateLoggerConfig()       : log4j 설정값 적용
04. Flume 이벤트 수신
05. RollingFileFlumeSink    - process()                  : 이벤트 body를 handleEvent() 메소드에 전달
06. RollingFileFlumeSink    - handleEvent()              : byte[] 이벤트body를 utf-8로 변환하고 write() 메소드 호출
07. RollingFileLogger       - write()                    : 이벤트를 log 파일에 작성
08. RollingFileFlumeSink    - process()                  : (05) ~ (07) 실행 과정에 오류가 있었으면 BACKOFF 실행, 없으면 트랜잭션 커밋
cs

8. Flume 적용 방법

 

- eclipse/sts에서 JAR파일로 export(의존 라이브러리 포함)

 

- (플럼 경로)/lib에 jar 업로드

 

- flume 설정값 적용

1
2
3
4
5
6
7
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = light.flume.RollingFileFlumeSink
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.id = test
a1.sinks.k1.sink.filename = /tmp/flume_rollingfile_sink/rolling_file.log
a1.sinks.k1.sink.filepattern = /tmp/flume_rollingfile_sink/${date:yyyy-MM}/rolling_file-%d{yyyy-MM-dd_hh_mm_ss}.log.gz
cs

 

- jar 적용을 위해 flume 프로세스/서비스 재시작(필수)