Flink的CEP编程之CEP案例(找到哪些用户名是恶意登录)

网友投稿 305 2022-11-17

Flink的CEP编程之CEP案例(找到哪些用户名是恶意登录)

需求:

从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户连续失败三次, 则是恶意登录),从而找到哪些用户名是恶意登录。

package cepimport org.apache.flink.cep.PatternSelectFunctionimport org.apache.flink.cep.scala.{CEP, PatternStream}import org.apache.flink.cep.scala.pattern.Patternimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport java.util/** * @Author yqq * @Date 2021/12/28 23:14 * @Version 1.0 * 需求:从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户10秒内连续失败三次, 则是恶意登录),从而找到哪些用户名是恶意登录。 * @param id 登录日志id * @param name 用户名 * @param eventType 登录类型(成功或失败) * @param eventTime 登录时间,精确到秒 */case class LoginEvent(id:Long,name:String,eventType:String,eventTime:Long)object TestCEPByLogin { def main(args: Array[String]): Unit = { val streamEvn = StreamExecutionEnvironment.getExecutionEnvironment streamEvn.setParallelism(1) streamEvn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设置时间语义 import org.apache.flink.streaming.api.scala._ //1.输入事件流的创建 //读取登录日志 val stream: DataStream[LoginEvent] = streamEvn.fromCollection(List( new LoginEvent(1, "yqq", "fail", 1577080451),//这里单位秒 new LoginEvent(2, "yqq", "fail", 1577080452), new LoginEvent(3, "yqq", "fail", 1577080453), new LoginEvent(4, "zifan", "fail", 1577080459), new LoginEvent(4, "zifan", "success", 1577080460), new LoginEvent(5, "yqq", "fail", 1577080463) )).assignAscendingTimestamps(_.eventTime*1000) //指定EventTime的时候必须确保到时间戳(毫秒) //2.定义模式(Pattern) val pattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("start").where(_.eventType.equals("fail")) .next("fail2").where(_.eventType.equals("fail")) .next("fail3").where(_.eventType.equals("fail")) .within(Time.seconds(10)) //时间限制, //3.检测Pattern val patternStream: PatternStream[LoginEvent] = CEP.pattern(stream.keyBy(_.name), pattern) //根据用户名分组 //4.选择结果并输出 val result: DataStream[String] = patternStream.select(new PatternSelectFunction[LoginEvent, String] { override def select(map: util.Map[String, util.List[LoginEvent]]): String = { val keyIter: util.Iterator[String] = map.keySet().iterator() val e1: LoginEvent = map.get(keyIter.next()).iterator().next() val e2: LoginEvent = map.get(keyIter.next()).iterator().next() val e3: LoginEvent = map.get(keyIter.next()).iterator().next() "用户名:" + e1.name + "登录时间" + ":" + e1.eventTime + ":" + e2.eventTime + ":" + e3.eventTime } }) result.print() streamEvn.execute() }}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:springboot 整合druid及配置依赖
下一篇:Flink的CEP编程之Pattern API
相关文章

 发表评论

暂时没有评论,来抢沙发吧~