.net 线程管理
最近在建一电子商务的网站,因早期没有那么时间、精力的投入寻找产品、拍摄产品图片,因此周末就花了两天动手做个简单的Spider(或称之为HTMLPARSER,只抓取商务网站想要的内容)。
在做Spider时,我们会常遇到一些概念:任务、线程、线程池、队列等。
那么在做的过程中,我着重解决以下问题:
1.线程的使用
背景:启动一个线程时,线程运行开始时做些什么,运行过程中做些什么,线程完成时又做些什么。
2.线程池的使用
背景:多个线程的管理,包括线程如何进入线程池、如何调度管理线程。因为大量的启用线程很容易引起以下问题:效率低下、引起内存溢出。
源码:
Thread:
public partial class Thread
{
public bool IsShutDown { get; set; }
public int ID { get; set; }
public string Name { get; set; }
public string State { get; set; }
public string Uri { get; set; }
#region observer 机制
public delegate void StartEventHandler(object sender, StartThreadEventArgs e);
public event StartEventHandler OnStart;
public delegate void CompleteEventHandler(object sender, CompleteThreadEventArgs e);
public event CompleteEventHandler OnComplete;
private void startThread()
{
if (OnStart != null)
{
var startEventArgs = new StartThreadEventArgs(EnumThreadHandlerState.Started.ToString(), string.Empty);
OnStart(this, startEventArgs);
}
}
private void completeThread()
{
if (OnComplete != null)
{
var CompleteEventArgs = new CompleteThreadEventArgs(EnumThreadHandlerState.Completed.ToString(), string.Empty);
OnComplete(this, CompleteEventArgs);
}
}
public void Start()
{
if (!IsShutDown)
{
startThread();
completeThread();
}
}
#endregion
}
{
public bool IsShutDown { get; set; }
public int ID { get; set; }
public string Name { get; set; }
public string State { get; set; }
public string Uri { get; set; }
#region observer 机制
public delegate void StartEventHandler(object sender, StartThreadEventArgs e);
public event StartEventHandler OnStart;
public delegate void CompleteEventHandler(object sender, CompleteThreadEventArgs e);
public event CompleteEventHandler OnComplete;
private void startThread()
{
if (OnStart != null)
{
var startEventArgs = new StartThreadEventArgs(EnumThreadHandlerState.Started.ToString(), string.Empty);
OnStart(this, startEventArgs);
}
}
private void completeThread()
{
if (OnComplete != null)
{
var CompleteEventArgs = new CompleteThreadEventArgs(EnumThreadHandlerState.Completed.ToString(), string.Empty);
OnComplete(this, CompleteEventArgs);
}
}
public void Start()
{
if (!IsShutDown)
{
startThread();
completeThread();
}
}
#endregion
}
ThreadController:
public class ThreadController
{
private readonly int _QueueLength = 25; //最大队列数
private readonly Queue<Thread> queueUse2Work; //工作队列
private Dictionary<int, Thread> _Threads;
public int TotalThreadCount { set; get; }
public event Action OnStartNextRound;
public event Action OnCompletedAllDownloading;
private int _SegmentCounter;
private int _Segment;
public ThreadController(int queueLength)
{
queueUse2Work = new Queue<Thread>();
this._QueueLength = queueLength;
}
public void StartDownLoading()
{
int result;
Math.DivRem(TotalThreadCount, _QueueLength, out result);
if (result > 0)
_Segment = 1 + TotalThreadCount / _QueueLength;
else
_Segment = TotalThreadCount / _QueueLength;
foreach (var thread in _Threads)
{
if (queueUse2Work.Count >= _QueueLength) break;
queueUse2Work.Enqueue(thread.Value);
}
while (queueUse2Work.Count > 0)
{
var thread = queueUse2Work.Dequeue();
thread.Start();
}
_SegmentCounter++;
if (_SegmentCounter == _Segment)
{
OnCompletedAllDownloading();
return;
}
if (queueUse2Work.Count == 0)
{
OnStartNextRound();
}
}
public void LoadThreads(Dictionary<int, Thread> threads)
{
_Threads = threads;
}
}
{
private readonly int _QueueLength = 25; //最大队列数
private readonly Queue<Thread> queueUse2Work; //工作队列
private Dictionary<int, Thread> _Threads;
public int TotalThreadCount { set; get; }
public event Action OnStartNextRound;
public event Action OnCompletedAllDownloading;
private int _SegmentCounter;
private int _Segment;
public ThreadController(int queueLength)
{
queueUse2Work = new Queue<Thread>();
this._QueueLength = queueLength;
}
public void StartDownLoading()
{
int result;
Math.DivRem(TotalThreadCount, _QueueLength, out result);
if (result > 0)
_Segment = 1 + TotalThreadCount / _QueueLength;
else
_Segment = TotalThreadCount / _QueueLength;
foreach (var thread in _Threads)
{
if (queueUse2Work.Count >= _QueueLength) break;
queueUse2Work.Enqueue(thread.Value);
}
while (queueUse2Work.Count > 0)
{
var thread = queueUse2Work.Dequeue();
thread.Start();
}
_SegmentCounter++;
if (_SegmentCounter == _Segment)
{
OnCompletedAllDownloading();
return;
}
if (queueUse2Work.Count == 0)
{
OnStartNextRound();
}
}
public void LoadThreads(Dictionary<int, Thread> threads)
{
_Threads = threads;
}
}
如何使用?
public class DownloadPDetails
{
private const int queueLength = 25;
private static readonly string categoryIndexSavedPath =
ConfigurationManager.AppSettings["CategoryIndexSavedPath"];
private static readonly string webSite = ConfigurationManager.AppSettings["WebSite"];
private readonly AvanterSpiderEntities context = new AvanterSpiderEntities();
private readonly ThreadController controller;
private int completedCount;
public Label LblProgress;
private int totalCount;
public DownloadPDetails()
{
controller = new ThreadController(queueLength);
}
public void Work()
{
totalCount =
context.ContentItem.Where(m => m.ContentType.Substring(0, 8) == "PDetails" & m.DetailsCatched == false).
Count();
controller.TotalThreadCount = totalCount;
controller.OnStartNextRound += controller_OnStartNextRound;
controller.OnCompletedAllDownloading += controller_OnCompletedDownloading;
controller_OnStartNextRound();
}
private static void thread_OnStartEvent(object sender, StartThreadEventArgs e)
{
string root = string.Format("{0}/PIndex/", categoryIndexSavedPath);
if (!Directory.Exists(root)) Directory.CreateDirectory(root);
var item = (Thread) sender;
var client = new WebClient();
client.Encoding = Encoding.UTF8;
var uri = new Uri(string.Format("{0}/{1}", webSite, item.Uri));
try
{
client.DownloadFile(uri, string.Format(@"{0}\{1}.html", root, item.ID));
}
catch (Exception)
{
}
}
private void thread_OnCompleteEvent(object sender, CompleteThreadEventArgs e)
{
lock (new object())
{
completedCount++;
}
if (LblProgress.InvokeRequired)
LblProgress.Invoke(new updateProgress(showProgress), completedCount, totalCount);
int itemId = ((Thread) sender).ID;
ContentItem item = context.ContentItem.Where(m => m.ID == itemId).First();
item.DetailsCatched = true;
context.SaveChanges();
}
private void showProgress(int nRecordcount, int nTotalRecordCount)
{
string processor = string.Format("完成了{0}/{1}条记录", nRecordcount, nTotalRecordCount);
LblProgress.Visible = true;
LblProgress.Text = processor;
}
private void controller_OnStartNextRound()
{
IQueryable<ContentItem> items =
context.ContentItem.Where(m => m.ContentType.Substring(0, 8) == "PDetails" & m.DetailsCatched == false).
Take(queueLength);
if (items.Count() == 0) return;
var list = new Dictionary<int, Thread>();
int i = 0;
foreach (ContentItem item in items)
{
i++;
var thread = new Thread
{
ID = item.ID,
Name = item.ID.ToString(),
State = "",
Uri = item.ContentUrl
};
thread.OnStart += thread_OnStartEvent;
thread.OnComplete += thread_OnCompleteEvent;
list.Add(i, thread);
}
controller.LoadThreads(list);
controller.StartDownLoading();
}
private static void controller_OnCompletedDownloading()
{
MessageBox.Show("任务完成");
}
#region Nested type: updateProgress
private delegate void updateProgress(int nRecordcount, int nTotalRecordCount);
#endregion
{
private const int queueLength = 25;
private static readonly string categoryIndexSavedPath =
ConfigurationManager.AppSettings["CategoryIndexSavedPath"];
private static readonly string webSite = ConfigurationManager.AppSettings["WebSite"];
private readonly AvanterSpiderEntities context = new AvanterSpiderEntities();
private readonly ThreadController controller;
private int completedCount;
public Label LblProgress;
private int totalCount;
public DownloadPDetails()
{
controller = new ThreadController(queueLength);
}
public void Work()
{
totalCount =
context.ContentItem.Where(m => m.ContentType.Substring(0, 8) == "PDetails" & m.DetailsCatched == false).
Count();
controller.TotalThreadCount = totalCount;
controller.OnStartNextRound += controller_OnStartNextRound;
controller.OnCompletedAllDownloading += controller_OnCompletedDownloading;
controller_OnStartNextRound();
}
private static void thread_OnStartEvent(object sender, StartThreadEventArgs e)
{
string root = string.Format("{0}/PIndex/", categoryIndexSavedPath);
if (!Directory.Exists(root)) Directory.CreateDirectory(root);
var item = (Thread) sender;
var client = new WebClient();
client.Encoding = Encoding.UTF8;
var uri = new Uri(string.Format("{0}/{1}", webSite, item.Uri));
try
{
client.DownloadFile(uri, string.Format(@"{0}\{1}.html", root, item.ID));
}
catch (Exception)
{
}
}
private void thread_OnCompleteEvent(object sender, CompleteThreadEventArgs e)
{
lock (new object())
{
completedCount++;
}
if (LblProgress.InvokeRequired)
LblProgress.Invoke(new updateProgress(showProgress), completedCount, totalCount);
int itemId = ((Thread) sender).ID;
ContentItem item = context.ContentItem.Where(m => m.ID == itemId).First();
item.DetailsCatched = true;
context.SaveChanges();
}
private void showProgress(int nRecordcount, int nTotalRecordCount)
{
string processor = string.Format("完成了{0}/{1}条记录", nRecordcount, nTotalRecordCount);
LblProgress.Visible = true;
LblProgress.Text = processor;
}
private void controller_OnStartNextRound()
{
IQueryable<ContentItem> items =
context.ContentItem.Where(m => m.ContentType.Substring(0, 8) == "PDetails" & m.DetailsCatched == false).
Take(queueLength);
if (items.Count() == 0) return;
var list = new Dictionary<int, Thread>();
int i = 0;
foreach (ContentItem item in items)
{
i++;
var thread = new Thread
{
ID = item.ID,
Name = item.ID.ToString(),
State = "",
Uri = item.ContentUrl
};
thread.OnStart += thread_OnStartEvent;
thread.OnComplete += thread_OnCompleteEvent;
list.Add(i, thread);
}
controller.LoadThreads(list);
controller.StartDownLoading();
}
private static void controller_OnCompletedDownloading()
{
MessageBox.Show("任务完成");
}
#region Nested type: updateProgress
private delegate void updateProgress(int nRecordcount, int nTotalRecordCount);
#endregion
代码
转载于:https://www.cnblogs.com/Breaker/archive/2010/03/24/1693698.html