今天在做flume+kerberos写入hdfs时遇到的问题。

测试的配置文件:

agent-server1.sources= testtailagent-server1.sinks = hdfs-sinkagent-server1.channels= hdfs-channelagent-server1.sources.testtail.type = netcatagent-server1.sources.testtail.bind = localhostagent-server1.sources.testtail.port = 9999agent-server1.sinks.hdfs-sink.hdfs.kerberosPrincipal = hdfs/_HOST@KERBEROS_HADOOPagent-server1.sinks.hdfs-sink.hdfs.kerberosKeytab = /home/vipshop/conf/hdfs.keytabagent-server1.channels.hdfs-channel.type = memoryagent-server1.channels.hdfs-channel.capacity = 200000000agent-server1.channels.hdfs-channel.transactionCapacity = 10000agent-server1.sinks.hdfs-sink.type = hdfsagent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume/%Y%m%dagent-server1.sinks.hdfs-sink.hdfs.rollInterval = 60agent-server1.sinks.hdfs-sink.hdfs.rollSize = 0agent-server1.sinks.hdfs-sink.hdfs.rollCount = 0agent-server1.sinks.hdfs-sink.hdfs.threadsPoolSize = 10agent-server1.sinks.hdfs-sink.hdfs.round = falseagent-server1.sinks.hdfs-sink.hdfs.roundValue = 30agent-server1.sinks.hdfs-sink.hdfs.roundUnit = minuteagent-server1.sinks.hdfs-sink.hdfs.batchSize = 100agent-server1.sinks.hdfs-sink.hdfs.fileType = DataStreamagent-server1.sinks.hdfs-sink.hdfs.writeFormat = Textagent-server1.sinks.hdfs-sink.hdfs.callTimeout = 60000agent-server1.sinks.hdfs-sink.hdfs.idleTimeout = 100agent-server1.sinks.hdfs-sink.hdfs.filePrefix = ipagent-server1.sinks.hdfs-sink.channel = hdfs-channelagent-server1.sources.testtail.channels = hdfs-channel

在启动服务后,使用telnet进行测试,发现如下报错:

14/03/24 18:03:07 ERROR hdfs.HDFSEventSink: process failedjava.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)        at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)        at java.lang.Thread.run(Thread.java:662)Caused by: java.lang.NumberFormatException: null        at java.lang.Long.parseLong(Long.java:375)        at java.lang.Long.valueOf(Long.java:525)        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)        ... 5 more14/03/24 18:03:07 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.org.apache.flume.EventDeliveryException: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event toresolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:461)        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)        at java.lang.Thread.run(Thread.java:662)Caused by: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)        at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)        ... 3 moreCaused by: java.lang.NumberFormatException: null        at java.lang.Long.parseLong(Long.java:375)        at java.lang.Long.valueOf(Long.java:525)        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)        ... 5 more

从调用栈的信息来看,错误出在org.apache.flume.formatter.output.BucketPath类的replaceShorthand方法。

在org.apache.flume.sink.hdfs.HDFSEventSink类中,使用process方法来生成hdfs的url,其中主要是调用了BucketPath类的escapeString方法来进行字符的转换,并最终调用了replaceShorthand方法。
其中replaceShorthand方法的相关代码如下:

public static String replaceShorthand(char c, Map
headers, TimeZone timeZone, boolean needRounding, int unit, int roundDown) { String timestampHeader = headers.get("timestamp"); long ts; try { ts = Long.valueOf(timestampHeader); } catch (NumberFormatException e) { throw new RuntimeException("Flume wasn't able to parse timestamp header" + " in the event to resolve time based bucketing. Please check that" + " you're correctly populating timestamp header (for example using" + " TimestampInterceptor source interceptor).", e); } if(needRounding){ ts = roundDown(roundDown, unit, ts); }........

从代码中可以看到,timestampHeader 的值如果取不到,在向ts赋值时就会报错。。

这其实是flume的一个bug,bug id:
解决方法有3个:
1.更改配置,更新hdfs文件的路径格式

agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume

但是这样就不能按天来存放日志了

2.通过更改相关的代码
(patch:https://issues.apache.org/jira/secure/p_w_upload/12538891/FLUME-1419.patch)
如果在headers中获取不到timestamp的值,就给它一个当前timestamp的值。
相关代码:

String timestampHeader = headers.get("timestamp");     long ts;     try {      if (timestampHeader == null) {        ts = System.currentTimeMillis();      } else {        ts = Long.valueOf(timestampHeader);      }     } catch (NumberFormatException e) {       throw new RuntimeException("Flume wasn't able to parse timestamp header"         + " in the event to resolve time based bucketing. Please check that"         + " you're correctly populating timestamp header (for example using"                  + " TimestampInterceptor source interceptor).", e);}

3.为source定义基于timestamp的interceptors

在配置中增加两行即可:

agent-server1.sources.testtail.interceptors = i1agent-server1.sources.testtail.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

一个技巧:

在debug flume的问题时,可以在flume的启动参数中设置把debug日志打到console中。

-Dflume.root.logger=DEBUG,console,LOGFILE