JAVA多線程內(nèi)容比較多,今天寫完了第七篇,最后一篇了,歷時(shí)將近兩周,終于寫完了。
1. 多線程斷點(diǎn)續(xù)傳原理
(1)多線程斷點(diǎn)續(xù)傳,實(shí)際上就是傳遞文件過程中斷了,下次再重新傳遞的時(shí)候,記住中斷的點(diǎn),重新傳遞文件。
(2)但是如何記錄斷開的點(diǎn)呢?多線程的斷點(diǎn)續(xù)傳是通過判斷客戶端的文件長度來來判定傳輸?shù)奈恢玫?,然后服?wù)端設(shè)置到已經(jīng)傳輸完畢的下一個(gè)位置開始繼續(xù)傳輸
(3)然后拆分成了多份,多線程同時(shí)下載,下載過程中通過臨時(shí)文件(拆成多少份需要多少個(gè)臨時(shí)文件)記錄當(dāng)前下載完成的長度,多個(gè)文件都下載完成就完成了
(4)如果到了哪個(gè)位置即使斷了,多份文件中記錄著每份下載完成長度,重新執(zhí)行多線程的時(shí)候仍然分成多份文件,每份文件獨(dú)立判斷是否完成,完成多少,跳轉(zhuǎn)到未完成的位置開始繼續(xù)下載
2. 斷點(diǎn)續(xù)傳的實(shí)現(xiàn)
這個(gè)是我網(wǎng)上找了幾個(gè)例子,只有這個(gè)才是真正的斷點(diǎn)續(xù)傳,其他的帖子都是騙子,只有分份下載,壓根沒續(xù)傳。
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
public class MutilDownload {
private static String path = "https://mirrors.cnnic.cn/apache/tomcat/tomcat-8/v8.5.73/bin/apache-tomcat-8.5.73-deployer.tar.gz";
private static final int threadCount = 3;
private static int runningThread; //標(biāo)識 正在運(yùn)行的線程數(shù)量
public static void main(String[] args) {
try {
URL url = new URL(path);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setConnectTimeout(5000);
int responseCode = conn.getResponseCode();
if (responseCode == 200) {
int contentLength = conn.getContentLength();
runningThread = threadCount;
System.out.println("length" + contentLength);
RandomAccessFile rafAccessFile = new RandomAccessFile("D://WUDownloadCache//apache-tomcat-8.5.73-deployer.tar.gz", "rw");
rafAccessFile.setLength(contentLength);
int blockSize = contentLength / threadCount;
for (int i = 0; i < threadCount; i++) {
int startIndex = i * blockSize; //每個(gè)現(xiàn)成下載的開始位置
int endIndex = (i + 1) * blockSize - 1;// 每個(gè)線程的結(jié)束位置
if (i == threadCount - 1) {
//最后一個(gè)線程
endIndex = contentLength - 1;
}
new DownloadThread(startIndex, endIndex, i).start();
}
}
} catch (Exception e) {
}
}
private static class DownloadThread extends Thread {
private int startIndex;
private int endIndex;
private int threadId;
public DownloadThread(int startIndex, int endIndex, int threadId) {
this.startIndex = startIndex;
this.endIndex = endIndex;
this.threadId = threadId;
}
@Override
public void run() {
try {
URL url = new URL(path);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setConnectTimeout(5000);
File file = new File("D://WUDownloadCache//"+threadId + ".txt");
if(file.exists() && file.length() > 0) {
FileInputStream fis = new FileInputStream(file);
BufferedReader buff = new BufferedReader(new InputStreamReader(fis));
String lastPosition = buff.readLine();// 讀取出來的內(nèi)容就是上次下載的位置
int lastPos = Integer.parseInt(lastPosition);
System.out.println("線程id:" + threadId + "當(dāng)前線程下載的位置:-----"+ lastPos);
startIndex = lastPos;
fis.close();
buff.close();
}
conn.setRequestProperty("Range", "bytes=" + startIndex + "-" + endIndex); //固定寫法,請求部分資源
int responseCode = conn.getResponseCode(); // 206表示請求部分資源
if (responseCode == 206) {
RandomAccessFile rafAccessFile = new RandomAccessFile("D://WUDownloadCache//apache-tomcat-8.5.73-deployer.tar.gz", "rw");
rafAccessFile.seek(startIndex);
InputStream is = conn.getInputStream();
int len = -1;
byte[] buffer = new byte[1024];
int total = 0; // 代表當(dāng)前線程下載的大小
while ((len = is.read(buffer)) != -1) {
rafAccessFile.write(buffer, 0, len);
total += len;
//斷點(diǎn)續(xù)傳, 保存當(dāng)前線程下載的位置
int currentThreadPosition = startIndex + total; //當(dāng)前線程下載的位置
// 存儲當(dāng)線程的下載五位置
RandomAccessFile raff = new RandomAccessFile("D://WUDownloadCache//"+threadId +".txt", "rwd");
raff.write(String.valueOf(currentThreadPosition).getBytes());
raff.close();
}
rafAccessFile.close();
System.out.println("線程" + threadId + "下載完成");
//刪除臨時(shí)文件
// File tempFile = new File(threadId + ".txt");
// if(tempFile.exists()) {
// file.delete();
// }
synchronized (MutilDownload.class) {
runningThread--;
if(runningThread == 0) {
for (int i = 0; i < threadCount; i++) {
File deleteFile = new File(i + ".txt");
deleteFile.delete();
}
}
}
}
} catch (Exception e) {
}
}
}
}
3. 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式
簡易版本,只有發(fā)送和消費(fèi),中間用queue實(shí)現(xiàn)。
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import static java.lang.Thread.*;
public class ProducerAndConsumer {
public static final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(20);
public static void main(String[] args) {
new Thread(new Producer()).start();
try {
sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Consumer()).start();
}
static class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 25; i++) {
try {
queue.add(i + "");
}catch(Exception e){
e.printStackTrace();
if(e.getClass().equals(IllegalStateException.class)&&e.getMessage().equals("Queue full")){
waitTest();
}
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
while(true) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = queue.poll();
System.out.println(result);
if(Objects.isNull(result)){
notifyTest();
}
}
}
}
public static synchronized void notifyTest() {
ProducerAndConsumer.class.notify();
}
public static synchronized void waitTest() {
try {
ProducerAndConsumer.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4. 用Java實(shí)現(xiàn)阻塞隊(duì)列
(1)什么是阻塞隊(duì)列:在隊(duì)列為空時(shí),獲取元素的線程會等待隊(duì)列變?yōu)榉强?。?dāng)隊(duì)列滿時(shí),存儲元素的線程會等待隊(duì)列可用。阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。
(2)主要就是存入和取出的處理,存入時(shí)候需要判斷是否隊(duì)列已經(jīng)滿了,滿了需要等待隊(duì)列有數(shù)據(jù)被取出才能放入,如果存入數(shù)據(jù)要通知等待取數(shù)據(jù)的隊(duì)列;取出時(shí)候,要判斷隊(duì)列是否為空,如果空了就要等待隊(duì)列有數(shù)據(jù)再取出,如果取出數(shù)據(jù)了要通知等待存入數(shù)據(jù)的隊(duì)列。
(3)另外注意一定要在存/取的時(shí)候加鎖,避免線程并發(fā)存儲或者取出數(shù)據(jù)導(dǎo)致的問題
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockingQueueTest<E> {
public static void main(String[] args) {
BlockingQueueTest<String> test = new BlockingQueueTest<String>(10);
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 15; i++) {
test.add(i+"");
System.out.println("add "+ i );
}
}
}).start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("get started");
while (true) {
String get = test.get();
System.out.println("get "+ get );
}
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private LinkedList<E> list = new LinkedList<>();
private static int MAX_SIZE;
private final static Object lock = new Object();
public BlockingQueueTest(int size) {
this.MAX_SIZE = size;
}
//原子性操作數(shù)字的加減
private AtomicInteger count = new AtomicInteger(0);
public synchronized void waitTest() {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void add(E e) {
synchronized (lock) {
//滿了
while(MAX_SIZE == count.get()){
waitTest();
}
list.add(e);
//count++
count.incrementAndGet();
lock.notify();
}
}
public E get(){
synchronized (lock) {
//滿了
while(0 == count.get()){
waitTest();
}
E e = list.removeFirst();
//count--
count.decrementAndGet();
lock.notify();
return e;
}
}
}
5. BlockingQueue介紹:
(1)上邊54代碼講的夠詳細(xì)了,下面主要講講他的方法吧
(2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當(dāng)前執(zhí)行方法的線程)
(3)offer(E o, long timeout, TimeUnit unit):可以設(shè)定等待的時(shí)間,如果在指定的時(shí)間內(nèi),還不能往隊(duì)列中加入BlockingQueue,則返回失敗
(4)put(anObject):把a(bǔ)nObject加到BlockingQueue里,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續(xù)
(5)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數(shù)規(guī)定的時(shí)間,取不到時(shí)返回null;
(6)poll(long timeout, TimeUnit unit):從BlockingQueue取出一個(gè)隊(duì)首的對象,如果在指定時(shí)間內(nèi),隊(duì)列一旦有數(shù)據(jù)可取,則立即返回隊(duì)列中的數(shù)據(jù)。否則知道時(shí)間超時(shí)還沒有數(shù)據(jù)可取,返回失敗。
(7)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入;
(8)drainTo():一次性從BlockingQueue獲取所有可用的數(shù)據(jù)對象(還可以指定獲取數(shù)據(jù)的個(gè)數(shù)),通過該方法,可以提升獲取數(shù)據(jù)效率;不需要多次分批加鎖或釋放鎖。
6. Java中的同步集合與并發(fā)集合有什么區(qū)別?
(1)同步集合,就是集合的方法都有synchronized的集合,線程安全,效率不高。
(2)并發(fā)集合,Concurrent下面的集合,比如ConcurrentHashMap通過分段的方式控制多線程并發(fā)沖突,也是線程安全,效率較高。