StreamSets基本使用
官方文档示例:https://streamsets.com/documentation/datacollector/3.0.0.0/help/index.html#Tutorial/BasicTutorial.html#task_jmz_3dn_ls
登录
点击 StreamSets > Data Collector WebUI访问默认用户名:admin
密码:admin准备工作
从官网下载测试数据: https://www.streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv
创建测试目录并赋予权限:
[root@node00 ~]# mkdir home/test_stream
[root@node00 ~]# mkdir home/test_stream/data
[root@node00 ~]# mkdir home/test_stream/error
[root@node00 ~]# mkdir home/test_stream/out
[root@node00 ~]# chmod -R 777 home/test_stream下载测试数据并将测试数据拷贝到 home/test_stream/data 目录下
创建第一个Pipelines
添加文件目录和修改属性
在侧边中选择Diretory或者Select Origin > Directory ,添加一个目录,选中目录后点击configuration> file,填写Files Directory 和File Name Pattern 还有选择Read Order 为最近更新时间
点击dataFormat 标签,修改选择如下选择
配置改Pipelines 的错误日志写入方式和目录
预览文件
将鼠标放到Directory1 上看到只剩一个Validation_0011的异常然后点击预览,Validation_0011仅表示目录原点尚未连接到任何内容,并且不会阻止数据预览
预览配置这里,我们选中预览来源,写入目的地和执行者,显示字段类型,记住配置,然后执行
添加流选择器
${record:value('/payment_type') == 'CRD'}
这里只查看卡类型为信用卡的记录
使用jython 脚本来筛选卡片
使用Jython脚本创建了一个额外的字段credit_card_type,并通过评估信用卡号码的前几位来生成信用卡类型。如果记录具有信用卡付款类型而没有相应的信用卡号码,则脚本将返回错误消息。(这里下载的完整的parcels包所以Jython插件无需再下载,否则需要在Package Manager 中联网下载)
以下脚本放在 Jython >configuration>Jython>Script 中
try:
for record in records:
cc = record.value['credit_card']
if cc == '':
error.write(record, "Payment type was CRD, but credit card was null")
continue
cc_type = ''
if cc.startswith('4'):
cc_type = 'Visa'
elif cc.startswith(('51','52','53','54','55')):
cc_type = 'MasterCard'
elif cc.startswith(('34','37')):
cc_type = 'AMEX'
elif cc.startswith(('300','301','302','303','304','305','36','38')):
cc_type = 'Diners Club'
elif cc.startswith(('6011','65')):
cc_type = 'Discover'
elif cc.startswith(('2131','1800','35')):
cc_type = 'JCB'
else:
cc_type = 'Other'
record.value['credit_card_type'] = cc_type
output.write(record)
except Exception as e:
error.write(record, e.message)
脚本在评估器中应当如下:使用Field Masker来屏蔽信用卡号码
现在让我们通过使用Field Masker来屏蔽信用卡号码,防止敏感信息到达内部数据库。
Field Masker提供固定和可变长度的掩码来屏蔽字段中的所有数据。要显示数据中的指定位置,您可以使用自定义掩码。要显示数据中的一组位置,可以使用正则表达式掩码来定义数据的结构,然后显示一个或多个组。
对于信用卡号码,我们将使用以下正则表达式来屏蔽除最后四位之外的所有数字:
信用卡号字段:/credit_card
(.*)([0-9]{4})
配置写入目的地
输出文件 tab 如下
Data Format 如下:
流程预览测试
可以查看到敏感信息被屏蔽
添加Expression Evaluator处理器
将一个Expression Evaluator处理器添加到画布并将Stream Selector的第二个默认流连 接到它。这会创建一个credit_card_type字段,指示信息不适用。
由于我们使用“n a”作为表达式的常量,因此我们不需要使用美元符号和括号来表达表达式。但是如果我们想要使用它们,我们可以将它们定义为$ {‘credit_card_type’}和$ {‘N A’}。
创建数据规则和告警信息
在我们运行基本管道之前,让我们添加一个数据规则和警报。数据规则是用户定义的规则,用于检查在两个阶段之间移动的数据。它们是查找异常值和异常数据的有效方法。
数据规则和警报需要详细了解通过管道的数据。对于更一般的管道监控信息,您可以使用度量标准规则和警报。
Jython Evaluator中的脚本为没有信用卡号码的信用卡交易创建错误记录。我们可以创建一个数据规则和警报,让我们知道记录数量何时达到指定阈值。
我们将使用带有record:value()函数的表达式来标识信用卡号码字段/credit_card为空的情况。该函数返回指定字段中的数据。
在流选择器和Jython评估器之间,选择链接或数据检查图标 > dataRules > 点击Add
校验执行
Jython评估器显示40条错误记录。单击错误记录编号以查看缓存的错误记录和相关错误消息的列表。
您也可以选择红色的数据检测图标来查看有关数据警报的信息并查看与数据警报相关的错误记录。