Flume自定义Source

Source 是负责接收数据到 Flume Agent 的组件

Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy

官方提供的 Source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Source

Flume自定义Source

Source 的目的是从外部客户端接收数据并将其存储到配置的 Channel 中。Source 可以获取它自己的 ChannelProcessor 以串行方式处理在 Channel 本地事务中提交的事件的实例。若出现异常, 所需的 Channel 将传递异常, 所有 Channel 都会事务回滚, 但以前在其他 Channel 上处理的事件将保持已提交。

类似于 SinkRunner.PolingRunner Runnable, 当 Flume 框架调用 PollableSourceRunner.start() 方法时, PolingRunner Runnable 就会在一个线程上启动。每个配置过的 PollableSource 都与运行着自己 PolingRunner 的线程相关联。此线程管理 PollableSource 的生命周期。一个 PollableSource 实现类必须实现在 LifecycleAware 接口中声明的 start()stop() 方法。运行 PollableSource 会自动调用 process() 方法。在 process() 方法中需要提取新数据, 并将其存储为 Flume Events

请注意, 实际上有两种类型的 Source。除了 PollableSource , 还有一个是 EventDrivenSource。与 PollableSource不同, EventDrivenSource 必须有自己的回调机制来捕获新数据并将其存储到 Channel 中。EventDrivenSource 并不像 PollableSources 由自己的线程驱动

示例:

public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // 处理 myProp 值(验证,或类型转换......)
    // 将 myProp 放在一个变量中 后续 process() 方法需要用
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // 初始化与外部客户端的连接
  }

  @Override
  public void stop () {
    // 释放资源或清空字段等
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // 自定义处理

      // 接收新数据
      Event e = getSomeData();

      // 将 Event  存储到此 Source 相关Channel(s) 中
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // 异常记录, 处理

      status = Status.BACKOFF;

      // 异常抛出
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }
}

测试

1.将写好的代码打包,并放到 flume 的 lib 目录下。
2.编写配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.RayfunC.MySource
a1.sources.r1.delayTime = 1000
a1.sources.r1.preText = shenzhen

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.开启任务
flume-ng agent -c conf -f job/mysource-test.conf -n a1 -Dflume.root.logger=INFO,console