SpringBoot集成camel-ftp讀取ftp文件并解壓
1、在pom.xml文件中添加依賴
這里使用的是spring boot版本是1.5.9 JDk1.7
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-ftp</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
2、在application.yml中添加配置
使用filter=#filterName添加過濾器; 配置ftp服務連接的用戶名和密碼,還有要消費的路徑
camel:
springboot:
main-run-controller: true #監(jiān)聽Ftp服務器的時候,為了讓java進程在后臺運行
ftp:
wifi-info:
url: 127.0.0.1
username: test
password: 123456
dir: /wifi_info
server-info: ftp://${ftp.wifi-info.url}:21${ftp.wifi-info.dir}?username=${ftp.wifi-info.username}&password=${ftp.wifi-info.password}&delay=2s&readLock=rename&include=.*zip&filter=#wifiDownloadFileFilter
local-save-dir: E:/ftpdata/mac/save
unzip-temp-dir: E:/ftpdata/mac/temp
local-files-prefix: /localFilesList- #已下載文件列表的前綴
local-files-suffix: .txt #已下載文件列表的后綴
3、創(chuàng)建過濾器 實現(xiàn)org.apache.camel.component.file.GenericFileFilter接口,實現(xiàn)accept方法;并聲明到Spring容器中;
import com.lilian.utils.FileUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileFilter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
/**
* FTP文件路由過濾器,
* 將只下載當天,且下載目錄中不存在的文件
*/
@Slf4j
@Component
public class WifiDownloadFileFilter implements GenericFileFilter<Object> {
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
@Value("${ftp.wifi-info.unzip-temp-dir}")
private String tempPath;
@Value("${ftp.wifi-info.local-files-prefix}")
private String filePrefix;
@Value("${ftp.wifi-info.local-files-suffix}")
private String fileSuffix;
/**
* 過濾下載文件
* @param genericFile
* @return true=下載,false=不下載
*/
@Override
public boolean accept(GenericFile<Object> genericFile) {
long lastModified = genericFile.getLastModified();
String fileName = genericFile.getFileName();
return isLatestFile(lastModified) && !isInLocalDir(fileName) ? true : false;
}
/**
* 文件是否已在本地目錄中
* @param fileName
* @return true=不存在 false=已存在了
*/
private boolean isInLocalDir(String fileName) {
try {
//獲取本地文件夾中已下載的文件名
File fileDir = new File(tempPath);
if (!fileDir.exists()) {
fileDir.mkdir();
}
String path = tempPath + filePrefix + simpleDateFormat.format(new Date()) + fileSuffix;
File file = new File(path);
if (!file.exists()) {//如果不存在就創(chuàng)建
file.createNewFile();
FileUtil.appendMethod(path, fileName + "\r\n");
return true;
}
List<String> localFileNames = FileUtil.readFileByLines(file);
if (localFileNames.contains(fileName)) {
return false;
} else {
FileUtil.appendMethod(path, fileName + "\r\n");
return true;
}
} catch (Exception e) {
log.error("獲取本地已下載文件列表出錯", e);
return false;
}
}
/**
* 文件是否為今天的數(shù)據(jù)
* @param lastModified
* @return true=是今天的文件 false=不是今天的文件
*/
public boolean isLatestFile(long lastModified) {
Date lastDate = new Date(lastModified);
String lastDateStr = simpleDateFormat.format(lastDate);
String todayStr = simpleDateFormat.format(new Date());
return todayStr.equals(lastDateStr);
}
}
4、創(chuàng)建下載路由
繼承 RouteBuilder,重寫configure方法,使用@Value 將配置文件中的路由地址和下載文件存放路徑注入。
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* wifi信息Zip文件下載
*
* @Author: 孫龍
* @Date: 2018/4/24.
*/
@Component
public class WifiFileDownLoadRoute extends RouteBuilder {
@Value("${ftp.wifi-info.server-info}")
private String wifiDataUrl;
@Value("${ftp.mobile-info.local-save-dir}")
private String localDir;
@Autowired
private WifiFileProcessor wifiFileProcessor;
@Override
public void configure() throws Exception {
from(wifiDataUrl).to("file:" + localDir).process(wifiFileProcessor);
}
}
5、創(chuàng)建本地文件路由解析類
import com.alibaba.fastjson.JSON;
import com.lilian.service.IEqMobileMacService;
import com.lilian.utils.FileHandlerMap;
import com.lilian.utils.FileUtil;
import com.lilian.utils.ZipUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFileMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
/**
* WiFi文件路由解析器
* 解析從ftp服務器路由下來的wifi數(shù)據(jù)壓縮文件
*
* @Author: 孫龍
* @Date: 2018/4/24.
*/
@Slf4j
@Component
public class WifiFileProcessor implements Processor {
public static final String SIGNALCHANNEL_MAC = "signalChannel_mac";
@Value("${ftp.wifi-info.unzip-temp-dir}")
private String unZipTempPath;
@Value("${ftp.mobile-info.local-save-dir}")
private String localDir;
@Autowired
private IEqMobileMacService eqMobileMacService;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public void process(Exchange exchange) throws Exception {
log.info("開始解析wifi下載的文件。。。");
GenericFileMessage<RandomAccessFile> inFileMessage = (GenericFileMessage<RandomAccessFile>) exchange.getIn();
// gf.getFile().
String fileName = inFileMessage.getGenericFile().getFileName();
File zipFile = new File(localDir + File.separator + fileName);
// Message message = exchange.getIn();
// GenericFile<?> gf = (GenericFile<?>) message.getBody();
// File zipFile = (File) gf.getFile();//兩種File對象獲取方式,這一種有可能會報異常,類型轉換異常
//解壓文件,
List<File> files = null;
String zipName = zipFile.getName();
if (Pattern.matches(ZipUtil.ZIP_PATTERN, zipName)) {
if (FileHandlerMap.readedFilesWithMobiles.get(zipFile.getName()) != null) {
return;
}
files = ZipUtil.unZip(zipFile, unZipTempPath, ZipUtil.BCP_PATTERN);
} else {
log.debug("下載到不是zip的壓縮文件:" + zipFile.getName());
return;
}
if (files == null) {
log.error("壓縮文件中沒有解析到bcp文件:" + zipFile.getName());
} else {
//讀取文件
List<String> textList = ZipUtil.batchReadFile(files, "GBK");
//批量解析文件中數(shù)據(jù)
List<EqMobileMac> eqMobileMacList = this.batchStrToEntity(textList);
//刪除臨時解壓文件
FileUtil.batchDeleteFile(files);
//批量存入數(shù)據(jù)庫
if (eqMobileMacList.size() > 0) {
eqMobileMacService.batchSave(eqMobileMacList);
for (EqMobileMac eqMobileMac : eqMobileMacList) {
redisTemplate.convertAndSend(SIGNALCHANNEL_MAC, JSON.toJSONString(eqMobileMac));
}
} else {
log.debug("文件中沒有解析到任何數(shù)據(jù)!" + unZipTempPath + zipFile.getName());
}
}
}
}
6、粘貼兩個文件處理工具類
ZipUtil.java只用到了兩個解壓方法和一個文件讀取方法;
package com.lilian.utils;
import lombok.extern.slf4j.Slf4j;
import net.lingala.zip4j.exception.ZipException;
import net.lingala.zip4j.model.FileHeader;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
/**
* 文件解壓縮工具類
*
* @Author: 孫龍
* @Date: 2018/4/24.
*/
@Slf4j
public class ZipUtil {
public static final String BCP_PATTERN = ".*?\\.bcp";
public static final String HP_PATTERN = ".*?\\.hp";
public static final String ZIP_PATTERN = ".*?\\.zip";
/**
* 此方法描述的是:ZIP壓縮
*
* @param source
* 源文件
* @param dest
* 壓縮文件
* @version: 2015年3月2日 上午9:17:26
*/
public static String zip(File source, File dest) {
ZipOutputStream out = null;
BufferedOutputStream bo = null;
try {
File zipParent = dest.getParentFile();
if (!zipParent.exists()) {
zipParent.mkdirs();
}
out = new ZipOutputStream(new FileOutputStream(dest));
bo = new BufferedOutputStream(out);
zip(out, source, source.getName(), bo);
return dest.getAbsolutePath();
} catch (Exception e) {
log.error("文件讀取錯誤:" + e.getMessage());
} finally {
if (bo != null) {
try {
bo.close();
} catch (IOException e) {
}
}
if (out != null) {
try {
out.close(); // 輸出流關閉
} catch (IOException e) {
}
}
}
return null;
}
public static void zip(ZipOutputStream out, File f, String base, BufferedOutputStream bo) throws Exception { // 方法重載
out.putNextEntry(new ZipEntry(base)); // 創(chuàng)建zip壓縮進入點base
FileInputStream in = new FileInputStream(f);
BufferedInputStream bi = new BufferedInputStream(in);
int b;
try {
while ((b = bi.read()) != -1) {
bo.write(b); // 將字節(jié)流寫入當前zip目錄
}
} finally {
bi.close();
in.close(); // 輸入流關閉
bo.close();
}
}
/**
* 解壓加密壓縮文件
*
* @param zipFile
* @param outPath
* @param passwd
* @return
* @throws ZipException
* @throws IOException
*/
public static List<File> unEncryptZip(File zipFile, String outPath, String fileNameRegexp, String passwd) {
List<File> extractedFileList = null;
try {
net.lingala.zip4j.core.ZipFile zFile = new net.lingala.zip4j.core.ZipFile(zipFile);
if (!zFile.isValidZipFile()) {
throw new ZipException("壓縮文件不合法,可能被損壞.文件名:" + zipFile.getName());
}
File destDir = new File(outPath);
if (destDir.isDirectory() && !destDir.exists()) {
destDir.mkdir();
}
if (zFile.isEncrypted()) {
zFile.setPassword(passwd);
}
zFile.extractAll(outPath);
List<FileHeader> headerList = zFile.getFileHeaders();
extractedFileList = new ArrayList<File>();
for (FileHeader fileHeader : headerList) {
if (!fileHeader.isDirectory()) {
if (!Pattern.matches(fileNameRegexp, fileHeader.getFileName())) {
//解壓到?jīng)]用的文件,直接刪除
new File(destDir, fileHeader.getFileName()).delete();
continue;
}
extractedFileList.add(new File(destDir, fileHeader.getFileName()));
}
}
FileHandlerMap.readedFilesWithMobiles.put(zipFile.getName(), 1);
} catch (ZipException e) {
log.error("解壓文件時出錯:" + e.getMessage());
e.printStackTrace();
}
return extractedFileList;
}
/**
* 解壓文件,并將匹配表達式的文件讀取到內(nèi)存
*
* @param zipFile
* @param outPath
* @return
* @throws ZipException
* @throws IOException
*/
public static List<File> unZip(File zipFile, String outPath, String fileNameRegexp) {
List<File> extractedFileList = null;
try {
net.lingala.zip4j.core.ZipFile zFile = new net.lingala.zip4j.core.ZipFile(zipFile);
zFile.setFileNameCharset("GBK");
if (!zFile.isValidZipFile()) {
throw new ZipException("壓縮文件不合法,可能被損壞.文件名:" + zipFile.getName());
}
File destDir = new File(outPath);
if (destDir.isDirectory() && !destDir.exists()) {
destDir.mkdir();
}
zFile.extractAll(outPath);
List<FileHeader> headerList = zFile.getFileHeaders();
extractedFileList = new ArrayList<>();
for (FileHeader fileHeader : headerList) {
if (!fileHeader.isDirectory()) {
if (!Pattern.matches(fileNameRegexp, fileHeader.getFileName())) {
new File(destDir, fileHeader.getFileName()).delete();//匹配錯誤直接刪除
continue;
}
extractedFileList.add(new File(destDir, fileHeader.getFileName()));
}
}
FileHandlerMap.readedFilesWithMobiles.put(zipFile.getName(), 1);
} catch (ZipException e) {
log.error("解壓文件時出錯:" + e.getMessage());
e.printStackTrace();
} finally {
}
return extractedFileList;
}
/**
* 此方法描述的是:獲得ZIP文件同名解壓目錄
*
* @version: 2015年3月5日 下午2:10:41
*/
public static String getUnpackForder(File zipFile) {
String filePath = zipFile.getAbsolutePath();
return filePath.substring(0, filePath.lastIndexOf("."));
}
/**
* 此方法描述的是:獲得ZIP文件指定解壓目錄
*
* @version: 2015年3月6日 下午10:28:36
*/
public static String getUnpackForder(File zipFile, String subDir) {
return zipFile.getParent() + File.separator + subDir;
}
/**
* 批量讀取文件,每一行解析為一個字符串;
*
* @param files
* @return
*/
public static List<String> batchReadFile(List<File> files, String charset) throws Exception {
List<String> textList = new ArrayList<>();
for (File file : files) {
List<String> list = readTextFile(file, charset);
textList.addAll(list);
}
return textList;
}
/**
* 將文件中內(nèi)容讀取到內(nèi)存中
*
* @param file
* @return
*/
public static List<String> readTextFile(File file, String charset) throws Exception {
List<String> list = new ArrayList<>();
InputStreamReader reader = new InputStreamReader(new FileInputStream(file), charset);
BufferedReader bufferedReader = new BufferedReader(reader);
String lineText;
while ((lineText = bufferedReader.readLine()) != null) {
list.add(lineText);
}
return list;
}
}
FileUtil.java
package com.lilian.utils;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class FileUtil {
/**
* 以行為單位讀取文件,常用于讀面向行的格式化文件
*/
public static List<String> readFileByLines(File file) {
BufferedReader reader = null;
List<String> strList = new ArrayList<>();
try {
// System.out.println("以行為單位讀取文件內(nèi)容,一次讀一整行:");
reader = new BufferedReader(new FileReader(file));
String tempString = null;
// 一次讀入一行,直到讀入null為文件結束
while ((tempString = reader.readLine()) != null) {
// 顯示行號
strList.add(tempString);
}
reader.close();
} catch (IOException e) {
log.error("文件讀取時出現(xiàn)錯誤!");
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
log.error("流關閉異常!");
}
}
}
return strList;
}
/**
* 方法追加文件:使用FileWriter
* @param fileName
* @param content
*/
public static void appendMethod(String fileName, String content) {
try {
//打開一個寫文件器,構造函數(shù)中的第二個參數(shù)true表示以追加形式寫文件
FileWriter writer = new FileWriter(fileName, true);
writer.write(content);
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 刪除單個文件
* @param file 要刪除的文件對象
* @return 單個文件刪除成功返回true,否則返回false
*/
public static boolean deleteFile(File file) {
if (file.exists() && file.isFile()) {
if (file.delete()) {
return true;
} else {
return false;
}
} else {
return false;
}
}
/**
* 批量刪除文件
* @param files
*/
public static void batchDeleteFile(List<File> files) {
for (File file : files) {
deleteFile(file);
}
}
}
隨后我會將我的一個完整Demo整理出來開源到github上面,希望能夠幫到你!
參考:https://blog.csdn.net/tiantian12234/article/details/77942598 https://blog.csdn.net/sunknew/article/details/79374781