SpringBoot集成camel-ftp讀取ftp文件并解壓

SpringBoot集成camel-ftp讀取ftp文件并解壓

1、在pom.xml文件中添加依賴

這里使用的是spring boot版本是1.5.9 JDk1.7

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-spring-boot-starter</artifactId>
    <version>2.17.1</version>
</dependency>

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-ftp</artifactId>
    <version>2.17.1</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

2、在application.yml中添加配置

使用filter=#filterName添加過濾器; 配置ftp服務連接的用戶名和密碼,還有要消費的路徑

camel:
  springboot:
    main-run-controller: true #監(jiān)聽Ftp服務器的時候,為了讓java進程在后臺運行
ftp:
  wifi-info:
    url: 127.0.0.1
    username: test
    password: 123456
    dir: /wifi_info
    server-info: ftp://${ftp.wifi-info.url}:21${ftp.wifi-info.dir}?username=${ftp.wifi-info.username}&password=${ftp.wifi-info.password}&delay=2s&readLock=rename&include=.*zip&filter=#wifiDownloadFileFilter
    local-save-dir: E:/ftpdata/mac/save
    unzip-temp-dir: E:/ftpdata/mac/temp
    local-files-prefix: /localFilesList-  #已下載文件列表的前綴
    local-files-suffix: .txt   #已下載文件列表的后綴

3、創(chuàng)建過濾器 實現(xiàn)org.apache.camel.component.file.GenericFileFilter接口,實現(xiàn)accept方法;并聲明到Spring容器中;

import com.lilian.utils.FileUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileFilter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * FTP文件路由過濾器,
 * 將只下載當天,且下載目錄中不存在的文件
 */
@Slf4j
@Component
public class WifiDownloadFileFilter implements GenericFileFilter<Object> {

    private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");

    @Value("${ftp.wifi-info.unzip-temp-dir}")
    private String tempPath;

    @Value("${ftp.wifi-info.local-files-prefix}")
    private String filePrefix;

    @Value("${ftp.wifi-info.local-files-suffix}")
    private String fileSuffix;

    /**
     * 過濾下載文件
     * @param genericFile
     * @return true=下載,false=不下載
     */
    @Override
    public boolean accept(GenericFile<Object> genericFile) {
        long lastModified = genericFile.getLastModified();
        String fileName = genericFile.getFileName();
        return isLatestFile(lastModified) && !isInLocalDir(fileName) ? true : false;
    }

    /**
     * 文件是否已在本地目錄中
     * @param fileName
     * @return  true=不存在   false=已存在了
     */
    private boolean isInLocalDir(String fileName) {
        try {

            //獲取本地文件夾中已下載的文件名
            File fileDir = new File(tempPath);
            if (!fileDir.exists()) {
                fileDir.mkdir();
            }
            String path = tempPath + filePrefix + simpleDateFormat.format(new Date()) + fileSuffix;
            File file = new File(path);

            if (!file.exists()) {//如果不存在就創(chuàng)建
                file.createNewFile();
                FileUtil.appendMethod(path, fileName + "\r\n");
                return true;
            }

            List<String> localFileNames = FileUtil.readFileByLines(file);
            if (localFileNames.contains(fileName)) {
                return false;
            } else {
                FileUtil.appendMethod(path, fileName + "\r\n");
                return true;
            }
        } catch (Exception e) {
            log.error("獲取本地已下載文件列表出錯", e);
            return false;
        }
    }
    /**
     * 文件是否為今天的數(shù)據(jù)
     * @param lastModified
     * @return true=是今天的文件 false=不是今天的文件
     */
    public boolean isLatestFile(long lastModified) {
        Date lastDate = new Date(lastModified);
        String lastDateStr = simpleDateFormat.format(lastDate);

        String todayStr = simpleDateFormat.format(new Date());
        return todayStr.equals(lastDateStr);
    }

}

4、創(chuàng)建下載路由

繼承 RouteBuilder,重寫configure方法,使用@Value 將配置文件中的路由地址和下載文件存放路徑注入。

import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * wifi信息Zip文件下載
 *
 * @Author: 孫龍
 * @Date: 2018/4/24.
 */
@Component
public class WifiFileDownLoadRoute extends RouteBuilder {

    @Value("${ftp.wifi-info.server-info}")
    private String wifiDataUrl;

    @Value("${ftp.mobile-info.local-save-dir}")
    private String localDir;

    @Autowired
    private WifiFileProcessor wifiFileProcessor;

    @Override
    public void configure() throws Exception {
        from(wifiDataUrl).to("file:" + localDir).process(wifiFileProcessor);
    }
}

5、創(chuàng)建本地文件路由解析類

import com.alibaba.fastjson.JSON;
import com.lilian.service.IEqMobileMacService;
import com.lilian.utils.FileHandlerMap;
import com.lilian.utils.FileUtil;
import com.lilian.utils.ZipUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFileMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;

/**
 * WiFi文件路由解析器
 * 解析從ftp服務器路由下來的wifi數(shù)據(jù)壓縮文件
 *
 * @Author: 孫龍
 * @Date: 2018/4/24.
 */
@Slf4j
@Component
public class WifiFileProcessor implements Processor {

    public static final String SIGNALCHANNEL_MAC = "signalChannel_mac";

    @Value("${ftp.wifi-info.unzip-temp-dir}")
    private String unZipTempPath;

    @Value("${ftp.mobile-info.local-save-dir}")
    private String localDir;

    @Autowired
    private IEqMobileMacService eqMobileMacService;
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Override
    public void process(Exchange exchange) throws Exception {
        log.info("開始解析wifi下載的文件。。。");
        GenericFileMessage<RandomAccessFile> inFileMessage = (GenericFileMessage<RandomAccessFile>) exchange.getIn();
//        gf.getFile().
        String fileName = inFileMessage.getGenericFile().getFileName();
        File zipFile = new File(localDir + File.separator + fileName);
//        Message message = exchange.getIn();
//        GenericFile<?> gf = (GenericFile<?>) message.getBody();
//        File zipFile = (File) gf.getFile();//兩種File對象獲取方式,這一種有可能會報異常,類型轉換異常
        //解壓文件,
        List<File> files = null;
        String zipName = zipFile.getName();
        if (Pattern.matches(ZipUtil.ZIP_PATTERN, zipName)) {
            if (FileHandlerMap.readedFilesWithMobiles.get(zipFile.getName()) != null) {
                return;
            }
            files = ZipUtil.unZip(zipFile, unZipTempPath, ZipUtil.BCP_PATTERN);
        } else {
            log.debug("下載到不是zip的壓縮文件:" + zipFile.getName());
            return;
        }
        if (files == null) {
            log.error("壓縮文件中沒有解析到bcp文件:" + zipFile.getName());
        } else {
            //讀取文件
            List<String> textList = ZipUtil.batchReadFile(files, "GBK");
            //批量解析文件中數(shù)據(jù)
            List<EqMobileMac> eqMobileMacList = this.batchStrToEntity(textList);
            //刪除臨時解壓文件
            FileUtil.batchDeleteFile(files);
            //批量存入數(shù)據(jù)庫
            if (eqMobileMacList.size() > 0) {
                eqMobileMacService.batchSave(eqMobileMacList);
                for (EqMobileMac eqMobileMac : eqMobileMacList) {
                    redisTemplate.convertAndSend(SIGNALCHANNEL_MAC, JSON.toJSONString(eqMobileMac));
                }
            } else {
                log.debug("文件中沒有解析到任何數(shù)據(jù)!" + unZipTempPath + zipFile.getName());
            }
        }

    }
}

6、粘貼兩個文件處理工具類

ZipUtil.java只用到了兩個解壓方法和一個文件讀取方法;

package com.lilian.utils;

import lombok.extern.slf4j.Slf4j;
import net.lingala.zip4j.exception.ZipException;
import net.lingala.zip4j.model.FileHeader;

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;

/**
 * 文件解壓縮工具類
 * 
 * @Author: 孫龍
 * @Date: 2018/4/24.
 */
@Slf4j
public class ZipUtil {
    public static final String BCP_PATTERN = ".*?\\.bcp";
    public static final String HP_PATTERN = ".*?\\.hp";
    public static final String ZIP_PATTERN = ".*?\\.zip";

    /**
     * 此方法描述的是:ZIP壓縮
     * 
     * @param source
     *            源文件
     * @param dest
     *            壓縮文件
     * @version: 2015年3月2日 上午9:17:26
     */
    public static String zip(File source, File dest) {
        ZipOutputStream out = null;
        BufferedOutputStream bo = null;
        try {
            File zipParent = dest.getParentFile();
            if (!zipParent.exists()) {
                zipParent.mkdirs();
            }
            out = new ZipOutputStream(new FileOutputStream(dest));
            bo = new BufferedOutputStream(out);
            zip(out, source, source.getName(), bo);
            return dest.getAbsolutePath();
        } catch (Exception e) {
            log.error("文件讀取錯誤:" + e.getMessage());
        } finally {
            if (bo != null) {
                try {
                    bo.close();
                } catch (IOException e) {
                }
            }
            if (out != null) {
                try {
                    out.close(); // 輸出流關閉
                } catch (IOException e) {
                }
            }
        }
        return null;
    }

    public static void zip(ZipOutputStream out, File f, String base, BufferedOutputStream bo) throws Exception { // 方法重載
        out.putNextEntry(new ZipEntry(base)); // 創(chuàng)建zip壓縮進入點base
        FileInputStream in = new FileInputStream(f);
        BufferedInputStream bi = new BufferedInputStream(in);
        int b;
        try {
            while ((b = bi.read()) != -1) {
                bo.write(b); // 將字節(jié)流寫入當前zip目錄
            }
        } finally {
            bi.close();
            in.close(); // 輸入流關閉
            bo.close();
        }
    }

    /**
     * 解壓加密壓縮文件
     * 
     * @param zipFile
     * @param outPath
     * @param passwd
     * @return
     * @throws ZipException
     * @throws IOException
     */
    public static List<File> unEncryptZip(File zipFile, String outPath, String fileNameRegexp, String passwd) {
        List<File> extractedFileList = null;
        try {
            net.lingala.zip4j.core.ZipFile zFile = new net.lingala.zip4j.core.ZipFile(zipFile);

            if (!zFile.isValidZipFile()) {
                throw new ZipException("壓縮文件不合法,可能被損壞.文件名:" + zipFile.getName());
            }
            File destDir = new File(outPath);
            if (destDir.isDirectory() && !destDir.exists()) {
                destDir.mkdir();
            }
            if (zFile.isEncrypted()) {
                zFile.setPassword(passwd);
            }
            zFile.extractAll(outPath);
            List<FileHeader> headerList = zFile.getFileHeaders();
            extractedFileList = new ArrayList<File>();
            for (FileHeader fileHeader : headerList) {
                if (!fileHeader.isDirectory()) {
                    if (!Pattern.matches(fileNameRegexp, fileHeader.getFileName())) {
                        //解壓到?jīng)]用的文件,直接刪除
                        new File(destDir, fileHeader.getFileName()).delete();
                        continue;
                    }
                    extractedFileList.add(new File(destDir, fileHeader.getFileName()));
                }
            }
            FileHandlerMap.readedFilesWithMobiles.put(zipFile.getName(), 1);
        } catch (ZipException e) {
            log.error("解壓文件時出錯:" + e.getMessage());
            e.printStackTrace();
        }

        return extractedFileList;
    }

    /**
     * 解壓文件,并將匹配表達式的文件讀取到內(nèi)存
     * 
     * @param zipFile
     * @param outPath
     * @return
     * @throws ZipException
     * @throws IOException
     */
    public static List<File> unZip(File zipFile, String outPath, String fileNameRegexp) {
        List<File> extractedFileList = null;
        try {
            net.lingala.zip4j.core.ZipFile zFile = new net.lingala.zip4j.core.ZipFile(zipFile);
            zFile.setFileNameCharset("GBK");
            if (!zFile.isValidZipFile()) {
                throw new ZipException("壓縮文件不合法,可能被損壞.文件名:" + zipFile.getName());
            }
            File destDir = new File(outPath);
            if (destDir.isDirectory() && !destDir.exists()) {
                destDir.mkdir();
            }

            zFile.extractAll(outPath);

            List<FileHeader> headerList = zFile.getFileHeaders();
            extractedFileList = new ArrayList<>();
            for (FileHeader fileHeader : headerList) {
                if (!fileHeader.isDirectory()) {
                    if (!Pattern.matches(fileNameRegexp, fileHeader.getFileName())) {
                        new File(destDir, fileHeader.getFileName()).delete();//匹配錯誤直接刪除
                        continue;
                    }
                    extractedFileList.add(new File(destDir, fileHeader.getFileName()));
                }
            }
            FileHandlerMap.readedFilesWithMobiles.put(zipFile.getName(), 1);
        } catch (ZipException e) {
            log.error("解壓文件時出錯:" + e.getMessage());
            e.printStackTrace();
        } finally {

        }
        return extractedFileList;
    }

    /**
     * 此方法描述的是:獲得ZIP文件同名解壓目錄
     * 
     * @version: 2015年3月5日 下午2:10:41
     */
    public static String getUnpackForder(File zipFile) {
        String filePath = zipFile.getAbsolutePath();
        return filePath.substring(0, filePath.lastIndexOf("."));
    }

    /**
     * 此方法描述的是:獲得ZIP文件指定解壓目錄
     * 
     * @version: 2015年3月6日 下午10:28:36
     */
    public static String getUnpackForder(File zipFile, String subDir) {
        return zipFile.getParent() + File.separator + subDir;
    }

    /**
     * 批量讀取文件,每一行解析為一個字符串;
     * 
     * @param files
     * @return
     */
    public static List<String> batchReadFile(List<File> files, String charset) throws Exception {
        List<String> textList = new ArrayList<>();
        for (File file : files) {
            List<String> list = readTextFile(file, charset);
            textList.addAll(list);

        }
        return textList;
    }

    /**
     * 將文件中內(nèi)容讀取到內(nèi)存中
     * 
     * @param file
     * @return
     */
    public static List<String> readTextFile(File file, String charset) throws Exception {
        List<String> list = new ArrayList<>();
        InputStreamReader reader = new InputStreamReader(new FileInputStream(file), charset);
        BufferedReader bufferedReader = new BufferedReader(reader);
        String lineText;
        while ((lineText = bufferedReader.readLine()) != null) {
            list.add(lineText);
        }
        return list;

    }

}

FileUtil.java

package com.lilian.utils;

import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.util.ArrayList;
import java.util.List;

@Slf4j
public class FileUtil {

    /**
     * 以行為單位讀取文件,常用于讀面向行的格式化文件
     */
    public static List<String> readFileByLines(File file) {
        BufferedReader reader = null;
        List<String> strList = new ArrayList<>();
        try {
//            System.out.println("以行為單位讀取文件內(nèi)容,一次讀一整行:");
            reader = new BufferedReader(new FileReader(file));
            String tempString = null;
            // 一次讀入一行,直到讀入null為文件結束
            while ((tempString = reader.readLine()) != null) {
                // 顯示行號
                strList.add(tempString);
            }
            reader.close();

        } catch (IOException e) {
            log.error("文件讀取時出現(xiàn)錯誤!");
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                    log.error("流關閉異常!");
                }
            }
        }
        return strList;
    }

    /**
     * 方法追加文件:使用FileWriter
     * @param fileName
     * @param content
     */
    public static void appendMethod(String fileName, String content) {
        try {
            //打開一個寫文件器,構造函數(shù)中的第二個參數(shù)true表示以追加形式寫文件
            FileWriter writer = new FileWriter(fileName, true);
            writer.write(content);
            writer.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 刪除單個文件
     * @param file 要刪除的文件對象
     * @return 單個文件刪除成功返回true,否則返回false
     */
    public static boolean deleteFile(File file) {
        if (file.exists() && file.isFile()) {
            if (file.delete()) {
                return true;
            } else {
                return false;
            }
        } else {
            return false;
        }
    }

    /**
     * 批量刪除文件
     * @param files
     */
    public static void batchDeleteFile(List<File> files) {
        for (File file : files) {
            deleteFile(file);
        }
    }
}

隨后我會將我的一個完整Demo整理出來開源到github上面,希望能夠幫到你!
參考:https://blog.csdn.net/tiantian12234/article/details/77942598 https://blog.csdn.net/sunknew/article/details/79374781

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,569評論 19 139
  • Spring Boot 參考指南 介紹 轉載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,275評論 6 342
  • 曹德旺,一個白手起家的傳奇億萬富豪,一位值得尊敬的佛教徒企業(yè)家。 最開始了解曹德旺,是看了他寫的一篇關于他的糟糠之...
    堅冰至_Monsol閱讀 29,447評論 15 31
  • 人與人交往是有節(jié)奏的,產(chǎn)生共振才是相匹配的氣場。 在一起舒服才是一個人頂級的人格魅力。 時間久了,才知道一個人的人...
    每日愛圖閱讀 320評論 0 0
  • 我總是花費著寶貴的時間在別人看來很沒有價值的事物上。 它們是有價值的。只是我沒能使它們充分發(fā)揮出來。 我是一個慢吞...
    沿海公路邊上的阿簡閱讀 434評論 0 0

友情鏈接更多精彩內(nèi)容