Java多线程并发执行
Web开发中查询永远是效率要求最高的部分,查询耗时会影响系统的用户体验,计算机中无非是:时间换空间、空间换时间,当我们需要在较短时间内完成庞大的查询,就必须耗费空间即内存,多线程并发执行会使JVM的压力加大,所以多线程要注意防止内存溢出。
大数据的处理有很多方案,我们这里是使用将数据按日期分表存储,我们建立31张表,每张表代表一个日期,每条数据按其所在的日期存入表中,例如:1月1号的数据和2月1号的数据是存入1号表中,这样就将全年的数据分到31张表中存储,这样存储的数据就不便于查询,因此你我就需要进行多线程并发查询。
下面我就介绍如何开启31个线程,异步查询出每张表中用户的数据,并将查询结果同步填入集合当中,这里主要用到ExecutorService类与CountDownLatch类,ExecutorService是一个线程池类,用了管理线程的并发执行,CountDownLatch的作用类似于计数器,使主线程等待所有异步线程执行完毕再执行。
下面是一个完整的多线程并发查询的例子,我们可以通过这个例子来了解如何进行并发查询。
1 import java.util.ArrayList; 2 import java.util.Date; 3 import java.util.List; 4 import java.util.Set; 5 import java.util.concurrent.CountDownLatch; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 import org.hibernate.Session; 12 import org.hibernate.Transaction; 13 14 15 import com.zldb.db.D05DataUser; 16 import com.zldb.util.HibernateUtil; 17 import com.zldb.util.MyDateFormat; 18 19 public class ThreadDemo { 20 private Log log = LogFactory.getLog(ThreadDemo.class) ; 21 22 public List<D05DataUser> getList(String userId, String startDate, String endDate) { 23 List<D05DataUser> list = new ArrayList<D05DataUser>() ; 24 Session session = HibernateUtil.getSession() ; 25 //多线程并发查询共用一个Session,虽然查询用不到事务,但是需要开始事务 26 Transaction transaction = session.beginTransaction() ; 27 try { 28 //时间格式化 29 Date start = MyDateFormat.parseDate(startDate) ; 30 //由于结束时间格式化结果为例如:2014-03-20 00:00:00 ,所以这天的数据是差不出来的,需要加上一天并减去一毫秒,即是:2014-03-20 23:59:59 31 Date end = new Date(MyDateFormat.parseDate(endDate).getTime() + MyDateFormat.DATEMILLIS -1) ; 32 //根据开始时间和结束时间,获取被查询表的集合 33 Set<String> tableSet = MyDateFormat.tableName(startDate, endDate, "D05_Data_User_") ; 34 //创建线程池 35 ExecutorService threadPool = Executors.newCachedThreadPool() ; 36 //线程池辅助类,控制正在运行的线程总数 37 CountDownLatch latch = new CountDownLatch(tableSet.size()) ; 38 //循环查询每张表 39 for(String tableName : tableSet) { 40 //创建线程 41 QueryTask task = new QueryTask(session, start, end, userId, tableName, latch, list) ; 42 //将线程放入线程池中执行 43 threadPool.execute(task) ; 44 } 45 //等待线程池中的线程全部执行完毕 46 latch.await() ; 47 //关闭线程池 48 threadPool.shutdown() ; 49 transaction.commit() ; 50 } catch (Exception e) { 51 if(transaction != null) { 52 transaction.rollback() ; 53 } 54 e.printStackTrace() ; 55 log.error(e.getMessage()) ; 56 } finally { 57 HibernateUtil.closeSession() ; 58 } 59 //查询结果的集合需要排序,此处省略掉了 60 return list ; 61 } 62 63 } 64 65 class QueryTask implements Runnable{ 66 private Log log = LogFactory.getLog(QueryTask.class) ; 67 68 private Session session ; 69 private Date start ; 70 private Date end ; 71 private String userId ; 72 private String tableName ; 73 private CountDownLatch latch ; 74 private List<D05DataUser> userList ; 75 76 public QueryTask() { 77 super() ; 78 } 79 80 public QueryTask(Session session, Date start, Date end, String userId, 81 String tableName, CountDownLatch latch, List<D05DataUser> userList) { 82 super(); 83 this.session = session; 84 this.start = start; 85 this.end = end; 86 this.userId = userId; 87 this.tableName = tableName; 88 this.latch = latch; 89 this.userList = userList; 90 } 91 92 @SuppressWarnings("unchecked") 93 @Override 94 public void run() { 95 String sql = 96 " SELECT ID, CreateTime, UsedSpan, OriginalHeatIncrement, Power, HeatIncrement, ChargeIncrement" + 97 " FROM "+tableName+ 98 " WHERE UserID = :userId AND CreateTime BETWEEN :start AND :end" ; 99 try { 100 //查询一张表的数据 101 List<Object[]> objsList = session.createSQLQuery(sql) 102 .setString("userId", userId) 103 .setDate("start", start) 104 .setDate("end", end) 105 .list() ; 106 //解析数据,这里并没有查询全部封装到类的属性中,因为我并不需表中每一个字段的数据 107 List<D05DataUser> list = new ArrayList<D05DataUser>() ; 108 for(Object[] objs : objsList) { 109 D05DataUser d05 = new D05DataUser() ; 110 d05.setId(Integer.parseInt(objs[0].toString())) ; 111 d05.setCreateDate((Date) objs[1]) ; 112 d05.setUsedSpan(Integer.parseInt(objs[2].toString())) ; 113 d05.setOriginalHeatIncrement(Double.parseDouble(objs[3].toString())) ; 114 d05.setPower(Double.parseDouble(objs[4].toString())) ; 115 d05.setHeatIncrement(Double.parseDouble(objs[5].toString())) ; 116 d05.setChargeIncrement(Double.parseDouble(objs[6].toString())) ; 117 list.add(d05) ; 118 } 119 //这里必须用同步代码块,不然userList中的数据会出现缺失 120 synchronized (userList) { 121 userList.addAll(list) ; 122 } 123 } catch (Exception e) { 124 e.printStackTrace() ; 125 log.error(e.getMessage()) ; 126 } finally { 127 //每结束一个线程,就要总数减一 128 latch.countDown() ; 129 } 130 } 131 132 }
注意:CountDownLatch类使主线程等待异步线程执行的方法时 await(),如果缺少这个方法,异步线程还没有执行完毕,那么主线程就会继续往下执行,Session就会关闭,而异步线程就无法执行查询报错:Session is closed , 还有就是在QueryThread类的 run方法中,最后一定要执行 countDown() 方法, 不然主线程就无法知道异步线程是否执行完毕。
转载于:https://www.cnblogs.com/dengjunwu/articles/3613761.html