夜已深,注意休息哦!

编程开发

Apache NiFi Processors 列表处理器中文介绍

2019年08月02日 15:01:45 · 本文共 39,040 字阅读时间约 141分钟 · 9,600 次浏览
Apache NiFi Processors 列表处理器中文介绍

Apache NiFi 的 Processors 实在太多了,不知道该用哪个,所以我就用机器翻译了一下,把全部的Apache NiFi Processors 处理器列出来,方面寻找应该用哪一个 Processors 处理器,文档针对的是 Apache NiFi Processors 1.9.0

AttributeRollingWindow

基于评估每个FlowFile上的表达式语言表达式跟踪滚动窗口,并将该值添加到处理器的状态。将使用FlowFiles的计数和当前时间窗口中处理的值的总聚合值来发出每个FlowFile。

AttributesToCSV

生成输入FlowFile属性的CSV表示形式。生成的CSV可以写入名为“CSVAttributes”的新生成的属性,也可以作为内容写入FlowFile。如果属性值包含逗号,换行符或双引号,则属性值将使用双引号进行转义。属性值中的任何双引号字符都使用另一个双引号进行转义。

AttributesToJSON

生成输入FlowFile属性的JSON表示。生成的JSON可以写入新的属性“JSONAttributes”或作为内容写入FlowFile。

Base64EncodeContent

对base64进行内容编码或解码

CalculateRecordStats

一种处理器,可以计算记录集中的项目数,并根据用户定义的记录集子集上的标准提供计数。

CaptureChangeMySQL

从MySQL数据库中检索更改数据捕获(CDC)事件。CDC事件包括INSERT,UPDATE,DELETE操作。事件作为单个流文件输出,这些文件按操作发生的时间排序。

CompareFuzzyHash

将包含模糊散列的属性与包含模糊散列列表的文件进行比较,在成功匹配的情况下将属性附加到FlowFile。

CompressContent

使用用户指定的压缩算法压缩或解压缩FlowFiles的内容,并根据需要更新mime.type属性

ConnectWebSocket

充当WebSocket客户端端点以与远程WebSocket服务器交互。FlowFiles根据收到的消息类型转移到下游关系,因为使用此处理器配置的WebSocket客户端从远程WebSocket服务器接收消息。

ConsumeAMQP

使用AMQP 0.9.1协议从AMQP Broker消耗AMQP消息。从AMQP Broker接收的每条消息都将作为其自己的FlowFile发布到“成功”关系。

ConsumeAzureEventHub

从Microsoft Azure Event Hub接收消息,将Azure消息的内容写入FlowFile的内容。

ConsumeEWS

使用Exchange Web服务从Microsoft Exchange使用消息。每个收到的电子邮件消息的原始字节都写为FlowFile的内容

ConsumeGCPubSub

使用已配置的Google Cloud PubSub订阅消息。如果设置了“批量大小”,则将在单个请求中提取配置的消息数,否则将仅提取一条消息。

ConsumeIMAP

使用IMAP协议从电子邮件服务器消耗消息。每个收到的电子邮件消息的原始字节都写为FlowFile的内容

ConsumeJMS

使用类型为BytesMessage或TextMessage的JMS消息将其内容转换为FlowFile并将其转换为“成功”关系。标头和属性等JMS属性将被复制为FlowFile属性。

ConsumeKafka

使用专门针对Kafka 0.9.x Consumer API构建的Apache Kafka消息。请注意,在某些情况下,发布商可能会陷入无限期停滞状态。我们正密切关注Kafka社区如何发展,并尽快利用这些修复措施。同时,可以进入只有重新启动JVM NiFi运行的分辨率的状态。用于发送消息的补充NiFi处理器是PublishKafka。

ConsumeKafka_0_10

使用专门针对Kafka 0.10.x Consumer API构建的Apache Kafka消息。用于发送消息的补充NiFi处理器是PublishKafka_0_10。

ConsumeKafka_0_11

使用专门针对Kafka 0.11.x Consumer API构建的Apache Kafka消息。用于发送消息的补充NiFi处理器是PublishKafka_0_11。

ConsumeKafka_1_0

使用专门针对Kafka 1.0 Consumer API构建的Apache Kafka消息。用于发送消息的补充NiFi处理器是PublishKafka_1_0。

ConsumeKafka_2_0

使用专门针对Kafka 2.0 Consumer API构建的Apache Kafka消息。用于发送消息的补充NiFi处理器是PublishKafka_2_0。

ConsumeKafkaRecord_0_10

使用专门针对Kafka 0.10.x Consumer API构建的Apache Kafka消息。用于发送消息的补充NiFi处理器是PublishKafkaRecord_0_10。请注意,此时,处理器假定从给定分区检索的所有记录具有相同的模式。如果拉出任何Kafka消息但无法使用配置的Record Reader或Record Writer解析或写入,则消息的内容将写入单独的FlowFile,并且FlowFile将被转移到'parse.failure'关系。否则,每个FlowFile都会被发送到“成功”关系,并且可能在单个FlowFile中包含许多单独的消息。添加'record.count'属性以指示FlowFile中包含的记录数。

ConsumeKafkaRecord_0_11

使用专门针对Kafka 0.11.x Consumer API构建的Apache Kafka消息。用于发送消息的补充NiFi处理器是PublishKafkaRecord_0_11。请注意,此时,处理器假定从给定分区检索的所有记录具有相同的模式。如果拉出任何Kafka消息但无法使用配置的Record Reader或Record Writer解析或写入,则消息的内容将写入单独的FlowFile,并且FlowFile将转移到'parse.failure'关系。否则,每个FlowFile都会被发送到“成功”关系,并且可能在单个FlowFile中包含许多单独的消息。添加'record.count'属性以指示FlowFile中包含的消息数。 属性。

ConsumeKafkaRecord_1_0

使用专门针对Kafka 1.0 Consumer API构建的Apache Kafka消息。用于发送消息的补充NiFi处理器是PublishKafkaRecord_1_0。请注意,此时,处理器假定从给定分区检索的所有记录具有相同的模式。如果拉出任何Kafka消息但无法使用配置的Record Reader或Record Writer解析或写入,则消息的内容将写入单独的FlowFile,并且FlowFile将被转移到'parse.failure'关系。否则,每个FlowFile都会被发送到“成功”关系,并且可能在单个FlowFile中包含许多单独的消息。添加'record.count'属性以指示FlowFile中包含的消息数。 属性。

ConsumeKafkaRecord_2_0

使用专门针对Kafka 2.0 Consumer API构建的Apache Kafka消息。用于发送消息的补充NiFi处理器是PublishKafkaRecord_2_0。请注意,此时,处理器假定从给定分区检索的所有记录具有相同的模式。如果拉出任何Kafka消息但无法使用配置的Record Reader或Record Writer解析或写入,则消息的内容将写入单独的FlowFile,并且FlowFile将转移到'parse.failure'关系。否则,每个FlowFile都会被发送到“成功”关系,并且可能在单个FlowFile中包含许多单独的消息。添加'record.count'属性以指示FlowFile中包含的消息数。 属性。

ConsumeMQTT

订阅主题并从MQTT代理接收消息

ConsumePOP3

使用POP3协议从电子邮件服务器消耗消息。每个收到的电子邮件消息的原始字节都写为FlowFile的内容

ConsumeWindowsEventLog

注册Windows事件日志订阅回调以从Windows上的事件接收FlowFiles。这些可以通过channel和XPath进行过滤。

ControlRate

控制将数据传输到后续处理器的速率。如果配置非常小的时间持续时间,则油门的准确性会变差。您可以通过减少屈服持续时间来提高此准确性,但代价是给予处理器更多的任务。

ConvertAvroSchema

将记录从一个Avro架构转换为另一个,包括支持展平和简单类型转换

ConvertAvroToJSON

将二进制Avro记录转换为JSON对象。此处理器提供Avro字段到JSON字段的直接映射,这样生成的JSON将具有与Avro文档相同的层次结构。请注意,Avro架构信息将丢失,因为这不是从二进制Avro到JSON格式Avro的转换。输出JSON采用UTF-8编码进行编码。如果传入的FlowFile包含多个Avro记录的流,则生成的FlowFile将包含一个包含所有Avro记录或一系列JSON对象的JSON数组。如果传入的FlowFile不包含任何记录,则输出空JSON对象。空/单Avro记录FlowFile输入可选地包装在容器中,如'Wrap Single Record'所示

ConvertAvroToORC

将Avro记录转换为ORC文件格式。此处理器提供Avro记录到ORC记录的直接映射,这样生成的ORC文件将具有与Avro文档相同的层次结构。如果传入的FlowFile包含多个Avro记录的流,则生成的FlowFile将包含一个包含所有Avro记录的ORC文件。如果传入的FlowFile不包含任何记录,则输出空的ORC文件。注意:许多Avro数据类型(集合,基元和基元的联合,例如)可以转换为ORC,但集合和其他复杂数据类型的联合可能无法转换为ORC。

ConvertAvroToParquet

将Avro记录转换为Parquet文件格式。传入的FlowFile应该是有效的avro文件。如果传入的FlowFile不包含任何记录,则输出空的镶木地板文件。注意:许多Avro数据类型(集合,基元和基元的联合,例如)可以转换为镶木地板,但集合和其他复杂数据类型的联合可能无法转换为Parquet。

ConvertCharacterSet

将FlowFile的内容从一个字符集转换为另一个字符集

ConvertCSVToAvro

根据Avro架构将CSV文件转换为Avro

ConvertExcelToCSVProcessor

使用Microsoft Excel文档并将每个工作表转换为csv。传入的Excel文档中的每个工作表都将生成一个将从此处理器输出的新Flowfile。每个输出Flowfile的内容将被格式化为csv文件,其中excel表中的每一行都作为csv文件中的换行输出。此处理器目前只能处理.xlsx(XSSF 2007 OOXML文件格式)Excel文档而不是较旧的.xls(HSSF '97(-2007)文件格式)文档。此处理器还期望格式良好的CSV内容,并且不会转义包含无效内容(如换行符或其他逗号)的单元格。

ConvertJSONToAvro

根据Avro架构将JSON文件转换为Avro

ConvertJSONToSQL

将JSON格式的FlowFile转换为UPDATE,INSERT或DELETE SQL语句。传入的FlowFile应该是“平坦的”JSON消息,这意味着它由单个JSON元素组成,每个字段映射到一个简单类型。如果字段映射到JSON对象,则该JSON对象将被解释为Text。如果输入是JSON元素的数组,则数组中的每个元素都作为单独的FlowFile输出到'sql'关系。转换成功后,原始FlowFile将路由到“原始”关系,SQL将路由到“sql”关系。

ConvertRecord

使用配置的Record Reader和Record Write Controller Services将记录从一种数据格式转换为另一种数据格式。必须使用“匹配”模式配置Reader and Writer。这样,我们的意思是模式必须具有相同的字段名称。如果字段值可以从一种类型强制转换为另一种类型,则字段的类型不必相同。例如,如果输入模式具有double类型的名为“balance”的字段,则输出模式可以具有名为“balance”的字段,其字符串为double,double或float。如果输入中存在输出中不存在的任何字段,则该字段将被排除在输出之外。如果在输出模式中指定了任何字段但输入数据/模式中不存在该字段,则该字段将不会出现在输出中,或者具有空值,具体取决于编写器。

CountText

计算传入文本的各种指标。请求的结果将记录为属性。生成的流文件不会修改其内容。

CreateHadoopSequenceFile

从传入的流文件创建Hadoop序列文件

CryptographicHashAttribute

使用给定算法计算每个指定属性的哈希值,并将其写入输出属性。请参阅https://csrc.nist.gov/Projects/Hash-Functions/NIST-Policy-on-Hash-Functions以获取帮助以确定要使用的算法。

CryptographicHashContent

使用给定算法计算流文件内容的加密哈希值,并将其写入输出属性。请参阅https://csrc.nist.gov/Projects/Hash-Functions/NIST-Policy-on-Hash-Functions以获取帮助以确定要使用的算法。

DebugFlow

DebugFlow处理器通过允许在没有FlowFile的情况下响应FlowFile或定时器事件的接收而显式触发各种响应(如果使用基于定时器或基于cron的调度)来帮助测试和调试FlowFile框架。它可以强制执行运行所需的响应或测试处理器运行时可能发生的各种故障模式。

DeleteAzureBlobStorage

从Azure存储中删除提供的blob

DeleteByQueryElasticsearch

使用查询从ElasticSearch索引中删除。可以从Flowfile主体或Query参数加载查询。

DeleteDynamoDB

根据哈希和范围键从DynamoDB中删除文档。密钥可以是字符串或数字。该请求需要操作的所有主键(散列或散列和范围键)

DeleteElasticsearch5

按文档ID从Elasticsearch 5.0中删除文档。如果群集已配置为授权和/或安全传输(SSL / TLS),并且X-Pack插件可用,则可以进行安全连接。

DeleteGCSObject

从Google Cloud Bucket中删除对象。如果尝试删除不存在的文件,FlowFile将路由到成功。

DeleteGridFS

使用文件名或查询从GridFS中删除文件。

DeleteHBaseCells

此处理器允许用户通过在流文件内容中指定一行或多行来删除单个HBase单元,这些行是由行ID,列族,列限定符和关联的可见性标签组成的序列(如果已启用并且正在使用可见性标签)。用户定义的分隔符用于分隔每行上的每个数据片段,其中::::是默认分隔符。

DeleteHBaseRow

单独或批量删除HBase记录。输入可以是流文件内容中的单行ID,每行一个ID,由可配置分隔符分隔的行ID(默认为逗号)。

DeleteHDFS

从HDFS中删除一个或多个文件或目录。路径可以作为传入FlowFile的属性提供,也可以作为定期删除的静态设置路径提供。如果此处理器具有传入连接,它将忽略定期运行,而是依赖传入的FlowFiles来触发删除。请注意,您可以使用通配符来匹配多个文件或目录。如果没有传入连接,则不会将流文件传输到任何输出关系。如果存在传入的流文件,则提供没有检测到的故障,它将被转移到成功,否则将被发送到false。如果需要删除已知的globbed文件,请先使用ListHDFS生成要删除的特定文件列表。

DeleteMongo

对MongoDB集合执行删除查询。查询在流文件的主体中提供,用户可以选择是否删除与之匹配的一个或多个文档。

DeleteRethinkDB

处理器使用文档ID从RethinkDB(https://www.rethinkdb.com/)中删除JSON文档。

DeleteS3Object

删除Amazon S3存储桶上的FlowFiles。如果尝试删除不存在的文件,FlowFile将路由到成功。

DeleteSQS

从Amazon Simple Queuing Service Queue中删除消息

DetectDuplicate

为每个传入的FlowFile缓存从FlowFile属性计算的值,并确定是否已经看到缓存的值。如果是这样,使用名为'original.identifier'的属性将FlowFile路由到'duplicate',该属性指定原始FlowFile的“描述”,该描述在属性。如果未确定FlowFile是重复的,则处理器将FlowFile路由为“非重复”

DistributeLoad

根据分发策略将FlowFiles分发到下游处理器。如果使用循环策略,则默认为每个目标分配权重1(均匀分布)。但是,可以将可选属性添加到更改中; 添加名为“5”且值为“10”的属性意味着名称为“5”的关系将在每次迭代中接收10个FlowFiles而不是1。

DuplicateFlowFile

用于负载测试,此处理器将为每个传入的FlowFile创建配置的副本数。原始FlowFile以及所有生成的副本都将发送到“成功”关系。此外,每个FlowFile都会将一个属性'copy.index'设置为副本号,其中原始FlowFile的值为零,所有副本都接收递增的整数值。

EncryptContent

使用密码和随机生成的盐的对称加密或使用公钥和密钥的非对称加密来加密或解密FlowFile。

EnforceOrder

强制执行属于同一数据组的FlowFiles的预期排序。虽然可以在连接上使用PriorityAttributePrioritizer来确保通过该连接的流文件按优先级顺序排列,但根据错误处理,分支和其他流程设计,FlowFiles可能会出现乱序。EnforceOrder可用于强制执行这些FlowFiles的原始排序。[重要]为了使EnforceOrder生效,FirstInFirstOutPrioritizer应该用于每个下游关系UNTIL,FlowFiles的顺序通过诸如MergeContent之类的操作物理地获得FIXED或者存储到最终目的地。

EvaluateJsonPath

根据FlowFile的内容评估一个或多个JsonPath表达式。这些表达式的结果将分配给FlowFile属性,或者写入FlowFile本身的内容,具体取决于处理器的配置。通过添加用户定义的属性输入JsonPaths; 属性的名称映射到将放置结果的Attribute Name(如果Destination是flowfile-attribute;否则,将忽略属性名称)。该属性的值必须是有效的JsonPath表达式。返回类型“自动检测”将根据配置的目的地进行确定。当'Destination'设置为'flowfile-attribute'时,将使用返回类型'scalar'。当'Destination'设置为'flowfile-content'时,将使用返回类型'JSON'。如果JsonPath评估为JSON数组或JSON对象,并且Return Type设置为'scalar',则FlowFile将不会被修改并将路由到失败。如果提供的JsonPath计算为指定值并且将作为匹配路由,则JSON的返回类型可以返回标量值。如果Destination是'flowfile-content'并且JsonPath没有计算到定义的路径,则FlowFile将被路由在没有修改内容的情况下“无与伦比”。如果Destination是flowfile-attribute并且表达式不匹配,则将使用空字符串作为值创建属性,并且FlowFile将始终路由到“匹配”。如果提供的JsonPath计算为指定值并且将作为匹配路由,则JSON的返回类型可以返回标量值。如果Destination为'flowfile-content'且JsonPath未计算到定义的路径,则将路由FlowFile在没有修改内容的情况下“无与伦比”。如果Destination是flowfile-attribute并且表达式不匹配,则将使用空字符串作为值创建属性,并且FlowFile将始终路由到“匹配”。如果提供的JsonPath计算为指定值并且将作为匹配路由,则JSON的返回类型可以返回标量值。如果Destination是'flowfile-content'并且JsonPath没有计算到定义的路径,则FlowFile将被路由在没有修改内容的情况下“无与伦比”。如果Destination是flowfile-attribute并且表达式不匹配,则将使用空字符串作为值创建属性,并且FlowFile将始终路由到“匹配”。

EvaluateXPath

根据FlowFile的内容评估一个或多个XPath。这些XPath的结果将分配给FlowFile属性,或者写入FlowFile本身的内容,具体取决于处理器的配置。通过添加用户定义的属性输入XPath; 属性的名称映射到将放置结果的Attribute Name(如果Destination是flowfile-attribute;否则,将忽略属性名称)。该属性的值必须是有效的XPath表达式。如果XPath评估到多个节点并且返回类型设置为'nodeset'(直接或通过'自动检测',目标为'flowfile-content'),则FlowFile将不会被修改并将被路由失败。如果XPath未评估节点,FlowFile将被路由到“不匹配”而不修改其内容。如果Destination是flowfile-attribute且表达式不匹配,则将使用空字符串作为值创建属性,并且FlowFile将始终路由到“匹配”

EvaluateXQuery

根据FlowFile的内容评估一个或多个XQueries。这些XQuery的结果将分配给FlowFile属性,或者写入FlowFile本身的内容,具体取决于处理器的配置。通过添加用户定义的属性输入XQueries; 属性的名称映射到将放置结果的Attribute Name(如果Destination是'flowfile-attribute';否则,将忽略属性名称)。该属性的值必须是有效的XQuery。如果XQuery返回多个结果,则将为每个结果创建新属性或FlowFiles(分别用于'flowfile-attribute'或'flowfile-content'的目标)(属性将附加'.n'一个一号的数字到指定的属性名称)。如果任何提供的XQuery返回结果,FlowFile将被路由到“匹配”。如果没有提供的XQuery返回结果,FlowFile将被路由到'unmatched'。如果Destination是'flowfile-attribute'并且XQueries没有匹配,则不会将任何属性应用于FlowFile。

ExecuteFlumeSink

执行Flume水槽。每个输入FlowFile都被转换为Flume事件以供接收器处理。

ExecuteFlumeSource

执行Flume源。每个Flume事件都作为FlowFile发送到成功关系

ExecuteGroovyScript

实验扩展的Groovy脚本处理器。该脚本负责处理传入的流文件(例如,传输到SUCCESS或删除)以及脚本创建的任何流文件。如果处理不完整或不正确,会话将回滚。

ExecuteInfluxDBQuery

处理器从FlowFile(首选)或计划查询的内容执行InfluxDB查询。请查看InfluxDB文档(https://www.influxdb.com/)中支持的查询的详细信息。

ExecuteProcess

运行用户指定的操作系统命令,并将该命令的输出写入FlowFile。如果预期命令长时间运行,则处理器可以按指定的时间间隔输出部分数据。使用此选项时,输出应采用文本格式,因为在任意时间间隔上拆分二进制数据通常没有意义。

ExecuteScript

实验 - 在给定流文件和进程会话的情况下执行脚本。该脚本负责处理传入的流文件(例如,传输到SUCCESS或删除)以及脚本创建的任何流文件。如果处理不完整或不正确,会话将回滚。实验:尚未验证持续使用的影响。

ExecuteSparkInteractive

通过Livy管理的HTTP会话执行Spark代码到实时Spark上下文。支持缓存的RDD共享。

的ExecuteSQL

执行提供的SQL select查询。查询结果将转换为Avro格式。使用流式传输,因此支持任意大的结果集。可以使用标准调度方法将此处理器调度为在计时器或cron表达式上运行,或者可以由传入的FlowFile触发。如果它是由传入的FlowFile触发的,那么在评估select查询时,该FlowFile的属性将可用,并且查询可以使用?逃避参数。在这种情况下,要使用的参数必须作为具有命名约定sql.args.N.type和sql.args.N.value的FlowFile属性存在,其中N是正整数。sql.args.N.type应该是一个表示JDBC Type的数字。FlowFile的内容预计为UTF-8格式。FlowFile属性'executionql.row。

ExecuteSQLRecord

执行提供的SQL select查询。查询结果将转换为Record Writer指定的格式。使用流式传输,因此支持任意大的结果集。可以使用标准调度方法将此处理器调度为在计时器或cron表达式上运行,或者可以由传入的FlowFile触发。如果它是由传入的FlowFile触发的,那么在评估select查询时,该FlowFile的属性将可用,并且查询可以使用?逃避参数。在这种情况下,要使用的参数必须作为具有命名约定sql.args.N.type和sql.args.N.value的FlowFile属性存在,其中N是正整数。sql.args.N.type应该是一个表示JDBC Type的数字。FlowFile的内容预计为UTF-8格式。FlowFile属性'executionql.row。

ExecuteStreamCommand

对流文件的内容执行外部命令,并使用命令结果创建新的流文件。

ExtractAvroMetadata

从Avro数据文件的标头中提取元数据。

ExtractCCDAAttributes

从Consolidated CDA格式的FlowFile中提取信息,并将各个属性作为FlowFile属性提供。属性命名为 如果Parent重复,则命名将是 例如,section.act_07.observation.name =原发性高血压

ExtractEmailAttachments

从mime格式的电子邮件文件中提取附件,将它们拆分为单独的流文件。

ExtractEmailHeaders

使用flowfile内容作为数据源,从符合RFC的电子邮件文件中提取标头,将相关属性添加到流文件中。此处理器不执行广泛的RFC验证,但仍然要求最低限度遵守RFC 2822

ExtractGrok

根据FlowFile的内容评估一个或多个Grok表达式,将结果作为属性添加或使用匹配内容的JSON表示法替换FlowFile的内容

ExtractHL7Attributes

从HL7(Health Level 7)格式的FlowFile中提取信息,并将信息添加为FlowFile属性。属性命名为 如果细分正在重复,则命名将是 例如,我们可能有一个名为“MHS.12”的属性,其值为“2.1”,属性名为“OBX_11.3”,其值为“93000 ^ CPT4”。

ExtractImageMetadata

从包含图像的流文件中提取图像元数据。此处理器依赖于此元数据提取程序库https://github.com/drewnoakes/metadata-extractor。它提取了一长串元数据类型,包括但不限于EXIF,IPTC,XMP和Photoshop字段。有关完整列表,请访问该库的网站。注意:正在使用的库会将图像加载到内存中,因此极大的图像可能会导致问题。

ExtractMediaMetadata

从包含音频,视频,图像和其他文件类型的流文件中提取内容元数据。该处理器依赖Apache Tika项目进行文件格式检测和解析。它为媒体文件(包括音频,视频和打印媒体格式)提取了一长串元数据类型。注意:提取的属性名称和内容可能因升级而异,因为解析是由外部Tika工具执行的,而外部Tika工具又依赖于其他项目元数据提取。有关更多详细信息和支持的文件类型列表,请访问该库的网站http://tika.apache.org/。

ExtractText

根据FlowFile的内容评估一个或多个正则表达式。这些正则表达式的结果将分配给FlowFile属性。通过添加用户定义的属性输入正则表达式; 属性的名称映射到将放置结果的Attribute Name。第一个捕获组(如果找到)将被放入该属性名称。但是所有捕获组(包括匹配的字符串序列本身)也将在该属性名称处提供,并提供索引值,但捕获组除外是可选的并且不匹配 - 例如,给定属性名称“regex”和表达式“abc(def)?(g)”如果“def”,我们将添加一个值为“def”的属性“regex.1” “匹配。如果“def”不匹配,则没有名为“regex”的属性。将添加1“,但将添加名为”regex.2“且值为”g“的属性。属性的值必须是具有一个或多个捕获组的有效正则表达式。如果正则表达式匹配多次,则仅使用第一个匹配,除非启用重复捕获组的属性设置为true。如果任何提供的正则表达式匹配,则FlowFile将被路由到“匹配”。如果未提供正则表达式匹配,则FlowFile将路由到“不匹配”,并且不会将任何属性应用于FlowFile。除非启用重复捕获组的属性设置为true,否则将仅使用第一个匹配项。如果任何提供的正则表达式匹配,则FlowFile将被路由到“匹配”。如果未提供正则表达式匹配,则FlowFile将路由到“不匹配”,并且不会将任何属性应用于FlowFile。除非启用重复捕获组的属性设置为true,否则将仅使用第一个匹配项。如果任何提供的正则表达式匹配,则FlowFile将被路由到“匹配”。如果未提供正则表达式匹配,则FlowFile将路由到“不匹配”,并且不会将任何属性应用于FlowFile。

ExtractTNEFAttachments

从mime格式的电子邮件文件中提取附件,将它们拆分为单独的流文件。

FetchAzureBlobStorage

检索Azure存储Blob的内容,将内容写入FlowFile的内容

FetchDistributedMapCache

为每个传入的FlowFile计算来自FlowFile属性的缓存键,并从与每个键关联的分布式映射缓存中获取值。如果配置时没有目标属性,则传入的FlowFile内容将替换为分布式映射缓存接收的二进制数据。如果该密钥下没有存储值,则流文件将路由到“未找到”。请注意,处理器将始终尝试将整个缓存值读入内存,然后再将其放入其目标位置。如果缓存的值非常大,这可能会有问题。

FetchElasticsearch

使用指定的连接属性和要检索的文档的标识符从Elasticsearch检索文档。如果群集已配置为授权和/或安全传输(SSL / TLS)并且Shield插件可用,则可以进行安全连接。此处理器支持Elasticsearch 2.x群集。

FetchElasticsearch5

使用指定的连接属性和要检索的文档的标识符从Elasticsearch检索文档。如果群集已配置为授权和/或安全传输(SSL / TLS),并且X-Pack插件可用,则可以进行安全连接。此处理器支持Elasticsearch 5.x群集。

FetchElasticsearchHttp

使用指定的连接属性和要检索的文档的标识符从Elasticsearch检索文档。请注意,在写入流文件进行传输之前,文档的整个主体将被读入内存。

FetchFile

从磁盘读取文件的内容并将其流式传输到传入的FlowFile的内容中。完成此操作后,可以选择将文件移动到其他位置或删除该文件以帮助保持文件系统的有序性。

FetchFTP

从远程SFTP服务器获取文件的内容,并使用远程文件的内容覆盖传入的FlowFile的内容。

FetchGCSObject

从Google Cloud Bucket获取文件。旨在与ListGCSBucket配合使用。

FetchGridFS

按文件名或用户定义的查询从GridFS存储桶中检索一个或多个文件。

FetchHBaseRow

从HBase表中获取一行。Destination属性控制是将单元格添加为流文件属性,还是将行作为JSON写入流文件内容。此处理器可用于通过直接在处理器中指定表和行ID来获取间隔上的固定行,或者它可用于通过引用传入流文件中的表和行ID来动态获取行。

FetchHDFS

从HDFS检索文件。传入的FlowFile的内容将替换为HDFS中文件的内容。HDFS中的文件保持不变,不对其进行任何更改。

FetchParquet

从给定的Parquet文件读取,并使用所选的记录编写器将记录写入流文件的内容。原始Parquet文件将保持不变,并且流文件的内容将替换为所选类型的记录。此处理器可与ListHDFS或ListFile一起使用,以获取要提取的文件列表。

FetchS3Object

检索S3对象的内容并将其写入FlowFile的内容

FetchSFTP

从远程SFTP服务器获取文件的内容,并使用远程文件的内容覆盖传入的FlowFile的内容。

FlattenJson

为用户提供获取嵌套JSON文档并将其展平为简单键/值对文档的功能。密钥在每个级别与用户定义的分隔符组合,默认为“。”。支持MongoDB查询的三种展平模式,普通,保持数组和点符号。默认的展平模式是'keep-arrays'。

ForkRecord

该处理器允许用户将记录分成多个记录。用户必须指定至少一个记录路径作为动态属性,指向包含RECORD对象的ARRAY类型的字段。处理器接受两种模式:'split'和'extract'。在两种模式中,指定数组中包含的每个元素都会生成一条记录。在“拆分”模式下,每个生成的记录将保留输入中给出的相同模式,但该数组将只包含一个元素。在'extract'模式下,数组的元素必须是记录类型,并且是生成的记录。此外,在“提取”模式下,可以指定每个生成的记录是否应包含从根级别到提取的记录的父记录的所有字段。这假定要在Record Writer控制器服务的模式中定义要在记录中添加的字段。请参阅此处理器的其他详细信息文档中的示例。

FuzzyHashContent

计算FlowFile内容的模糊/位置敏感哈希值,并将该哈希值作为名称由FlowFile确定的属性放在FlowFile上 property.Note:此处理器仅提供非加密哈希算法。并且它不应被视为HashContent处理器的替代。注意:底层库将整个流内容加载到内存中并在内存中执行结果评估。因此,重要的是要考虑该处理器评估的内容的预期配置文件以及支持它的硬件,尤其是在处理大文件时。

GenerateFlowFile

此处理器使用随机数据或自定义内容创建FlowFiles。GenerateFlowFile对负载测试,配置和模拟很有用。

GenerateTableFetch

生成从表中获取行“页面”的SQL选择查询。分区大小属性以及表的行数确定页面的大小和数量以及生成的FlowFiles。此外,可以通过设置最大值列来实现增量提取,这会使处理器跟踪列的最大值,从而仅获取列的值超过观察到的最大值的行。此处理器仅用于在主节点上运行。

该处理器可以接受传入连接; 无论是否提供传入连接,处理器的行为都是不同的:

  • 如果未指定传入连接,则处理器将根据指定的处理器计划生成SQL查询。许多字段都支持表达式语言,但没有可用的流文件属性。但是,将使用Variable Registry评估属性。
  • 如果指定了传入连接且没有可用于处理器任务的流文件,则不会执行任何工作。
  • 如果指定了传入连接并且流文件可用于处理器任务,则流文件的属性可以在表达式语言中用于诸如表名称等字段。但是,Max-Value Columns和Columns to Return字段必须为空或引用每个指定表中可用的列。

GeoEnrichIP

查找IP地址的地理定位信息,并将地理信息添加到FlowFile属性。地理数据作为MaxMind数据库提供。包含要查找的IP地址的属性由“IP地址属性”属性提供。如果提供的属性名称为“X”,则通过“浓缩”添加的属性将采用X.geo形式。

GetAzureEventHub

从Microsoft Azure Event Hub接收消息,将Azure消息的内容写入FlowFile的内容

GetAzureQueueStorage

从Azure队列存储中检索消息。默认情况下,检索到的消息将从队列中删除。如果要求使用消息而不删除消息,请将“自动删除消息”设置为“false”。注意:在收到邮件但由于某些意外情况而无法从队列中删除的情况下,可能会有可能收到重复项。

GetCouchbaseKey

通过密钥/值访问从Couchbase Server获取文档。可以通过设置来提供要获取的文档的ID属性。注意:如果未设置Document Id属性,将读取FlowFile的内容以确定Document Id,这意味着整个FlowFile的内容将缓冲在内存中。

GetDynamoDB

基于散列和范围键从DynamoDB检索文档。键可以是字符串或数字。对于任何获取请求,所有主键都是必需的(散列或散列和基于表键的范围).DynamoDB项的Json Document('Map')属性被读入内容FlowFile。

的GetFile

从目录中的文件创建FlowFiles。NiFi将忽略它至少没有读取权限的文件。

GetFTP

从FTP服务器获取文件并从中创建FlowFiles

GetHBase

此处理器轮询HBase以获取指定表中的任何记录。处理器跟踪它接收的单元的时间戳,以便当新记录被推送到HBase时,它们将被自动拉出。每个记录以JSON格式输出,如{“row”:““,”“cells”:{“<column 1 family>:<column 1 qualifier>”:“<cell 1 value>”,“<column 2 family>:<column 2 qualifier>”:“<cell 2 value>” ,...}}。对于收到的每条记录,将以hbase:// <table name> /格式发出一个Provenance RECEIVE事件,哪里 是行密钥的UTF-8编码值。

GetHDFS

将文件从Hadoop分布式文件系统(HDFS)提取到FlowFiles中。此处理器将在获取文件后从HDFS中删除该文件。

GetHDFSEvents

此处理器轮询HdfsAdmin API提供的通知事件。由于这使用HdfsAdmin API,因此需要以HDFS超级用户身份运行。目前有六种类型的事件(追加,关闭,创建,元数据,重命名和取消链接)。有关每个事件的完整说明,请参阅org.apache.hadoop.hdfs.inotify.Event文档。此处理器将根据定义的持续时间轮询新事件。对于收到的每个事件,将使用预期属性创建新的流文件,并将事件本身序列化为JSON并写入流文件的内容。例如,如果event.type是APPEND,那么流文件的内容将包含一个JSON文件,其中包含有关append事件的信息。如果成功,则将流文件发送到“成功”关系。注意生成的流文件的存储位置。如果流文件存储在处理器的一个监视目录中,则会有永无止境的事件流。同样重要的是要知道此处理器必须消耗所有事件。过滤必须在处理器内进行。这是因为HDFS管理员的事件通知API没有过滤。

GetHDFSFileInfo

从HDFS中检索文件和目录的列表。此处理器创建一个FlowFile,表示具有相关信息的HDFS文件/目录。此处理器的主要用途是提供类似于HDFS客户端的功能,即count,du,ls,test等。与ListHDFS不同,此处理器是无状态的,支持传入连接并提供dir级别的信息。

GetHDFSSequenceFile

将Hadoop分布式文件系统(HDFS)中的序列文件提取到FlowFiles中

GetHTMLElement

使用CSS选择器从传入流文件的内容中提取HTML元素值。传入的HTML首先转换为HTML文档对象模型,以便可以使用CSS选择器将样式应用于HTML的类似方式选择HTML元素。然后使用用户定义的CSS选择器字符串“查询”生成的HTML DOM。“查询”HTML DOM的结果可能会产生0-N结果。如果没有找到结果,则将流文件转移到“未找到元素”关系,以向最终用户指示。如果找到N个结果,将为每个结果创建并发出新的流文件。查询结果将放在新流文件的内容中,或者作为新流文件的属性。默认情况下,结果将写入属性。这可以通过“Destination”属性来控制。通过设置属性“Prepend Element Value”或“Append Element Value”的值,生成的查询值也可以在其前面或附加数据。前置和附加值被视为字符串值,并连接到从HTML DOM查询操作中检索的结果。可以在“http://jsoup.org/apidocs/org/jsoup/select/Selector.html”找到有关CSS选择器语法的更全面的参考。

GetHTTP

此处理器已弃用,可能会在将来的版本中删除。

GetIgniteCache

从Ignite Cache获取字节数组并将其添加为FlowFile的内容。处理器使用FlowFile属性的值(Ignite缓存条目键)作为缓存键查找。如果在缓存中找不到与密钥对应的条目,则会将错误消息与FlowFile相关联注意 - Ignite Kernel会定期向日志输出节点性能统计信息。可以通过在logback.xml配置文件中将logger'org.apache.ignite'的日志级别设置为WARN来关闭此消息。

GetJMSQueue

此处理器已弃用,可能会在将来的版本中删除。

GetJMSTopic

此处理器已弃用,可能会在将来的版本中删除。

GetKafka

从Apache Kafka获取消息,特别是0.8.x版本。用于发送消息的互补NiFi处理器是PutKafka。

GetMongo

从用户指定的查询加载的MongoDB中的文档创建FlowFiles。

GetMongoRecord

基于记录的GetMongo版本,它使用Record编写器编写MongoDB结果集。

GetRethinkDB

处理器使用文档ID从RethinkDB(https://www.rethinkdb.com/)获取JSON文档。FlowFile将包含检索到的文档

GetSFTP

从SFTP服务器获取文件并从中创建FlowFiles

GetSNMP

从SNMP代理检索信息,并输出包含属性信息且没有任何内容的FlowFile

GetSolr

查询Solr并以XML格式或使用Record Writer将结果输出为FlowFile

GetSplunk

从Splunk Enterprise检索数据。

GetSQS

从Amazon Simple Queuing Service Queue获取消息

GetTCP

通过TCP连接到提供的端点。收到的数据将作为内容写入FlowFile

GetTwitter

从Twitter的流API中提取状态更改。在以1.9.0开头的版本中,根据https://developer.twitter.com/en/docs/basics/authentication/guides/securing-keys-and-tokens将Consumer Key和Access Token标记为敏感。

HandleHttpRequest

启动HTTP服务器并侦听HTTP请求。对于每个请求,创建一个FlowFile并转移到“success”。此处理器旨在与HandleHttpResponse处理器一起使用,以便创建Web服务

HandleHttpResponse

将HTTP响应发送到生成FlowFile的请求者。此处理器旨在与HandleHttpRequest结合使用,以便创建Web服务。

HashAttribute

将多个flowfile属性的键/值对散列在一起,并将散列添加为新属性。将添加可选属性,以便属性的名称是要考虑的flowfile属性的名称,属性的值是正则表达式,如果与属性值匹配,将导致该属性用作部分哈希。如果正则表达式包含捕获组,则仅使用捕获组的值。对于接受各种属性并生成每个属性的加密哈希的处理器,请参阅“CryptographicHashAttribute”。

HashContent

此处理器已弃用,可能会在将来的版本中删除。

IdentifyMimeType

尝试识别用于FlowFile的MIME类型。如果可以标识MIME类型,则添加名为“mime.type”的属性,其值为MIME类型。如果无法确定MIME类型,则该值将设置为“application / octet-stream”。此外,如果已知MIME类型的公共文件扩展名,则将设置属性mime.extension。

InferAvroSchema

检查传入的FlowFile的内容以推断Avro架构。处理器将使用Kite SDK尝试从传入内容自动生成Avro架构。在从JSON数据推断架构时,将在生成的Avro架构定义中使用键名称。当从CSV数据推断时,必须存在“标题定义”作为输入数据的第一行,或者必须在属性“CSV标题定义”中明确设置“标题定义”。“标题定义”只是一个逗号分隔的行,用于定义每列的名称。需要“标头定义”以确定应在结果Avro定义中为每个字段指定的名称。在推断数据类型时,如果存在歧义,则始终使用更高阶数据类型。例如,当检查数值时,类型可以设置为“long”而不是“integer”,因为long可以安全地保持任何“整数”的值。目前仅支持CSV和JSON内容来自动推断Avro架构。传入FlowFile中存在的内容类型是使用属性“输入内容类型”设置的。该属性可以显式设置为CSV,JSON或“使用mime.type值”,它将检查传入FlowFile上的mime.type属性的值,以确定存在的内容类型。传入FlowFile中存在的内容类型是使用属性“输入内容类型”设置的。该属性可以显式设置为CSV,JSON或“使用mime.type值”,它将检查传入FlowFile上的mime.type属性的值,以确定存在的内容类型。传入FlowFile中存在的内容类型是使用属性“输入内容类型”设置的。该属性可以显式设置为CSV,JSON或“使用mime.type值”,它将检查传入FlowFile上的mime.type属性的值,以确定存在的内容类型。

InvokeAWSGatewayApi

AWS Gateway API端点的客户端

InvokeHTTP

可以与可配置的HTTP端点交互的HTTP客户端处理器。目标URL和HTTP方法是可配置的。FlowFile属性转换为HTTP头,FlowFile内容作为请求的主体包含(如果HTTP方法是PUT,POST或PATCH)。

InvokeScriptedProcessor

实验 - 为给定脚本中定义的处理器调用脚本引擎。该脚本必须定义一个实现Processor接口的有效类,并且必须将一个变量“processor”设置为该类的实例。处理器方法(如onTrigger())将委派给脚本化的Processor实例。此外,脚本处理器定义的任何Relationships或PropertyDescriptors都将添加到配置对话框中。实验:尚未验证持续使用的影响。

ISPEnrichIP

查找IP地址的ISP信息,并将信息添加到FlowFile属性。ISP数据作为MaxMind ISP数据库提供(请注意,这与某些地理浓缩工具使用的GeoLite数据库不同)。包含要查找的IP地址的属性由“IP地址属性”属性提供。如果提供的属性名称为“X”,则通过“浓缩”添加的属性将采用X.isp形式。

JoltTransformJSON

将Jolt规范列表应用于流文件JSON有效内容。使用已转换的内容创建新的FlowFile,并将其路由到“成功”关系。如果JSON转换失败,则原始FlowFile将路由到“失败”关系。

JoltTransformRecord

将Jolt规范列表应用于FlowFile有效负载。使用已转换的内容创建新的FlowFile,并将其路由到“成功”关系。如果转换失败,则原始FlowFile将路由到“失败”关系。

JsonQueryElasticsearch

允许用户运行使用ElasticSearch JSON DSL编写的查询(使用聚合)的处理器。它不会自动为用户分页查询。如果将传入关系添加到此处理器,它将使用该流文件的内容进行查询。应该注意查询的大小,因为ElasticSearch的整个响应将一次性加载到内存中并转换为生成的流文件。

ListAzureBlobStorage

列出Azure存储容器中的Blob。列表详细信息附加到空的FlowFile以与FetchAzureBlobStorage一起使用。此处理器旨在仅在群集中的主节点上运行。如果主节点发生更改,则新主节点将在上一节点停止的位置拾取,而不会复制所有数据。

ListDatabaseTables

生成一组流文件,每个文件包含与数据库连接中的表的元数据相对应的属性。一旦获取了有关表的元数据,在刷新间隔(如果已设置)已经过去或者手动清除状态之前,将不会再次获取它。

ListenBeats

使用Libbeat的'output.logstash'监听libbeat兼容客户端发送的消息(例如filebeats,metricbeats等),将其JSON格式的有效负载写入FlowFile的内容。该处理器取代现已弃用的ListenLumberjack

ListenHTTP

启动HTTP Server并侦听给定的基本路径,以将传入的请求转换为FlowFiles。服务的默认URI将是http:// {hostname}:{port} / contentListener。仅支持HEAD和POST请求。GET,PUT和DELETE将导致错误和HTTP响应状态代码405。

ListenLumberjack

此处理器已弃用,可能会在不久的将来删除。侦听通过TCP发送到给定端口的Lumberjack消息。成功将消息写入FlowFile后,将确认每条消息。每个FlowFile将包含一个或多个Lumberjack帧的数据部分。在Lumberjack帧包含系统日志消息的情况下,可以将该处理器的输出发送到ParseSyslog处理器以进行进一步处理。

ListenRELP

侦听通过TCP发送到给定端口的RELP消息。成功将消息写入FlowFile后,将确认每条消息。每个FlowFile将包含一个或多个RELP帧的数据部分。在RELP帧包含系统日志消息的情况下,可以将该处理器的输出发送到ParseSyslog处理器以进行进一步处理。

ListenSMTP

此处理器为任意端口实现轻量级SMTP服务器,允许nifi侦听传入的电子邮件。请注意,此服务器不执行任何电子邮件验证。如果寻求直接暴露于互联网,使用NiFi和工业规模MTA(例如Postfix)的组合可能是更好的主意。此处理器的线程由所使用的底层smtp服务器管理,因此处理器不需要支持多个线程。

ListenSyslog

侦听通过TCP或UDP发送到给定端口的Syslog消息。根据RFC5424和RFC3164格式的消息的正则表达式检查传入消息。每条消息的格式为:()(VERSION)(TIMESTAMP)(HOSTNAME)(BODY)其中​​版本是可选的。时间戳可以是RFC5424时间戳,格式为“yyyy-MM-dd'T'HH:mm:ss.SZ”或“yyyy-MM-dd'T'HH:mm:ss.S + hh:mm” ,或者它可以是RFC3164时间戳,格式为“MMM d HH:mm:ss”。如果传入消息与这些模式之一匹配,则将解析消息,并将各个片段放在FlowFile属性中,原始消息将包含在FlowFile的内容中。如果传入消息与这些模式之一不匹配,则不会对其进行解析,并且syslog.valid属性将设置为false,并在FlowFile的内容中显示原始消息。有效消息将在成功关系上传输,无效消息将在无效关系上传输。

ListenTCP

侦听传入的TCP连接,并使用行分隔符作为消息分隔符从每个连接读取数据。默认行为是每条消息生成一个FlowFile,但是可以通过将批量大小增加到更大的值来控制更高的吞吐量。接收缓冲区大小必须设置为与预期接收的最大消息一样大,这意味着如果每100kb有一个行分隔符,则接收缓冲区大小必须大于100kb。

ListenTCPRecord

侦听传入的TCP连接并使用配置的记录读取器从每个连接读取数据,并使用配置的记录编写器将记录写入流文件。选择的记录阅读器类型将决定客户端如何发送数据。例如,当使用Grok阅读器读取日志时,客户端可以保持打开连接并连续传输数据,但是当使用JSON阅读器时,客户端无法发送JSON文档数组,然后在同一连接上发送另一个数组,因为读者在那时会处于不良状态。将以阻塞模式从连接中读取记录,并将根据处理器中指定的读取超时进行超时。如果读取超时,或者在读取时遇到任何其他错误,则连接将被关闭,读取到该点的任何记录将根据配置的读取错误策略(丢弃或转移)进行处理。如果客户端保持连接打开,则应调整处理器的并发任务以匹配允许的最大TCP连接数,以便有一个任务处理每个连接。

ListenUDP

在给定端口上侦听Datagram Packets。默认行为为每个数据报生成FlowFile,但是为了获得更高的吞吐量,可以增加Max Batch Size属性以指定在单个FlowFile中批处理的数据报的数量。通过指定“发送主机”和“发送主机端口”属性,可以限制此处理器侦听来自特定远程主机和端口的数据报,否则它将侦听来自所有主机和端口的数据报。

ListenUDPRecord

在给定端口上侦听Datagram Packets,并使用配置的Record Reader读取每个数据报的内容。然后,使用配置的Record Writer将每条记录写入流文件。通过指定“发送主机”和“发送主机端口”属性,可以限制此处理器侦听来自特定远程主机和端口的数据报,否则它将侦听来自所有主机和端口的数据报。

ListenWebSocket

充当WebSocket服务器端点以接受客户端连接。FlowFiles根据收到的消息类型转移到下游关系,因为使用此处理器配置的WebSocket服务器接收客户端请求

ListFile中

从本地文件系统中检索文件列表。对于列出的每个文件,创建一个表示该文件的FlowFile,以便可以与FetchFile一起提取该文件。此处理器旨在仅在群集中的主节点上运行。如果主节点发生更改,则新主节点将在上一节点停止的位置拾取,而不会复制所有数据。与GetFile不同,此处理器不会从本地文件系统中删除任何数据。

ListFTP

执行驻留在FTP服务器上的文件列表。对于在远程服务器上找到的每个文件,将创建一个新的FlowFile,并将filename属性设置为远程服务器上的文件名。然后,可以将其与FetchFTP结合使用以获取这些文件。

ListGCSBucket

从GCS存储桶中检索对象列表。对于列出的每个对象,创建一个表示对象的FlowFile,以便可以与FetchGCSObject一起提取它。此处理器旨在仅在群集中的主节点上运行。如果主节点发生更改,则新主节点将在上一节点停止的位置拾取,而不会复制所有数据。

ListHDFS

从HDFS中检索文件列表。每次执行列表时,具有最新时间戳的文件将被排除并在下一次执行处理器期间被拾取。这样做是为了确保在单次执行处理器之前和之后立即写入具有相同时间戳的文件时,我们不会遗漏任何文件或产生重复文件。对于HDFS中列出的每个文件,此处理器创建一个FlowFile,表示要与FetchHDFS一起提取的HDFS文件。此处理器旨在仅在群集中的主节点上运行。如果主节点发生更改,则新主节点将在上一节点停止的位置拾取,而不会复制所有数据。与GetHDFS不同,此处理器不会从HDFS中删除任何数据。

ListS3

从S3存储桶中检索对象列表。对于列出的每个对象,创建一个表示对象的FlowFile,以便可以与FetchS3Object一起获取它。此处理器旨在仅在群集中的主节点上运行。如果主节点发生更改,则新主节点将在上一节点停止的位置拾取,而不会复制所有数据。

ListSFTP

执行驻留在SFTP服务器上的文件列表。对于在远程服务器上找到的每个文件,将创建一个新的FlowFile,并将filename属性设置为远程服务器上的文件名。然后,可以将其与FetchSFTP结合使用以获取这些文件。

LogAttribute

在指定的日志级别发出FlowFile的属性

的LogMessage

在指定的日志级别发出日志消息

LookupAttribute

查找服务中的查找属性

LookupRecord

从Record中提取一个或多个字段,并在LookupService中查找这些字段的值。如果LookupService返回结果,则可以选择将该结果添加到Record中。在这种情况下,处理器用作浓缩处理器。无论如何,记录然后被路由到“匹配”关系或“不匹配”关系(如果“路由策略”属性被配置为这样做),指示LookupService是否返回结果,允许处理器也可用作路由处理器。用于在查找服务中查找值的“坐标”是通过添加用户定义的属性来定义的。添加的每个属性都有一个添加到Map的条目,其中属性的名称变为Map Key,RecordPath返回的值将成为该键的值。如果RecordPath返回多个值,则Record将被路由到“不匹配”关系(或“成功”,具体取决于“路由策略”属性的配置)。如果一个或多个字段与Result RecordPath匹配,则将更新匹配的所有字段。如果配置的LookupService中没有匹配项,则不会更新任何字段。即,它不会使用空值覆盖Record中的现有值。但请注意,如果您的架构中没有考虑LookupService返回的结果(特别是为Record Writer配置的架构),那么这些字段将不会写入FlowFile。取决于“路由策略”属性的配置)。如果一个或多个字段与Result RecordPath匹配,则将更新匹配的所有字段。如果配置的LookupService中没有匹配项,则不会更新任何字段。即,它不会使用空值覆盖Record中的现有值。但请注意,如果您的架构中没有考虑LookupService返回的结果(特别是为Record Writer配置的架构),那么这些字段将不会写入FlowFile。取决于“路由策略”属性的配置)。如果一个或多个字段与Result RecordPath匹配,则将更新匹配的所有字段。如果配置的LookupService中没有匹配项,则不会更新任何字段。即,它不会使用空值覆盖Record中的现有值。但请注意,如果您的架构中没有考虑LookupService返回的结果(特别是为Record Writer配置的架构),那么这些字段将不会写入FlowFile。

MergeContent

根据用户定义的策略将一组FlowFiles合并在一起,并将它们打包到一个FlowFile中。建议只为单个传入连接配置处理器,因为不会从不同连接中的FlowFiles创建FlowFiles组。此处理器根据需要更新mime.type属性。

MergeRecord

此处理器将多个面向记录的FlowFiles合并到一个FlowFile中,该FlowFile包含输入FlowFiles的所有记录。此处理器的工作原理是创建“箱”,然后将FlowFiles添加到这些箱中,直到它们满了。一旦bin已满,所有FlowFiles将合并为一个输出FlowFile,FlowFile将被路由到'merged'关系。bin将包含许多'类似FlowFiles'。为了使两个FlowFiles被视为“像FlowFiles”,它们必须具有相同的Schema(由Record Reader标识),如果属性已设置,指定属性的值相同。有关更多信息,请参阅处理器使用和其他详

ModifyBytes

丢弃开头和结尾的字节范围或二进制文件的所有内容。

ModifyHTMLElement

修改现有HTML元素的值。通过使用CSS选择器语法来定位要修改的所需元素。传入的HTML首先转换为HTML文档对象模型,以便可以使用CSS选择器将样式应用于HTML的类似方式选择HTML元素。然后使用用户定义的CSS选择器字符串“查询”生成的HTML DOM,以找到用户希望修改的元素。如果找到HTML元素,则使用指定的“Modified Value”属性值在DOM中更新元素的值。将更新与CSS选择器匹配的所有DOM元素。一旦所有DOM元素都已更新,DOM将呈现为HTML,结果将使用更新的HTML替换流文件内容。可以在“http:// jsoup”中找到有关CSS选择器语法的更全面的参考。

MonitorActivity

监视活动流并在流量在指定时间内没有任何数据时发出指示,并在流量活动恢复时再次发出指示

MoveHDFS

在Hadoop分布式文件系统(HDFS)上重命名现有文件或文件目录(非递归)。

通知

在分布式缓存中缓存释放信号标识符,可选地与FlowFile的属性一起缓存。一旦发现缓存中的该信号,将释放在相应的等待处理器处保持的任何流文件。

ParseCEF

解析CEF格式的消息的内容,并为FlowFile添加属性以用于CEF消息的部分的头部和扩展。注意:此处理器期望CEF消息没有syslog标头(即从“CEF:0”开始)

ParseEvtx

解析Windows事件日志文件(evtx)的内容,并将生成的XML写入FlowFile

ParseNetflowv5

解析netflowv5字节摄取并添加到NiFi流文件作为属性或JSON内容。

ParseSyslog

尝试根据RFC5424和RFC3164格式解析Syslog消息的内容,并为Syslog消息的每个部分向FlowFile添加属性。注意:请注意RFC3164是信息性的,并且存在各种不同的实现。狂野的。如果消息无法解析,请考虑使用RFC5424或使用通用解析处理器(如ExtractGrok)。

ParseSyslog5424

尝试按照RFC5424格式解析格式正确的Syslog消息的内容,并为Syslog消息的每个部分添加属性到FlowFile,包括结构化数据。结构化数据将作为每个项目id的一个属性写入属性+参数参见https://tools.ietf.org/html/rfc5424.Note:ParseSyslog5424比ParseSyslog更接近规范。如果您的Syslog生成器没有严格遵循该规范,例如,对于丢失的头条目使用' - ',这些日志将失败,使用此解析器,ParseSyslog不会失败。

PartitionRecord

接收面向记录的数据(即,可以由配置的记录读取器读取的数据),并根据传入的FlowFile中的每个记录评估一个或多个RecordPath。然后将每个记录与其他“类似记录”分组,并为每组“类似记录”创建FlowFile。两条记录“像记录”的含义由用户定义的属性决定。用户需要输入至少一个用户定义的属性,其值为RecordPath。如果两个记录对于所有已配置的RecordPath具有相同的值,则它们被认为是相似的。因为我们知道给定输出FlowFile中的所有记录对于RecordPath指定的字段具有相同的值,所以为每个字段添加一个属性。有关更多信息和示例,请参阅“使用”页面上的“其他详

PostHTTP

此处理器已弃用,可能会在将来的版本中删除。

PublishAMQP

根据FlowFile的内容创建AMQP消息,并将消息发送到AMQP Exchange。在典型的AMQP交换模型中,发送到AMQP Exchange的消息将根据“路由密钥”路由到队列中的最终目的地(绑定)。如果由于某些配置错误导致Exchange,路由密钥和队列之间的绑定未设置,则该消息将没有最终目标并将返回(即,数据将不会进入队列)。如果发生这种情况,您将在app-log和bulletin中看到一个记录该效果的日志,并且FlowFile将被路由到“失败”关系。

PublishGCPubSub

将传入流文件的内容发布到配置的Google Cloud PubSub主题。处理器支持动态属性。如果存在任何动态属性,它们将以“属性”的形式与消息一起发送。

PublishJMS

从FlowFile的内容创建JMS消息,并将其作为JMS BytesMessage或TextMessage发送到JMS目标(队列或主题)。FlowFile属性将作为JMS标头和/或属性添加到传出JMS消息中。

PublishKafka

使用Kafka 0.9.x Producer将FlowFile的内容作为消息发送到Apache Kafka。要发送的消息可以是单独的FlowFiles,也可以使用用户指定的分隔符(例如换行符)进行分隔。请注意,在某些情况下,发布商可能会陷入无限期停滞状态。我们正密切关注Kafka社区如何发展,并尽快利用这些修复措施。同时,可以进入只有重新启动JVM NiFi运行的分辨率的状态。用于获取消息的补充NiFi处理器是ConsumeKafka。

PublishKafka_0_10

使用Kafka 0.10.x Producer API将FlowFile的内容作为消息发送到Apache Kafka。要发送的消息可以是单独的FlowFiles,也可以使用用户指定的分隔符(例如换行符)分隔。用于获取消息的补充NiFi处理器是ConsumeKafka_0_10。

PublishKafka_0_11

使用Kafka 0.11.x Producer API将FlowFile的内容作为消息发送到Apache Kafka。要发送的消息可以是单独的FlowFiles,也可以使用用户指定的分隔符(例如换行符)分隔。用于获取消息的补充NiFi处理器是ConsumeKafka_0_11。

PublishKafka_1_0

使用Kafka 1.0 Producer API将FlowFile的内容作为消息发送到Apache Kafka。要发送的消息可以是单独的FlowFiles,也可以使用用户指定的分隔符(例如换行符)进行分隔。用于获取消息的补充NiFi处理器是ConsumeKafka_1_0。

PublishKafka_2_0

使用Kafka 2.0 Producer API将FlowFile的内容作为消息发送到Apache Kafka。要发送的消息可以是单独的FlowFiles,也可以使用用户指定的分隔符(例如换行符)进行分隔。用于获取消息的补充NiFi处理器是ConsumeKafka_2_0。

PublishKafkaRecord_0_10

使用Kafka 0.10.x Producer API将FlowFile的内容作为单独的记录发送到Apache Kafka。FlowFile的内容应该是面向记录的数据,可以由配置的Record Reader读取。用于获取消息的补充NiFi处理器是ConsumeKafka_0_10_Record。

PublishKafkaRecord_0_11

使用Kafka 0.11.x Producer API将FlowFile的内容作为单独的记录发送到Apache Kafka。FlowFile的内容应该是面向记录的数据,可以由配置的Record Reader读取。用于获取消息的补充NiFi处理器是ConsumeKafka_0_11_Record。

PublishKafkaRecord_1_0

使用Kafka 1.0 Producer API将FlowFile的内容作为单独的记录发送到Apache Kafka。FlowFile的内容应该是面向记录的数据,可以由配置的Record Reader读取。用于获取消息的补充NiFi处理器是ConsumeKafkaRecord_1_0。

PublishKafkaRecord_2_0

使用Kafka 2.0 Producer API将FlowFile的内容作为单独的记录发送到Apache Kafka。FlowFile的内容应该是面向记录的数据,可以由配置的Record Reader读取。用于获取消息的补充NiFi处理器是ConsumeKafkaRecord_2_0。

PublishMQTT

将消息发布到MQTT主题

PutAzureBlobStorage

将内容放入Azure存储Blob中

PutAzureEventHub

将FlowFile的内容发送到Windows Azure事件中心。注意:FlowFile的内容在发送之前将缓冲到内存中,因此应注意避免向此处理器发送超过可用Java堆空间量的FlowFiles。

PutAzureQueueStorage

将传入的FlowFiles的内容写入配置的Azure队列存储。

PutBigQueryBatch

批量加载将文件内容流式传输到Google BigQuery表。

PutCassandraQL

在Cassandra 1.x,2.x或3.0.x群集上执行提供的Cassandra查询语言(CQL)语句。传入的FlowFile的内容应该是要执行的CQL命令。CQL命令可以使用?逃避参数。在这种情况下,要使用的参数必须作为具有命名约定cql.args.N.type和cql.args.N.value的FlowFile属性存在,其中N是正整数。cql.args.N.type应该是一个小写字符串,表示Cassandra类型。

PutCassandraRecord

这是一个记录感知处理器,使用配置的“记录读取器”将传入的FlowFile的内容作为单独的记录读取,并使用本机协议版本3或更高版本将它们写入Apache Cassandra。

PutCloudWatchMetric

将指标发布到Amazon CloudWatch。度量标准可以是单个值,也可以是由最小值,最大值,总和和样本计数组成的统计集。

PutCouchbaseKey

通过键/值访问将文档放入Couchbase Server。

PutDatabaseRecord

PutDatabaseRecord处理器使用指定的RecordReader从传入的流文件中输入(可能是多个)记录。这些记录被转换为SQL语句并作为单个批处理执行。如果发生任何错误,则将流文件路由到失败或重试,如果成功传输记录,则传入的流文件将路由到成功。处理器执行的语句类型是通过Statement Type属性指定的,该属性接受一些硬编码值,如INSERT,UPDATE和DELETE,以及'Use statement.type Attribute',这会导致处理器获取流文件属性中的语句类型。重要信息:如果语句类型为UPDATE,则传入记录不得更改主键(或用户指定的更新键)的值。如果遇到这样的记录,

PutDistributedMapCache

获取FlowFile的内容,并使用从FlowFile属性计算的缓存键将其放入分布式地图缓存中。如果缓存已包含该条目,并且缓存更新策略为“保持原始”,则不会替换该条目。

PutDruidRecord

将记录发送给德鲁伊进行索引。利用Druid Tranquility Controller服务。

PutDynamoDB

根据哈希和范围键从DynamoDB中提取文档。该表可以单独使用散列和范围或散列键。目前支持的键是字符串,数字和值可以是json文档。在散列和范围键的情况下,操作需要两个键。FlowFile内容必须是JSON。FlowFile内容映射到DynamoDB项目中指定的Json Document属性。

PutElasticsearch

使用指定的参数(例如要插入的索引和文档类型)将FlowFile的内容写入Elasticsearch。如果群集已配置为授权和/或安全传输(SSL / TLS)并且Shield插件可用,则可以进行安全连接。此处理器支持Elasticsearch 2.x群集。

PutElasticsearch5

使用指定的参数(例如要插入的索引和文档类型)将FlowFile的内容写入Elasticsearch。如果群集已配置为授权和/或安全传输(SSL / TLS),并且X-Pack插件可用,则可以进行安全连接。此处理器支持Elasticsearch 5.x群集。

PutElasticsearchHttp

使用指定的参数(例如要插入的索引和文档类型)将FlowFile的内容写入Elasticsearch。

PutElasticsearchHttpRecord

使用指定的参数(例如要插入的索引和文档类型)以及操作类型(索引,upsert,delete等)将FlowFile中的记录写入Elasticsearch。注意:Bulk API用于发送记录。这意味着传入流文件的全部内容将被读入内存,并且每条记录都将转换为JSON文档,该文档将添加到单个HTTP请求正文中。对于非常大的流文件(例如,具有大量记录的文件),这可能导致内存使用问题。

PutEmail

向每个传入的FlowFile发送配置收件人的电子邮件

PUTFILE

将FlowFile的内容写入本地文件系统

PutFTP

将FlowFiles发送到FTP服务器

PutGCSObject

将流文件放入Google Cloud Bucket。

PutGridFS

将文件写入GridFS存储桶。

PutHBaseCell

将FlowFile的内容添加到HBase作为单个单元格的值

PutHBaseJSON

根据传入的JSON文档的内容向HBase添加行。每个FlowFile必须包含一个UTF-8编码的JSON文档,并且根元素不是单个文档的任何FlowFiles都将路由到失败。每个JSON字段名称和值将成为列限定符和HBase行的值。将跳过具有空值的任何字段,并且将根据复杂字段策略处理具有复杂值的字段。行ID可以通过行标识符属性直接在处理器上指定,也可以通过指定行标识符字段名称属性从JSON文档中提取。该处理器将一次保存给定批次的所有FlowFiles的内容。

PutHBaseRecord

使用配置的记录阅读器根据流文件的内容将行添加到HBase。

PutHDFS

将FlowFile数据写入Hadoop分布式文件系统(HDFS)

PutHiveQL

执行HiveQL DDL / DML命令(例如,UPDATE,INSERT)。传入的FlowFile的内容应该是要执行的HiveQL命令。HiveQL命令可能会使用?逃避参数。在这种情况下,要使用的参数必须作为具有命名约定hiveql.args.N.type和hiveql.args.N.value的FlowFile属性存在,其中N是正整数。hiveql.args.N.type应该是一个表示JDBC Type的数字。FlowFile的内容预计为UTF-8格式。

PutHiveStreaming

此处理器使用Hive Streaming将流文件数据发送到Apache Hive表。传入的流文件应采用Avro格式,表必须存在于Hive中。有关Hive表(格式,分区等)的要求,请参阅Hive文档。基于处理器中指定的分区列的名称从Avro记录中提取分区值。注意:如果为此处理器配置了多个并发任务,则单个线程只能随时写入一个表。打算写入同一个表的其他任务将等待当前任务完成写入表。

PutHTMLElement

在现有HTML DOM中放置一个新的HTML元素。使用CSS选择器语法指定新HTML元素的所需位置。传入的HTML首先转换为HTML文档对象模型,以便HTML DOM位置的位置可以与CSS选择器用于将样式应用于HTML的方式类似。然后使用用户定义的CSS选择器字符串“查询”生成的HTML DOM,以找到用户希望添加新HTML元素的位置。将新的HTML元素添加到DOM后,它将呈现为HTML,结果将使用更新的HTML替换流文件内容。可以在“http://jsoup.org/apidocs/org/jsoup/select/Selector.html”找到有关CSS选择器语法的更全面的参考。

PutIgniteCache

使用DataStreamer将FlowFile的内容流式传输到Ignite Cache。处理器使用FlowFile属性(Ignite缓存条目键)的值作为缓存键,并使用FlowFile的字节数组作为缓存条目值的值。字符串键和非空字节数组值都是必需的,否则FlowFile将转移到失败关系。注 - Ignite Kernel会定期向日志输出节点性能统计信息。可以通过在logback.xml配置文件中将logger'org.apache.ignite'的日志级别设置为WARN来关闭此消息。

PutInfluxDB

处理器以“行协议”的形式写入FlowFile的内容。请查看InfluxDB文档(https://www.influxdb.com/)中“行协议”的详细信息。流文件可以包含由线分隔符分隔的单个测量点或多个测量点。时间戳(最后一个字段)应该是纳秒级分辨率。

PutJMS

此处理器已弃用,可能会在将来的版本中删除。

PutKafka

将FlowFile的内容作为消息发送到Apache Kafka,特别是0.8.x版本。要发送的消息可以是单独的FlowFiles,也可以使用用户指定的分隔符(例如换行符)进行分隔。用于获取消息的补充NiFi处理器是GetKafka。

PutKinesisFirehose

将内容发送到指定的Amazon Kinesis Firehose。为了将数据发送到firehose,必须指定firehose传输流名称。

PutKinesisStream

将内容发送到指定的Amazon Kinesis。为了将数据发送到Kinesis,必须指定流名称。

PutKudu

使用提供的Record Reader从传入的FlowFile读取记录,并将这些记录写入指定的Kudu表。必须在处理器属性或源中提供表的架构。如果从输入读取记录或将记录写入Kudu时发生任何错误,FlowFile将被路由到失败

PutLambda

将内容发送到指定的Amazon Lamba函数。用于身份验证的AWS凭据必须具有执行Lambda函数(lambda:InvokeFunction)的权限.FlowFile内容必须是JSON。

PutMongo

将FlowFile的内容写入MongoDB

PutMongoRecord

该处理器是一个记录感知处理器,用于将数据插入MongoDB。它使用已配置的记录阅读器和模式从流文件正文中读取传入记录集,然后将这些记录的批量插入到已配置的MongoDB集合中。此处理器不支持更新,删除或upsert。一次插入的文档数由“插入批处理大小”配置属性控制。应将此值设置为合理的大小,以确保MongoDB不会一次过多地插入过多的插件。

PutParquet

使用提供的Record Reader从传入的FlowFile读取记录,并将这些记录写入Parquet文件。必须在处理器属性中提供Parquet文件的架构。此处理器将首先编写一个临时点文件,并在成功将每个记录写入点文件后,它会将点文件重命名为它的最终名称。如果无法重命名点文件,则重命名操作最多会尝试10次,如果仍然不成功,则会删除点文件,并将流文件路由到失败状态。如果从输入读取记录或将记录写入输出时发生任何错误,则将删除整个点文件,并将流文件路由到失败或重试,具体取决于错误。

PutRethinkDB

处理器将FlowFile的JSON内容写入RethinkDB(https://www.rethinkdb.com/)。流文件应包含JSON Object和JSON文档数组

PutRiemann

当FlowFiles通过此处理器时,将事件发送给Riemann(http://riemann.io)。您可以使用事件通知Riemann FlowFile通过,或者您可以附加更有意义的指标,例如FlowFile到达此处理器所花费的时间。附加到事件的所有属性都支持NiFi表达式语言。

PutS3Object

将FlowFiles放入Amazon S3存储桶。上传使用PutS3Object方法或PutS3MultipartUpload方法。PutS3Object方法在单个同步调用中发送文件,但它具有5GB的大小限制。使用PutS3MultipartUpload方法发送更大的文件。此多部分进程在每个步骤之后保存状态,以便在停止并重新启动处理器或集群时,可以以最小的损失恢复大型上载。分段上传包括三个步骤:1)启动上传,2)上传部分,以及3)完成上传。对于分段上传,处理器在本地保存状态,跟踪上传ID和上传的部分,必须同时提供这些内容才能完成上传。AWS库根据AWS区域选择端点URL,但是,可以使用“端点覆盖URL”属性覆盖此属性,以便与其他与S3兼容的端点一起使用。S3 API指定PutS3Object上载的最大文件大小为5GB。它还要求分段上传中的部分大小必须至少为5MB,除了最后一部分。这些限制确定了“分段上传阈值”和“零件尺寸”属性的界限。

PutSFTP

将FlowFiles发送到SFTP服务器

PutSlack

在slack.com上向您的团队发送消息

PutSNS

将FlowFile的内容作为通知发送到Amazon Simple Notification Service

PutSolrContentStream

将FlowFile的内容作为ContentStream发送到Solr

PutSolrRecord

将记录从FlowFile索引到Solr

PutSplunk

通过TCP,TCP + TLS / SSL或UDP将日志发送到Splunk Enterprise。如果提供了消息分隔符,则此处理器将根据分隔符从传入的FlowFile读取消息,并将每条消息发送到Splunk。如果未提供消息分隔符,则FlowFile的内容将直接发送到Splunk,就好像它是单个消息一样。

PutSQL

执行SQL UPDATE或INSERT命令。传入的FlowFile的内容应该是要执行的SQL命令。SQL命令可能会使用?逃避参数。在这种情况下,要使用的参数必须作为具有命名约定sql.args.N.type和sql.args.N.value的FlowFile属性存在,其中N是正整数。sql.args.N.type应该是一个表示JDBC Type的数字。FlowFile的内容预计为UTF-8格式。

PutSQS

将消息发布到Amazon Simple Queuing Service Queue

PutSyslog

通过TCP或UDP将Syslog消息发送到给定主机和端口。消息是从处理器的“Message ___”属性构造的,它可以使用表达式语言从传入的FlowFiles生成消息。这些属性用于构造以下形式的消息:)(VERSION)(TIMESTAMP)(HOSTNAME)(BODY)其中​​版本是可选的。根据RFC5424和RFC3164格式的消息的正则表达式检查构造的消息。时间戳可以是RFC5424时间戳,格式为“yyyy-MM-dd'T'HH:mm:ss.SZ”或“yyyy-MM-dd'T'HH:mm:ss.S + hh:mm” ,或者它可以是RFC3164时间戳,格式为“MMM d HH:mm:ss”。如果根据以上描述构造了不形成有效Syslog消息的消息,则将其路由到无效关系。有效消息将发送到Syslog服务器,成功将路由到成功关系,故障路由到故障关系。

PutTCP

PutTCP处理器接收FlowFile并通过TCP连接将FlowFile内容发送到配置的TCP服务器。默认情况下,FlowFiles通过相同的TCP连接(或TCP连接池,如果配置了多个输入线程)传输。为了帮助TCP服务器确定消息边界,可以配置一个可选的“Outgoing Message Delimiter”字符串,当通过TCP连接传输时,该字符串附加到每个FlowFiles内容的末尾。可以指定可选的“每个FlowFile连接”参数来更改行为,以便通过单个TCP连接传输每个FlowFiles内容,该连接在收到FlowFile时打开,并在发送FlowFile后关闭。此选项仅应用于低消息量方案,

PutUDP

PutUDP处理器接收FlowFile并将FlowFile内容打包成单个UDP数据报包,然后将其发送到配置的UDP服务器。用户必须确保提供给此处理器的FlowFile内容不大于基础UDP传输的最大大小。最大传输大小将根据平台设置而有所不同,但通常不到64KB。如果FlowFiles的内容大于最大传输大小,则会将其标记为失败。

PutWebSocket

使用由ListenWebSocket或ConnectWebSocket建立的WebSocket会话将消息发送到WebSocket远程端点。

QueryCassandra

在Cassandra 1.x,2.x或3.0.x群集上执行提供的Cassandra查询语言(CQL)选择查询。查询结果可以转换为Avro或JSON格式。使用流式传输,因此支持任意大的结果集。可以使用标准调度方法将此处理器调度为在计时器或cron表达式上运行,或者可以由传入的FlowFile触发。如果它由传入的FlowFile触发,则在评估选择查询时,该FlowFile的属性将可用。FlowFile属性'executecql.row.count'表示选择了多少行。

QueryDatabaseTable

生成SQL select查询,或使用提供的语句,并执行它以获取指定的Maximum Value列中的值大于先前看到的最大值的所有行。查询结果将转换为Avro格式。多个属性支持表达式语言,但不允许传入连接。变量注册表可用于为包含表达式语言的任何属性提供值。如果希望利用流文件属性来执行这些查询,可以将GenerateTableFetch和/或ExecuteSQL处理器用于此目的。使用流式传输,因此支持任意大的结果集。可以使用标准调度方法将此处理器调度为在计时器或cron表达式上运行。此处理器仅用于在主节点上运行。

QueryDatabaseTableRecord

生成SQL select查询,或使用提供的语句,并执行它以获取指定的Maximum Value列中的值大于先前看到的最大值的所有行。查询结果将转换为记录编写者指定的格式。多个属性支持表达式语言,但不允许传入连接。变量注册表可用于为包含表达式语言的任何属性提供值。如果希望利用流文件属性来执行这些查询,可以将GenerateTableFetch和/或ExecuteSQL处理器用于此目的。使用流式传输,因此支持任意大的结果集。可以使用标准调度方法将此处理器调度为在计时器或cron表达式上运行。此处理器仅用于在主节点上运行。FlowFile属性'querydbtable.row.count'表示选择了多少行。

QueryDNS

强大的DNS查询处理器主要用于丰富DataFlow与基于DNS的API(例如RBL,ShadowServer的ASN查找),但也可用于执行常规DNS查找。

QueryElasticsearchHttp

使用指定的连接属性查询Elasticsearch。请注意,在写入Flow Files进行传输之前,每页文档的完整正文将被读入内存。另请注意,Elasticsearch max_result_window索引设置是可以使用此查询检索的记录数的上限。要检索更多记录,请使用ScrollElasticsearchHttp处理器。

QueryRecord

根据FlowFile的内容评估一个或多个SQL查询。然后,SQL查询的结果将成为输出FlowFile的内容。例如,这可用于特定于字段的过滤,转换和行级过滤。可以重命名列,执行简单的计算和聚合等。处理器配置有记录读取器控制器服务和记录编写器服务,以便允许传入和传出数据格式的灵活性。处理器必须至少配置一个用户定义的属性。Property的名称是将数据路由到的Relationship,而Property的值是一个SQL SELECT语句,用于指定如何转换/过滤输入数据。SQL语句必须是有效的ANSI SQL,并且由Apache Calcite提供支持。如果转换失败,原始FlowFile被路由到“失败”关系。否则,所选数据将被路由到关联的关系。如果Record Writer选择从Record继承模式,请务必注意,继承的模式将来自ResultSet,而不是输入Record。这允许QueryRecord处理器的单个实例具有多个查询,每个查询返回一组不同的列和聚合。因此,派生的模式将没有模式名称,因此,如果从Record继承Schema,则配置的Record Writer不会尝试将模式名称作为属性进行编写。有关更多信息,请参阅处理器使用文档。所选数据将被路由到相关关系。如果Record Writer选择从Record继承模式,请务必注意,继承的模式将来自ResultSet,而不是输入Record。这允许QueryRecord处理器的单个实例具有多个查询,每个查询返回一组不同的列和聚合。因此,派生的模式将没有模式名称,因此,如果从Record继承Schema,则配置的Record Writer不会尝试将模式名称作为属性进行编写。有关更多信息,请参阅处理器使用文档。所选数据将被路由到相关关系。如果Record Writer选择从Record继承模式,请务必注意,继承的模式将来自ResultSet,而不是输入Record。这允许QueryRecord处理器的单个实例具有多个查询,每个查询返回一组不同的列和聚合。因此,派生的模式将没有模式名称,因此,如果从Record继承Schema,则配置的Record Writer不会尝试将模式名称作为属性进行编写。有关更多信息,请参阅处理器使用文档。重要的是要注意,继承的模式将来自ResultSet,而不是输入Record。这允许QueryRecord处理器的单个实例具有多个查询,每个查询返回一组不同的列和聚合。因此,派生的模式将没有模式名称,因此,如果从Record继承Schema,则配置的Record Writer不会尝试将模式名称作为属性进行编写。有关更多信息,请参阅处理器使用文档。重要的是要注意,继承的模式将来自ResultSet,而不是输入Record。这允许QueryRecord处理器的单个实例具有多个查询,每个查询返回一组不同的列和聚合。因此,派生的模式将没有模式名称,因此,如果从Record继承Schema,则配置的Record Writer不会尝试将模式名称作为属性进行编写。有关更多信息,请参阅处理器使用文档。因此,如果从Record继承Schema,则配置的Record Writer不会尝试将Schema Name写为属性,这一点很重要。有关更多信息,请参阅处理器使用文档。因此,如果从Record继承Schema,则配置的Record Writer不会尝试将Schema Name写为属性,这一点很重要。有关更多信息,请参阅处理器使用文档。

QuerySolr

查询Solr并以XML格式或使用Record Writer将结果输出为FlowFile

QueryWhois

强大的whois查询处理器主要用于丰富DataFlow与基于whois的API(例如ShadowServer的ASN查找),但也可用于执行常规的whois查找。

ReplaceText

通过评估FlowFile的正则表达式(正则表达式)并将与正则表达式匹配的内容部分替换为某个备用值来更新FlowFile的内容。

ReplaceTextWithMapping

通过针对它评估正则表达式并将与正则表达式匹配的内容部分替换为映射文件中提供的某个备用值来更新FlowFile的内容。

ResizeImage

将图像大小调整为用户指定的尺寸。此处理器使用在运行NiFi的环境中注册的图像编解码器。默认情况下,这包括JPEG,PNG,BMP,WBMP和GIF图像。

RouteHL7

根据用户定义的查询路由传入的HL7数据。要添加查询,请向处理器添加新属性。属性的名称将成为处理器的新关系,值为HL7查询语言查询。如果FlowFile与查询匹配,则FlowFile的副本将路由到关联的关系。

RouteOnAttribute

使用属性表达式语言根据其属性路由FlowFiles

RouteOnContent

将正则表达式应用于FlowFile的内容,并将FlowFile的副本路由到其正则表达式匹配的每个目标。正则表达式作为用户定义的属性添加,其中属性的名称是关系的名称,值是与FlowFile内容匹配的正则表达式。用户定义的属性确实支持属性表达式语言,但结果被解释为文字值,而不是正则表达式

RouteText

根据一组用户定义的规则路由文本数据。传入的FlowFile中的每一行都与用户定义的Properties指定的值进行比较。将文本与这些用户定义的属性进行比较的机制由“匹配策略”定义。然后根据这些规则路由数据,分别路由文本的每一行。

RunMongoAggregation

每当收到流文件时运行聚合查询的处理器。

ScanAttribute

扫描FlowFiles的指定属性,检查指定的术语字典中是否存在任何值

ScanContent

扫描FlowFiles的内容以查找在用户提供的字典中找到的术语。如果匹配一个术语,则使用'matching.term'属性将该术语的UTF-8编码版本添加到FlowFile

ScanHBase

扫描并从HBase表中提取行。此处理器可用于通过指定一系列rowkey值(开始和/或结束),时间范围,过滤器表达式或它们的任意组合来从hbase表中获取行。记录顺序可以由属性控制Reversed处理器检索的行数可以是有限的。

ScrollElasticsearchHttp

使用指定的连接属性滚动Elasticsearch查询。此处理器旨在在主节点上运行,并且设计用于滚动大型结果集,如reindex的情况。必须先清除状态,然后才能运行另一个查询。返回每个结果页面,包含在JSON对象中,如下所示:{“hits”:[]}。请注意,每页文档的完整正文将在写入流文件进行传输之前读入内存。

SegmentContent

将FlowFile分段为字节边界上的多个较小段。每个段都具有以下属性:fragment.identifier,fragment.index,fragment.count,segment.original.filename; 然后,MergeContent处理器可以使用这些属性来重构原始FlowFile

SelectHiveQL

针对Hive数据库连接执行提供的HiveQL SELECT查询。查询结果将转换为Avro或CSV格式。使用流式传输,因此支持任意大的结果集。可以使用标准调度方法将此处理器调度为在计时器或cron表达式上运行,或者可以由传入的FlowFile触发。如果它由传入的FlowFile触发,则在评估选择查询时,该FlowFile的属性将可用。FlowFile属性'selecthiveql.row.count'表示选择了多少行。

执行setsnmp

根据传入的FlowFile属性,处理器将执行SNMP Set请求。建立名称如snmp $的属性时,处理器将尝试将属性值设置为属性名称中给定的相应OID

SplitAvro

根据配置的输出大小将二进制编码的Avro数据文件拆分为较小的文件。输出策略确定较小的文件是Avro数据文件,还是具有FlowFile属性中元数据的裸Avro记录。输出将始终是二进制编码。

SplitContent

按指定的字节序列拆分传入的FlowFiles

SplitJson

将JSON文件拆分为多个单独的FlowFiles,用于由JsonPath表达式指定的数组元素。每个生成的FlowFile都由指定数组的元素组成,并转移到关系“split”,原始文件转移到“原始”关系。如果未找到指定的JsonPath或未评估为数组元素,则原始文件将路由到“failure”并且不会生成任何文件。

SplitRecord

将面向记录的数据格式的输入FlowFile拆分为多个较小的FlowFiles

SplitText

将文本文件拆分为多个较小的文本文件,这些文本文件位于由最大行数或片段总大小限制的行边界上。每个输出拆分文件将包含不超过配置的行数或字节数。如果同时指定了“线分割计数”和“最大碎片大小”,则会在首先达到的限制时进行分割。如果片段的第一行超过最大片段大小,则该行将在单个分割文件中输出,该文件超出配置的最大大小限制。此组件还允许指定每个拆分应包含标题行。可以通过指定应构成标题的行数或使用标题标记与读取行匹配来计算标题行。如果发生这种匹配,则相应的行将被视为标题。请记住,在标头标记匹配的第一次失败时,将不再执行匹配,并且其余数据将被解析为给定拆分的常规行。如果在计算标题之后没有更多数据,则得到的分割将仅包括标题行。

SplitXml

将XML文件拆分为多个单独的FlowFiles,每个FlowFiles包含原始根元素的子级或后代

SpringContextProcessor

支持通过预定义的输入/输出MessageChannels从Spring Application Context中定义的应用程序发送和接收数据的处理器。

StoreInKiteDataset

将Avro记录存储在Kite数据集中

TagS3Object

在Amazon S3存储桶中的FlowFile上设置标记。如果尝试标记不存在的文件,FlowFile将路由到成功。

TailFile

“尾巴”文件或文件列表,在文件写入文件时从文件中摄取数据。该文件应该是文本的。仅在遇到新行(回车或换行符或组合)时才会摄取数据。如果文件到尾部定期“翻转”(通常是日志文件的情况),则可以使用可选的Rolling Filename Pattern从已翻转的文件中检索数据,即使在NiFi未运行时发生翻转(只要重新启动NiFi时数据仍然存在)。通常建议将运行计划设置为几秒钟,而不是使用默认值0秒运行,因为如果计划非常激进,此处理器将消耗大量资源。此时,此处理器不支持提取“翻转”时已压缩的文件。

的TransformXML

将提供的XSLT文件应用于流文件XML有效内容。使用已转换的内容创建新的FlowFile,并将其路由到“成功”关系。如果XSL转换失败,则原始FlowFile将路由到“失败”关系

UnpackContent

解压缩已使用多种不同包装格式之一打包的FlowFiles的内容,为每个输入FlowFile发送一个到多个FlowFile

UpdateAttribute

使用属性表达式语言更新FlowFile的属性和/或根据正则表达式删除属性

UpdateCounter

该处理器允许用户在其流程中设置特定的计数器和关键点。它对调试和基本计数功能很有用。

UpdateRecord

更新包含面向记录的数据的FlowFile的内容(即,可以通过RecordReader读取并由RecordWriter写入的数据)。此处理器要求至少添加一个用户定义的属性。Property的名称应指示RecordPath,该RecordPath确定应更新的字段。Property的值是替换值(可选地使用表达式语言)或者本身是从Record中提取值的RecordPath。Property属性值是确定为RecordPath还是文字值取决于的配置 属性。

ValidateCsv

根据用户指定的CSV架构验证FlowFiles的内容。有关一些模式示例,请查看此处理器的其他文档。

ValidateRecord

根据给定模式验证传入FlowFile的记录。遵循模式的所有记录都将路由到“有效”关系,而不遵循模式的记录将路由到“无效”关系。因此,如果某些记录根据模式有效而其他记录无效,则可以将单个传入的FlowFile拆分为两个单独的FlowFile。任何路由到“无效”关系的FlowFile都将发出ROUTE Provenance Event,并填充Details字段以解释记录无效的原因。此外,为了进一步解释记录无效的原因,可以为“org.apache.nifi.processors.standard.ValidateRecord”记录器启用DEBUG级别的日志记录。

ValidateXml

根据用户指定的XML Schema文件验证FlowFiles的内容

等待

将传入的FlowFiles路由到“等待”关系,直到匹配的释放信号从相应的Notify处理器存储在分布式缓存中。当识别出匹配的释放信号时,等待的FlowFile被路由到“成功”关系,其中从FlowFile复制的属性从Notify处理器产生释放信号。然后从缓存中移除释放信号条目。如果Waiting FlowFiles超过Expiration Duration,它将被路由到'expired'。如果需要等待多个信号,请通过“目标信号计数”属性指定所需的信号数。这对于将源FlowFile拆分为多个片段(如SplitText)的处理器尤其有用。为了等待处理所有片段,将“原始”关系连接到Wait处理器,以及与相应Notify处理器的“拆分”关系。配置Notify和Wait处理器使用'$ {fragment.identifier}'作为'Release Signal Identifier'的值,并在Wait处理器中指定'$ {fragment.count}'作为'Target Signal Count'的值。当使用'wait'关系作为循环时,建议使用优先级排序器(例如先进先出)。

YandexTranslate

将内容和属性从一种语言翻译成另一种语言

商业用途请联系作者获得授权。
版权声明:本文为博主「任霏」原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.renfei.net/posts/1003284
评论与留言

以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。

微信搜一搜:任霏博客