Python备忘
1. 自定义多线程程序:
import concurrent.futures
import threadingclass CustomThreadPool:def __init__(self, max_workers):self.max_workers = max_workersself.pool = concurrent.futures.ThreadPoolExecutor(max_workers)self.running_num = 0self.semaphore = threading.Semaphore(max_workers)def submit(self, fn, *args, **kwargs):self.semaphore.acquire() # 获取信号量(如果已满则阻塞)self.running_num += 1future = self.pool.submit(fn, *args, **kwargs)future.add_done_callback(self._callback)return futuredef _callback(self, future):self.running_num -= 1self.semaphore.release() # 释放信号量,允许新任务提交def shutdown(self):self.pool.shutdown()
以上程序实现:使用这个自定义线程池时,当并发任务数达到上限后,新提交的任务会被阻塞,直到有任务完成。
使用程序:
import time
import random
from custom_threadpool import CustomThreadPool # 假设上面的类保存在 custom_threadpool.py 中# 模拟下载图片的函数(实际应用中可替换为 requests.get 等真实下载逻辑)
def download_image(url):# 模拟网络请求延迟delay = random.uniform(0.5, 2.0)time.sleep(delay)# 模拟下载结果size = random.randint(100, 1000)print(f"下载完成: {url} (大小: {size}KB, 耗时: {delay:.2f}s)")return {"url": url, "size": size, "status": "success"}# 主程序
def main():# 创建一个最大并发数为3的线程池with CustomThreadPool(max_workers=3) as pool:# 模拟10个图片下载任务image_urls = [f"https://example.com/image_{i}.jpg" for i in range(1, 11)]# 提交所有下载任务futures = []for url in image_urls:future = pool.submit(download_image, url)futures.append(future)# 获取所有任务的结果(按完成顺序输出)for future in concurrent.futures.as_completed(futures):try:result = future.result()print(f"处理结果: {result['url']} ({result['size']}KB)")except Exception as e:print(f"下载失败: {e}")if __name__ == "__main__":start_time = time.time()main()print(f"总耗时: {time.time() - start_time:.2f}秒")
2.* ** 参数:
def example(a, b, *args, **kwargs):print(f"固定参数:a={a}, b={b}")print(f"位置参数:{args}")print(f"关键字参数:{kwargs}")example(1, 2, 3, 4, x=5, y=6)
# 输出:
# 固定参数:a=1, b=2
# 位置参数:(3, 4)
# 关键字参数:{'x': 5, 'y': 6}