flume
    多种适配,多样化的数据收集
    核心概念
        event:一条消息
        client:访问者
        agent:
            重要组件Sources、Channels、Sinks。Interspactor、Selecter
        
        
kafka
    吞吐量大,高并发场景下使用

注意:flume的agent配置文件不允许有空格。

一、flume打印内容到控制台
        1、创建一个agent(使用avroSource接收网络流在flume的控制台打印)配置文件agent1.conf
            cd /usr/local/flume/
            vi /conf/agent1.conf
                agent1.sources=as1
                agent1.channels=c1
                agent1.sinks=s1

agent1.sources.as1.type=avro
                agent1.sources.as1.bind=0.0.0.0            ##接收任意ip发送的数据
                agent1.sources.as1.port=21111            ##在21111端口上监听
                agent1.sources.as1.channels=c1
                agent1.channels.c1.type=memory

agent1.sinks.s1.type=logger
                agent1.sinks.s1.channel=c1
        2、启动agent1(每30秒检查agent1.conf文件一次,检查该文件是否有变化,有变化则马上生效),将输出打印在控制台上
            bin/flume-ng agent --conf conf/ -Dflume.root.logger=DEBUG,console -n agent1 -f conf/agent1.conf
        3、使用java代码生产log4j日志输出到flume
            
        3、验证agent,一种是flume控制台测试,一种是java代码通过log4j写日志
            1)bin/flume-ng avro-client --conf conf/ -H localhost -p 21111 -F ~/a        ##将~目录下的a文件内容写入到flume
            2)使用java类将log4j的日志写入到flume的agent中
                log4j.properties配置文件
                    log4j.rootLogger=INFO,flume
                    log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
                    log4j.appender.flume.Hostname = 192.168.1.33                                    ##flume启动agent所在的节点ip
                    log4j.appender.flume.Port = 21111                                                ##flume启动agent监听的端口号
                    log4j.appender.flume.UnsafeMode = true
                
                java代码    
                    public class FlumeProducer {
                        public static void main(String[] args) throws Exception {
                            final Logger logger = Logger.getLogger(FlumeProducer.class);
                            while (true) {
                                logger.info("logger datetime :" + System.currentTimeMillis());
                                Thread.sleep(1000);
                            }
                        }
                    }

二、flume生成avroLog文件写入到hdfs中,存放到不同的/IP/日期/文件夹中
        1、创建一个agent(使用avroSource接收网络流写入到hdfs)配置文件agent2.conf
            cd /usr/local/flume/
            vi /conf/agent2.conf
                agent2.sources=source1
                agent2.channels=channel1
                agent2.sinks=sink1

agent2.sources.source1.type=avro
                agent2.sources.source1.bind=0.0.0.0
                agent2.sources.source1.port=44444
                agent2.sources.source1.channels=channel1
                
                agent2.sources.source1.interceptors = i1 i2
                agent2.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
                agent2.sources.source1.interceptors.i1.preserveExisting = true
                agent2.sources.source1.interceptors.i1.useIP = true
                agent2.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

agent2.channels.channel1.type=memory
                agent2.channels.channel1.capacity=10000
                agent2.channels.channel1.transactionCapacity=1000
                agent2.channels.channel1.keep-alive=30

agent2.sinks.sink1.type=hdfs
                agent2.sinks.sink1.channel=channel1
                agent2.sinks.sink1.hdfs.path=hdfs://ns1/flume/events/%{host}/%Y-%m-%d            ##flume将文件写入到hdfs的路径
                agent2.sinks.sink1.hdfs.filePrefix=avroLog-                                        ##flume生成文件的前缀
                agent2.sinks.sink1.hdfs.fileSuffix=.log                                            ##flume生成文件的后缀
                agent2.sinks.sink1.hdfs.fileType=DataStream                                        ##flume生成文件的类型,DataStream或SequenceFile
                agent2.sinks.sink1.hdfs.writeFormat=Text
                agent2.sinks.sink1.hdfs.rollInterval=0
                agent2.sinks.sink1.hdfs.rollSize=10000
                agent2.sinks.sink1.hdfs.rollCount=0
                agent2.sinks.sink1.hdfs.idleTimeout=5
        2、启动agent2(每30秒检查agent1.conf文件一次,检查该文件是否有变化,有变化则马上生效),将内容写入到hdfs的/flume/events/中
            bin/flume-ng agent --conf conf/ -Dflume.monitoring.type=http -Dflume.monitoring.port=34343 -n agent2 -f conf/agent2.conf
        3、使用java代码生产log4j日志输出到flume
            log4j.properties配置文件
                log4j.rootLogger=INFO,flume
                log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
                log4j.appender.flume.Hostname = 192.168.1.33                                    ##flume启动agent所在的节点ip
                log4j.appender.flume.Port = 21111                                                ##flume启动agent监听的端口号
                log4j.appender.flume.UnsafeMode = true
                
            java代码    
                public class FlumeProducer {
                    public static void main(String[] args) throws Exception {
                        final Logger logger = Logger.getLogger(FlumeProducer.class);
                        while (true) {
                            logger.info("logger datetime :" + System.currentTimeMillis());
                            Thread.sleep(1000);
                        }
                    }
                }            
        4、验证agent2是否成功写入到hdfs的/flume/events/文件夹下
            hdfs dfs -ls -h -R /flume/events/IP/yyyy-MM-dd/                                        ##如果存在一个或多个avroLog.timestamp.log文件表示成功
            
  三、使用Socket客户端写入到flume中,flume保存文件到本地

    1、创建agent_tcp.conf(接收socket客户端发送的数据然后写入到Linux本地)

      cd /usr/local/flume

      vi conf/agent_tcp.conf

        agent_tcp.sources=as1
        agent_tcp.channels=c1
        agent_tcp.sinks=s1

        agent_tcp.sources.as1.type=syslogtcp
        agent_tcp.sources.as1.bind=0.0.0.0
        agent_tcp.sources.as1.port=21111
        agent_tcp.sources.as1.channels=c1

        agent_tcp.channels.c1.type=memory
        agent_tcp.channels.c1.capacity=10000
        agent_tcp.channels.c1.transactionCapacity=10000
        agent_tcp.channels.c1.keep-alive=120
        agent_tcp.channels.c1.byteCapacityBufferPercentage=20
        agent_tcp.channels.c1.byteCapacity=800000
  
        agent_tcp.sinks.s1.type=file_roll
        agent_tcp.sinks.s1.rollSize=10000
        agent_tcp.sinks.s1.sink.directory =/home/lefuBigDataDev/clouds/flume/logs
        agent_tcp.sinks.s1.channel=c1
    2、启动flume的agent_tcp.conf

      bin/flume-ng agent -n agent_tcp -c conf/ -f conf/agent_tcp.conf -Dflume.root.logger=DEBUG,console

    3、java代码socket客户端

      package com.left.clouds.cluster.flume.test;

  import java.io.InputStream;
              import java.io.OutputStream;
              import java.net.Socket;

              import org.junit.Before;
              import org.junit.Test;

        public class TestFlume {

            private Socket client = null;
                  InputStream in = null;
                  OutputStream out = null;
                
                  @Before
                  public void before(){
                      try {
                          client = new Socket("192.168.0.218", 21111);
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  }
                
                  @Test
                  public void sender() {
                      try {
                          out = client.getOutputStream();
                          int i = 0;
                          while(true){
                              out.write(("device-"+(i++)+("\n")).getBytes());
                              Thread.sleep(4000);
                              System.out.println("第:"+i+"次发送...");
                          }
                      } catch (Exception e) {
                          e.printStackTrace();
                      }                
                  }

        }

Flume-1.6.0中包含了kafka的source,agent配置文件实例如下
front_agent_kafka.sources=as1
front_agent_kafka.channels=c1
front_agent_kafka.sinks=s1

front_agent_kafka.sources.as1.type=org.apache.flume.source.kafka.KafkaSource
front_agent_kafka.sources.as1.zookeeperConnect=192.168.0.20:2181
front_agent_kafka.sources.as1.topic=test
front_agent_kafka.sources.as1.groupId=flume
front_agent_kafka.sources.as1.batchSize=100
front_agent_kafka.sources.as1.channels=c1
                              
front_agent_kafka.channels.c1.type=memory
front_agent_kafka.channels.c1.capacity=10000
front_agent_kafka.channels.c1.transactionCapacity=10000
front_agent_kafka.channels.c1.keep-alive=120
front_agent_kafka.channels.c1.byteCapacityBufferPercentage=20
front_agent_kafka.channels.c1.byteCapacity=800000

front_agent_kafka.sinks.s1.type=com.lefukj.flume.sinks.JdbcSink
front_agent_kafka.sinks.s1.channel=c1

flume-agent实例的更多相关文章

  1. Flume Source 实例

    Flume Source 实例 Avro Source 监听avro端口,接收外部avro客户端数据流.跟前面的agent的Avro Sink可以组成多层拓扑结构. 1 2 3 4 5 6 7 8 9 ...

  2. 《OD大数据实战》Flume入门实例

    一.netcat source + memory channel + logger sink 1. 修改配置 1)修改$FLUME_HOME/conf下的flume-env.sh文件,修改内容如下 e ...

  3. 一个flume agent异常的解决过程记录

    今天在使用flume agent的时候,遇到了一个异常,  现把解决的过程记录如下: 问题的背景: 我使用flume agent 来接收从storm topology发送下来的accesslog , ...

  4. flume+sparkStreaming实例 实时监控文件demo

    1,flume所在的节点不和spark同一个集群  v50和 10-15节点 flume在v50里面 flume-agent.conf spark是开的work节点,就是单点计算节点,不涉及到mast ...

  5. 使用Flume消费Kafka数据到HDFS

    1.概述 对于数据的转发,Kafka是一个不错的选择.Kafka能够装载数据到消息队列,然后等待其他业务场景去消费这些数据,Kafka的应用接口API非常的丰富,支持各种存储介质,例如HDFS.HBa ...

  6. flume安装及入门实例

    1. 如何安装? 1)将下载的flume包,解压到/home/hadoop目录中 2)修改 flume-env.sh 配置文件,主要是JAVA_HOME变量设置 root@m1:/home/hadoo ...

  7. Flume 多个agent串联

    多个agent串联 采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs,使用agent串联 根据需求,首先定义以下3大要素 第一台flum ...

  8. Flume官方文档翻译——Flume 1.7.0 User Guide (unreleased version)中一些知识点

    Flume官方文档翻译--Flume 1.7.0 User Guide (unreleased version)(一) Flume官方文档翻译--Flume 1.7.0 User Guide (unr ...

  9. Flume官方文档翻译——Flume 1.7.0 User Guide (unreleased version)(二)

    Flume官方文档翻译--Flume 1.7.0 User Guide (unreleased version)(一) Logging raw data(记录原始数据) Logging the raw ...

  10. Flume日志采集系统——初体验(Logstash对比版)

    这两天看了一下Flume的开发文档,并且体验了下Flume的使用. 本文就从如下的几个方面讲述下我的使用心得: 初体验--与Logstash的对比 安装部署 启动教程 参数与实例分析 Flume初体验 ...

随机推荐

  1. 【poj3422】 Kaka's Matrix Travels

    http://poj.org/problem?id=3422 (题目链接) 题意 N*N的方格,每个格子中有一个数,寻找从(1,1)走到(N,N)的K条路径,使得取到的数的和最大. Solution ...

  2. Unity3D脚本调用Objective C代码实现游戏内购买

    0.开篇吐槽: 一年之内从WP转到iOS,又从iOS转到U3D,真心伤不起. 1.Unity3D脚本调用OC代码的原理: 其实也没啥神秘的,因为OC是和C互通的 ,C#又可以通过DllImport的形 ...

  3. jq实现搜索引擎的提示效果

    (function ($) { $.fn.Search = function (options) { var defaults = { inputid: "search", div ...

  4. 原 Linux搭建SVN 服务器2

    原 Linux搭建SVN 服务器 发表于1年前(2014-08-05 17:55)   阅读(12257) | 评论(3) 31人收藏此文章, 我要收藏 赞3 摘要 Linux搭建SVN 服务器 目录 ...

  5. [JavaScript]'this'详解

    http://blog.csdn.net/sodino/article/details/51318565

  6. nginx根据域名做http,https分发

    omcat端口:8080 做好虚拟主机 参照我的另一篇文章nginx端口:80 根据域名分派 在conf/nginx.conf中的http中增加 include www.huozhe.com.conf ...

  7. ORACLE 11G没有备份文件參数文件在异机通过rman备份恢复找回被误删的数据

    背景:          同事误删除线上数据.所以须要从备份中找回数据恢复. 真实屋漏偏逢连夜雨.船迟又遇打头风.前两天备份的磁盘坏块,如今仅仅有rman全备的.bak文件,没有控制文件和參数文件,所 ...

  8. android 去除标题

    //去除标题,必须在setContentView之前设置 requestWindowFeature(Window.FEATURE_NO_TITLE);

  9. nova boot from volume代码分析

    首先要创建一个bootable volume curl -i http://16.158.166.197:8776/v1/c24c59846a7f44538d958e7548cc74a3/volume ...

  10. 【Bcftools】合并不同sample的vcf文件,通过bcftools

    通过GATK calling出来的SNP如果使用UnifiedGenotype获得的SNP文件是分sample的,但是如果使用vcftools或者ANGSD则需要Vcf文件是multi-sample的 ...