(Azure)PGSQL和redis 连通性测试 --code 备份
Azure databricks 测试pgsql
!pip install psycopg2-binary
import psycopg2
from psycopg2 import OperationalError
def test_postgresql_connection():# 从图片中提取的连接参数db_host = "数据库host"db_port = "5432"db_name = "database" # 已经存在的database数据库名db_user = "用户名" db_password = "密码"# SSL 模式设置为 require,这是 Azure PostgreSQL 的要求ssl_mode = "require"# 构建连接字符串connection_string = f"host={db_host} port={db_port} dbname={db_name} user={db_user} password={db_password} sslmode={ssl_mode}"print(f"连接字符串 :{connection_string}")try:# 尝试建立连接connection = psycopg2.connect(connection_string)# 创建游标对象cursor = connection.cursor()# 执行简单的查询验证连接cursor.execute("SELECT version();")db_version = cursor.fetchone()# 检查是否成功连接到特定数据库cursor.execute("SELECT current_database();")current_db = cursor.fetchone()print("✅ PostgreSQL conect success!")print(f"version: {db_version[0]}")print(f"database: {current_db[0]}") # 关闭连接cursor.close()connection.close()return Trueexcept OperationalError as e:print(f"❌ connect fail: {e}")return Falseexcept Exception as e:print(f"❌ unkonw error: {e}")return False
test_postgresql_connection()
开启了SSL的redis连通性测试
!pip install redis
import redis.cluster
import ssl
# Azure Redis 集群连接配置
redis_host = "redis的host"
redis_port = 6380 # 必须使用SSL端口
redis_password = "密码"
# 创建SSL上下文
context = ssl.create_default_context()
# 对于Azure Redis服务,通常可以放宽验证
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE# 建立集群连接
# 关键修改:使用 RedisCluster 而非 Redis
rc = redis.cluster.RedisCluster(host=redis_host,port=redis_port,password=redis_password,ssl=True,ssl_cert_reqs=None,ssl_ca_certs=None,decode_responses=True,# 以下是可选的集群相关参数,但通常默认即可skip_full_coverage_check=True, # 建议为Azure Redis启用ssl_context=context # 显式传入我们创建的SSL上下文
)# 测试连接
try:# 在集群模式下,ping() 会向所有主节点发送PING并返回一个字典# 更简单的测试是执行一个基本命令print(rc.ping()) # 可能会返回一个类似 {True: 2} 的字典,表示2个节点都返回了Trueprint("集群连接成功!")# 更可靠的测试是设置和获取一个测试值test_key="ellen_test"test_filed = "test1"test_value = "hello ellen"rc.hset(test_key, test_filed,test_value)rc.hset(test_key, "test2","harry")retrieved_value = rc.get("org:c29f4a4ad:user")print(f"从集群中读取的值: {retrieved_value}")
except Exception as e:print(f"连接或测试失败: {e}")
package org.batch.utils;
import java.io.Closeable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.BinaryJedisCluster;
import redis.clients.jedis.Client;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterConnectionHandler;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.PipelineBase;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.jedis.util.JedisClusterCRC16;
import redis.clients.jedis.util.SafeEncoder;/*** 在集群模式下提供批量操作的功能。 <br/>* 由于集群模式存在节点的动态添加删除,且client不能实时感知(只有在执行命令时才可能知道集群发生变更),* 因此,该实现不保证一定成功,建议在批量操作之前调用 refreshCluster() 方法重新获取集群信息。<br />* 应用需要保证不论成功还是失败都会调用close() 方法,否则可能会造成泄露。<br/>* 如果失败需要应用自己去重试,因此每个批次执行的命令数量需要控制。防止失败后重试的数量过多。<br />* 基于以上说明,建议在集群环境较稳定(增减节点不会过于频繁)的情况下使用,且允许失败或有对应的重试策略。<br />* <p>* 该类非线程安全** @author ellen shen* @since Ver 1.1*/
public class JedisClusterPipeline extends PipelineBase implements Closeable {private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class);// 部分字段没有对应的获取方法,只能采用反射来做// 你也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口private static final Field FIELD_CONNECTION_HANDLER;private static final Field FIELD_CACHE;static {FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");}private JedisSlotBasedConnectionHandler connectionHandler;private JedisClusterInfoCache clusterInfoCache;private Queue<Client> clients = new LinkedList<Client>(); // 根据顺序存储每个命令对应的Clientprivate Map<JedisPool, Jedis> jedisMap = new HashMap<JedisPool, Jedis>(); // 用于缓存连接private boolean hasDataInBuf = false; // 是否有数据在缓存区/*** 根据jedisCluster实例生成对应的JedisClusterPipeline** @param* @return*/public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {JedisClusterPipeline pipeline = new JedisClusterPipeline();pipeline.setJedisCluster(jedisCluster);return pipeline;}/*** 构建JedisClusterPipeline*/public JedisClusterPipeline() {}/*** 设置JedisCluster* @param jedis*/public void setJedisCluster(JedisCluster jedis) {connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);}/*** 刷新集群信息,当集群信息发生变更时调用** @param* @return*/public void refreshCluster() {connectionHandler.renewSlotCache();}/*** 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化*/public void sync() {innerSync(null);}/*** 同步读取所有数据 并按命令顺序返回一个列表** @return 按照命令的顺序返回所有的数据*/public List<Object> syncAndReturnAll() {List<Object> responseList = new ArrayList<Object>();innerSync(responseList);return responseList;}/*** 内部数据同步* @param formatted*/private void innerSync(List<Object> formatted) {try {for (Client client : clients) {// 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。// 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了Object data = generateResponse(client.getOne()).get();if (null != formatted) {formatted.add(data);}}} catch (JedisRedirectionException jre) {if (jre instanceof JedisMovedDataException) {// if MOVED redirection occurred, rebuilds cluster's slot cache,// recommended by Redis cluster specificationrefreshCluster();}throw jre;} finally {// 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染for (Jedis jedis : jedisMap.values()) {flushCachedData(jedis);}hasDataInBuf = false;close();}}/*** 关闭链接*/@Overridepublic void close() {clean();clients.clear();for (Jedis jedis : jedisMap.values()) {if (hasDataInBuf) {flushCachedData(jedis);}jedis.close();}jedisMap.clear();hasDataInBuf = false;}/*** 清除数据* @param jedis*/private void flushCachedData(Jedis jedis) {try {//jedis.getClient().flushAll();} catch (RuntimeException ex) {// 其中一个client出问题,后面出问题的几率较大}}@Overrideprotected Client getClient(String key) {byte[] bKey = SafeEncoder.encode(key);return getClient(bKey);}@Overrideprotected Client getClient(byte[] key) {Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));Client client = jedis.getClient();clients.add(client);return client;}/**** @param slot* @return*/private Jedis getJedis(int slot) {JedisPool pool = clusterInfoCache.getSlotPool(slot);// 添加空值检查if (pool == null) {// 刷新集群信息并重试refreshCluster();pool = clusterInfoCache.getSlotPool(slot);if (pool == null) {throw new JedisException("No available JedisPool for slot: " + slot +". Cluster nodes may be unavailable.");}}// 根据pool从缓存中获取JedisJedis jedis = jedisMap.get(pool);if (null == jedis) {jedis = pool.getResource();jedisMap.put(pool, jedis);}hasDataInBuf = true;return jedis;}/**** @param cls* @param fieldName* @return*/private static Field getField(Class<?> cls, String fieldName) {try {Field field = cls.getDeclaredField(fieldName);field.setAccessible(true);return field;} catch (NoSuchFieldException e) {throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);} catch (SecurityException e) {throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);}}/*** 获取value* @param obj* @param field* @param <T>* @return*/private static <T> T getValue(Object obj, Field field) {try {return (T) field.get(obj);} catch (IllegalArgumentException e) {LOGGER.error("get value fail", e);throw new RuntimeException(e);} catch (IllegalAccessException e) {LOGGER.error("get value fail", e);throw new RuntimeException(e);}}
}
package org.batch.utils;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.deloitte.cdp.batch.consts.SystemConst;
import org.deloitte.cdp.batch.io.writable.*;
import redis.clients.jedis.*;
import java.util.*;import static java.lang.System.exit;public class RedisUtils {private JedisCluster jedisCluster = null;private JedisPool pool = null;private PipelineBase pl = null;//自增ID生成序列前缀public static final String INCR_PRE = "incr:";private int i = 0;private boolean clusterEnable = true;public RedisUtils(boolean clusterEnable, String host, String port, String password,String confPath) {this.clusterEnable = clusterEnable;if (clusterEnable) {initCluster(host, port, password,confPath);} else {initPool(host, port, password,confPath);}}private void initCluster(String host, String port, String password,String confPath) {Set<HostAndPort> jedisClusterNodes = new HashSet<>();String[] hostStr = host.split(";");String[] portStr = port.split(";");for (int i = 0; i < hostStr.length; i++) {for (int k = 0; k < portStr.length; k++) {jedisClusterNodes.add(new HostAndPort(hostStr[i], Integer.parseInt(portStr[k])));}}setJedisCluster(password, jedisClusterNodes,confPath);}private void setJedisCluster(String password, Set<HostAndPort> jedisClusterNodes,String confPath) {JedisPoolConfig configRedis = new JedisPoolConfig();configRedis.setMaxTotal(50);//200configRedis.setMaxIdle(50);configRedis.setMinIdle(8);//设置最小空闲数configRedis.setMaxWaitMillis(10000);configRedis.setTestOnBorrow(true);configRedis.setTestOnReturn(true);//Idle时进行连接扫描configRedis.setTestWhileIdle(true);//表示idle object evitor两次扫描之间要sleep的毫秒数configRedis.setTimeBetweenEvictionRunsMillis(30000);//表示idle object evitor每次扫描的最多的对象数configRedis.setNumTestsPerEvictionRun(10);//表示一个对象至少停留在idle状态的最短时间,然后才能被idle object evitor扫描并驱逐;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义configRedis.setMinEvictableIdleTimeMillis(60000);if (StringUtils.isNotBlank(password)) {String sslStr = DBFSConfigReader.readValue(confPath,SystemConst.CONFIG_FILE, SystemConst.REDIS_SSL);SysPrint.goPrint("setJedisCluster sslStr",sslStr);if (StringUtils.isNotBlank(sslStr)) {boolean ssl = new Boolean(sslStr);jedisCluster = new JedisCluster(jedisClusterNodes, 10000, 10000, 10, password, "redisUpdate", configRedis, ssl);} else {jedisCluster = new JedisCluster(jedisClusterNodes, 10000, 10000, 10, password, configRedis);}} else {jedisCluster = new JedisCluster(jedisClusterNodes, 10000, 10000, 10, configRedis);}}/*** @param host* @param port* @param pass*/private void initPool(String host, String port, String pass,String confPath) {if (pool == null) {JedisPoolConfig config = new JedisPoolConfig();config.setMaxTotal(200);config.setMaxIdle(80);config.setMinIdle(10);//设置最小空闲数config.setMaxWaitMillis(10000);config.setTestOnBorrow(false);config.setTestOnReturn(false);//Idle时进行连接扫描config.setTestWhileIdle(true);//表示idle object evitor两次扫描之间要sleep的毫秒数config.setTimeBetweenEvictionRunsMillis(30000);//表示idle object evitor每次扫描的最多的对象数config.setNumTestsPerEvictionRun(10);//表示一个对象至少停留在idle状态的最短时间,然后才能被idle object evitor扫描并驱逐;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义config.setMinEvictableIdleTimeMillis(60000);if (StringUtils.isNotBlank(pass)) {String sslStr = DBFSConfigReader.readValue(confPath,SystemConst.CONFIG_FILE, SystemConst.REDIS_SSL);if (StringUtils.isNotBlank(sslStr)) {boolean ssl = new Boolean(sslStr);pool = new JedisPool(config, host, NumberUtils.toInt(port), 5000, pass,ssl);} else {pool = new JedisPool(config, host, NumberUtils.toInt(port), 5000, pass);}} else {pool = new JedisPool(config, host, NumberUtils.toInt(port), 5000);}}}private void getPipeline() {if (pl == null) {if (clusterEnable) {JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jedisCluster);jcp.refreshCluster();pl = jcp;} else {Jedis jedis = pool.getResource();pl = jedis.pipelined();}}}public Map<String, String> hgetAll(String key) {return jedisCluster.hgetAll(key);}public String get(String key) {return jedisCluster.get(key);}public String set(String key, String value) {return jedisCluster.set(key, value);}/****/public void flush() {try {if (clusterEnable && pl != null) {((JedisClusterPipeline) pl).sync();} else if (!clusterEnable && pl != null) {((Pipeline) pl).sync();}} catch (Exception e) {SysPrint.goPrint("flush SQLException", e.getMessage());}}public void close() {try {if (clusterEnable) {jedisCluster.close();} else {pool.close();}} catch (Exception e) {SysPrint.goPrint("close SQLException", e.getMessage());}}public void writeTags(String user, String type, String oper, TagListWritable tw, long curTime) {String userKey = SystemConst.USER.equals(type) ? BATCH_USER_PREX + user : type + ":" + user;getPipeline();if (oper.contains("td")) { // del allpl.del(userKey);} else {if(oper.contains("stag_td")){pl.hdel(userKey, STAG_PREX);pl.hdel(userKey, STAG_PII_PREX);}...pl.hset(userKey, SystemConst.UT, String.valueOf(curTime));}if ((++i) % 200 == 0) {flush();}}public Map<String, String> testReadFiledData() {Response <Map<String, String>> mapResponse = pl.hgetAll("ellen_test");flush();Map<String, String> resultMap = mapResponse.get();return resultMap;}
}
result.repartition(rparal).mapPartitions(ms => {val result = new ArrayBuffer[String]()val utils = new RedisUtils(cluster, redisHost, redisPort, redisPassword, confPath)ms.foreach(m => {utils.writeTags(m._1, m._2, m._3, m._4, curTime)})utils.flush()utils.close();result.iterator}).take(3).foreach(println)
pom文件
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.6.0</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions></dependency>
