前言
近日要用到flume+kafka进行数据采集。查阅资料后发现国内并没有自定义http source的相关文章,故此记录。
介绍
flume自带很多的source,如:exe、kafka...其中有一个非常简单的source——httpsource,使用httpSource,flume启动后会拉起一个web服务来监听指定的ip和port。常用的使用场景:对于有些应用环境中,不能部署Flume SDK及其依赖项,可以在代码中通过HTTP而不是Flume的PRC发送数据的情况,此时HTTP SOURCE可以用来将数据接收到Flume中。但是httpSource默认是用JSONHandler,说白了就是只能接收post提交的json。由于不能满足需要,就只能自己定义一个httpsource了.要自定义handler,必须实现HTTPSourceHandler接口。
自定义source
第一步,新建一个空的maven项目
添加如下依赖(记得pom里面修改打包方式为jar)
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
第二步,创建一个自定义的source类,需要实现HTTPSourceHandler接口(代码不贴了)
第三步,打包成jar包,丢到flume的lib目录下即可。
第四步,配置文件指定handler
flume配置
这里就不贴代码了(如下指定自定义的handler)
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=http
a1.sources.r1.handler=org.example.GetHandler
a1.sources.r1.bind=10.10.255.199
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.channels.c1.type=memory