专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Flink 实时写入 Redis 实战教程,掌握 Bahir Connector 高效用法

在大数据实时处理场景中,Apache Flink 作为流式计算引擎的标杆,常需要与外部存储系统进行高效交互。Redis 作为高性能的内存键值数据库,常被用于实时缓存、状态存储和低延迟查询。本文将深入探讨如何在 Flink DataStream API 中通过 Redis Connector 实现数据的高效写入,覆盖核心原理、代码实践、调优技巧和常见问题解决方案。

1. 技术选型与架构设计

1.1 为何选择 Redis Connector?

  • 低延迟写入 :Redis 内存存储特性与 Flink 的实时处理能力完美契合。
  • 丰富数据结构 :支持 String、Hash、List、Set、Sorted Set 等结构,适应不同业务场景。
  • 生态兼容性 :通过 Bahir 等扩展库实现与 Flink 的无缝集成。

1.2 主流 Connector 对比

Connector 类型 优势 适用场景
Bahir Redis 官方推荐,轻量级,支持基础操作 简单 Key-Value 或 Hash 写入
自定义 SinkFunction 灵活性高,可深度定制 Redis 交互逻辑 复杂数据结构或需要事务支持的场景
Jedis/Pool 直接调用 性能可控,直接利用原生客户端 超高频写入或特殊连接池管理需求

在这使用 Apache Bahir 提供的 Redis Connector。


2. 环境准备与依赖配置

2.1 添加 Maven 依赖

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.0</version>
</dependency>

2.2 Redis 服务准备

确保Redis 实例已启动并开放访问权限,针对不同的部署模式选择对应的配置类:

部署模式 特点 适用场景 配置类
单机模式 单个 Redis 节点运行,无高可用性,适合开发测试或简单场景 本地开发、小规模数据存储 FlinkJedisPoolConfig
哨兵模式 通过 Sentinel 监控主从节点,主节点故障时自动切换从节点为新的主节点,实现高可用 对高可用性有要求的线上环境 FlinkJedisSentinelConfig
集群模式 数据分片存储在多个节点,支持水平扩展和高并发,客户端自动路由请求到正确的分片 大规模数据、高性能读写、分布式存储需求 FlinkJedisClusterConfig

单机模式默认端口 6379


3. 核心实现:从数据流到 Redis 写入

3.1 定义数据模型

假设处理用户行为事件 UserBehavior

public class SimpleUserBehavior {
    private Long userId;
    private Long timestamp;

    // 必须提供无参构造函数
    public SimpleUserBehavior() {
    }

    public SimpleUserBehavior(Long userId, Long timestamp) {
        this.userId = userId;
        this.timestamp = timestamp;
    }

    // Getter 和 Setter 方法
    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }
}

3.2 实现 RedisMapper

通过RedisMapper 接口定义数据到 Redis 命令的转换逻辑:

// 自定义 RedisMapper,定义如何将数据转换为 Redis 命令
public static class UserBehaviorRedisMapper implements RedisMapper<SimpleUserBehavior> {
    @Override
    public RedisCommandDescription getCommandDescription() {
        // 使用 SET 命令(Key-Value 直接存储)
        return new RedisCommandDescription(RedisCommand.SET);
    }

    @Override
    public String getKeyFromData(SimpleUserBehavior behavior) {
        // Redis Key: 用户ID
        return String.valueOf(behavior.getUserId());
    }

    @Override
    public String getValueFromData(SimpleUserBehavior behavior) {
        // Redis Value: 用户操作时间戳
        return String.valueOf(behavior.getTimestamp());
    }
}

3.3 配置连接池配置

单机模式下需要直接连接指定主机和端口的 Redis 实例:

// Redis 连接配置
FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder()
    .setHost("localhost")    // Redis 主机
    .setPort(6379)          // Redis 端口
    .setDatabase(0) // Redis 数据库
    .setTimeout(3000)
    .setMaxTotal(100) // 连接池最大连接数
    .build();

单机模式下,若主节点宕机,需手动恢复。

哨兵模式下通过 Sentinel 节点监控主从状态,需要指定 Sentinel 节点列表:

FlinkJedisSentinelConfig config = new FlinkJedisSentinelConfig.Builder()
    .setSentinels(new HashSet<>(Arrays.asList("sentinel1:26379", "sentinel2:26380"))) // Sentinel 节点列表
    .setMasterName("mymaster")      // Sentinel 监控的主节点名称
    .setDatabase(0)                 // 数据库索引
    .setSoTimeout(3000)             // Socket 超时时间(ms)
    .build();

集群模式下数据分片存储在多个节点,需要指定集群节点列表:

FlinkJedisClusterConfig config = new FlinkJedisClusterConfig.Builder()
    .setNodes(new HashSet<>(Arrays.asList("redis-node1:6379", "redis-node2:6380"))) // 集群节点列表
    .setMaxRedirects(3)            // 最大重定向次数(应对集群拓扑变化)
    .build();

3.4 构建 Redis Sink

// 创建 Sink
RedisSink<SimpleUserBehavior> redisSink = new RedisSink<>(
    redisConfig,
    new UserBehaviorRedisMapper()
);

// 将数据流接入 Sink
DataStream<UserBehavior> source = ... // 来自 Kafka 或其他 Source
source.addSink(redisSink); // 将数据写入 Redis

3.5 示例

下面示例中使用 DataGeneratorSource 随机构造用户ID以及时间戳生成随机用户写入 Redis 中存储每个用户的最新操作时间:

public class BahirRedisConnectorExample {
    private static final Logger LOG = LoggerFactory.getLogger(BahirRedisConnectorExample.class);

    public static void main(String[] args) throws Exception {
        // 1. 创建 Flink 流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. DataGeneratorSource
        RandomGenerator<SimpleUserBehavior> randomGenerator = new RandomGenerator<SimpleUserBehavior>() {
            @Override
            public SimpleUserBehavior next() {
                SimpleUserBehavior userBehavior = new SimpleUserBehavior(
                        random.nextLong(10000001, 90000001),
                        System.currentTimeMillis()
                );
                LOG.info("Source UserId: " + userBehavior.getUserId() + ", Timestamp: " + userBehavior.getTimestamp());
                return userBehavior;
            }
        };
        DataGeneratorSource<SimpleUserBehavior> generatorSource = new DataGeneratorSource<>(randomGenerator, 1L, 10L);
        DataStream<SimpleUserBehavior> source = env.addSource(generatorSource, "DataGeneratorSource")
                .returns(Types.POJO(SimpleUserBehavior.class));

        // 3. 配置 Redis 连接
        FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")    // Redis 主机
                .setPort(6379)          // Redis 端口
                .setDatabase(0) // Redis 数据库
                .setTimeout(3000)
                .setMaxTotal(100) // 连接池最大连接数
                .build();

        // 4. 创建 RedisSink
        RedisSink<SimpleUserBehavior> redisSink = new RedisSink<>(
                redisConfig,
                new UserBehaviorRedisMapper()
        );

        // 5. 将数据写入 Redis
        source.addSink(redisSink);

        // 6. 执行任务
        env.execute("BahirRedisConnectorExample");
    }

    // 自定义 RedisMapper,定义如何将数据转换为 Redis 命令
    public static class UserBehaviorRedisMapper implements RedisMapper<SimpleUserBehavior> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            // 使用 SET 命令(Key-Value 直接存储)
            return new RedisCommandDescription(RedisCommand.SET);
        }

        @Override
        public String getKeyFromData(SimpleUserBehavior behavior) {
            // Redis Key: 用户ID
            return String.valueOf(behavior.getUserId());
        }

        @Override
        public String getValueFromData(SimpleUserBehavior behavior) {
            // Redis Value: 用户操作时间戳
            return String.valueOf(behavior.getTimestamp());
        }
    }
}

运行上述程序随机生成了如下 10 个用户:

16:37:52,215 INFO  BahirRedisConnectorExample [] - Source UserId: 39429175, Timestamp: 1746952672215
16:37:52,215 INFO  BahirRedisConnectorExample [] - Source UserId: 82172014, Timestamp: 1746952672215
16:37:52,215 INFO  BahirRedisConnectorExample [] - Source UserId: 16610178, Timestamp: 1746952672215
16:37:52,215 INFO  BahirRedisConnectorExample [] - Source UserId: 75186643, Timestamp: 1746952672215
16:37:53,210 INFO  BahirRedisConnectorExample [] - Source UserId: 39159513, Timestamp: 1746952673210
16:37:53,210 INFO  BahirRedisConnectorExample [] - Source UserId: 73432909, Timestamp: 1746952673210
16:37:53,210 INFO  BahirRedisConnectorExample [] - Source UserId: 16058372, Timestamp: 1746952673210
16:37:53,210 INFO  BahirRedisConnectorExample [] - Source UserId: 69252173, Timestamp: 1746952673210
16:37:54,212 INFO  BahirRedisConnectorExample [] - Source UserId: 86405133, Timestamp: 1746952674212
16:37:54,212 INFO  BahirRedisConnectorExample [] - Source UserId: 65315201, Timestamp: 1746952674212

可以通过 Redis Cli 来验证写入是否成功:

127.0.0.1:6379> keys *
 1) "16058372"
 2) "75186643"
 3) "69252173"
 4) "86405133"
 5) "73432909"
 6) "39429175"
 7) "65315201"
 8) "16610178"
 9) "39159513"
10) "82172014"
127.0.0.1:6379> get 16058372
"1746952673210"
127.0.0.1:6379>

通过上述可以看到通过 Flink DataStream 写入 Redis 成功。

未经允许不得转载:搜云库技术团队 » Flink 实时写入 Redis 实战教程,掌握 Bahir Connector 高效用法

相关推荐

  • 暂无文章

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们