1.创建一个监听
[code="java"]import java.io.IOException;
import java.net.*;
import java.util.concurrent.Future;
public class Listener extends ServerSocket implements Runnable {
public Listener() throws IOException {
super(Server.AppConfig.getLocalhost().getListenport());
}
@Override
public void run() {
while (true) {
try {
Socket socket = accept();
CreateServer server = new CreateServer(socket, Server.pool);
Future<Integer> result = Server.pool.submit(server);
Server.Results.add(result);
} catch (Exception e) {
Server.createMessage("Listener:"+e.getMessage());
} finally {
}
}
}
}[/code]
[code="java"]import java.net.Socket;
import java.util.Date;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.io.*;
import com.szcust.einfo.receiveEntity.serverEntity.DataText;
public class CreateServer implements Callable {
private Socket client;
private Scanner in;
private PrintWriter out;
private Resolve resolve;
private int timeOut = 30;
private Date lastTime;
public CreateServer(Socket s, ExecutorService pool) {
client = s;
lastTime = new Date();
resolve = new Resolve();
try {
client.setSoTimeout(30 * 60 * 1000);
Server.ClientCount = Server.ClientCount + 1;
in = new Scanner(client.getInputStream(), "GB2312");
// in = new BufferedReader(new
// InputStreamReader(client.getInputStream(), "GB2312"));
out = new PrintWriter(client.getOutputStream(), true);
out
.println("--- Welcome To Universtar Science & Technology Softwear System ---");
} catch (Exception ex) {
Server.createMessage("Ex " + ex.getMessage());
}
}
@Override
public Integer call() {
String line = "";
while ((line = in.next()) != null) {
try {
if (check(line)) {
DataText dataText = resolve.getDataTextBySoketString(line);
if (dataText != null) {
// Server.Data_Array.add(dataText);
resolve.saveDataRun(dataText);//业务代码
} else {
Server.createMessage("Resolve error " + line);
}
} else {
Server.createMessage("Check error " + line);
}
} catch (Exception ex) {
Server.createMessage("Ex " + ex.getMessage());
closeSocket(this.client );
}
}
Server.ClientCount--;
return Server.ClientCount;
}[/code]
[code="java"]import java.util.Calendar;
import java.util.Vector;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.szcust.einfo.receiveBiz.FactorBiz;
import com.szcust.einfo.receiveBiz.StationBiz;
import com.szcust.einfo.receiveEntity.einfoEntity.Station;
import com.szcust.einfo.receiveEntity.serverEntity.DataText;
import com.szcust.einfo.receiveEntity.configEntity.Config;
import com.szcust.einfo.receiveEntity.configEntity.ExLog;
public class Server{
public static ConcurrentLinkedQueue<DataText> Data_Array = null;
public static Vector<Station> Client_Stations = null;
public static Vector<Station> Server_Stations = null;
private static ConcurrentLinkedQueue<String> Message = null;
public static ConcurrentLinkedQueue<String> ErrorText = null;
public static ConcurrentLinkedQueue<Future<Integer>> Results = null;
public static ExecutorService pool = Executors.newCachedThreadPool();
public static Config AppConfig = null;
public static int ClientCount = 0;
public static boolean IsClear = true;
public static void init() {
Data_Array =new ConcurrentLinkedQueue<DataText>();
Client_Stations = new Vector<Station>();
Server_Stations = new Vector<Station>();
Message = new ConcurrentLinkedQueue<String>();
ErrorText = new ConcurrentLinkedQueue<String>();
Results = new ConcurrentLinkedQueue<Future<Integer>>();
AppConfig = new Config();
ClientCount = 0;
IsClear = true;
loadData();
}[/code]
哦,我不是很清楚 MINA的实现,Netty里,是用线程池模型的,可以控制生成线程的上限。
Mina里如果找不到控制线程数的地方,我建议还是用Netty吧。
至少Netty算是公司在维护 :)
Blocking Socket不适合用线程池处理通讯。
单个线程会被挂起的。
用NIO才合适。
个人觉得没必要在accept后submit一个Callable
直接execute一个Runnable
同步好ClientCount就可以了吧
你的项目可以用外部库吗?
你可以看看Netty这个NIO库。(或者apache mina,netty是mina的创始者写的,我用起来蛮方便的。)
NIO是基于SELECT模型的。就是注册一批关注对象socketChannel,
然后重复调用Select方法,当SocketChannel处于指定状态,比如 有数据,可写
等状态,再拿出来处理。
这样,可以分成主线程持续select,select出来需要处理的socketchannel放到一个队列,由threadpool里分线程取出来 处理。
因为socketchannel操作不会阻塞,所以可以充分利用多线程。
而你的代码用的是普通socket,普通socket的特性就是在read时,如果没有数据,则会阻塞住。而如果想利用线程池,应当是read没有数据时,继续往下走,把当前线程资源还回线程池。
不用你主动结束那些线程,那些线程被应该会被设为 daemon=true,即主线程结束时自动结束。平时,等待 主监听线程传需要处理的任务进来,如果没有,则自己wait。
那些就是线程池里的线程。