`
jinnianshilongnian
  • 浏览: 21430716 次
  • 性别: Icon_minigender_1
博客专栏
5c8dac6a-21dc-3466-8abb-057664ab39c7
跟我学spring3
浏览量:2404348
D659df3e-4ad7-3b12-8b9a-1e94abd75ac3
Spring杂谈
浏览量:2997101
43989fe4-8b6b-3109-aaec-379d27dd4090
跟开涛学SpringMVC...
浏览量:5631001
1df97887-a9e1-3328-b6da-091f51f886a1
Servlet3.1规范翻...
浏览量:257411
4f347843-a078-36c1-977f-797c7fc123fc
springmvc杂谈
浏览量:1592961
22722232-95c1-34f2-b8e1-d059493d3d98
hibernate杂谈
浏览量:248900
45b32b6f-7468-3077-be40-00a5853c9a48
跟我学Shiro
浏览量:5846927
Group-logo
跟我学Nginx+Lua开...
浏览量:697914
5041f67a-12b2-30ba-814d-b55f466529d5
亿级流量网站架构核心技术
浏览量:780235
社区版块
存档分类
最新评论

Flume架构与源码分析-核心组件分析-2

阅读更多

 

4、整体流程

从以上部分我们可以看出,不管是Source还是Sink都依赖Channel,那么启动时应该先启动Channel然后再启动SourceSink即可。

 

Flume有两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程,此处我们已Application分析为主。

 

首先进入org.apache.flume.node.Applicationmain方法启动:

//1、设置默认值启动参数、参数是否必须的
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);

option = new Option("f", "conf-file", true,
"specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);

//2、接着解析命令行参数
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);

String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");

if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
  isZkConfigured = true;
}

if (isZkConfigured) {
    //3、如果是通过ZooKeeper配置,则使用ZooKeeper参数启动,此处忽略,我们以配置文件讲解
} else {
  //4、打开配置文件,如果不存在则快速失败
  File configurationFile = new File(commandLine.getOptionValue('f'));
  if (!configurationFile.exists()) {
         throw new ParseException(
        "The specified configuration file does not exist: " + path);
  }
  List<LifecycleAware> components = Lists.newArrayList();

  if (reload) { //5、如果需要定期reload配置文件,则走如下方式
    //5.1、此处使用Guava提供的事件总线
    EventBus eventBus = new EventBus(agentName + "-event-bus");
    //5.2、读取配置文件,使用定期轮训拉起策略,默认30s拉取一次
    PollingPropertiesFileConfigurationProvider configurationProvider =
        new PollingPropertiesFileConfigurationProvider(
          agentName, configurationFile, eventBus, 30);
    components.add(configurationProvider);
    application = new Application(components); //5.3、向Application注册组件
    //5.4、向事件总线注册本应用,EventBus会自动注册Application中使用@Subscribe声明的方法
    eventBus.register(application);

  } else { //5、配置文件不支持定期reload
    PropertiesFileConfigurationProvider configurationProvider =
        new PropertiesFileConfigurationProvider(
          agentName, configurationFile);
    application = new Application();
    //6.2、直接使用配置文件初始化Flume组件
    application.handleConfigurationEvent(configurationProvider
      .getConfiguration());
  }
}
//7、启动Flume应用
application.start();

//8、注册虚拟机关闭钩子,当虚拟机关闭时调用Application的stop方法进行终止
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
  @Override
  public void run() {
    appReference.stop();
  }
});

 

以上流程只提取了核心代码中的一部分,比如ZK的实现直接忽略了,而Flume启动大体流程如下:

1、读取命令行参数;

2、读取配置文件;

3、根据是否需要reload使用不同的策略初始化Flume;如果需要reload,则使用Guava的事件总线实现,ApplicationhandleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化;

4、启动Application,并注册虚拟机关闭钩子。

 

handleConfigurationEvent方法比较简单,首先调用了stopAllComponents停止所有组件,接着调用startAllComponents使用配置文件初始化所有组件: 

@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
  stopAllComponents();
  startAllComponents(conf);
} 

MaterializedConfiguration存储Flume运行时需要的组件:SourceChannelSinkSourceRunnerSinkRunner等,其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。

 

对于startAllComponents实现大体如下: 

//1、首先启动Channel
supervisor.supervise(Channels,
      new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//2、确保所有Channel是否都已启动
for(Channel ch: materializedConfiguration.getChannels().values()){
  while(ch.getLifecycleState() != LifecycleState.START
      && !supervisor.isComponentInErrorState(ch)){
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
        Throwables.propagate(e);
    }
  }
}
//3、启动SinkRunner
supervisor.supervise(SinkRunners,  
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//4、启动SourceRunner
supervisor.supervise(SourceRunner,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//5、初始化监控服务
this.loadMonitoring(); 

从如下代码中可以看到,首先要准备好Channel,因为SourceSink会操作它,对于Channel如果初始化失败则整个流程是失败的;然后启动SinkRunner,先准备好消费者;接着启动SourceRunner开始进行采集日志。此处我们发现有两个单独的组件LifecycleSupervisorMonitorService,一个是组件守护哨兵,一个是监控服务。守护哨兵对这些组件进行守护,假设出问题了默认策略是自动重启这些组件。

 

对于stopAllComponents实现大体如下:

//1、首先停止SourceRunner
supervisor.unsupervise(SourceRunners);
//2、接着停止SinkRunner
supervisor.unsupervise(SinkRunners);
//3、然后停止Channel
supervisor.unsupervise(Channels);
//4、最后停止MonitorService
monitorServer.stop(); 

此处可以看出,停止的顺序是SourceSinkChannel,即先停止生产,再停止消费,最后停止管道。

 

Application中的start方法代码实现如下:

public synchronized void start() {
  for(LifecycleAware component : components) {
    supervisor.supervise(component,
        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
  }
} 

其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启。

 

Application关闭执行了如下动作:    

public synchronized void stop() {
  supervisor.stop();
  if(monitorServer != null) {
    monitorServer.stop();
  }
} 

即关闭守护哨兵和监控服务。

 

到此基本的Application分析结束了,我们还有很多疑问,守护哨兵怎么实现的。 

 

整体流程可以总结为:

1、首先初始化命令行配置;

2、接着读取配置文件;

3、根据是否需要reload初始化配置文件中的组件;如果需要reload会使用Guava事件总线进行发布订阅变化;

4、接着创建Application,创建守护哨兵,并先停止所有组件,接着启动所有组件;启动顺序:ChannelSinkRunnerSourceRunner,并把这些组件注册给守护哨兵、初始化监控服务;停止顺序:SourceRunnerSinkRunnerChannel

5、如果配置文件需要定期reload,则需要注册Polling***ConfigurationProvider到守护哨兵;

6、最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。

 

 

轮训实现的SourceRunner SinkRunner会创建一个线程进行工作,之前已经介绍了其工作方式。接下来我们看下守护哨兵的实现。

 

首先创建LifecycleSupervisor

  //1、用于存放被守护的组件
  supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
  //2、用于存放正在被监控的组件
  monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
  //3、创建监控服务线程池
  monitorService = new ScheduledThreadPoolExecutor(10,
      new ThreadFactoryBuilder().setNameFormat(
          "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
          .build());
  monitorService.setMaximumPoolSize(20);
  monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
  //4、定期清理被取消的组件
  purger = new Purger();
  //4.1、默认不进行清理
  needToPurge = false; 

LifecycleSupervisor启动时会进行如下操作:

public synchronized void start() {
  monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);
  lifecycleState = LifecycleState.START;
} 

首先每隔两个小时执行清理组件,然后改变状态为启动。而LifecycleSupervisor停止时直接停止了监控服务,然后更新守护组件状态为STOP

  //1、首先停止守护监控服务
  if (monitorService != null) {
    monitorService.shutdown();
    try {
      monitorService.awaitTermination(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      logger.error("Interrupted while waiting for monitor service to stop");
    }
  }
  //2、更新所有守护组件状态为STOP,并调用组件的stop方法进行停止
  for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) {
    if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
      entry.getValue().status.desiredState = LifecycleState.STOP;
      entry.getKey().stop();
    }
  }
  //3、更新本组件状态
  if (lifecycleState.equals(LifecycleState.START)) {
    lifecycleState = LifecycleState.STOP;
  }
  //4、最后的清理
  supervisedProcesses.clear();
  monitorFutures.clear(); 

 

接下来就是调用supervise进行组件守护了:

  if(this.monitorService.isShutdown() || this.monitorService.isTerminated()
  || this.monitorService.isTerminating()){
    //1、如果哨兵已停止则抛出异常,不再接收任何组件进行守护
  }
  //2、初始化守护组件
  Supervisoree process = new Supervisoree();
  process.status = new Status();
  //2.1、默认策略是失败重启
  process.policy = policy;
  //2.2、初始化组件默认状态,大多数组件默认为START
  process.status.desiredState = desiredState;
  process.status.error = false;
  //3、组件监控器,用于定时获取组件的最新状态,或者重新启动组件
  MonitorRunnable monitorRunnable = new MonitorRunnable();
  monitorRunnable.lifecycleAware = lifecycleAware;
  monitorRunnable.supervisoree = process;
  monitorRunnable.monitorService = monitorService;

  supervisedProcesses.put(lifecycleAware, process);
  //4、定期的去执行组件监控器,获取组件最新状态,或者重新启动组件
  ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
      monitorRunnable, 0, 3, TimeUnit.SECONDS);
  monitorFutures.put(lifecycleAware, future);
}

 

如果不需要守护了,则需要调用unsupervise

public synchronized void unsupervise(LifecycleAware lifecycleAware) {
  synchronized (lifecycleAware) {
    Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
    //1.1、设置守护组件的状态为被丢弃
    supervisoree.status.discard = true;
    //1.2、设置组件盼望的最新生命周期状态为STOP
    this.setDesiredState(lifecycleAware, LifecycleState.STOP);
    //1.3、停止组件
    lifecycleAware.stop();
  }
  //2、从守护组件中移除
  supervisedProcesses.remove(lifecycleAware);
  //3、取消定时监控组件服务
  monitorFutures.get(lifecycleAware).cancel(false);
  //3.1、通知Purger需要进行清理,Purger会定期的移除cancel的组件
  needToPurge = true;
  monitorFutures.remove(lifecycleAware);
}

 

接下来我们再看下MonitorRunnable的实现,其负责进行组件状态迁移或组件故障恢复:

public void run() {
  long now = System.currentTimeMillis();
  try {
    if (supervisoree.status.firstSeen == null) {
        supervisoree.status.firstSeen = now; //1、记录第一次状态查看时间
    }
    supervisoree.status.lastSeen = now; //2、记录最后一次状态查看时间
    synchronized (lifecycleAware) {
        //3、如果守护组件被丢弃或出错了,则直接返回
        if (supervisoree.status.discard || supervisoree.status.error) {
          return;
        }
        //4、更新最后一次查看到的状态
        supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
        //5、如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化
        if (!lifecycleAware.getLifecycleState().equals(
            supervisoree.status.desiredState)) {
          switch (supervisoree.status.desiredState) { 
            case START: //6、如果是启动状态,则启动组件
             try {
                lifecycleAware.start();
              } catch (Throwable e) {
                if (e instanceof Error) {
                  supervisoree.status.desiredState = LifecycleState.STOP;
                  try {
                    lifecycleAware.stop();
                  } catch (Throwable e1) {
                    supervisoree.status.error = true;
                    if (e1 instanceof Error) {
                      throw (Error) e1;
                    }
                  }
                }
                supervisoree.status.failures++;
              }
              break;
            case STOP: //7、如果是停止状态,则停止组件
              try {
                lifecycleAware.stop();
              } catch (Throwable e) {
                if (e instanceof Error) {
                  throw (Error) e;
                }
                supervisoree.status.failures++;
              }
              break;
            default:
          }
    } catch(Throwable t) {
    }
  }
}

 

如上代码进行了一些简化,整体逻辑即定时去采集组件的状态,如果发现守护组件和组件的状态不一致,则可能需要进行启动或停止。即守护监视器可以用来保证组件如能失败后自动启动。默认策略是总是失败后重启,还有一种策略是只启动一次。 

  

 

 

2
1
分享到:
评论

相关推荐

    word源码java-sparkstreaming:SparkStreaming实时流处理项目实战

    二、Flume架构及核心组件 三、Flume&JDK环境部署 1.前置条件 Java Runtime Environment - Java 1.8 or later Memory - Sufficient memory for configurations used by sources, channels or sinks Disk Space - ...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 mapreduce的原理和编程 01-hdfs源码跟踪之打开输入流.avi 02-hdfs源码跟踪之打开输入流总结.avi 03-mapreduce介绍及wordcount.avi 04-wordcount的编写和...

    JAVA上百实例源码以及开源项目源代码

    Java 源码包 Applet钢琴模拟程序java源码 2个目标文件,提供基本的音乐编辑功能。编辑音乐软件的朋友,这款实例会对你有所帮助。 Calendar万年历 1个目标文件 EJB 模拟银行ATM流程及操作源代码 6个目标文件,EJB来...

    大数据学习计划.pdf

    本模块通过学习 Hive、Impala 等⼤数据 SQL 分析组件,让⽤户将隐匿在泥沙之下的数据价值挖掘出来。 所以在第三部分的学习中我们需要达到以下⽬标: 1、 安装部署 Hive; 理解 Hive 架构及执⾏原理 ; Hive 的优化...

    JAVA上百实例源码以及开源项目

    笔者当初为了学习JAVA,收集了很多经典源码,源码难易程度分为初级、中级、高级等,详情看源码列表,需要的可以直接下载! 这些源码反映了那时那景笔者对未来的盲目,对代码的热情、执着,对IT的憧憬、向往!此时此...

    java开源包8

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包1

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    尚gg大数据项目实战电商数仓系统开发教程.txt

    35_数仓采集_日志采集Flume启动停止脚本.avi2 ~/ r- J: h$ U, q/ e# e7 k% M 36_数仓采集_Kafka集群安装.avi3 L6 `7 F& o/ U6 F" U5 U 37_数仓采集_Kafka集群启动停止脚本.avi 38_数仓采集_Kafka Manager安装及脚本....

    java开源包10

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包2

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包11

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包3

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包6

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包5

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包4

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包7

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包9

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    java开源包101

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

    Java资源包01

    R-OSGi 是一套适用于任意满足 OSGi 架构的分布式通讯组件。它以 jar 的形式发布,部署容易,使用也较为便捷。 Java邮箱地址验证 jaev jaev 是一个用来验证电子邮箱地址是否有效的 Java 项目。 Java的FastCGI网关 ...

Global site tag (gtag.js) - Google Analytics