Luigi

Luigi是一款管道作业流程配置方案,可以帮助用户快速构建自动化流程管道,将复杂的任务建立联系,为任务配置执行方式,设置调度方案,让任务按照您配置的工作流程自动运行,方便以后快速检测哪个环节出现故障,对于需要建立工作流,需要可视化工作任务的朋友很有帮助,在处理数据的时候就可以通过这款软件建立自动化工作方式,可以使用内置的任务模板快速创建工作流管理方案,可以设置任务依存关系,可以设置任务数据输出方式,可以设置作业相关范围,通过流程化的方式让你的工作可视化!

Luigi软件功能

Luigi是一个Python(已测试3.6、3.7、3.8、3.9)软件包,可帮助您构建批处理作业的复杂管道。它处理依赖关系解析,工作流管理,可视化,处理故障,命令行集成等。

Luigi的目的是解决通常与长时间运行的批处理相关的所有管道问题。您希望将许多任务链接起来,使其自动化,否则将发生故障。这些任务可以是任何事情,但通常都是长期运行的任务,例如 Hadoop作业,向数据库转储数据或从数据库转储数据,运行机器学习算法或其他任何事情。

还有其他一些软件包专注于较低级别的数据处理,例如Hive, Pig或 Cascading。Luigi并不是替代这些框架。相反,它可以帮助您将许多任务组合在一起,其中每个任务可以是Hive查询,Java中的Hadoop作业,Scala或Python中的 Spark作业,Python代码段, 从数据库中转储表或其他任何内容。建立包含数千个任务且需要几天或几周才能完成的长期运行的管道很容易。Luigi负责许多工作流管理,因此您可以专注于任务本身及其依赖。

您几乎可以构建任何所需的任务,但是Luigi还附带了一个包含多个常用任务模板的工具箱。它包括对在Hadoop中运行Python mapreduce作业以及 Hive和Pig作业的支持。它还带有 HDFS的文件系统象和本地文件,以确保所有文件系统作都是原子的。这很重要,因为这意味着您的数据管道在包含部分数据的状态下不会崩溃。

Luigi软件特色

可视化器页面

Luigi也带有Web界面,因此您可以在所有任务中进行搜索和过滤。

依赖图示例

只是为了让您了解Luigi的功能,这是我们在生产中正在运行的内容的屏幕截图。使用Luigi的可视化工具,我们可以很好地直观地看到工作流程的依赖关系图。每个节点代表一个必须运行的任务。绿色任务已经完成,而任务尚未运行。这些任务大多数是Hadoop作业,但也有些事情在本地运行并建立数据文件。

从概念上讲,Luigi与U Make相似,其中您有某些任务,而这些任务又可能依赖于其他任务。与Oozie 和Azkaban也有一些相似之处。一个主要的区别是Luigi不仅是专门为Hadoop构建的,而且很容易通过其他类型的任务对其进行扩展。

Luigi中的所有内容都在Python中。代替XML配置或类似的外部数据文件中,依赖图被指定内的Python。这使构建任务的复杂依赖关系图变得容易,其中依赖关系可能涉及期代数或对同一任务其他版本的递归引用。但是,工作流可能会触发Python以外的事件,例如运行 Pig脚本 或scp’ing文件。

Luigi教程

执行模型

Luigi有一个非常简单的执行和触发模型。

工人和任务执行最重要的方面是没有执行被转移。当您运行Luigi工作流时,将计划所有任务,并在流程中执行这些任务。

这种方案的好处是,由于所有执行都在流程中进行,因此它非常易于调试。这也使部署成为非事件。在开发过程中,通常可以从命令行运行Luigi工作流,而在部署它时,可以使用cntab或任何其他调度程序来触发它。

缺点是Luigi不会免费提供可伸缩。实际上,在开始运行数千个任务之前,这不是问题。

Luigi自动化和安排这些工作流程的目的不是吗?在某种程度上。Luigi可帮助您对任务的依赖进行编码并建立链。此外,Luigi的调度程序可确保对依赖关系图有一个集中的视图,并且不会由多个同时执行同一作业。

排程器客户端仅在run()单线程调度程序允许的情况下启动任务的方法。由于任务数量通常很少(与一个任务正在处理的PB数据相比),我们可以负担得起简单的集中式的便利。

触发任务

Luigi不包括其自身的触发,因此您必须依外部调度程序(例如cntab)来实际触发工作流程。

在实践中,这并不是一个大障碍,因为Luigi避免了通常由它引起的所有混乱。使用例如,安排复杂的工作流程非常简单。cntab。

将来,Luigi可能会实现自己的触发。对cntab的依赖(或任何外部触发机制)有点尴尬,可以避免。

触发范例例如,如果您每天都有一个外部数据转储,并且您的工作流依赖于此,则编写一个依赖于此数据转储的工作流。,Cntab可以每分钟触发一次此工作流程,以数据是否到达。如果有,它将运行完整的依赖关系图。

在您的cnline中,您会遇到类似

30 0 * * * my-user luigi RunAll –module my_tasks

您可以从cntab甚至在多台计算机之间触发任意次数的触发,因为调度程序将确保每个AggregationTask任务最多同时运行。请注意,这实际上可能意味着可以运行多个任务,因为存在具有不同参数的实例,并且这可以为您提供某种形式的并行化(例如,AggregationTask(2013-01-09)可以与并行运行AggregationTask(2013-01-08))。

当然,某些Task类型(例如HadoopJobTask)可以将执行转移到其他地方,但这取决于每个Task的定义。

Luigi图案

代码重用

Luigi的一个优点是,依赖于其他存储库中定义的任务非常容易。在执行路径中包含“ forks”也很简单,在该路径中,一项任务的输出可能成为许多其他任务的输入。

当前,不支持“中间”输出的语义,这意味着所有输出将无限期保留。这样做的好处是,如果尝试运行X-> Y,并且Y崩溃,则可以使用以前构建的X恢复。缺点是,文件系统上会有很多中间结果。一种有用的模式是将这些文件放在一个特殊的目录中,并进行某种定期的垃圾收集清理。

触发许多任务

一种方便的模式是在几个依赖关系链的末尾有一个虚拟Task,因此您可以通过在命令行中仅指定一个任务来触发多个管道,类似于make的 工作方式。

这个简单的任务本身不会做任何事情,但是会调用许多其他任务。每次调用时,Luigi将执行尽可能多的起作业(具有所有依赖项的作业)。

您将需要使用WrapperTask它而不是通常的Task类,因为此作业不会产生自己的任何输出,因此需要一种方式指示完成的时间。此类用于仅包装其他任务的任务,并且根据定义,如果所有任务均存在,则完成该任务。

触发周期任务

一个常见的要求是每晚产生一份每报告(或其他报告)。有时,由于各种原因,任务将持续崩溃或缺少所需依赖项的时间超过一天,这将导致某个期缺少可交付成果。哎呀。

为了确保上述AllReports任务最终每天完成一次(date参数的值),可以例如在require。方法中添加一个循环,以产生对self.date之前的几天的依赖关系。,只要Luigi不断被调用,解决间歇问题后,积压的工作量就会很好地追上来。

Luigi实际上带有一个可重用的工具来实现这一目标,称为RangeDailyBase(resp。RangeHourlyBase)。简单地说

luigi –module all_reports RangeDailyBase –of AllReports –start 2015-01-01

从2015年1月1起,您的cntab中的代码将很容易避免出现差距。注意:从2015年1月1到当前时间,它不会一直循环播放,但默认情况下最多为3个月前-请参阅RangeDailyBase此文档以及更多调整行为的旋钮。另请参见下面的监控。

有效触发重复任务

如上所述,RangeDailyBase之所以这样命名,是因为存在一个更有效的子类RangeDaily(resp。RangeHourly),该子类是针对数百个任务类量身定制的,这些任务类是在连续多年的连续需求下同时进行调度的(这将导致使用天真的循环方法进行冗余完整和调度程序过载)。 ) 用法:

luigi –module all_reports RangeDaily –of AllReports –start 2015-01-01

它具有与RangeDailyBase相同的旋钮,但有一些附加要求。也就是说,该任务必须实现有效的bulk_complete方法,或者必须将输出写入文件系统Target中,且期参数值始终在文件路径中表示。

回填任务

这也是一个常见的用例,有时您已经调整了现有的重复任务代码,并且出于某种原因或其他原因,希望将其重新计算的时间间隔定为一个期。最方便的是使用上述范围工具,只需指定start(包括)和stop(排除)参数即可:

luigi –module all_reports RangeDaily –of AllReportsV2 –start 2014-10-31 –stop 2014-12-25

将多个参数值批处理为单次运行

有时,将多个作业作为一处理一起运行要比单独运行每个作业更快。在这种情况下,您可以在构造函数中使用batch_method标记某些参数,以告诉如何组合多个值。一种常见的实现方法是简单地运行最大值。这对于在运行较新的数据时覆盖较旧的数据的任务很有用。您可以通过将batch_method设置为max来完成此作,如下所示:

令人兴奋的是,如果您向调度程序发送多个As,它可以将它们组合并返回一个。所以,如果A(date=2016-07-28),A(date=2016-07-29)并且 A(date=2016-07-30)都准备运行,你将开始运行A(date=2016-07-30)。虽然这是正在运行,调度程序将显示A(date=2016-07-28),A(date=2016-07-29)批量运行,同时 A(date=2016-07-30)运行。当A(date=2016-07-30)完成运行,成为失败或完成,另外两个任务将更新为相同的状态。

如果要限制批生产的数量,只需设置max_batch_size。所以如果你有

那么调度程序最多将一起批处理10个作业。您可能不希望使用max batch方法执行此作,但是如果您使用其他方法,则可能会有所帮助。您可以使用任何采用参数值列表并返回单个参数值的方法。

如果您有两个最大批处理参数,则将获得两个参数的最大值。如果您的参数没有批处理方法,则会将它们分别汇总。所以如果你有一个像

你创建任务,, ,你会得到他们分批为和。A(p1=1, p2=2, p3=0)A(p1=2, p2=3, p3=0)A(p1=3, p2=4, p3=1)A(p1=2, p2=3, p3=0)A(p1=3, p2=4, p3=1)

请注意,批处理任务不会占用[resources],只有最终运行的任务才会使用资源。调度程序仅在将每个任务一起批处理之前,才分别每个任务是否有足够的资源。

定期覆盖相同数据源的任务

如果每次运行都覆盖相同的数据源,则需要确保不能同时运行两处理。您可以通过将batch_method设置为max并设置唯一的资源来轻松地做到这一点:

现在,如果你有多个任务,如A(date=2016-06-01), A(date=2016-06-02),A(date=2016-06-03),调度只会让你运行的最高可用一个并标记较下层为batch_running。如果有新任务可用,而其他任务正在运行,则使用唯一资源将阻止多个任务同时写入同一位置。

避免同时写入单个文件

从多个任务更新单个文件几乎总是一个坏主意,并且在执行此作之前,您必须非常确信没有其他好的解决方案。但是,如果没有其他选择,则可能至少需要确保没有两个任务试图_simultaneously_写入文件。

通过将“资源”转换为Python属,它可以返回取决于任务参数或其他动态属的值:

由于默认情况下,资源的使用限制为1,如果它们具有相同的Important_file_name属,则将不会运行任务A的两个实例。

减少正在运行的任务的资源

在调度时,luigi调度程序需要知道任务运行后可能具有的最大资源消耗。但是,对于某些任务,在其运行方法内的两个步骤之间减少消耗的资源量(例如在进行大量计算之后)可能是有益的。在这种情况下,已经可以调度等待该特定资源的其他任务。

监视任务管道

Luigi带有一些现有的方法,luigi.notifications可以在任务崩溃时接收。电子邮件是最常见的方式。

上面提到的用于重复任务的范围工具不仅可以为您实现可的调度,而且还可以发出可用于设置延迟监视的事件。这样,您就可以在作业因长时间缺少输入数据或需要其他注意而卡住时实施警报。

原子写问题

luigi水管工经常犯的一个普遍错误是将数据部分写入最终目的地,而不是原子地。出现问题是因为luigi中的完成与运行一样幼稚 luigi.target.Target.exists()。在许多情况下,这仅意味着磁盘上是否存在文件夹。在我们部分写入数据的时间内,根据该输出执行的任务将认为其输入已完成。可能会产生的影响,例如在感恩节错误中。

可以通过想象我们处理本地磁盘上存储的数据并通过运行命令来说明该概念:

如前所述,问题在于在一段时间内仅存在部分数据,但是我们认为该数据是complete()因为输出文件夹已经存在。这是一个强大的版本:

确实,好的方法并不那么琐碎。它涉及到提供一个唯一的目录名和一个相当复杂的mv行,之所以mv需要这些都是因为我们不想mv将目录移到潜在的现有目录中。在特殊情况下,例如,当锁定失败并且同一任务将以某种方式同时运行两次时,目录可能已经存在。最后,在一种从未移动过文件的特殊情况下,可能要删除从未使用过的临时目录。

请注意,这是一个示例,其中存储位于本地磁盘上。但是,对于每个存储(硬盘文件,hdfs文件,数据库表等),此过程将有所不同。但是每个luigi用户都需要实现这种复杂吗?不,幸运的是,luigi开发人员已经意识到了这些,并且luigi附带了许多内置解决方案。如果您正在处理文件系统(FileTarget),则应考虑使用 temporary_path()。对于其他目标,应确保编写最终输出目录的方式是原子的。

向任务发送消息

调度程序能够将消息发送到特定任务。当正在运行的任务接受消息时,它可以访问 存储传入消息的multipcessing.Queue对象。您可以实现自定义行为来响应和响应消息:

可以直接从调度程序UI发送消息,该UI还显示响应(如果有)。请注意,仅当将调度程序配置为发送消息(请参阅[scheduler]配置),并且将任务配置为接受消息时,此功能才可用。

上一篇 2022-11-28

相关推荐

  • Luigi

    Luigi是一款管道作业流程配置方案,可以帮助用户快速构建自动化流程管道,将复杂的任务建立联系,为任务配置执行方式,设置调度方案,让任务按照您配置的工作流程自动运行,方便以后快...
正在提交中,请稍等片刻...

发表回复

请登录后评论...
登录后才能评论

评论列表 (0条)