golang爬虫项目Pholcus源码分析(二)

运行界面显示

以windows的exec文件为例

// +build windows
package exec

import (
	"os"
	"os/exec"
	"os/signal"

	"github.com/henrylee2cn/pholcus/config"

	"github.com/henrylee2cn/pholcus/cmd" // cmd版
	"github.com/henrylee2cn/pholcus/gui" // gui版
	"github.com/henrylee2cn/pholcus/web" // web版
)

func run(which string) {
	exec.Command("cmd.exe", "/c", "title", config.FULL_NAME).Start()

	// 选择运行界面
	switch which {
	case "gui":
		gui.Run()

	case "cmd":
		cmd.Run()

	case "web":
		fallthrough
	default:
		ctrl := make(chan os.Signal, 1)
		signal.Notify(ctrl, os.Interrupt, os.Kill)
		go web.Run()
		<-ctrl
	}
}

os/exec包的用法

https://cloud.tencent.com/developer/section/1143928

switch语句,在参数等于web的时候,fallthrough到下一步,执行default。

os/signal包用法

https://cloud.tencent.com/developer/section/1143930

将os.Interrupt和os.Kill信号中继到ctrl,收到中断信号则中断当前进程。

1.cmd

调用了cmd中的Run方法

golang爬虫项目Pholcus源码分析(二)

第一句,app.LogicApp.Init(cache.Task.Mode, cache.Task.Port, cache.Task.Master)

查找app.LogicApp.init

golang爬虫项目Pholcus源码分析(二)

如图,结构体Logic实现了接口App的所有方法,初始化了接口实例LogicApp。其中的init方法如下

// 使用App前必须先进行Init初始化(SetLog()除外)
func (self *Logic) Init(mode int, port int, master string, w ...io.Writer) App {
	self.canSocketLog = false
	if len(w) > 0 {
		self.SetLog(w[0])
	}
    // 打开log
	self.LogGoOn()

    // 使用参数初始化Logic
	self.AppConf.Mode, self.AppConf.Port, self.AppConf.Master = mode, port, master
	self.Teleport = teleport.New()
	self.TaskJar = distribute.NewTaskJar()
	self.SpiderQueue = crawler.NewSpiderQueue()
	self.CrawlerPool = crawler.NewCrawlerPool()

    // 选择模式,server、client或者离线模式。
	switch self.AppConf.Mode {
	case status.SERVER:
		logs.Log.EnableStealOne(false)
		if self.checkPort() {
			logs.Log.Informational("                                                                                               !!当前运行模式为:[ 服务器 ] 模式!!")
			self.Teleport.SetAPI(distribute.MasterApi(self)).Server(":" + strconv.Itoa(self.AppConf.Port))
		}

	case status.CLIENT:
		if self.checkAll() {
			logs.Log.Informational("                                                                                               !!当前运行模式为:[ 客户端 ] 模式!!")
			self.Teleport.SetAPI(distribute.SlaveApi(self)).Client(self.AppConf.Master, ":"+strconv.Itoa(self.AppConf.Port))
			// 开启节点间log打印
			self.canSocketLog = true
			logs.Log.EnableStealOne(true)
			go self.socketLog()
		}
	case status.OFFLINE:
		logs.Log.EnableStealOne(false)
		logs.Log.Informational("                                                                                               !!当前运行模式为:[ 单机 ] 模式!!")
		return self
	default:
		logs.Log.Warning(" *    ——请指定正确的运行模式!——")
		return self
	}
	return self
}

其中用到了teleport框架,信息如下:

https://studygolang.com/articles/10784

接下来执行run函数,

// https://github.com/henrylee2cn/pholcus/blob/master/cmd/pholcus-cmd.go
// 运行
func run() {
	// 创建蜘蛛队列
	sps := []*spider.Spider{}
	*spiderflag = strings.TrimSpace(*spiderflag)

    // 根据spiderflag创建蜘蛛队列
	if *spiderflag == "*" {
		sps = app.LogicApp.GetSpiderLib()

	} else {
		for _, idx := range strings.Split(*spiderflag, ",") {
			idx = strings.TrimSpace(idx)
			if idx == "" {
				continue
			}
			i, _ := strconv.Atoi(idx)
			sps = append(sps, app.LogicApp.GetSpiderLib()[i])
		}
	}

	app.LogicApp.SpiderPrepare(sps).Run()
}

sps的生成过程:

// https://github.com/henrylee2cn/pholcus/blob/master/app/app.go
// 获取全部蜘蛛种类
func (self *Logic) GetSpiderLib() []*spider.Spider {
	return self.SpiderSpecies.Get()
}


// SpiderSpecies初始化为spider.Species
func newLogic() *Logic {
	return &Logic{
		AppConf:       cache.Task,
		SpiderSpecies: spider.Species,
		status:        status.STOPPED,
		Teleport:      teleport.New(),
		TaskJar:       distribute.NewTaskJar(),
		SpiderQueue:   crawler.NewSpiderQueue(),
		CrawlerPool:   crawler.NewCrawlerPool(),
	}
}


// https://github.com/henrylee2cn/pholcus/blob/master/app/spider/species.go
// Species下的Get方法

// 获取全部蜘蛛种类
func (self *SpiderSpecies) Get() []*Spider {
	if !self.sorted {
		l := len(self.list)
		initials := make([]string, l)
		newlist := map[string]*Spider{}
		for i := 0; i < l; i++ {
			initials[i] = self.list[i].GetName()
			newlist[initials[i]] = self.list[i]
		}
		pinyin.SortInitials(initials)
		for i := 0; i < l; i++ {
			self.list[i] = newlist[initials[i]]
		}
		self.sorted = true
	}
	return self.list
}


然后调用了app.LogicApp.SpiderPrepare(sps).Run()

// https://github.com/henrylee2cn/pholcus/blob/master/app/app.go

// SpiderPrepare()必须在设置全局运行参数之后,Run()的前一刻执行
// original为spider包中未有过赋值操作的原始蜘蛛种类
// 已被显式赋值过的spider将不再重新分配Keyin
// client模式下不调用该方法
func (self *Logic) SpiderPrepare(original []*spider.Spider) App {
	self.SpiderQueue.Reset()
	// 遍历任务
	for _, sp := range original {
		spcopy := sp.Copy()
		spcopy.SetPausetime(self.AppConf.Pausetime)
		if spcopy.GetLimit() == spider.LIMIT {
			spcopy.SetLimit(self.AppConf.Limit)
		} else {
			spcopy.SetLimit(-1 * self.AppConf.Limit)
		}
        // 调用蜘蛛队列的方法
		self.SpiderQueue.Add(spcopy)
	}
	// 遍历自定义配置
	self.SpiderQueue.AddKeyins(self.AppConf.Keyins)
	return self // 返回了self,所以self.Run调用的是Logic实现的Run方法。
}

// SpiderQueue:蜘蛛队列
func newLogic() *Logic {
	return &Logic{
		AppConf:       cache.Task,
		SpiderSpecies: spider.Species,
		status:        status.STOPPED,
		Teleport:      teleport.New(),
		TaskJar:       distribute.NewTaskJar(),
		SpiderQueue:   crawler.NewSpiderQueue(), // SpiderQueue的初始化
		CrawlerPool:   crawler.NewCrawlerPool(),
	}
}

// https://github.com/henrylee2cn/pholcus/blob/master/app/crawler/spiderqueue.go

// 采集引擎中规则队列
type (
	SpiderQueue interface {
		Reset() //重置清空队列
		Add(*Spider)
		AddAll([]*Spider)
		AddKeyins(string) //为队列成员遍历添加Keyin属性,但前提必须是队列成员未被添加过keyin
		GetByIndex(int) *Spider
		GetByName(string) *Spider
		GetAll() []*Spider
		Len() int // 返回队列长度
	}
	sq struct {
		list []*Spider
	}
)

func NewSpiderQueue() SpiderQueue {
	return &sq{
		list: []*Spider{},
	}
}

// Add方法只是为sq设置id后append进list中。
func (self *sq) Add(sp *Spider) {
	sp.SetId(self.Len())
	self.list = append(self.list, sp)
}

// 添加keyin,遍历蜘蛛队列得到新的队列(已被显式赋值过的spider将不再重新分配Keyin)
func (self *sq) AddKeyins(keyins string) {
	keyinSlice := util.KeyinsParse(keyins)
	if len(keyinSlice) == 0 {
		return
	}

	unit1 := []*Spider{} // 不可被添加自定义配置的蜘蛛
	unit2 := []*Spider{} // 可被添加自定义配置的蜘蛛
	for _, v := range self.GetAll() {
		if v.GetKeyin() == KEYIN {
			unit2 = append(unit2, v)
			continue
		}
		unit1 = append(unit1, v)
	}

	if len(unit2) == 0 {
		logs.Log.Warning("本批任务无需填写自定义配置!\n")
		return
	}

	self.Reset()

	for _, keyin := range keyinSlice {
		for _, v := range unit2 {
			v.Keyin = keyin
			nv := *v
			self.Add((&nv).Copy())
		}
	}
	if self.Len() == 0 {
		self.AddAll(append(unit1, unit2...))
	}

	self.AddAll(unit1)
}


// App中最终的Run方法
// 加载完任务配置后开始运行任务
func (self *Logic) Run() {
	// 确保开启log
	self.LogGoOn()
    // 确保任务列表不为空
	if self.AppConf.Mode != status.CLIENT && self.SpiderQueue.Len() == 0 {
		logs.Log.Warning(" *     —— 亲,任务列表不能为空哦~")
		self.LogRest()
		return
	}

    // 设置管道符finish
	self.finish = make(chan bool)

    // sync包用法 https://studygolang.com/articles/11038?fr=sidebar
	self.finishOnce = sync.Once{}
	// 重置计数
	self.sum[0], self.sum[1] = 0, 0
	// 重置计时
	self.takeTime = 0
	// 设置状态
	self.setStatus(status.RUN)

    // 确保执行完成后设置状态为stop
	defer self.setStatus(status.STOPPED)
	// 任务执行
	switch self.AppConf.Mode {
	case status.OFFLINE:
		self.offline()
	case status.SERVER:
		self.server()
	case status.CLIENT:
		self.client()
	default:
		return
	}
    // 通过close控制self.finish来标记任务结束
	<-self.finish
}

// 返回当前运行状态
func (self *Logic) setStatus(status int) {
    // 加读写锁,防止冲突
	self.RWMutex.Lock()
	defer self.RWMutex.Unlock()
	self.status = status
}

下一篇是self.server、self.client、self.offline的执行