当前位置: 首页 > news >正文

kafka + flink +mysql 案例

假设你有两个Kafka主题:user_activities_topicproduct_views_topic,并且你希望将user_activities_topic中的数据写入到user_activities表,而将product_views_topic中的数据写入到product_views表。

maven

<dependencies>
    <!-- Apache Flink dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>

    <!-- MySQL JDBC Driver -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.26</version>
    </dependency>
</dependencies>

Flink Job 示例代码

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;

import java.util.Properties;

public class MultipleKafkaToFlinkToMysql {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置Kafka消费者属性
        Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProperties.setProperty("group.id", "test");

        // 创建第一个Kafka消费者 (User Activities)
        FlinkKafkaConsumer<String> userActivitiesConsumer = new FlinkKafkaConsumer<>(
                "user_activities_topic",
                new SimpleStringSchema(),
                kafkaProperties);

        // 创建第二个Kafka消费者 (Product Views)
        FlinkKafkaConsumer<String> productViewsConsumer = new FlinkKafkaConsumer<>(
                "product_views_topic",
                new SimpleStringSchema(),
                kafkaProperties);

        // 从Kafka获取用户活动数据流
        env.addSource(userActivitiesConsumer)
           .map(value -> {
               String[] parts = value.split(",");
               return new UserActivity(parts[0], parts[1]);
           })
           .addSink(JdbcSink.sink(
                   "INSERT INTO user_activities (user_id, activity) VALUES (?, ?)",
                   (statement, userActivity) -> {
                       statement.setString(1, userActivity.userId);
                       statement.setString(2, userActivity.activity);
                   },
                   new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                           .withUrl("jdbc:mysql://localhost:3306/your_database")
                           .withDriverName("com.mysql.cj.jdbc.Driver")
                           .withUsername("your_username")
                           .withPassword("your_password")
                           .build()));

        // 从Kafka获取产品浏览数据流
        env.addSource(productViewsConsumer)
           .map(value -> {
               String[] parts = value.split(",");
               return new ProductView(parts[0], Integer.parseInt(parts[1]));
           })
           .addSink(JdbcSink.sink(
                   "INSERT INTO product_views (user_id, product_id) VALUES (?, ?)",
                   (statement, productView) -> {
                       statement.setString(1, productView.userId);
                       statement.setInt(2, productView.productId);
                   },
                   new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                           .withUrl("jdbc:mysql://localhost:3306/your_database")
                           .withDriverName("com.mysql.cj.jdbc.Driver")
                           .withUsername("your_username")
                           .withPassword("your_password")
                           .build()));

        env.execute("Multiple Kafka to Multiple MySQL Tables with Flink");
    }

    // 用户活动类
    public static class UserActivity {
        public String userId;
        public String activity;

        public UserActivity(String userId, String activity) {
            this.userId = userId;
            this.activity = activity;
        }
    }

    // 产品浏览类
    public static class ProductView {
        public String userId;
        public int productId;

        public ProductView(String userId, int productId) {
            this.userId = userId;
            this.productId = productId;
        }
    }
}

当处理多个消费者和表时,直接为每个消费者编写独立的代码会导致代码冗长且难以维护。为了提高代码的可维护性和扩展性,可以采用一些设计模式和抽象方法来简化代码结构。以下是一些改进策略:

### 1. 使用工厂模式和配置文件

通过使用工厂模式和配置文件,可以将不同Kafka主题和MySQL表的映射关系抽象出来,从而减少重复代码。

### 2. 示例代码重构

下面是一个示例,展示了如何通过配置文件和工厂模式来管理多个Kafka消费者和相应的MySQL输出。

#### 2.1 配置文件 (`application.yaml`)

首先,定义一个配置文件来描述每个消费者的配置信息,包括Kafka主题、目标MySQL表名以及字段映射等。

consumers:
  - name: user_activities_consumer
    kafka_topic: user_activities_topic
    mysql_table: user_activities
    fields:
      - { index: 0, column: user_id }
      - { index: 1, column: activity }

  - name: product_views_consumer
    kafka_topic: product_views_topic
    mysql_table: product_views
    fields:
      - { index: 0, column: user_id }
      - { index: 1, column: product_id }

#### 2.2 工厂类 (`ConsumerFactory.java`)

创建一个工厂类,根据配置文件中的信息动态生成消费者并设置其数据处理逻辑。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import java.util.Properties;
import java.util.List;
import java.util.Map;

public class ConsumerFactory {

    public static void createAndRegisterConsumers(StreamExecutionEnvironment env, List<Map<String, Object>> consumers) {
        Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProperties.setProperty("group.id", "test");

        for (Map<String, Object> consumerConfig : consumers) {
            String kafkaTopic = (String) consumerConfig.get("kafka_topic");
            String mysqlTable = (String) consumerConfig.get("mysql_table");
            List<Map<String, Object>> fields = (List<Map<String, Object>>) consumerConfig.get("fields");

            FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                    kafkaTopic,
                    new SimpleStringSchema(),
                    kafkaProperties);

            env.addSource(kafkaConsumer)
               .map(value -> parseMessage(value, fields))
               .addSink(JdbcSink.sink(
                       generateInsertSQL(mysqlTable, fields),
                       (statement, record) -> populateStatement(statement, record, fields),
                       new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                               .withUrl("jdbc:mysql://localhost:3306/your_database")
                               .withDriverName("com.mysql.cj.jdbc.Driver")
                               .withUsername("your_username")
                               .withPassword("your_password")
                               .build()));
        }
    }

    private static Map<String, Object> parseMessage(String value, List<Map<String, Object>> fields) {
        String[] parts = value.split(",");
        return fields.stream().collect(Collectors.toMap(
                field -> (String) field.get("column"),
                field -> parts[(Integer) field.get("index")]
        ));
    }

    private static String generateInsertSQL(String table, List<Map<String, Object>> fields) {
        StringBuilder columns = new StringBuilder();
        StringBuilder placeholders = new StringBuilder();

        for (int i = 0; i < fields.size(); i++) {
            if (i > 0) {
                columns.append(", ");
                placeholders.append(", ");
            }
            columns.append(fields.get(i).get("column"));
            placeholders.append("?");
        }

        return "INSERT INTO " + table + " (" + columns.toString() + ") VALUES (" + placeholders.toString() + ")";
    }

    private static void populateStatement(java.sql.PreparedStatement statement, Map<String, Object> record, List<Map<String, Object>> fields) throws Exception {
        for (int i = 0; i < fields.size(); i++) {
            String column = (String) fields.get(i).get("column");
            Object value = record.get(column);
            if (value instanceof Integer) {
                statement.setInt(i + 1, (Integer) value);
            } else if (value instanceof String) {
                statement.setString(i + 1, (String) value);
            }
            // 其他类型可以根据需要添加
        }
    }
}

#### 2.3 主程序 (`Main.java`)

在主程序中加载配置文件,并调用工厂类来注册所有消费者。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.yaml.snakeyaml.Yaml;

import java.io.InputStream;
import java.util.List;
import java.util.Map;

public class Main {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Yaml yaml = new Yaml();
        InputStream inputStream = Main.class.getClassLoader().getResourceAsStream("application.yaml");
        Map<String, Object> config = yaml.load(inputStream);

        List<Map<String, Object>> consumers = (List<Map<String, Object>>) config.get("consumers");

        ConsumerFactory.createAndRegisterConsumers(env, consumers);

        env.execute("Multiple Kafka to Multiple MySQL Tables with Flink");
    }
}

### 关键点解释

1. **配置文件**:通过配置文件定义每个消费者的信息,使得添加新的消费者变得简单,只需修改配置文件即可。
   
2. **工厂模式**:使用工厂类 `ConsumerFactory` 根据配置动态创建消费者,并为其设置数据处理逻辑和输出目标。

3. **通用的数据处理逻辑**:`parseMessage` 方法根据配置文件中的字段映射解析消息,`generateInsertSQL` 和 `populateStatement` 方法则用于生成插入SQL语句和填充PreparedStatement。

4. **扩展性**:这种设计方式非常灵活,易于扩展。如果需要增加新的消费者或修改现有消费者的配置,只需更新配置文件而无需更改代码逻辑。

这种方法不仅减少了代码量,还提高了代码的可维护性和扩展性,使得系统更容易管理和维护。

相关文章:

  • [Web]get请求和post请求
  • 机器学校的考试风波:误差分析、过拟合和欠拟合
  • Java多线程和锁_八股场景题
  • 2025年人工智能的发展前景将呈现多维度、深层次的变革,涵盖技术突破、行业应用、算力基础设施、政策法规等多个领域.结合工作情况,个人看法参考。
  • Cocos Creator Shader入门实战(三):CCEffect参数配置讲解
  • 捌拾贰- 贝尔不等式 (2)
  • 大白话JavaScript闭包实现原理与在实际开发中的应用场景
  • AF3 correct_msa_restypes函数解读
  • mac本地代理nginx,解决跨域问题
  • 【Java代码审计 | 第六篇】XSS防范
  • 【React】React + Tailwind CSS 快速入门指南
  • VBA高级应用30例Excel中ListObject对象:提取表内单元格的格式元素
  • WPF 之SizeToContent
  • 8.1linux竞争与并发知识讲解(尽可能详细)_csdn
  • pta L1-003 个位数统计
  • LeetCode 738. 单调递增的数字 java题解
  • 2.装饰器模式
  • 计算机安全 第四节:访问控制(上)
  • Qt常用控件之分组框QGroupBox
  • Express Router 全面教程与最佳实践
  • 西安市未央区建设局官方网站/应用关键词优化
  • 旅游网站开发注意点/青岛网站制作推广
  • 想找做拼接屏的公司去哪个网站/百度识图在线识别
  • 镇江seo公司/宣城网站seo
  • 阿里云Windows网站建设/怎么开发一个网站
  • 浙江建设监理协会网站/关键词排名优化系统