电信采集之数据处理
该项目的实质就是就是将3A服务器记录下来的日志封装成对象保存在数据库中
#briup1660|037:wKgB1660A|7|1239110900|44.211.221.247 #briup4418|037:wKgB4418A|7|1239138480|251.196.223.191 #|037:wKgB1660A|8|1239203860|44.211.221.247 #briup1247|037:wKgB1247A|7|1239106770|22.7.202.75 #briup3288|037:wKgB3288A|7|1239127180|240.144.42.68 #|037:wKgB1247A|8|1239176602|22.7.202.75 #briup8258|037:wKgB8258A|7|1239176880|90.203.198.194 #briup2391|037:wKgB2391A|7|1239118210|161.43.86.232
package com.briup.woss.client; import java.io.*; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import com.briup.util.BIDR; import com.briup.util.BackUP; import com.briup.util.Configuration; import com.briup.woss.ConfigurationAWare; public class GatherImpl implements Gather,ConfigurationAWare{ String pathName; String pathName1="src/com/briup/woss/File/map.txt"; //list存入完整数据 static List<BIDR> list=new ArrayList<BIDR>(); //存储不完整数据Map<IP,BIDR> static Map<String,BIDR> map=new HashMap(); Configuration conf=null; @Override public void init(Properties p) { // TODO Auto-generated method stub pathName=(String) p.get("src-file"); } @SuppressWarnings("unchecked") @Override public Collection<BIDR> gather() throws Exception { // TODO Auto-generated method stub BackUP bi=conf.getBackup(); //加载备份文件 Map<String, BIDR> newMap=(Map<String,BIDR>)bi.load(pathName1, BackUP.LOAD_REMOVE); if(newMap!=null){ map.putAll(newMap); } //1.读temp.txt,成功的标志是打印到控制台 BufferedReader br=new BufferedReader(new FileReader(new File(pathName))); BIDR bidr=new BIDR(); String str=""; while((str=br.readLine())!=null){ //System.out.println(str); String[] line=str.split("[|]"); //System.out.println(line.length); if(line[2].equals("7")){ bidr=new BIDR(); bidr.setAAA_login_name(line[0].substring(1)); bidr.setNAS_ip(line[1]); Long login_date=Long.parseLong(line[3]); Timestamp login_time=new Timestamp(login_date*1000); bidr.setLogin_date(login_time); bidr.setLogin_ip(line[4]); //保存不完整信息 map.put(line[4],bidr); }else if(line[2].equals("8")){ BIDR bidr1 = map.get(line[4]); if(bidr1!=null){ //设置用户下线时长 Long logout_date=Long.parseLong(line[3]); Timestamp logout_time=new Timestamp(logout_date*1000); bidr1.setLogout_date(logout_time); //计算出用户在线时长 Integer time_deration=(int) (logout_date - (bidr1.getLogin_date().getTime())/1000); bidr1.setTime_deration(time_deration); //完整数据存入list list.add(bidr1); map.remove(line[4]); } } } bi.store(pathName1, map, bi.STORE_OVERRIDE); System.out.println("备份的不完整数据为:"+map.size()); br.close(); System.out.println("list="+list.size()); return list; } /*public static void main(String[] args) throws Exception { new GatherImpl().gather(); for(BIDR bidr:list){ System.out.println(bidr.getAAA_login_name()+","+bidr.getLogin_ip()+" "+bidr.getNAS_ip()+" "+bidr.getTime_deration()+","+bidr.getLogin_date()+","+bidr.getLogout_date()); } System.out.println("完整的信息为"+list.size()); System.out.println("不完整信息:"+map.size()); } */ @Override public void setConfiguration(Configuration co) { // TODO Auto-generated method stub this.conf=co; } }
负责备份一些没有处理完的数据,所谓没有处理完的数据,我的理解是没有下线记录的
不完全信息以及尚未保存到数据库中的完全信息。
进行数据的备份需要实现的方法:
Object load(String key, boolean flag)
store(String key, Object date, boolean flag)
在进行数据备份时所遇到的问题:
1.如何实现数据的备份
2.如何读取文件中的备份文件
3.在使用load()和store()这两个方法的时候,是追加还是覆盖之前的数据
因为数据是隔一段时间进会进行更新的,某一个用户当前没有下线,可能再过一段时间就下线了,因此是覆盖之前的数据
package com.briup.util; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.List; import java.util.Properties; public class BackUPImpl implements BackUP { String filePath; Object object = null; private static List<BIDR> list; @Override public void init(Properties p) { // TODO Auto-generated method stub } // 通过键名获取已经备份的数据 key备份数据的键 @Override public Object load(String key, boolean flag) throws Exception { // TODO Auto-generated method stub File file = new File(filePath, key); // file是否为空 if (file.exists() && file.length() != 0) { ObjectInputStream ois = new ObjectInputStream(new FileInputStream(file)); object = ois.readObject(); } return object; } // 通过键名存储数据,key-备份数据的键,date-需要备份的数据,flag如果键值已经存在数据,追加还是覆盖之前的数据 // 文件路径key @Override public void store(String key, Object date, boolean flag) throws Exception { // TODO Auto-generated method stub // 通过key找备份文件,键值就是文件名 // 接收到完整数据 File file = new File(key); if(file.exists()){ ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(file, flag)); oos.writeObject(date); oos.flush(); oos.close(); } } }
负责将采集好的数据发给服务器
需要实现的方法是send(Collection<BIDR> collection)
在此过程中遇到的问题是
1、如何建立连接
使用socket建立连接
2、如何保存数据
使用对象流的形式读写数据
3、所保存的数据是完全信息还是不完全信息
客户端向服务器端进行保存的数据是完全信息
package com.briup.woss.client; import java.io.*; import java.net.Socket; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.Properties; import com.briup.util.BIDR; import com.briup.util.Configuration; import com.briup.woss.ConfigurationAWare; public class ClientImpl implements Client{ private static String id; private static int port; private static ObjectOutputStream oo; @Override public void init(Properties p) { // TODO Auto-generated method stub id=p.getProperty("id");//通过将依赖注入的方式将配置信息注入到该模块中 port=Integer.parseInt(p.getProperty("port")); } @Override public void send(Collection<BIDR> collection) throws Exception { //创建Socket指定ip地址端口号 Socket socket=new Socket(id,port); //数据流 OutputStream os=socket.getOutputStream(); oo=new ObjectOutputStream(os);//将数据保存在对象流中,保存的是完全信息 oo.writeObject(collection); System.out.println(collection); System.out.println(collection.size()); oo.flush(); oo.close(); socket.close(); // TODO Auto-generated method stub //1.id+port //2.数据流 } }
2.4服务器端模块
作用:负责接收客户端传过来的数据
需要实现的方法:Collection<BIDR> revicer()
package com.briup.woss.Server; import java.io.*; import java.net.*; import java.util.Collection; import java.util.List; import java.util.Properties; import com.briup.util.BIDR; import com.briup.woss.server.Server; public class ServrImpl implements Server{ private ServerSocket ss; private int port; private ObjectInputStream oi; @Override public void init(Properties p) { // TODO Auto-generated method stub System.out.println(p.get("port")); port=Integer.parseInt((String) p.get("port")); System.out.println(port); } @SuppressWarnings("unchecked") @Override public Collection<BIDR> revicer() throws Exception { // TODO Auto-generated method stub //1.port //数据流 //1)创建ServerSocket ss=new ServerSocket(port); System.out.println("服务器已启动,请连接·····"); while(true){ Socket s=ss.accept(); oi=new ObjectInputStream(s.getInputStream()); List<BIDR> c=(List<BIDR>)oi.readObject(); System.out.println("连接成功"); return c; } } @Override public void shutdown() { // TODO Auto-generated method stub } }
package com.briup.woss.Server; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; import com.briup.util.BIDR; import com.briup.util.BackUP; import com.briup.util.Configuration; import com.briup.util.Logger; import com.briup.woss.ConfigurationAWare; import com.briup.woss.server.DBStore; public class DBStoreImpl implements DBStore,ConfigurationAWare{ /* * driver=oracle.jdbc.driver.OracleDriver * url=jdbc:oracle:thin:@localhost:1521:XE username=oracle password=orcle */ private static String driver; private static String url; private static String username; private static String password; private static Connection connection; private String aaa_login_name; private String login_ip; private java.sql.Date login_date; private java.sql.Date logout_date; private String nas_ip; private String time_duration; private int i_date; private static PreparedStatement[] ps = new PreparedStatement[31];; String sql; static int i = 0; private static String pathName = "src/com/briup/woss/File/list.txt"; Configuration conf=null; // 存放完整数据 static List<BIDR> list = new ArrayList<>(); @Override public void init(Properties p) { driver=p.getProperty("driver"); url=p.getProperty("url"); username=p.getProperty("username"); password=p.getProperty("password"); } @Override public void saveToDB(Collection<BIDR> collction) throws Exception { BackUP bi=conf.getBackup(); Logger log=conf.getLogger(); try { // 注册驱动 Class.forName(driver); // 创建连接 connection = DriverManager.getConnection(url,username,password); System.out.println(connection); for (int i = 0; i < 31; i++) { sql = "insert into t_detail_" + i + "(aaa_login_name,login_ip,login_date,logout_date,nas_ip,time_duration) " + "values(?,?,?,?,?,?)"; ps[i] = connection.prepareStatement(sql); } //connection最多创建300个prepareStatement //将ps = connection.prepareStatement(sql);放在下面的for (BIDR bidr : collction)里面遍历,由于遍历次数过多,会出现超出游标最大值异常 //遍历31次,与下面遍历bidr对象时,存储信息时的ps相对应 for (BIDR bidr : collction) { Timestamp login_d = bidr.getLogin_date(); String s_date = login_d.toString(); String[] str1 = s_date.split(" "); String[] str2 = str1[0].split("-"); i_date = Integer.parseInt(str2[2]); aaa_login_name = bidr.getAAA_login_name(); login_ip = bidr.getLogin_ip(); login_date = new java.sql.Date(bidr.getLogin_date().getTime()); logout_date = new java.sql.Date(bidr.getLogout_date().getTime()); // 通过PreparedStatement将信息存储到数据库中 ps[i_date].setString(1, aaa_login_name); ps[i_date].setString(2, login_ip); ps[i_date].setDate(3, login_date); ps[i_date].setDate(4, logout_date); ps[i_date].setString(5, nas_ip); ps[i_date].setString(6, time_duration); // 执行sql ps[i_date].executeUpdate(); i++; if (i == 200) { i = 1 / 0; } list.add(bidr); System.out.println("插入数据成功!"); } log.info("入库数据的个数" + list.size()); collction.removeAll(list); log.info("未入库数据的个数" + collction.size()); } catch (Exception e) { connection.rollback(); bi.store(pathName, list, BackUP.STORE_OVERRIDE);//对象.字段会出现黄色波浪线,可以改成类名.字段 log.debug("备份数据为"+list.size()); log.debug("未入库数据的个数" + collction.size()); log.debug("插入个数为:"+i); } } /*public static void main(String[] args) throws Exception { GatherImpl ga = new GatherImpl(); new DBStoreImpl().saveToDB(ga.gather()); System.out.println("备份数据的个数为:" + list.size()); System.out.println(i); }*/ @Override public void setConfiguration(Configuration co) { // TODO Auto-generated method stub this.conf=co; } }
3.6公共配置模块
需要实现的方法
BackUP getBackup()
Client getClient()
DBStore getDBStore()
Gather getGather()
Logger getLogger()
Server getServer()
1、如何实现日志记录
2、了解日志级别
日志级别一共有五种,级别由高到低依次是:fatal、error、warn、info、debug
3、如何将日志输出到控制台和指定文件中去
在项目中,不能通过new来创建对象,要通过Configurtion来获取
package com.briup.util; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.dom4j.*; import org.dom4j.io.SAXReader; import com.briup.woss.ConfigurationAWare; import com.briup.woss.WossModule; import com.briup.woss.client.Client; import com.briup.woss.client.Gather; import com.briup.woss.server.DBStore; import com.briup.woss.server.Server; public class ConfigurationImpl implements Configuration{ String filePath="src/com/briup/woss/File/conf.xml"; //存入woss模块对象 Map<String,WossModule> wossMap=new HashMap<>(); //存放配置信息 Properties pro=new Properties(); @Override public BackUP getBackup() throws Exception { // TODO Auto-generated method stub return (BackUP) wossMap.get("backup"); } @Override public Client getClient() throws Exception { // TODO Auto-generated method stub return (Client) wossMap.get("client"); } @Override public DBStore getDBStore() throws Exception { // TODO Auto-generated method stub return (DBStore) wossMap.get("dbstore"); } public static void main(String[] args) throws Exception { new ConfigurationImpl().getDBStore(); } @Override public Gather getGather() throws Exception { // TODO Auto-generated method stub return (Gather) wossMap.get("gather"); } @Override public Logger getLogger() throws Exception { // TODO Auto-generated method stub return (Logger) wossMap.get("logger"); } @Override public Server getServer() throws Exception { return (Server) wossMap.get("server"); } public ConfigurationImpl() { try { //1.获取解析器。读取conf.xml //创建SAXReader读取器,专门用于读取xml SAXReader saxReader=new SAXReader(); //2.获取根节 Document document=saxReader.read(filePath); Element rootElement=document.getRootElement(); //3.获取子节点--属性值 List elements=rootElement.elements(); for(Object object:elements){ Element e=(Element)object; String name=e.getName(); //System.out.println("子节点"+name); //class String attValue=e.attributeValue("class"); // System.out.println(attValue); //通过反射获取对象 WossModule woss; try { woss = (WossModule)Class.forName(attValue).newInstance(); // System.out.println(woss); wossMap.put(name, woss); // System.out.println(name); for(String key:wossMap.keySet()){ // System.out.println(key+":"+wossMap.get(key)); //4.固定值-->Properties List ee=e.elements(); for(Object obj:ee){ Element el=(Element)obj; String key1=el.getName(); String value=el.getText(); // System.out.println(key1); // System.out.println(el.getName()+"*:*"+el.getText()); pro.put(key1, value); String po=(String)pro.get("po"); // System.out.println("po:**************"); // System.out.println(po); } //配置信息依赖注入 for(Object obj:wossMap.values()){ //调用init()方法,注入配置信息 if(obj instanceof WossModule){ ((WossModule) obj).init(pro); } if(obj instanceof ConfigurationAWare){ ((ConfigurationAWare)obj).setConfiguration(this); } } } } catch (ClassNotFoundException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } //class--->实例化对象放入集合 } } catch (Exception e2) { // TODO Auto-generated catch block e2.printStackTrace(); } //固定值 //class实例化对象放入集合 } }
详细的代码已上传至GitHub:https://github.com/shuilifang815/-815
如果您有更好的看法或理解,欢迎留言!