news 2026/7/2 8:29:32

Flume日志采集简介

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flume日志采集简介

一、flume分布

flume的核心组件分为数据源source、管道channel和目的地sink。

  • source:对接数据源
  • channel:用于中间缓存数据
  • sink:对接目的地

在整个数据传输的过程中,流动的是 event,它是 Flume 内部数据传输的最基本单元。

event 将传输的数据进行封装。如果是文本文件,通常是一行记录,event 也是事务的基本单位。event 从 source,流向 channel,再到 sink,本身为一个字节数组,并可携带 headers(头信息)信息。event 代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

一个完整的 event 包括:event headers、event body、event 信息,其中event 信息就是 flume 收集到的日记记录。

二、flume的结构

flume的内部结构分为标准和串联

  • 标准结构:单台flume传输,单管道,单目的地。
  • 串联:多台flume传输给一台flume的数据源,然后这台进行汇总传输到最终目的地(hdfs)。

flume的原理:

  • ​ flume本身是一个Java进程,启动以后会生成一个叫agent的进程。
  • ​ agent进程中包含了source,sink和channel。
  • ​ 在flume中,数据被包装为event。真实的数据存放在event body中
    • event是flume中最小的数据单元

三、flume的安装与部署

flume官网:(https://flume.apache.org/)

Flume 1.11.0 用户指南 — Apache Flume

flume部署步骤:

  • 解压
  • 配置环境变量
  • 改名flume-env.sh、在flume-env.sh输入Java路径
  • 删除lib下guava可能导致的版本冲突问题
  • 编辑conf文件
  • 启动flume

flume监视端口案例(官网案例)

# example.conf: A single-node Flume configuration# 定义agent组件的名称a1.sources=r1 a1.sinks=k1 a1.channels=c1# 定义sources组件属性:r1# 设定type为netcat,用处为监听端口的数据,转换log为英文文本a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=44444# 定义sinks组件属性:k1# 定义type打印方式:这里为logger,即屏幕上(还可为hdfs等类型)a1.sinks.k1.type=logger# 定义channels组件属性:c1a1.channels.c1.type=memory# 设定最多存放1000个、设定事务容量为100个(一起成功或失败)a1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100# 设置sources和sinks的管道(即channels)a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1

flume启动案例

flume-ng agent-nagent-c$FLUME_HOME/conf-f$FLUME_HOME/conf/flume-properties.conf-Dflume.root.logger=INFO,console

flume监视文件目录案例

a1.sources=r1a1.sinks=k1a1.channels=c1
  • sources
# 设定sources属性:r1# 设定sources的type为spooldir可以监视文件目录# 注意不能有同名的文件在hdfs下产生,否则报错罢工,所以一般使用时间戳来命名进行采集数据a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/root/dirlogs a1.sources.r1.fileHeader=true
  • sinks
# 定义sinks组件属性:k1# 定义type打印方式:这里为hdfsa1.sinks.k1.type=hdfs# 设置动态获取当前时间a1.sinks.k1.hdfs.useLocalTimeStamp=truea1.sinks.k1.hdfs.path=/tmp/flume/%y-%m-%d/%H:%M/%S# filePrefix设定文件前缀a1.sinks.k1.hdfs.filePrefix=-events# fileSuffix设置文件后缀a1.sinks.k1.hdfs.fileSuffix=.log# 设置文件类型为datastream普通文件,默认为序列化文件a1.sinks.k1.hdfs.fileType=Datastream# round 控制文件夹以多少时间滚动、是否开启时间舍弃# 这里设置为10分钟,则10分钟内采集的文件都会在一个文件夹下面a1.sinks.k1.hdfs.round=truea1.sinks.k1.hdfs.roundValue=10a1.sinks.k1.hdfs.roundUnit=minute# roll 控制写入hdfs文件,以何种方式滚动a1.sinks.k1.hdfs.rollInterval=3# 以时间间隔、默认30秒a1.sinks.k1.hdfs.rollSize=20# 以文件大小、默认1024a1.sinks.k1.hdfs.rollCount=5# 以event数量、默认10个# 如果三个都设置,谁先满足谁触发# 如果不想以某种属性为滚动,设置为0即可
  • channels
# 定义channels组件属性:c1# 设定使用内存来缓存a1.channels.c1.type=memory# 设定最多存放1000个、设定事务容量为100个event(一起成功或失败)a1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100
  • 设定连接
# 设置sources和sinks的管道(即channels)a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1

flume监视文件日志(实时监视、实时更新)

a1.sources=r1a1.sinks=k1a1.channels=c1
  • sources
# 设定sources属性:r1# 设置exec可以运行cat或者tail -F命令的结果作为数据进行收集a1.sources.r1.type=execa1.sources.r1.command=cat/root/logs/tests.log
  • sinks
# 定义sinks组件属性:k1a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.useLocalTimeStamp=truea1.sinks.k1.hdfs.path=/tmp/flume/%y-%m-%d/%H:%M/%S a1.sinks.k1.hdfs.filePrefix=-eventsa1.sinks.k1.hdfs.fileSuffix=.log a1.sinks.k1.hdfs.fileType=Datastream a1.sinks.k1.hdfs.round=truea1.sinks.k1.hdfs.roundValue=10a1.sinks.k1.hdfs.roundUnit=minute a1.sinks.k1.hdfs.rollInterval=3a1.sinks.k1.hdfs.rollSize=20a1.sinks.k1.hdfs.rollCount=5
  • channels
# 定义channels组件属性:c1a1.channels.c1.type=memory a1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100
  • 设定连接
# 设置sources和sinks的管道(即channels)a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1

spooldir监视文件

a1.sources=s1a1.channels=c1a1.sinks=s2a1.sources.s1.type=spooldira1.sources.s1.spoolDir=/root/csv/a1.sources.s1.fileHeader=truea1.channels.c1.type=memorya1.channels.c1.capacity=100000000a1.channels.c1.transactionCapacity=50000000a1.sinks.s2.type=hdfsa1.sinks.s2.hdfs.path=hdfs://master:9000/inputa1.sinks.s2.hdfs.rollSize=100000000a1.sinks.s2.hdfs.rollCount=0a1.sinks.s2.hdfs.rollInterval=0a1.sinks.s2.hdfs.batchSize=50000000a1.sources.s1.channels=c1a1.sinks.s2.channel=c1

四、flume的load-balance、failover

flume的负载均衡

flume在串联的过程中,如果前面的agent能够一次处理100个event、后面一个则只能一次处理10个event、那么就会出现堆积情况。

这时候可以多个进程进行处理前面的agent。

同一个请求只能交给一个进行处理,避免数据重复。

如何分配请求就涉及到了负载均衡的算法:轮询(round_robin) 随机(random) 权重

负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能,如下图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent 上,示例配置,如下所示:

a1.sinkgroups=g1 a1.sinkgroups.g1.sinks=k1 k2 k3 a1.sinkgroups.g1.processortype=load_balance a1.sinkgroups.g1.processor.backoff=true#如果开启,则将失败的sink放入黑名单a1.sinkgroups.g1.processor.selector=round_robin#另外还支持randoma1.sinkgroups.g1.processor.selector.maxTimeOut=10000#在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长

flume串联跨网络传输数据

  • avro sink

  • avro source

    使用上述两个组件指定绑定的端口ip 就可以满足数据跨网络传递 通常用于flume串联架构中。

    agent1.sinks.k1.channel=c1 agent1.sinks.k1.type=avro agent1.sinks.k1.hostname=node-2 agent1.sinks.k1.port=52020# set sink2agent1.sinks.k2.channel=c1 agent1.sinks.k2.type=avro agent1.sinks.k2.hostname=node-3 agent1.sinks.k2.port=52020#set sink groupagent1.sinkgroups.gl.sinks=k1 k2#set failoveragent1.sinkgroups.gl.processor.type=load_balance agent1.sinkgroups.gl.processor.backoff=trueagent1.sinkgroups.gl.processor.selector=round robin|agent1.sinkgroups.gl.processor.selector.maxTimeOut=10000
    al.sources=rl al.sinks=kl al.channels=cl# Describe/configure the sourceal.sources.rl.type=avro al.sources.rl.channels=cl al.sources.rl.bind=node-3 al.sources.rl.port=52020# Describe the sinkal.sinks.kl.type=logger# Use a channel which buffers events in memoryal.channels.cl.type=memory al.channels.cl.capacity=1000al.channels.cl.transactionCapacity=100# Bind the source and sink to the channelal.sources.rl.channels=cl al.sinks.kl.channel=cl
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/7/2 8:27:49

哔咔漫画下载器:5分钟打造个人离线漫画图书馆的终极指南

哔咔漫画下载器:5分钟打造个人离线漫画图书馆的终极指南 【免费下载链接】picacomic-downloader 哔咔漫画 picacomic pica漫画 bika漫画 PicACG 多线程下载器,带图形界面 带收藏夹,已打包exe 下载速度飞快 项目地址: https://gitcode.com/g…

作者头像 李华
网站建设 2026/7/2 8:25:12

Ai驱动结合蛋白设计:Bindcraft全流程教学

Bindcraft:用户友好的binder从头设计流程 蛋白质主要依赖蛋白质-蛋白质相互作用(PPI)来执行复杂的生物学功能。因此,设计能够特异性靶向PPI的蛋白质结合剂具有极高的治疗与生物技术价值。然而,传统的实验方法以及早期…

作者头像 李华
网站建设 2026/7/2 8:16:20

HTTP/2快速重置攻击漏洞修复实战:从原理到Nginx、F5 BIG-IP修复方案

1. 项目概述:直面HGVE-2024-E003漏洞的挑战 最近在排查线上系统安全基线时,一个编号为HGVE-2024-E003的漏洞引起了我的高度警觉。这个漏洞与CVE-2023-44487(HTTP/2协议中的“快速重置”攻击漏洞)紧密相关,影响范围极广…

作者头像 李华
网站建设 2026/7/2 8:14:38

DownKyi:B站视频批量下载的终极解决方案

DownKyi:B站视频批量下载的终极解决方案 【免费下载链接】downkyi 哔哩下载姬downkyi,哔哩哔哩网站视频下载工具,支持批量下载,支持8K、HDR、杜比视界,提供工具箱(音视频提取、去水印等)。 项…

作者头像 李华