Flink Keyed State 详解之七
Flink Queryable State 详解
1. 基本概念
Queryable State(可查询状态)是 Flink 提供的一种机制,允许在应用程序运行时从外部查询算子的状态。这使得用户可以在不中断流处理作业的情况下实时获取状态信息,对于监控、调试和实时数据服务非常有用。
1.1 核心特性
- 实时查询:在作业运行时实时查询状态
- 低延迟:提供毫秒级的查询响应
- 无侵入性:不需要修改现有流处理逻辑
- 安全性:支持访问控制和认证
1.2 工作原理
Queryable State 通过以下方式工作:
- 在算子中启用状态查询功能
- 启动查询服务监听指定端口
- 外部客户端通过网络连接查询状态
- 查询服务返回状态数据给客户端
2. 适用场景
2.1 实时监控和仪表板
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** 实时监控示例* 统计用户访问次数并支持外部查询*/
public class RealTimeMonitoringExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 创建用户访问数据流DataStream<String> userVisits = env.fromElements("user1", "user2", "user1", "user3", "user2", "user1");// 处理用户访问并维护访问计数状态DataStream<String> visitCounts = userVisits.keyBy(user -> user).map(new QueryableVisitCounter());visitCounts.print();env.execute("Real-time Monitoring Example");}/*** 支持查询的访问计数器*/public static class QueryableVisitCounter extends RichMapFunction<String, String> {private ValueState<Integer> countState;@Overridepublic void open(Configuration parameters) {ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("visit-count",Integer.class,0);// 启用可查询状态descriptor.setQueryable("user-visit-count");countState = getRuntimeContext().getState(descriptor);}@Overridepublic String map(String user) throws Exception {Integer currentCount = countState.value();currentCount++;countState.update(currentCount);return "User " + user + " has visited " + currentCount + " times";}}
}
2.2 实时数据服务
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** 实时数据服务示例* 维护用户配置信息并支持外部查询*/
public class RealTimeDataServiceExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 创建用户配置更新数据流DataStream<UserConfigUpdate> configUpdates = env.fromElements(new UserConfigUpdate("user1", "theme", "dark"),new UserConfigUpdate("user2", "language", "en"),new UserConfigUpdate("user1", "language", "zh"),new UserConfigUpdate("user3", "theme", "light"));// 处理配置更新并维护用户配置状态DataStream<String> configResults = configUpdates.keyBy(update -> update.userId).map(new QueryableUserConfigManager());configResults.print();env.execute("Real-time Data Service Example");}/*** 支持查询的用户配置管理器*/public static class QueryableUserConfigManager extends RichMapFunction<UserConfigUpdate, String> {private MapState<String, String> userConfigState;@Overridepublic void open(Configuration parameters) {MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>("user-config",String.class,String.class);// 启用可查询状态descriptor.setQueryable("user-config-store");userConfigState = getRuntimeContext().getMapState(descriptor);}@Overridepublic String map(UserConfigUpdate update) throws Exception {// 更新用户配置userConfigState.put(update.configKey, update.configValue);return "Updated user " + update.userId + " config: " + update.configKey + " = " + update.configValue;}}/*** 用户配置更新*/public static class UserConfigUpdate {public String userId;public String configKey;public String configValue;public UserConfigUpdate() {}public UserConfigUpdate(String userId, String configKey, String configValue) {this.userId = userId;this.configKey = configKey;this.configValue = configValue;}}
}
3. Queryable State 配置
3.1 服务端配置
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Queryable State 服务端配置示例*/
public class QueryableStateServerConfiguration {/*** 配置 Queryable State 服务端*/public static StreamExecutionEnvironment configureQueryableStateServer() {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Queryable State 服务端参数Configuration config = new Configuration();// 启用 Queryable State 服务端config.setBoolean(ConfigConstants.QUERYABLE_STATE_SERVER_ENABLE, true);// 设置 Queryable State 服务端端口范围config.setString(ConfigConstants.QUERYABLE_STATE_SERVER_PORT_RANGE, "9067-9077");// 设置 Queryable State 代理端口范围config.setString(ConfigConstants.QUERYABLE_STATE_PROXY_PORT_RANGE, "9069-9079");// 设置 Queryable State 代理网络线程数config.setInteger(ConfigConstants.QUERYABLE_STATE_PROXY_THREADS, 8);// 设置 Queryable State 服务端网络线程数config.setInteger(ConfigConstants.QUERYABLE_STATE_SERVER_THREADS, 8);// 应用配置env.configure(config, QueryableStateServerConfiguration.class.getClassLoader());return env;}/*** 启动 MiniCluster 并启用 Queryable State*/public static MiniCluster startMiniClusterWithQueryableState() throws Exception {MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build();MiniCluster miniCluster = new MiniCluster(cfg);miniCluster.start();return miniCluster;}
}
3.2 客户端配置
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.util.FlinkException;import java.net.InetAddress;
import java.util.concurrent.CompletableFuture;/*** Queryable State 客户端配置示例*/
public class QueryableStateClientConfiguration {/*** 创建 Queryable State 客户端*/public static QueryableStateClient createQueryableStateClient() throws Exception {// 创建客户端,连接到 Queryable State 代理QueryableStateClient client = new QueryableStateClient(InetAddress.getByName("localhost"),  // 代理主机9069  // 代理端口);return client;}/*** 查询状态数据*/public static CompletableFuture<String> queryState(QueryableStateClient client,JobID jobId,String key,String stateName) throws FlinkException {// 创建状态描述符ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>(stateName,BasicTypeInfo.STRING_TYPE_INFO);// 异步查询状态return client.getKvState(jobId,           // 作业IDstateName,       // 状态名称key,             // 查询键BasicTypeInfo.STRING_TYPE_INFO,  // 键类型信息stateDescriptor  // 状态描述符);}
}
4. 完整使用示例
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.net.InetAddress;
import java.util.concurrent.CompletableFuture;/*** Queryable State 完整使用示例*/
public class CompleteQueryableStateExample {public static void main(String[] args) throws Exception {// 启动 Flink 作业StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 启用 Queryable StateConfiguration config = new Configuration();config.setBoolean("queryable-state.server.enable", true);config.setString("queryable-state.server.ports", "9067-9077");config.setString("queryable-state.proxy.ports", "9069-9079");env.configure(config, CompleteQueryableStateExample.class.getClassLoader());// 创建用户事件数据流DataStream<UserEvent> userEvents = env.fromElements(new UserEvent("user1", "login", System.currentTimeMillis()),new UserEvent("user2", "login", System.currentTimeMillis() + 1000),new UserEvent("user1", "purchase", System.currentTimeMillis() + 2000),new UserEvent("user3", "login", System.currentTimeMillis() + 3000),new UserEvent("user2", "purchase", System.currentTimeMillis() + 4000));// 处理用户事件并维护状态DataStream<String> results = userEvents.keyBy(event -> event.userId).map(new QueryableUserActivityTracker());// 添加查询结果输出results.addSink(new QueryResultSink());// 获取作业ID并启动查询客户端JobID jobId = env.executeAsync("Queryable State Example").getJobID();// 启动查询客户端(在实际应用中,这可能是一个独立的服务)startQueryClient(jobId);// 等待作业完成Thread.sleep(10000);}/*** 启动查询客户端*/public static void startQueryClient(JobID jobId) {new Thread(() -> {try {// 创建查询客户端QueryableStateClient client = new QueryableStateClient(InetAddress.getByName("localhost"),9069);// 定期查询用户状态while (true) {try {// 查询 user1 的登录计数ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("login-count",Integer.class);CompletableFuture<Integer> future = client.getKvState(jobId,"user-login-count","user1",org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO,descriptor);Integer loginCount = future.get();System.out.println("User1 login count: " + loginCount);Thread.sleep(3000); // 每3秒查询一次} catch (Exception e) {System.err.println("Query failed: " + e.getMessage());Thread.sleep(3000);}}} catch (Exception e) {System.err.println("Client error: " + e.getMessage());}}).start();}/*** 支持查询的用户活动跟踪器*/public static class QueryableUserActivityTracker extends RichMapFunction<UserEvent, String> {private ValueState<Integer> loginCountState;private ValueState<Integer> purchaseCountState;@Overridepublic void open(Configuration parameters) {// 登录计数状态ValueStateDescriptor<Integer> loginDescriptor = new ValueStateDescriptor<>("login-count",Integer.class,0);loginDescriptor.setQueryable("user-login-count");loginCountState = getRuntimeContext().getState(loginDescriptor);// 购买计数状态ValueStateDescriptor<Integer> purchaseDescriptor = new ValueStateDescriptor<>("purchase-count",Integer.class,0);purchaseDescriptor.setQueryable("user-purchase-count");purchaseCountState = getRuntimeContext().getState(purchaseDescriptor);}@Overridepublic String map(UserEvent event) throws Exception {if ("login".equals(event.eventType)) {Integer loginCount = loginCountState.value();loginCount++;loginCountState.update(loginCount);return "User " + event.userId + " logged in (total: " + loginCount + ")";} else if ("purchase".equals(event.eventType)) {Integer purchaseCount = purchaseCountState.value();purchaseCount++;purchaseCountState.update(purchaseCount);return "User " + event.userId + " made a purchase (total: " + purchaseCount + ")";}return "User " + event.userId + " performed " + event.eventType;}}/*** 查询结果输出*/public static class QueryResultSink implements SinkFunction<String> {@Overridepublic void invoke(String value, Context context) {System.out.println("Activity: " + value);}}/*** 用户事件*/public static class UserEvent {public String userId;public String eventType;public long timestamp;public UserEvent() {}public UserEvent(String userId, String eventType, long timestamp) {this.userId = userId;this.eventType = eventType;this.timestamp = timestamp;}}
}
5. 安全配置
5.1 TLS/SSL 配置
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Queryable State 安全配置示例*/
public class QueryableStateSecurityConfiguration {/*** 配置 TLS/SSL 安全连接*/public static StreamExecutionEnvironment configureSecureQueryableState() {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Configuration config = new Configuration();// 启用 Queryable Stateconfig.setBoolean("queryable-state.server.enable", true);// 启用 TLS/SSLconfig.setBoolean("security.ssl.enabled", true);// 配置 SSL 证书config.setString("security.ssl.keystore", "/path/to/keystore.jks");config.setString("security.ssl.keystore-password", "keystore-password");config.setString("security.ssl.key-password", "key-password");config.setString("security.ssl.truststore", "/path/to/truststore.jks");config.setString("security.ssl.truststore-password", "truststore-password");// 配置 Queryable State 端口config.setString("queryable-state.server.ports", "9067-9077");config.setString("queryable-state.proxy.ports", "9069-9079");env.configure(config, QueryableStateSecurityConfiguration.class.getClassLoader());return env;}
}
5.2 认证配置
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Queryable State 认证配置示例*/
public class QueryableStateAuthenticationConfiguration {/*** 配置认证*/public static StreamExecutionEnvironment configureAuthenticatedQueryableState() {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Configuration config = new Configuration();// 启用 Queryable Stateconfig.setBoolean("queryable-state.server.enable", true);// 启用认证config.setBoolean("security.enabled", true);// 配置认证提供者config.setString("security.provider", "JAAS");config.setString("security.jaas.login-context-name", "QueryableStateLoginContext");config.setString("security.jaas.login-module", "com.example.QueryableStateLoginModule");// 配置 Queryable State 端口config.setString("queryable-state.server.ports", "9067-9077");config.setString("queryable-state.proxy.ports", "9069-9079");env.configure(config, QueryableStateAuthenticationConfiguration.class.getClassLoader());return env;}
}
6. 性能优化
6.1 客户端优化
import org.apache.flink.queryablestate.client.QueryableStateClient;/*** Queryable State 客户端性能优化示例*/
public class QueryableStateClientOptimization {/*** 配置高性能客户端*/public static QueryableStateClient configureHighPerformanceClient() throws Exception {QueryableStateClient client = new QueryableStateClient(java.net.InetAddress.getByName("localhost"),9069);// 配置客户端参数client.setMaxRetries(3);  // 最大重试次数client.setConnectTimeout(5000);  // 连接超时时间(毫秒)client.setReadTimeout(10000);  // 读取超时时间(毫秒)client.setRequestTimeout(15000);  // 请求超时时间(毫秒)return client;}
}
6.2 缓存优化
import org.apache.flink.queryablestate.client.QueryableStateClient;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;/*** 带缓存的 Queryable State 客户端*/
public class CachedQueryableStateClient {private final QueryableStateClient client;private final ConcurrentHashMap<String, CacheEntry> cache;private final long cacheTimeoutMs;public CachedQueryableStateClient(QueryableStateClient client, long cacheTimeoutMs) {this.client = client;this.cache = new ConcurrentHashMap<>();this.cacheTimeoutMs = cacheTimeoutMs;}/*** 查询状态(带缓存)*/public <T> T queryState(String cacheKey, java.util.function.Supplier<T> querySupplier) {CacheEntry entry = cache.get(cacheKey);// 检查缓存是否有效if (entry != null && (System.currentTimeMillis() - entry.timestamp) < cacheTimeoutMs) {return (T) entry.value;}// 缓存未命中或过期,执行查询T result = querySupplier.get();// 更新缓存cache.put(cacheKey, new CacheEntry(result, System.currentTimeMillis()));return result;}/*** 缓存条目*/private static class CacheEntry {final Object value;final long timestamp;CacheEntry(Object value, long timestamp) {this.value = value;this.timestamp = timestamp;}}
}
7. 最佳实践建议
7.1 使用建议
- 
合理启用: - 只对需要查询的状态启用 Queryable State
- 避免对所有状态都启用查询功能
 
- 
性能考虑: - 查询操作会带来网络开销
- 避免频繁查询大状态对象
- 考虑使用缓存减少重复查询
 
- 
安全性: - 在生产环境中启用认证和加密
- 限制对 Queryable State 的访问权限
- 监控查询活动
 
7.2 监控和维护
- 
监控指标: - 监控查询延迟和吞吐量
- 监控查询错误率
- 监控客户端连接数
 
- 
故障处理: - 实现查询重试机制
- 处理网络分区情况
- 准备降级方案
 
- 
容量规划: - 根据查询负载规划代理资源
- 监控内存使用情况
- 调整线程池配置
 
通过合理使用 Queryable State,可以在不中断流处理作业的情况下实时获取状态信息,为监控、调试和实时数据服务提供强大的支持。
