Processor模块¶
本节介绍Processor子模块的详细安装步骤。 WeEvent服务的快速安装请参见快速安装 。在一台机器上详细安装,和通过快速安装然后把目标路径中的Processor子目录打包拷贝到这台机器,效果是一样的。
Processor为用户提供时序流分析和时间联动等。如果是第一次安装WeEvent,参见这里的系统要求 。以下安装以CentOS 7.2为例。
前置条件¶
Broker模块
必选配置,通过
Broker访问区块链。具体安装步骤,请参见Broker模块安装。
Governance模块
必选配置,通过
Governance从Web端调用Processor。修改配置文件./governance/conf/application-prod.properties,增加processor对应的ip和端口配置weevent.processor.url=http://127.0.0.1:7008。具体安装步骤,请参见Governance模块安装。
Mysql数据库
必选配置。
Processor通过Mysql存储数据。推荐安装
Mysql 5.6+版本。具体安装步骤,安装请参见Mysql安装 。
获取安装包¶
从github下载安装包weevent-processor-1.1.0.tar.gz,并且解压到/usr/local/weevent/下。
$ cd /usr/local/weevent/
$ wget https://github.com/WeBankFinTech/WeEvent/releases/download/v1.1.0/weevent-processor-1.1.0.tar.gz
$ tar -xvf weevent-processor-1.1.0.tar.gz
如果github下载速度慢,可以尝试国内下载链接。
解压后的目录结构如下
$ cd ./weevent-processor-1.1.0
$ tree -L 2
$ cd ./weevent-processor-1.1.0
$ tree -L 2
|-- apps
| `-- weevent-processor-1.1.0.jar
|-- check-service.sh
|-- conf
| |-- application-prod.properties
| |-- application.properties
| |-- log4j2.xml
| |-- mappers
| |-- processor.properties
|-- cep_rule.sql
|-- init-processor.sh
|-- processor.sh
|-- lib
修改配置文件¶
配置端口
在配置文件
./conf/application-prod.properties中,Processor的服务端口server.port,默认7008。server.port=7008
配置Mysql数据库 修改
datasource中的url配置、username、passwordspring.datasource.url=jdbc:mysql://127.0.0.1:3306/WeEvent_processor spring.datasource.driverClassName=org.mariadb.jdbc.Driver spring.jpa.database=mysql spring.datasource.username=****** spring.datasource.password=******
在配置文件processor.properties配置Mysql数据库,修改
datasource中的url配置、username、passwordorg.quartz.scheduler.instanceName=test org.quartz.jobStore.dataSource=WeEvent_processor org.quartz.threadPool.threadCount=20 org.quartz.threadPool.threadPriority=5
org.quartz.scheduler.instanceName当前Schedule name,用户可以修改org.quartz.dataSource数据库名称,默认为WeEvent_processor
注意:数据库要赋予配置账号创建库表的权限。
mysql
>> grant all privileges on . to 'test'@'%' identified by '123456';
>> flush privileges;
初始化系统,执行脚本init-processor.sh ,成功输出如下。否则,用户需要检查配置项是否正常。
$ ./init-processor.sh
init processor db success
服务启停¶
服务启动
通过
./processor.sh start命令启动服务,正常启动如下:
$ ./processor.sh start
start weevent-processor success (PID=53927)
add the crontab job success
通过./processor.sh stop命令停止服务。
进程启动后,会自动添加crontab监控任务./processor.sh monitor。
验证服务
通过
./check-service.sh命令检查服务功能是否正常。$ ./check-service.sh check processor service processor service is ok
界面展示¶
- 创建规则
{
"ruleName":"alarm",
"type":"json",
"payload":{
"temperate":30,
"humidity":0.5
}
}
- ruleName: 支持英文字母、数字、下划线、连字符
- type:改规则处理数据的格式,目前只支持JSON格式。
- 规则的详细描述。

设置触发
JSON数据可以映射为虚拟的表,其中Key对应表的列,Value对应列值,这样就可以使用SQL处理。为便于理解,我们将数据流转的一条规则抽象为一条SQL表达(类试MySQL语法):

例如某环境传感器用于火灾预警,可以采集温度、湿度及气压数据,上报数据内容如下:
{ "temperature":25.1, "humidity":65, "type":"warning", "range":"higher", }
假定温度大于38,湿度小于40时,需要触发报警,可以编写如下的SQL语句:
SELECT temperature, deviceName FROM ProductA WHERE temperature > 38 and humidity < 40
当上报的数据中,温度大于38且湿度小于40时,会触发该规则,并且解析数据中的温度、设备名称,用于进一步处理。
触发条件
(temperature > 38 and humidity < 40)Topic:自定义和通配符
MySQL 说明: JSON数据格式 SELECT语句中的字段,可以使用上报消息的payload解析结果,即JSON中的键值,也可以使用SQL内置的函数,比如deviceName。 支持*,不支持子SQL查询。
FROM FROM 可以填写Topic。Topic中的设备名(deviceName),用于匹配需要处理的设备消息Topic。当有符合Topic规则的消息到达时,消息的payload数据以JSON格式解析,并根据SQL语句进行处理(如果消息格式不合法,将忽略此消息)。 WHERE 规则触发条件,条件表达式。不支持子SQL查询。WHERE中可以使用的字段和SELECT语句一致,当接收到对应Topic的消息时,WHERE语句的结果会作为是否触发规则的判断条件。`WHERE temperature > 38 and humidity < 40` 表示温度大于38且湿度小于40时,才会触发该规则,执行配置。
- 可以进行单条件查询
>、<、>=、<=、<>、!=,具体详情见本章最后章节。
- 可以进行单条件查询

规则详情展示
用户可以继续编辑规则的规则描述、
SELECT、FROM、WHERE。
规则列表展示
用户可以查询规则、创建规则、编辑规则、启动规则、停止规则、删除规则。

命中逻辑说明¶
- 不支持嵌套查询、连表查询、自带函数查询、ORDER BY(ASC|DESC)
- 文本字段 vs. 数值字段
SELECT * FROM Websites WHERE country='CN';
- 支持的类型 运算符
| = | 等于 |
|---|---|
| <> | 不等于。注释:在 SQL 的一些版本中,该操作符可被写成 != |
| > | 大于 |
| < | 小于 |
| >= | 大于等于 |
| <= | 小于等于 |
数字类型
temperature=29;
temperature>29;
temperature>=29;
temperature<29;
temperature<=29;
temperature<>29;
文本类型
SELECT * FROM Websites WHERE facilicty-charater="warning";
- AND & OR 运算符
如果第一个条件和第二个条件都成立,则 and 运算符显示一条记录。
如果第一个条件和第二个条件中只要有一个成立,则 or 运算符显示一条记录。
SELECT * FROM Websites WHERE facilicty-charater="warning" and temperature > 50;
SELECT * FROM Websites WHERE temperature > 35 or facilicty-charater=="warning" ;
非聚合类的内置函数
- 数字计算abs(绝对值),ceil,floor ,round
SELECT * FROM Websites WHERE abs(temperature) > 50; SELECT * FROM Websites WHERE ceil(temperature)> 50; SELECT * FROM Websites WHERE floor(temperature) > 50; SELECT * FROM Websites WHERE round(temperature) > 50;
- 字符串拼接substring,concat,trim,lcase
SELECT * FROM Websites WHERE range.substring(6)=="warning-001"; SELECT * FROM Websites WHERE range.substring(5,10)=="test"; SELECT * FROM Websites WHERE range.concat(type)=="higherwarning"; SELECT * FROM Websites WHERE range.trim()=="higher"; SELECT * FROM Websites WHERE lcase(range)=="higher";
内置时间
时间选取now, currentDate,currentTime
SELECT now,currentDate,currentTime FROM Websites WHERE range.substring(type,6)=="warning-001";