当前位置: 首页 > news >正文

突破性能瓶颈:Scala爬虫的大规模数据处理方案

咱们今天就用Scala来写个高效好用的网络爬虫!Scala这语言处理并发任务特别拿手,尤其搭配Akka工具库,就像给爬虫装上了多线程引擎,能同时处理大量网页抓取。下面我会带你一步步实现:从发起网页请求、解析内容到管理抓取节奏,完整走一遍流程。你会发现用Scala写爬虫不仅性能强劲,代码结构还特别清晰!

在这里插入图片描述

下面是一个完整的 Scala 爬虫教程,重点展示如何利用 Scala 的并发特性(特别是 Akka Actor 模型)构建高性能的网络爬虫。

项目概述

我们将创建一个能够并发爬取多个网页的爬虫系统,包含以下功能:

  • 并发发送 HTTP 请求
  • 解析 HTML 内容提取链接和数据
  • 控制请求频率避免被封禁
  • 简单的数据存储

环境设置

1. 创建 SBT 项目

首先创建一个新的 SBT 项目,在 build.sbt 中添加以下依赖:

name := "scala-web-crawler"
version := "1.0"
scalaVersion := "2.13.8"val akkaVersion = "2.6.19"
val akkaHttpVersion = "10.2.9"libraryDependencies ++= Seq(// Akka Actor"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,"com.typesafe.akka" %% "akka-stream" % akkaVersion,// Akka HTTP"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,// HTML 解析"org.jsoup" % "jsoup" % "1.14.3",// 日志"ch.qos.logback" % "logback-classic" % "1.2.11",// 数据库存储 (SQLite)"org.xerial" % "sqlite-jdbc" % "3.36.0.3","com.typesafe.slick" %% "slick" % "3.3.3","com.typesafe.slick" %% "slick-hikaricp" % "3.3.3"
)

实现代码

1. 定义消息协议(Actor 通信)

// src/main/scala/crawler/Messages.scala
package crawlersealed trait CrawlerMessage
case class StartCrawling(urls: List[String]) extends CrawlerMessage
case class CrawlUrl(url: String, depth: Int) extends CrawlerMessage
case class PageFetched(url: String, content: String, depth: Int) extends CrawlerMessage
case class ParsePage(url: String, content: String, depth: Int) extends CrawlerMessage
case class LinksFound(url: String, links: List[String], depth: Int) extends CrawlerMessage
case object CrawlingCompleted extends CrawlerMessage

2. 实现网页下载器

// src/main/scala/crawler/Downloader.scala
package crawlerimport akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshalimport scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}object Downloader {sealed trait Commandcase class Download(url: String, depth: Int, replyTo: ActorRef[PageFetched]) extends Commanddef apply(): Behavior[Command] = Behaviors.setup { context =>implicit val ec: ExecutionContext = context.executionContextBehaviors.receiveMessage {case Download(url, depth, replyTo) =>context.log.info(s"Downloading: $url (depth: $depth)")// 使用 Akka HTTP 发送请求Http(context.system.classicSystem).singleRequest(HttpRequest(uri = url)).onComplete {case Success(response) =>response.status match {case StatusCodes.OK =>Unmarshal(response.entity).to[String].onComplete {case Success(content) =>replyTo ! PageFetched(url, content, depth)case Failure(ex) =>context.log.error(s"Failed to parse content from $url: ${ex.getMessage}")}case _ =>context.log.warn(s"Request to $url failed with status: ${response.status}")}case Failure(ex) =>context.log.error(s"Request to $url failed: ${ex.getMessage}")}Behaviors.same}}
}

3. 实现页面解析器

// src/main/scala/crawler/Parser.scala
package crawlerimport akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import org.jsoup.Jsoup
import org.jsoup.nodes.Documentimport scala.collection.JavaConverters._
import scala.util.Tryobject Parser {sealed trait Commandcase class Parse(html: String, url: String, depth: Int, replyTo: ActorRef[LinksFound]) extends Commanddef apply(): Behavior[Command] = Behaviors.receive { (context, message) =>message match {case Parse(html, url, depth, replyTo) =>context.log.info(s"Parsing page: $url")Try {val doc: Document = Jsoup.parse(html, url)// 提取页面标题val title = doc.title()// 提取所有链接val links = doc.select("a[href]").asScala.map(_.attr("abs:href")).filter(link => link.startsWith("http") && !link.contains("#")).toList// 提取正文文本(简单示例)val text = doc.body().text()(title, links, text)} match {case scala.util.Success((title, links, text)) =>// 这里可以添加代码将提取的数据存储到数据库context.log.info(s"Found ${links.size} links on $url")replyTo ! LinksFound(url, links, depth)case scala.util.Failure(exception) =>context.log.error(s"Failed to parse $url: ${exception.getMessage}")}Behaviors.same}}
}

4. 实现爬虫管理器(核心 Actor)

// src/main/scala/crawler/CrawlerManager.scala
package crawlerimport akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.actor.typed.scaladsl.{Behaviors, PoolRouter, Routers}
import scala.collection.mutable
import scala.concurrent.duration._object CrawlerManager {sealed trait Commandcase class Start(urls: List[String], maxDepth: Int) extends Commandcase class AddUrl(url: String, depth: Int) extends Commandcase class CrawlCompleted(url: String) extends Commanddef apply(): Behavior[Command] = Behaviors.setup { context =>// 创建下载器和解析器的池val downloaderPool: ActorRef[Downloader.Command] = {val pool = PoolRouter(Downloader()).withRoundRobinRouting().withPoolSize(5)context.spawn(pool, "downloader-pool")}val parserPool: ActorRef[Parser.Command] = {val pool = PoolRouter(Parser()).withRoundRobinRouting().withPoolSize(3)context.spawn(pool, "parser-pool")}// 状态管理val visitedUrls = mutable.Set.empty[String]val pendingUrls = mutable.Queue.empty[(String, Int)]var activeTasks = 0var maxDepth = 3Behaviors.receiveMessage {case Start(urls, depth) =>maxDepth = depthurls.foreach(url => self ! AddUrl(url, 0))Behaviors.samecase AddUrl(url, depth) =>if (!visitedUrls.contains(url) && depth <= maxDepth) {visitedUrls.add(url)pendingUrls.enqueue((url, depth))context.self ! processNextUrl}Behaviors.samecase processNextUrl: CrawlCompleted.type =>activeTasks -= 1if (pendingUrls.nonEmpty) {val (url, depth) = pendingUrls.dequeue()activeTasks += 1// 发送下载请求downloaderPool ! Downloader.Download(url, depth, context.self)} else if (activeTasks == 0) {context.log.info("Crawling completed!")// 所有任务完成}Behaviors.samecase PageFetched(url, content, depth) =>// 将页面发送给解析器parserPool ! Parser.Parse(content, url, depth, context.self)Behaviors.samecase LinksFound(url, links, depth) =>context.log.info(s"Found ${links.size} links on $url")// 将新链接添加到队列中links.foreach { link =>context.self ! AddUrl(link, depth + 1)}// 标记当前URL完成context.self ! CrawlCompletedBehaviors.samecase _ => Behaviors.unhandled}}// 内部消息对象private case object processNextUrl extends Command
}

5. 实现速率限制中间件

// src/main/scala/crawler/RateLimiter.scala
package crawlerimport akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._object RateLimiter {sealed trait Commandcase class Request(url: String, replyTo: ActorRef[PageFetched]) extends Commandcase object Tick extends Commanddef apply(delay: FiniteDuration): Behavior[Command] = Behaviors.withTimers { timers =>timers.startTimerWithFixedDelay(Tick, delay)Behaviors.setup { context =>val queue = scala.collection.mutable.Queue.empty[(String, ActorRef[PageFetched])]Behaviors.receiveMessage {case Request(url, replyTo) =>queue.enqueue((url, replyTo))Behaviors.samecase Tick =>if (queue.nonEmpty) {val (url, replyTo) = queue.dequeue()context.log.debug(s"Processing: $url")// 这里实际应该发送请求,但为了简化,我们直接返回模拟数据replyTo ! PageFetched(url, s"Content of $url", 0)}Behaviors.same}}}
}

6. 主应用程序

// src/main/scala/crawler/Main.scala
package crawlerimport akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviorsobject Main extends App {// 创建Actor系统val rootBehavior = Behaviors.setup[Nothing] { context =>// 创建爬虫管理器val crawlerManager = context.spawn(CrawlerManager(), "crawler-manager")// 启动爬虫val startUrls = List("https://httpbin.org/html","https://httpbin.org/links/10/0","https://httpbin.org/links/10/1")crawlerManager ! CrawlerManager.Start(startUrls, maxDepth = 2)Behaviors.empty}val system = ActorSystem[Nothing](rootBehavior, "WebCrawlerSystem")// 10分钟后关闭系统import system.executionContextsystem.scheduler.scheduleOnce(10.minutes) {system.terminate()}
}

7. 配置 application.conf

akka {loglevel = INFOhttp {host-connection-pool {max-connections = 20max-open-requests = 256}}
}# 数据库配置(如果需要存储数据)
db {config = "crawler.db"driver = "org.sqlite.JDBC"url = "jdbc:sqlite:crawler.db"connectionPool = disabledkeepAliveConnection = true
}

运行爬虫

1、编译项目:

sbt compile

2、运行爬虫:

sbt run

扩展功能

这个基础爬虫可以进一步扩展:

1、数据存储:添加数据库支持,存储爬取的内容

2、代理支持:添加代理轮询功能避免IP被封

3、分布式爬取:使用Akka Cluster实现分布式爬虫

4、JS渲染:集成Selenium或HtmlUnit处理JavaScript渲染的页面

5、任务持久化:添加检查点机制,支持中断后恢复爬取

总结

这个教程展示了如何利用Scala和Akka构建一个高性能的并发网络爬虫。通过Actor模型,我们可以轻松实现:

1、高并发处理:使用Actor池并行处理多个请求

2、容错能力:Actor之间的隔离确保单个页面解析失败不会影响整体系统

3、流量控制:通过队列和速率限制避免过度请求

4、可扩展性:可以轻松添加新功能组件

Scala的函数式特性和强大的类型系统,结合Akka的Actor模型,使得构建健壮、高性能的爬虫系统变得更加容易。

看,用Scala写的爬虫不仅功能完整,还自带并发加速buff!Actor模型让各个抓取任务互不干扰,就算某个网页解析失败也不会拖垮整个系统。这种架构稍加改造就能应对更复杂的场景,比如分布式爬取或应对反爬机制。希望这个示例能让你体会到Scala在处理高并发任务时的独特魅力,下次需要抓取大规模数据时,不妨考虑让它大显身手!


文章转载自:

http://2Zb1EUye.fqsxf.cn
http://h9yuGOtr.fqsxf.cn
http://cA6MflXJ.fqsxf.cn
http://kX1IYkNe.fqsxf.cn
http://wiXMMwd7.fqsxf.cn
http://UaExTMWq.fqsxf.cn
http://Bmvncihx.fqsxf.cn
http://j40Wp6jC.fqsxf.cn
http://0EleNl1r.fqsxf.cn
http://9rVEJVFO.fqsxf.cn
http://HJBZkp5G.fqsxf.cn
http://DZ0GGTfd.fqsxf.cn
http://SIW8FuBM.fqsxf.cn
http://tBFdOKEg.fqsxf.cn
http://Nr1aYtIk.fqsxf.cn
http://RispJWue.fqsxf.cn
http://gi6YHvNo.fqsxf.cn
http://SfnY6Gj7.fqsxf.cn
http://WN86X2EM.fqsxf.cn
http://nr0BWnEE.fqsxf.cn
http://9KEVjssx.fqsxf.cn
http://jlIyYlpC.fqsxf.cn
http://j5cIpXxf.fqsxf.cn
http://Xi2fQdF6.fqsxf.cn
http://HE36BJ0u.fqsxf.cn
http://B9NiYkIw.fqsxf.cn
http://Ru5jmjLM.fqsxf.cn
http://VP12F9Ix.fqsxf.cn
http://ELImlPIc.fqsxf.cn
http://cQhOlJhi.fqsxf.cn
http://www.dtcms.com/a/364474.html

相关文章:

  • MobileCLIP2:优化多模态强化训练,实现低延迟下的图像文本模型性能突破
  • 计算机网络:(十七)应用层(上)应用层基本概念
  • 企业资源计划(ERP)在制造业的定制化架构
  • 基于物联网的智慧用电云平台构建与火灾防控应用研究
  • SpringAI应用开发面试全流程:技术原理、架构优化与企业场景解析
  • 网络:tcp
  • JavaScript闭包、原型链、事件循环,一文彻底讲明白(小白也能懂)
  • 学习笔记:MYSQL(3)(常用函数和约束)
  • dvs dvfs avs avfs 低功耗技术的区别
  • DBSCAN 密度聚类分析算法
  • 第二章:技术基石:写出“活”的代码(2)
  • DVWA靶场通关笔记-存储型XSS(Stored Impossible级别)
  • K8s Pod CrashLoopBackOff:从镜像构建到探针配置的排查过程
  • AI Agent重构SOC:下一代智能安全运营平台的能力跃迁
  • 解析SWOT分析和PV/UV这两个在产品与运营领域至关重要的知识点。
  • 【LeetCode 热题 100】72. 编辑距离——(解法一)记忆化搜索
  • 【LM358AD差分检测电压差】2022-11-30
  • 刻意练习理论
  • C++ 多线程编程
  • 【IO】进程间通信(IPC)练习
  • CAD/BIM软件产品技术深度分析文章写作计划
  • 7.4Element Plus 分页与表格组件
  • java spring cloud 企业工程管理系统源码+二次开发+定制化服务
  • 深兰科技AI问诊助手走访打浦桥街道社区卫生服务中心
  • Llama.cpp与CUDA Graph:深度学习推理框架的兼容性深度解析
  • Elasticsearch(text和keyword)区别分析
  • 怎么删除word空白页?【图文详解】删除最后一页空白页?5种删除word文档空白页方法?
  • Few-Shot Prompting 实战:用5个例子让GPT-4学会复杂任务
  • 线程与同步
  • 【Unity Shader学习笔记】(四)Shader编程