当前位置: 首页 > news >正文

【Flink银行反欺诈系统设计方案】1.短时间内多次大额交易场景的flink与cep的实现

【flink应用系列】1.Flink银行反欺诈系统设计方案

  • 1. 经典案例:短时间内多次大额交易
    • 1.1 场景描述
    • 1.2 风险判定逻辑
  • 2. 使用Flink实现
    • 2.1 实现思路
    • 2.2 代码实现
    • 2.3 使用Flink流处理
  • 3. 使用Flink CEP实现
    • 3.1 实现思路
    • 3.2 代码实现
  • 4. 总结

1. 经典案例:短时间内多次大额交易

1.1 场景描述

规则1:单笔交易金额超过10,000元。

规则2:同一用户在10分钟内进行了3次或更多次交易。

风险行为:同时满足规则1和规则2的交易行为。

1.2 风险判定逻辑

检测每笔交易是否满足“单笔交易金额超过10,000元”。

对同一用户,统计10分钟内的交易次数。

如果交易次数达到3次或更多,则判定为风险行为。

2. 使用Flink实现

2.1 实现思路

使用Flink的KeyedStream按用户分组。

使用ProcessFunction实现自定义窗口逻辑,统计10分钟内的交易次数。

结合规则1和规则2,判断是否为风险行为。

2.2 代码实现

// 定义交易数据POJO
public class Transaction {
    private String transactionId;
    private String userId;
    private Double amount;
    private Long timestamp;
    // getters and setters
}

// 定义风控结果POJO
public class RiskResult {
    private String userId;
    private String transactionId;
    private String riskLevel;
    private String actionTaken;
    private Long createTime;
    // getters and setters
}

// 实现风控逻辑
public class FraudDetectionProcessFunction extends KeyedProcessFunction<String, Transaction, RiskResult> {

    private transient ValueState<Integer> transactionCountState;
    private transient ValueState<Long> timerState;

    @Override
    public void open(Configuration parameters) {
        // 初始化状态
        ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>(
            "transactionCount", 
            Types.INT
        );
        transactionCountState = getRuntimeContext().getState(countDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
            "timerState", 
            Types.LONG
        );
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

    @Override
    public void processElement(
        Transaction transaction,
        Context ctx,
        Collector<RiskResult> out) throws Exception {

        // 规则1:单笔交易金额超过10,000元
        if (transaction.getAmount() > 10000) {
            // 更新交易次数
            Integer count = transactionCountState.value();
            if (count == null) {
                count = 0;
            }
            count += 1;
            transactionCountState.update(count);

            // 如果是第一次满足规则1,设置10分钟的定时器
            if (count == 1) {
                long timer = ctx.timestamp() + 10 * 60 * 1000; // 10分钟
                ctx.timerService().registerEventTimeTimer(timer);
                timerState.update(timer);
            }

            // 规则2:10分钟内交易次数达到3次
            if (count >= 3) {
                RiskResult result = new RiskResult();
                result.setUserId(transaction.getUserId());
                result.setTransactionId(transaction.getTransactionId());
                result.setRiskLevel("HIGH");
                result.setActionTaken("ALERT");
                result.setCreateTime(System.currentTimeMillis());
                out.collect(result);
            }
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<RiskResult> out) throws Exception {
        // 定时器触发时,重置状态
        transactionCountState.clear();
        timerState.clear();
    }
}

2.3 使用Flink流处理

java

DataStream<Transaction> transactionStream = env.addSource(transactionSource);

DataStream<RiskResult> riskResultStream = transactionStream
    .keyBy(Transaction::getUserId)
    .process(new FraudDetectionProcessFunction());

riskResultStream.addSink(new AlertSink());

3. 使用Flink CEP实现

Flink CEP(Complex Event Processing)是Flink提供的复杂事件处理库,适合处理基于时间序列的模式匹配。以下是使用Flink CEP实现上述风控规则的示例。

3.1 实现思路

定义模式:检测10分钟内3次或更多次大额交易。

使用Flink CEP的模式匹配功能,匹配符合条件的事件序列。

3.2 代码实现

java

// 定义交易数据POJO
public class Transaction {
    private String transactionId;
    private String userId;
    private Double amount;
    private Long timestamp;
    // getters and setters
}

// 定义风控结果POJO
public class RiskResult {
    private String userId;
    private List<String> transactionIds;
    private String riskLevel;
    private String actionTaken;
    private Long createTime;
    // getters and setters
}

// 实现风控逻辑
public class FraudDetectionCEP {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 交易数据流
        DataStream<Transaction> transactionStream = env.addSource(transactionSource)
            .assignTimestampsAndWatermarks(
                WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
            );

        // 按用户分组
        KeyedStream<Transaction, String> keyedStream = transactionStream
            .keyBy(Transaction::getUserId);

        // 定义CEP模式:10分钟内3次或更多次大额交易
        Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("first")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction transaction) {
                    return transaction.getAmount() > 10000;
                }
            })
            .next("second")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction transaction) {
                    return transaction.getAmount() > 10000;
                }
            })
            .next("third")
            .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction transaction) {
                    return transaction.getAmount() > 10000;
                }
            })
            .within(Time.minutes(10));

        // 应用模式
        PatternStream<Transaction> patternStream = CEP.pattern(keyedStream, pattern);

        // 生成风控结果
        DataStream<RiskResult> riskResultStream = patternStream.process(
            new PatternProcessFunction<Transaction, RiskResult>() {
                @Override
                public void processMatch(
                    Map<String, List<Transaction>> match,
                    Context ctx,
                    Collector<RiskResult> out) throws Exception {

                    RiskResult result = new RiskResult();
                    result.setUserId(match.get("first").get(0).getUserId());
                    result.setTransactionIds(
                        match.values().stream()
                            .flatMap(List::stream)
                            .map(Transaction::getTransactionId)
                            .collect(Collectors.toList())
                    );
                    result.setRiskLevel("HIGH");
                    result.setActionTaken("ALERT");
                    result.setCreateTime(System.currentTimeMillis());
                    out.collect(result);
                }
            }
        );

        // 输出结果
        riskResultStream.addSink(new AlertSink());

        env.execute("Fraud Detection with Flink CEP");
    }
}

4. 总结

Flink实现:通过KeyedProcessFunction和状态管理实现多规则匹配。

Flink CEP实现:通过定义复杂事件模式,简化多规则匹配的逻辑。

适用场景:

Flink适合需要自定义逻辑的场景。

Flink CEP适合基于时间序列的模式匹配场景。

通过以上实现,可以高效检测银行交易中的风险行为,并根据需要扩展更多规则

相关文章:

  • FineReport 操作注意
  • 【AI Guide】AI面试攻略只用看这一篇就够了!力争做全网最全的AI面试攻略——大模型(三十一)BASE与CHAT模型
  • Ubuntu20.04双系统安装及软件安装(一):系统安装
  • 备考六级:词汇量积累(day2)
  • 【流行病学】Melodi-Presto因果关联工具
  • vim 调整字体
  • vue3中 组合式测试深入组件—事件 与 $emit()
  • 硬件学习笔记--47 LDO相关基础知识介绍
  • wpa_supplicant源码剖析-main.c解析
  • 策略模式的C++实现示例
  • Java基础关键_017_集合(一)
  • 3D手眼标定转换详细实施步骤及原理概述
  • 初始提示词(Prompting)
  • dify智能体之不知道有啥用系列之使用chatflow让selenium打开特定网址
  • 数据序列化协议 Protobuf 3 介绍(Go 语言)
  • e2studio开发RA4M2(17)----ADC扫描多通道采样
  • 基于Java+SpringCloud+Vue的前后端分离的房产销售平台
  • 从小米汽车召回看智驾“命门”:智能化时代 — 时间就是安全
  • 【零基础到精通Java合集】第二十四集:ZGC收集器详解
  • 工业巡检进入‘无人化+AI’时代:无人机智能系统的落地实践与未来
  • 湛江企业建站系统/百度售后服务电话人工
  • 微信商城小程序怎么做/厦门seo百度快照优化
  • 营销型公司网站有哪些/推广产品
  • 做牛仔裤的视频网站/最佳磁力搜索引擎
  • 怎样搭建一个企业网站/南昌seo全网营销
  • seo快速排名优化/手机优化大师下载2022