多线程(callable+futureTask)去组装数据,并批量入库

      商城项目,收货地址会用到4级地址(省,市,县,镇),我们只用到了特定城市的。 但是我想通过京东的接口把全部的数据拿出来。于是就有 ------多线程(callable+futureTask)去组装数据。

---------------------------

    先贴下controller的代码:

package com.truelore.xunjia.wssc.test.controller;


import com.alibaba.fastjson.JSON;
import com.truelore.common.util.WsscHttpClientUtils;
import com.truelore.xunjia.wssc.dao.ProvinceDao;
import com.truelore.xunjia.wssc.entity.WsscArea;
import com.truelore.xunjia.wssc.entity.WsscCity;
import com.truelore.xunjia.wssc.entity.WsscProvince;
import com.truelore.xunjia.wssc.service.AreaService;
import com.truelore.xunjia.wssc.service.CityService;
import com.truelore.xunjia.wssc.service.ProvinceService;
import com.truelore.xunjia.wssc.service.TownService;
import com.truelore.xunjia.wssc.vo.JdaddressVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * 拿到京东全部的4级地址
 *  fujf
 *  201853114:49:49
 */
@RequestMapping("/wssc/test")
@Controller("testGetallJdaddress")
public class TestGetallJdaddress extends SuperToken {

    @Autowired
    private ProvinceService provinceService;

    @Autowired
    private ProvinceDao provinceDao;

    @Autowired
    private CityService cityService;

    @Autowired
    private AreaService areaService;

    @Autowired
    private TownService townService;

    @RequestMapping("/saveAddress")
    public void saveAddress(String level, HttpServletResponse resp){

        //level ="2";
        Long start_time = System.currentTimeMillis();   //开始时间

        List<String> parentids = new ArrayList<String>();
         if("1".equals(level)){
        //获取省的时候,没有parentid.  直接保存好了
             List<JdaddressVo> jdaddressVos = getaddressList(level, null);
            //save(jdaddressVos,level);
             batsave(jdaddressVos,level);

        }else if("2".equals(level)){

           List<WsscProvince> ProvinceList = provinceService.getLocalProvince(99);
             for (WsscProvince wsscProvince : ProvinceList) {
                 parentids.add(wsscProvince.getLocalProvinceId());
             }

        }else if("3".equals(level)){

             List<WsscCity> cityList = cityService.queryCityByCondition(null,null,null,99);
             for (WsscCity wsscCity : cityList) {
                 parentids.add(wsscCity.getLocalCityId());
             }

        }else if("4".equals(level)){
             List<WsscArea> areaList = areaService.queryAreaByCondition(null,null,null,99);
             for (WsscArea wsscArea : areaList) {
                 parentids.add(wsscArea.getLocalAreaId());
             }
        }



        if(!("1".equals(level))){         //不是省级的地址获取,我们就用下面的多线程方式
      List<FutureTask<List<JdaddressVo>>> futureTasks = new ArrayList<FutureTask<List<JdaddressVo>>>();
            ExecutorService executorService = Executors.newFixedThreadPool(50);
            MycallableForaddress callable = null;


            System.out.println("****************");
            for (String parentid : parentids) {
                callable = new MycallableForaddress(level,parentid);
                FutureTask<List<JdaddressVo>> futureTask = new FutureTask<List<JdaddressVo>>(callable);
                futureTasks.add(futureTask);
                executorService.submit(futureTask);
              while(futureTasks.size()==500){
                  for (FutureTask<List<JdaddressVo>> task : futureTasks) {
                      try {
                          List<JdaddressVo> addrlist = task.get();
                          if(null!=addrlist){
                              //save(addrlist,level);
                              batsave(addrlist,level);   //换成批量保存
                          }
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  }
                  futureTasks.clear();
              }

            }

            //循环结束,最后不满futureTasks.size()的也要保存起来
            while (futureTasks.size() > 0) {
                for (FutureTask<List<JdaddressVo>> task : futureTasks) {
                    try {
                        List<JdaddressVo> addrlist = task.get();
                        batsave(addrlist,level);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                futureTasks.clear();
            }

            executorService.shutdown();
        }


        try {
            Long end_time = System.currentTimeMillis();
            resp.setHeader("Content-type", "text/html;charset=UTF-8");
            resp.getWriter().write("ok");
            resp.getWriter().write("共用时:"+(end_time - start_time)+"毫秒");
        } catch (IOException e) {
            e.printStackTrace();
        }

    }




    //批量保存
    private void batsave(List<JdaddressVo> addrlist, String level) {
        if("1".equals(level)){
            for (JdaddressVo jdaddressVo : addrlist) {
                WsscProvince p = new WsscProvince();
                p.setGuid(UUID.randomUUID().toString());
                p.setLocalProvinceId(jdaddressVo.getAddressId());
                p.setProvinceName(jdaddressVo.getAddressName());
                p.setTarget(99);
                p.setTargetProvinceId(jdaddressVo.getAddressId());
                provinceService.save(p);
            }

        }else if("2".equals(level)){
                cityService.batsave(addrlist);
        }else if("3".equals(level)){
                areaService.batsave(addrlist);
        }else{
            townService.batsave(addrlist);
      }


    }



    //内部线程类    根据上级id返回下级的地址list<JdaddressVo>
    class MycallableForaddress implements Callable{

        private String level;

        private String parentId;

       public MycallableForaddress(String level, String parentId) {
           this.level = level;
           this.parentId = parentId;
       }

       @Override
        public Object call() throws Exception {
            List<JdaddressVo> jdaddressVos = getaddressList(level, parentId);
            return jdaddressVos;
        }
    }

    //返回map<地区名,编号>
      private List<JdaddressVo> getaddressList(String level,String parentId) {
          String url = null;
          Map maps = new HashMap<String, String>();
          maps.put("token", token);

          if ("1" .equals(level)) {                      //获取省
              url="https://bizapi.jd.com/api/area/getProvince";
          } else if ("2" .equals(level)) {               //获取市
              url="https://bizapi.jd.com/api/area/getCity";
              maps.put("id", parentId);
          } else if ("3" .equals(level)) {                //获取县
              url="https://bizapi.jd.com/api/area/getCounty";
              maps.put("id", parentId);
          } else if ("4".equals(level)) {                 //获取乡
              url="https://bizapi.jd.com/api/area/getTown";
              maps.put("id", parentId);
          } else {
              return null;
          }


          String rev = WsscHttpClientUtils.post(url, maps, null);

          if (null != rev) {
              Map resultmaps = (Map) JSON.parse(rev);
              System.out.println(resultmaps.get("success"));
              boolean isSuccess = (boolean) resultmaps.get("success");
              if (isSuccess) {
                  Map<String, Integer> resultmap = (Map) resultmaps.get("result");
                  //遍历map,方法1
                  List<JdaddressVo> JdaddressList = new ArrayList<>();
                  for (Object key : resultmap.keySet()) {
                      System.out.println(key + "---->" + resultmap.get(key));
                      JdaddressVo jdaddress = new JdaddressVo();
                      jdaddress.setAddressId(resultmap.get(key).toString());
                      jdaddress.setAddressName(key.toString());
                      jdaddress.setParentAddressId(parentId);
                      JdaddressList.add(jdaddress);
                  }

                  return JdaddressList;
              }
           }else{
              return null;
          }
          return null;

      }








      //-------------------以下是单元测试,不用理会-----------------------------------------
   // @Test
    public void getProvinceList(){
        String url = "https://bizapi.jd.com/api/area/getProvince";
        Map maps = new HashMap<String,String>();
        maps.put("token", token);
        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }

   // @Test
    public void getCityList(){
        String url = "https://bizapi.jd.com/api/area/getCity";
        String parentId ="6";
        Map maps = new HashMap<String,String>();
        maps.put("id",parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }


   // @Test
    public void getCountyList(){
        String url = "https://bizapi.jd.com/api/area/getCounty";
        String parentId ="318";
        Map maps = new HashMap<String,String>();
        maps.put("id",parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }


   // @Test
    public void getTownList() {
        String url = "https://bizapi.jd.com/api/area/getTown";
        String parentId = "319";
        Map maps = new HashMap<String, String>();
        maps.put("id", parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);

        if (null != rev) {
            Map resultmaps = (Map) JSON.parse(rev);
            System.out.println(resultmaps.get("success"));
            boolean isSuccess = (boolean) resultmaps.get("success");
            if (isSuccess) {
                Map<String,Integer> resultmap = (Map) resultmaps.get("result");

               //遍历map,方法1
                for (Object key : resultmap.keySet()){
                    System.out.println(key+"---->"+resultmap.get(key));
                }




                //遍历map,方法2
               /* for (Map.Entry<String,Integer> entry : resultmap.entrySet()){
                    System.out.println(entry.getKey()+"---->"+entry.getValue());
                }*/

                //遍历map,方法3   迭代器
               /* Iterator keys = resultmap.keySet().iterator();
                while (keys.hasNext()){
                   String key = (String) keys.next();
                   System.out.println(key+"--->"+resultmap.get(key));
                }*/

            }


        }

    }
}

多线程(callable+futureTask)去组装数据,并批量入库

这个是内部线程类的定义。

多线程(callable+futureTask)去组装数据,并批量入库

***DaoImpl中的批量保存代码。

@Override
public void batsave(final List<JdaddressVo> addrs) {

   this.getSession().doWork(new Work() {
                         @Override
                         public void execute(Connection connection) throws SQLException {
                            String sql = "insert into WSSC_CITY(GUID,CITY_NAME,LOCAL_CITYID,LOCAL_PROVINCEID,TARGET_CITYID,TARGET_PROVINCEID,TARGET) values(?,?,?,?,?,?,?)";
                            PreparedStatement ps = connection.prepareStatement(sql);
                            for (JdaddressVo addr : addrs) {
                               ps.setString(1, UUID.randomUUID().toString());
                               ps.setString(2,addr.getAddressName());
                               ps.setString(3,addr.getAddressId());
                               ps.setString(4,addr.getParentAddressId());
                               ps.setString(5,addr.getAddressId());
                               ps.setString(6,addr.getParentAddressId());
                               ps.setInt(7,99);
                               ps.addBatch();
                            }
                            ps.executeBatch();
                         }
                      }

   );

}


-----------------

经过测试,这样处理,5万条数据导入需要几分钟,快了不少。

------

感受:

要培养一种“批量”,“缓存”的思想,比如上面代码中 的 futureTasks(满500再处理);保存数据时,jdbc去批处理等。

"满一定量再去做"