golang爬虫项目Pholcus源码分析(二)
运行界面显示
以windows的exec文件为例
// +build windows
package exec
import (
"os"
"os/exec"
"os/signal"
"github.com/henrylee2cn/pholcus/config"
"github.com/henrylee2cn/pholcus/cmd" // cmd版
"github.com/henrylee2cn/pholcus/gui" // gui版
"github.com/henrylee2cn/pholcus/web" // web版
)
func run(which string) {
exec.Command("cmd.exe", "/c", "title", config.FULL_NAME).Start()
// 选择运行界面
switch which {
case "gui":
gui.Run()
case "cmd":
cmd.Run()
case "web":
fallthrough
default:
ctrl := make(chan os.Signal, 1)
signal.Notify(ctrl, os.Interrupt, os.Kill)
go web.Run()
<-ctrl
}
}
os/exec包的用法
https://cloud.tencent.com/developer/section/1143928
switch语句,在参数等于web的时候,fallthrough到下一步,执行default。
os/signal包用法
https://cloud.tencent.com/developer/section/1143930
将os.Interrupt和os.Kill信号中继到ctrl,收到中断信号则中断当前进程。
1.cmd
调用了cmd中的Run方法
第一句,app.LogicApp.Init(cache.Task.Mode, cache.Task.Port, cache.Task.Master)
查找app.LogicApp.init
如图,结构体Logic实现了接口App的所有方法,初始化了接口实例LogicApp。其中的init方法如下
// 使用App前必须先进行Init初始化(SetLog()除外)
func (self *Logic) Init(mode int, port int, master string, w ...io.Writer) App {
self.canSocketLog = false
if len(w) > 0 {
self.SetLog(w[0])
}
// 打开log
self.LogGoOn()
// 使用参数初始化Logic
self.AppConf.Mode, self.AppConf.Port, self.AppConf.Master = mode, port, master
self.Teleport = teleport.New()
self.TaskJar = distribute.NewTaskJar()
self.SpiderQueue = crawler.NewSpiderQueue()
self.CrawlerPool = crawler.NewCrawlerPool()
// 选择模式,server、client或者离线模式。
switch self.AppConf.Mode {
case status.SERVER:
logs.Log.EnableStealOne(false)
if self.checkPort() {
logs.Log.Informational(" !!当前运行模式为:[ 服务器 ] 模式!!")
self.Teleport.SetAPI(distribute.MasterApi(self)).Server(":" + strconv.Itoa(self.AppConf.Port))
}
case status.CLIENT:
if self.checkAll() {
logs.Log.Informational(" !!当前运行模式为:[ 客户端 ] 模式!!")
self.Teleport.SetAPI(distribute.SlaveApi(self)).Client(self.AppConf.Master, ":"+strconv.Itoa(self.AppConf.Port))
// 开启节点间log打印
self.canSocketLog = true
logs.Log.EnableStealOne(true)
go self.socketLog()
}
case status.OFFLINE:
logs.Log.EnableStealOne(false)
logs.Log.Informational(" !!当前运行模式为:[ 单机 ] 模式!!")
return self
default:
logs.Log.Warning(" * ——请指定正确的运行模式!——")
return self
}
return self
}
其中用到了teleport框架,信息如下:
https://studygolang.com/articles/10784
接下来执行run函数,
// https://github.com/henrylee2cn/pholcus/blob/master/cmd/pholcus-cmd.go
// 运行
func run() {
// 创建蜘蛛队列
sps := []*spider.Spider{}
*spiderflag = strings.TrimSpace(*spiderflag)
// 根据spiderflag创建蜘蛛队列
if *spiderflag == "*" {
sps = app.LogicApp.GetSpiderLib()
} else {
for _, idx := range strings.Split(*spiderflag, ",") {
idx = strings.TrimSpace(idx)
if idx == "" {
continue
}
i, _ := strconv.Atoi(idx)
sps = append(sps, app.LogicApp.GetSpiderLib()[i])
}
}
app.LogicApp.SpiderPrepare(sps).Run()
}
sps的生成过程:
// https://github.com/henrylee2cn/pholcus/blob/master/app/app.go
// 获取全部蜘蛛种类
func (self *Logic) GetSpiderLib() []*spider.Spider {
return self.SpiderSpecies.Get()
}
// SpiderSpecies初始化为spider.Species
func newLogic() *Logic {
return &Logic{
AppConf: cache.Task,
SpiderSpecies: spider.Species,
status: status.STOPPED,
Teleport: teleport.New(),
TaskJar: distribute.NewTaskJar(),
SpiderQueue: crawler.NewSpiderQueue(),
CrawlerPool: crawler.NewCrawlerPool(),
}
}
// https://github.com/henrylee2cn/pholcus/blob/master/app/spider/species.go
// Species下的Get方法
// 获取全部蜘蛛种类
func (self *SpiderSpecies) Get() []*Spider {
if !self.sorted {
l := len(self.list)
initials := make([]string, l)
newlist := map[string]*Spider{}
for i := 0; i < l; i++ {
initials[i] = self.list[i].GetName()
newlist[initials[i]] = self.list[i]
}
pinyin.SortInitials(initials)
for i := 0; i < l; i++ {
self.list[i] = newlist[initials[i]]
}
self.sorted = true
}
return self.list
}
然后调用了app.LogicApp.SpiderPrepare(sps).Run()
// https://github.com/henrylee2cn/pholcus/blob/master/app/app.go
// SpiderPrepare()必须在设置全局运行参数之后,Run()的前一刻执行
// original为spider包中未有过赋值操作的原始蜘蛛种类
// 已被显式赋值过的spider将不再重新分配Keyin
// client模式下不调用该方法
func (self *Logic) SpiderPrepare(original []*spider.Spider) App {
self.SpiderQueue.Reset()
// 遍历任务
for _, sp := range original {
spcopy := sp.Copy()
spcopy.SetPausetime(self.AppConf.Pausetime)
if spcopy.GetLimit() == spider.LIMIT {
spcopy.SetLimit(self.AppConf.Limit)
} else {
spcopy.SetLimit(-1 * self.AppConf.Limit)
}
// 调用蜘蛛队列的方法
self.SpiderQueue.Add(spcopy)
}
// 遍历自定义配置
self.SpiderQueue.AddKeyins(self.AppConf.Keyins)
return self // 返回了self,所以self.Run调用的是Logic实现的Run方法。
}
// SpiderQueue:蜘蛛队列
func newLogic() *Logic {
return &Logic{
AppConf: cache.Task,
SpiderSpecies: spider.Species,
status: status.STOPPED,
Teleport: teleport.New(),
TaskJar: distribute.NewTaskJar(),
SpiderQueue: crawler.NewSpiderQueue(), // SpiderQueue的初始化
CrawlerPool: crawler.NewCrawlerPool(),
}
}
// https://github.com/henrylee2cn/pholcus/blob/master/app/crawler/spiderqueue.go
// 采集引擎中规则队列
type (
SpiderQueue interface {
Reset() //重置清空队列
Add(*Spider)
AddAll([]*Spider)
AddKeyins(string) //为队列成员遍历添加Keyin属性,但前提必须是队列成员未被添加过keyin
GetByIndex(int) *Spider
GetByName(string) *Spider
GetAll() []*Spider
Len() int // 返回队列长度
}
sq struct {
list []*Spider
}
)
func NewSpiderQueue() SpiderQueue {
return &sq{
list: []*Spider{},
}
}
// Add方法只是为sq设置id后append进list中。
func (self *sq) Add(sp *Spider) {
sp.SetId(self.Len())
self.list = append(self.list, sp)
}
// 添加keyin,遍历蜘蛛队列得到新的队列(已被显式赋值过的spider将不再重新分配Keyin)
func (self *sq) AddKeyins(keyins string) {
keyinSlice := util.KeyinsParse(keyins)
if len(keyinSlice) == 0 {
return
}
unit1 := []*Spider{} // 不可被添加自定义配置的蜘蛛
unit2 := []*Spider{} // 可被添加自定义配置的蜘蛛
for _, v := range self.GetAll() {
if v.GetKeyin() == KEYIN {
unit2 = append(unit2, v)
continue
}
unit1 = append(unit1, v)
}
if len(unit2) == 0 {
logs.Log.Warning("本批任务无需填写自定义配置!\n")
return
}
self.Reset()
for _, keyin := range keyinSlice {
for _, v := range unit2 {
v.Keyin = keyin
nv := *v
self.Add((&nv).Copy())
}
}
if self.Len() == 0 {
self.AddAll(append(unit1, unit2...))
}
self.AddAll(unit1)
}
// App中最终的Run方法
// 加载完任务配置后开始运行任务
func (self *Logic) Run() {
// 确保开启log
self.LogGoOn()
// 确保任务列表不为空
if self.AppConf.Mode != status.CLIENT && self.SpiderQueue.Len() == 0 {
logs.Log.Warning(" * —— 亲,任务列表不能为空哦~")
self.LogRest()
return
}
// 设置管道符finish
self.finish = make(chan bool)
// sync包用法 https://studygolang.com/articles/11038?fr=sidebar
self.finishOnce = sync.Once{}
// 重置计数
self.sum[0], self.sum[1] = 0, 0
// 重置计时
self.takeTime = 0
// 设置状态
self.setStatus(status.RUN)
// 确保执行完成后设置状态为stop
defer self.setStatus(status.STOPPED)
// 任务执行
switch self.AppConf.Mode {
case status.OFFLINE:
self.offline()
case status.SERVER:
self.server()
case status.CLIENT:
self.client()
default:
return
}
// 通过close控制self.finish来标记任务结束
<-self.finish
}
// 返回当前运行状态
func (self *Logic) setStatus(status int) {
// 加读写锁,防止冲突
self.RWMutex.Lock()
defer self.RWMutex.Unlock()
self.status = status
}
下一篇是self.server、self.client、self.offline的执行