JAVA面試匯總(二)多線程(七)

JAVA多線程內(nèi)容比較多,今天寫完了第七篇,最后一篇了,歷時(shí)將近兩周,終于寫完了。

1. 多線程斷點(diǎn)續(xù)傳原理

(1)多線程斷點(diǎn)續(xù)傳,實(shí)際上就是傳遞文件過程中斷了,下次再重新傳遞的時(shí)候,記住中斷的點(diǎn),重新傳遞文件。
(2)但是如何記錄斷開的點(diǎn)呢?多線程的斷點(diǎn)續(xù)傳是通過判斷客戶端的文件長度來來判定傳輸?shù)奈恢玫?,然后服?wù)端設(shè)置到已經(jīng)傳輸完畢的下一個(gè)位置開始繼續(xù)傳輸
(3)然后拆分成了多份,多線程同時(shí)下載,下載過程中通過臨時(shí)文件(拆成多少份需要多少個(gè)臨時(shí)文件)記錄當(dāng)前下載完成的長度,多個(gè)文件都下載完成就完成了
(4)如果到了哪個(gè)位置即使斷了,多份文件中記錄著每份下載完成長度,重新執(zhí)行多線程的時(shí)候仍然分成多份文件,每份文件獨(dú)立判斷是否完成,完成多少,跳轉(zhuǎn)到未完成的位置開始繼續(xù)下載

2. 斷點(diǎn)續(xù)傳的實(shí)現(xiàn)

這個(gè)是我網(wǎng)上找了幾個(gè)例子,只有這個(gè)才是真正的斷點(diǎn)續(xù)傳,其他的帖子都是騙子,只有分份下載,壓根沒續(xù)傳。

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;

public class MutilDownload {
    private static String path = "https://mirrors.cnnic.cn/apache/tomcat/tomcat-8/v8.5.73/bin/apache-tomcat-8.5.73-deployer.tar.gz";
    private static final int threadCount = 3;
    private static int runningThread; //標(biāo)識 正在運(yùn)行的線程數(shù)量
    public static void main(String[] args) {
        try {
            URL url = new URL(path);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("GET");
            conn.setConnectTimeout(5000);
            int responseCode = conn.getResponseCode();
            if (responseCode == 200) {
                int contentLength = conn.getContentLength();
                runningThread = threadCount;
                System.out.println("length" + contentLength);
                RandomAccessFile rafAccessFile = new RandomAccessFile("D://WUDownloadCache//apache-tomcat-8.5.73-deployer.tar.gz", "rw");
                rafAccessFile.setLength(contentLength);
                int blockSize = contentLength / threadCount;
                for (int i = 0; i < threadCount; i++) {
                    int startIndex = i * blockSize; //每個(gè)現(xiàn)成下載的開始位置
                    int endIndex = (i + 1) * blockSize - 1;// 每個(gè)線程的結(jié)束位置
                    if (i == threadCount - 1) {
                        //最后一個(gè)線程
                        endIndex = contentLength - 1;
                    }
                    new DownloadThread(startIndex, endIndex, i).start();
                }
            }
        } catch (Exception e) {
        }
    }

    private static class DownloadThread extends Thread {
        private int startIndex;
        private int endIndex;
        private int threadId;
        public DownloadThread(int startIndex, int endIndex, int threadId) {
            this.startIndex = startIndex;
            this.endIndex = endIndex;
            this.threadId = threadId;
        }
        @Override
        public void run() {
            try {
                URL url = new URL(path);
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.setRequestMethod("GET");
                conn.setConnectTimeout(5000);
                File file = new File("D://WUDownloadCache//"+threadId + ".txt");
                if(file.exists() && file.length() > 0) {
                    FileInputStream fis = new FileInputStream(file);
                    BufferedReader buff = new BufferedReader(new InputStreamReader(fis));
                    String lastPosition = buff.readLine();// 讀取出來的內(nèi)容就是上次下載的位置
                    int lastPos = Integer.parseInt(lastPosition);
                    System.out.println("線程id:" + threadId + "當(dāng)前線程下載的位置:-----"+ lastPos);
                    startIndex = lastPos;
                    fis.close();
                    buff.close();
                }
                conn.setRequestProperty("Range", "bytes=" + startIndex + "-" + endIndex); //固定寫法,請求部分資源
                int responseCode = conn.getResponseCode();  // 206表示請求部分資源
                if (responseCode == 206) {
                    RandomAccessFile rafAccessFile = new RandomAccessFile("D://WUDownloadCache//apache-tomcat-8.5.73-deployer.tar.gz", "rw");
                    rafAccessFile.seek(startIndex);
                    InputStream is = conn.getInputStream();
                    int len = -1;
                    byte[] buffer = new byte[1024];
                    int total = 0; // 代表當(dāng)前線程下載的大小
                    while ((len = is.read(buffer)) != -1) {
                        rafAccessFile.write(buffer, 0, len);
                        total += len;
                        //斷點(diǎn)續(xù)傳, 保存當(dāng)前線程下載的位置
                        int currentThreadPosition = startIndex + total; //當(dāng)前線程下載的位置
                        // 存儲當(dāng)線程的下載五位置
                        RandomAccessFile raff = new RandomAccessFile("D://WUDownloadCache//"+threadId +".txt", "rwd");
                        raff.write(String.valueOf(currentThreadPosition).getBytes());
                        raff.close();
                    }
                    rafAccessFile.close();
                    System.out.println("線程" + threadId + "下載完成");
                    //刪除臨時(shí)文件
//                    File tempFile = new File(threadId + ".txt");
//                    if(tempFile.exists()) {
//                        file.delete();
//                    }
                    synchronized (MutilDownload.class) {
                        runningThread--;
                        if(runningThread == 0) {
                            for (int i = 0; i < threadCount; i++) {
                                File deleteFile = new File(i + ".txt");
                                deleteFile.delete();
                            }
                        }
                    }
                }
            } catch (Exception e) {
            }
        }
    }
}

3. 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式

簡易版本,只有發(fā)送和消費(fèi),中間用queue實(shí)現(xiàn)。

import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;

import static java.lang.Thread.*;

public class ProducerAndConsumer {
    public static final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(20);
    public static void main(String[] args) {
        new Thread(new Producer()).start();
        try {
            sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(new Consumer()).start();
    }
    static class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 25; i++) {
                try {
                    queue.add(i + "");
                }catch(Exception e){
                    e.printStackTrace();
                    if(e.getClass().equals(IllegalStateException.class)&&e.getMessage().equals("Queue full")){
                        waitTest();
                    }
                }
            }
        }
    }
    static class Consumer implements Runnable {
        @Override
        public void run() {
            while(true) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String result = queue.poll();
                System.out.println(result);
                if(Objects.isNull(result)){
                    notifyTest();
                }
            }
        }
    }
    public static synchronized void notifyTest() {
        ProducerAndConsumer.class.notify();
    }
    public static synchronized void waitTest() {
        try {
            ProducerAndConsumer.class.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

4. 用Java實(shí)現(xiàn)阻塞隊(duì)列

(1)什么是阻塞隊(duì)列:在隊(duì)列為空時(shí),獲取元素的線程會等待隊(duì)列變?yōu)榉强?。?dāng)隊(duì)列滿時(shí),存儲元素的線程會等待隊(duì)列可用。阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。
(2)主要就是存入和取出的處理,存入時(shí)候需要判斷是否隊(duì)列已經(jīng)滿了,滿了需要等待隊(duì)列有數(shù)據(jù)被取出才能放入,如果存入數(shù)據(jù)要通知等待取數(shù)據(jù)的隊(duì)列;取出時(shí)候,要判斷隊(duì)列是否為空,如果空了就要等待隊(duì)列有數(shù)據(jù)再取出,如果取出數(shù)據(jù)了要通知等待存入數(shù)據(jù)的隊(duì)列。
(3)另外注意一定要在存/取的時(shí)候加鎖,避免線程并發(fā)存儲或者取出數(shù)據(jù)導(dǎo)致的問題

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;

public class BlockingQueueTest<E> {
    public static void main(String[] args) {
        BlockingQueueTest<String> test = new BlockingQueueTest<String>(10);
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 15; i++) {
                    test.add(i+"");
                    System.out.println("add "+ i );
                }
            }
        }).start();
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("get started");
                while (true) {
                    String get = test.get();
                    System.out.println("get "+ get );
                }
            }
        }).start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private LinkedList<E> list = new LinkedList<>();
    private static int MAX_SIZE;
    private final static Object lock = new Object();
    public BlockingQueueTest(int size) {
        this.MAX_SIZE = size;
    }
    //原子性操作數(shù)字的加減
    private AtomicInteger count = new AtomicInteger(0);
    public synchronized void waitTest() {
        try {
            lock.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void add(E e) {
        synchronized (lock) {
            //滿了
            while(MAX_SIZE == count.get()){
                waitTest();
            }
            list.add(e);
            //count++
            count.incrementAndGet();
            lock.notify();
        }
    }
    public E get(){
        synchronized (lock) {
            //滿了
            while(0 == count.get()){
                waitTest();
            }
            E e = list.removeFirst();
            //count--
            count.decrementAndGet();
            lock.notify();
            return e;
        }
    }
}

5. BlockingQueue介紹:

(1)上邊54代碼講的夠詳細(xì)了,下面主要講講他的方法吧
(2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當(dāng)前執(zhí)行方法的線程)      
(3)offer(E o, long timeout, TimeUnit unit):可以設(shè)定等待的時(shí)間,如果在指定的時(shí)間內(nèi),還不能往隊(duì)列中加入BlockingQueue,則返回失敗
(4)put(anObject):把a(bǔ)nObject加到BlockingQueue里,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續(xù)
(5)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數(shù)規(guī)定的時(shí)間,取不到時(shí)返回null;
(6)poll(long timeout, TimeUnit unit):從BlockingQueue取出一個(gè)隊(duì)首的對象,如果在指定時(shí)間內(nèi),隊(duì)列一旦有數(shù)據(jù)可取,則立即返回隊(duì)列中的數(shù)據(jù)。否則知道時(shí)間超時(shí)還沒有數(shù)據(jù)可取,返回失敗。
(7)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入;
(8)drainTo():一次性從BlockingQueue獲取所有可用的數(shù)據(jù)對象(還可以指定獲取數(shù)據(jù)的個(gè)數(shù)),通過該方法,可以提升獲取數(shù)據(jù)效率;不需要多次分批加鎖或釋放鎖。

6. Java中的同步集合與并發(fā)集合有什么區(qū)別?

(1)同步集合,就是集合的方法都有synchronized的集合,線程安全,效率不高。
(2)并發(fā)集合,Concurrent下面的集合,比如ConcurrentHashMap通過分段的方式控制多線程并發(fā)沖突,也是線程安全,效率較高。

感謝各位的閱讀,幫忙點(diǎn)贊,感謝各位。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容