ERROR:upbmsAppid:upbmswl:20161208-16:24:52:jifenhptest:89128126:className:com.hp.upbms.slp.td.transfer.FTPTransfer functionName:doBusiness describe:Exception in FTPTransfer thread when connect to: 10.248.12.2121aiupbms
java.net.SocketException: Broken pipe (errno:32)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:97)
at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
at org.apache.commons.net.telnet.TelnetClient._flushOutputStream(TelnetClient.java:77)
at org.apache.commons.net.telnet.TelnetOutputStream.flush(TelnetOutputStream.java:137)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:123)
at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:278)
at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:122)
at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:212)
at java.io.BufferedWriter.flush(BufferedWriter.java:236)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:442)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:520)
at org.apache.commons.net.ftp.FTP.cwd(FTP.java:745)
at org.apache.commons.net.ftp.FTPClient.changeWorkingDirectory(FTPClient.java:725)
at com.hp.upbms.slp.td.transfer.FTPTransfer.doBusiness(FTPTransfer.java:111)
at com.hp.upbms.slp.td.transfer.FTPTransfer.run(FTPTransfer.java:55)
这是源码:
package com.hp.upbms.slp.td.transfer;
import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import lombok.Setter;
import lombok.extern.log4j.Log4j;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.springframework.beans.factory.InitializingBean;
import com.hp.upbms.slp.td.TransDownUtil;
import com.hp.upbms.slp.td.model.FTPConfig;
import com.hp.upbms.slp.td.model.SyncFile;
import com.hp.upbms.slp.td.transfer.dao.FtpTransferDaoImpl;
/**
FTPTransfer
is used transfer file to cooperator's FTP server.FTPTransfer
thread dealing with a business according tobusinessConfig
.@author JetHan
*/
@Log4j
public class FTPTransfer extends Thread implements InitializingBean {
@Setter private DataSource dataSource;
@Setter private String ftpType = "00";
@Setter private boolean passwordSecurity = false;
private static boolean isRunning = true;
private final static long ThreadSleepTime = 120000L;
private FtpTransferDaoImpl ftpDao = new FtpTransferDaoImpl();
private static List provinceList = new ArrayList();
private Map ftpClients = new HashMap(0);
public void afterPropertiesSet() throws Exception {
ftpDao.setDataSource(dataSource);
provinceList = ftpDao.getProvinceList();
start();
}
@Override public void run() {
log.info("The FTPTransfer Thread is Started.");
while (isRunning) {
try {
doBusiness();
try {
Thread.sleep(ThreadSleepTime);
} catch (Exception ex) {
log.error("SLEEP(LONG MILLIS)", ex);
}
} catch (Throwable e) {
log.error("The FTPTransfer Thread Throwable:" + e.toString(), e);
}
}
log.info("The FTPTransfer Thread is End.");
}
private void doBusiness() throws Throwable {
try {
List sfs = ftpDao.getNotSyncFile(ftpType);
log.info("The Ftp Transfer get data num is "+ sfs.size() + ".");
for (SyncFile sf : sfs) {
String partnerId = sf.getPartnerId();
if (provinceList.contains(partnerId)) {// 省公司
partnerId = "000";
}
FTPConfig fc = ftpDao.getFTPConfig(partnerId, ftpType);
if (fc == null || !"00".equals(fc.getFtpStatus())) {// 没有FTP配置及配置信息不可用
ftpDao.updateSyncFileStatus(sf.getFileId(), "03", sf.getSyncTimes() + 1);
continue;
}
String key = fc.getFtpIp() + fc.getFtpPort() + fc.getFtpUsername();
try {
FTPClient ftpClient = ftpClients.get(key);
if (ftpClient == null) {
ftpClient = new FTPClient();
ftpClients.put(key, ftpClient);
}
if (!ftpClient.isConnected()) {
ftpClient.connect(fc.getFtpIp(), Integer.parseInt(fc.getFtpPort()));
if(passwordSecurity) {
ftpClient.login(fc.getFtpUsername(), TransDownUtil.getDecoder(fc.getFtpPassword()));
} else {
ftpClient.login(fc.getFtpUsername(), fc.getFtpPassword());
}
ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
int reply = ftpClient.getReplyCode();
// If connect failed, try other files.
if (!FTPReply.isPositiveCompletion(reply)) {
log.error("THE SERVER REPLY " + reply + " ,WHEN CONNECT TO " + key);
ftpDao.updateSyncFileStatus(sf.getFileId(), "02", sf.getSyncTimes() + 1);
disconnect(ftpClient);
continue;
}
}
ftpClient.changeWorkingDirectory(fc.getFilePath());
log.info("The File :" + sf.getFileName() + " transfer start.");
FileInputStream in = new FileInputStream(new String(sf.getFilePath().getBytes("GBK"), "ISO-8859-1"));
String fileName = new String(sf.getFileName().getBytes("GBK"), "ISO-8859-1");
long startTime = System.currentTimeMillis();
boolean transferResult = ftpClient.storeFile(fileName, in);
long stopTime = System.currentTimeMillis();
log.info("The File :" + sf.getFileName() + " transfer finish.Time = " + (stopTime - startTime) + " ms");
if (transferResult) {
ftpDao.updateSyncFileStatus(sf.getFileId(), "01", sf.getSyncTimes() + 1);
log.info(sf.getFilePath() + " has been transfered to " + key + " successfully");
} else {
ftpDao.updateSyncFileStatus(sf.getFileId(), "02", sf.getSyncTimes() + 1);
log.info(sf.getFilePath() + " has been transfered to " + key + " failed ");
}
closeFileStream(in, sf);
} catch (Exception ex) {
ftpDao.updateSyncFileStatus(sf.getFileId(), "02", sf.getSyncTimes() + 1);
log.error("Exception in FTPTransfer thread when connect to: " + key, ex);
throw ex;
} finally {}
}
try {
Iterator<FTPClient> ite = ftpClients.values().iterator();
while (ite.hasNext()) {
FTPClient ftpClient = ite.next();
disconnect(ftpClient);
ite.remove();
}
ftpClients.clear();
} catch (Exception e) {
log.error("Exception in FTPTransfer thread, clear FtpClient info:", e);
throw e;
}
} catch (Throwable ex) {
throw ex;
}
}
/**
client
./**
每上传完一次之后,记得调一次getReply()方法,把FTP连接的状态变成260.
每上传完一次之后,记得调一次getReply()方法,把FTP连接的状态变成默认初始状态.