java心跳包檢測
當前位置:點晴教程→知識管理交流
→『 技術文檔交流 』
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Date; import java.util.HashMap; import java.util.Map;
import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.DocumentHelper;
import cn.edu.zju.cst.mina.im.server.entity.User; import cn.edu.zju.cst.mina.im.server.handler.ServerControler;
public class UserStateManage extends Thread { //在線用戶狀態列表 static HashMap<Integer, UserState> userStateList = new HashMap<Integer, UserState>(); Object hashLock = new Object(); //當前的連接數和工作線程數 static int workThreadNum = 0; static int socketConnect = 0; private ServerSocket serverSocket; //服務器IP private String host = "10.82.81.79"; //服務器端口 private int stateReportPort = 60001; //設置心跳包的結束標記 String endFlag = "</protocol>"; CharSequence csEndFlag = endFlag.subSequence(0, 10); //掃描間隔 private int scanTime = 1800; @Override public void run() { //綁定端口,并開始偵聽用戶的心跳包 serverSocket = startListenUserReport(stateReportPort); if(serverSocket == null){ System.out.println("【創建ServerSocket失??!】"); return; } //啟動掃描線程 Thread scanThread = new Thread(new scan()); scanThread.start(); //等待用戶心跳包請求 while(true){ Socket socket = null; try { socketConnect = socketConnect + 1; //接收客戶端的連接 socket = serverSocket.accept(); //為該連接創建一個工作線程 Thread workThread = new Thread(new Handler(socket)); //啟動工作線程 workThread.start(); } catch (IOException e) { e.printStackTrace(); } } } /** * 創建一個ServerSocket來偵聽用戶心跳包請求 * @param port 指定的服務器端的端口 * @return 返回ServerSocket * @author dream */ public ServerSocket startListenUserReport(int port){ try { ServerSocket serverSocket = new ServerSocket(); if(!serverSocket.getReuseAddress()){ serverSocket.setReuseAddress(true); } serverSocket.bind(new InetSocketAddress(host,port)); System.out.println("【開始在"+serverSocket.getLocalSocketAddress()+"上偵聽用戶的心跳包請求!】"); return serverSocket; } catch (IOException e) { System.out.println("【端口"+port+"已經被占用!】"); if (serverSocket != null) { if (!serverSocket.isClosed()) { try { serverSocket.close(); } catch (IOException e1) { e1.printStackTrace(); } } } } return serverSocket; } //工作線程類 class Handler implements Runnable{ private Socket socket; UserState us = null; User newUser = null; private int userId; private int userState; /** * 構造函數,從調用者那里取得socket * @param socket 指定的socket * @author dream */ public Handler(Socket socket){ this.socket = socket; } /** * 從指定的socket中得到輸入流 * @param socket 指定的socket * @return 返回BufferedReader * @author dream */ private BufferedReader getReader(Socket socket){ InputStream is = null; BufferedReader br = null;
try { is = socket.getInputStream(); br = new BufferedReader(new InputStreamReader(is)); } catch (IOException e) { e.printStackTrace(); } return br; } public void run() { try{ workThreadNum = workThreadNum +1; System.out.println("【第"+workThreadNum+"個的連接:"+socket.getInetAddress()+":"+socket.getPort()+"】"); BufferedReader br = getReader(socket); String meg = null; StringBuffer report = new StringBuffer(); while ((meg = br.readLine()) != null) { report.append(meg); if (meg.contains(csEndFlag)) { us = getReporterUserState(meg, socket); synchronized (hashLock) { userStateList.put(userId, us); } } } }catch(IOException e){ System.out.println("【客戶:"+newUser.getUser_id()+"已經斷開連接!】"); userStateList.remove( userId ); announceStateChange( userId , -1); }finally{ if(socket != null){ try { //斷開連接 socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } private UserState getReporterUserState(String meg , Socket socket){ UserState us = new UserState(); try { Document requestDoc = DocumentHelper.parseText(meg); newUser = ServerControler.parseXmlToUserState(requestDoc,socket); userId = newUser.getUser_id(); userState = newUser.getUser_state(); us.setFlag(2); us.setUser_state( userState ); us.setUser_id( userId ); us.setUser_ip(newUser.getUser_ip()); us.setUser_port(newUser.getUser_port()); } catch (DocumentException e) { System.out.println("【來自客戶端的信息不是一個合法的心跳包協議】"); } return us; } } //掃描線程 class scan implements Runnable{ public void run() { while (true) { System.out.println("*******"+new Date()+":掃描線程開始掃描"+"*******"); synchronized (hashLock) { if(!userStateList.isEmpty()){ //遍歷在線用戶列表 for (Map.Entry<Integer, UserState> entry : userStateList.entrySet()) { int flag = entry.getValue().getFlag(); if ( (flag - 1) < 0) { //在這里通知該用戶的好友其狀態發生改變 // announceStateChange(entry.getKey() , 0); }else{ entry.getValue().setFlag(flag - 1); userStateList.put(entry.getKey(), entry.getValue()); } System.out.println(entry.getKey() + "-->" + entry.getValue().toString()); } }else{ System.out.println("現在還沒有在線用戶!"); } } //實現定時掃描 try { sleep(scanTime); } catch (InterruptedException e) { e.printStackTrace(); } } } } private void announceStateChange(int userId , int state){ System.out.println("通知其好友!"); } /** * 查詢一個用戶是否在線 * @param userId 指定要查詢狀態的用戶的ID * @return true 在線; false 不在線; * @author dream */ public boolean isAlive(int userId){ synchronized (hashLock) { return userStateList.containsKey(userId); } } /** * 返回指定用戶ID的狀態 * @param userId 指定要查詢狀態的用戶的ID * @return >0 該用戶在線; -1 該用戶離線 * @author dream */ public int getUserState(int userId){ synchronized (hashLock) { if(userStateList.containsKey(userId)){ return userStateList.get(userId).getUser_state(); }else{ return -1; } } } public Object getHashLock() { return hashLock; }
public void setHashLock(Object hashLock) { this.hashLock = hashLock; }
public String getHost() { return host; }
public void setHost(String host) { this.host = host; }
public int getStateReportPort() { return stateReportPort; }
public void setStateReportPort(int stateReportPort) { this.stateReportPort = stateReportPort; }
public String getEndFlag() { return endFlag; }
public void setEndFlag(String endFlag) { this.endFlag = endFlag; }
public int getScanTime() { return scanTime; }
public void setScanTime(int scanTime) { this.scanTime = scanTime; }
public static HashMap<Integer, UserState> getUserStateList() { return userStateList; }
public static int getWorkThreadNum() { return workThreadNum; }
public static int getSocketConnect() { return socketConnect; } //測試本函數的main函數 public static void main(String arg[]){ UserStateManage usm = new UserStateManage(); usm.start(); } } 該文章在 2013/3/11 9:16:50 編輯過 |
關鍵字查詢
相關文章
正在查詢... |