多線(xiàn)程下鎖的應(yīng)用


給女朋友上鎖

有一天夢(mèng)見(jiàn)女朋友跟一個(gè)陌生男人逛街,我很是著急,于是有很多志同道合的朋友開(kāi)始為我出謀劃策。有說(shuō),讓那個(gè)男的指向null,讓垃圾回收他。 也有的說(shuō)給個(gè)死循環(huán),讓他們逛到累,累死他們。。。沒(méi)錯(cuò),你們說(shuō)的都有道理,但是,如果換是我,我會(huì)給自己女朋友逛街這個(gè)行為上鎖,并且只有我才能獲取到鎖,也不會(huì)把鎖讓給別人。好了,扯完,開(kāi)始進(jìn)入正題,沒(méi)錯(cuò),就是鎖。

相信大家在開(kāi)發(fā)過(guò)程中都會(huì)遇到這樣一種場(chǎng)景:瀏覽器端發(fā)起一個(gè)請(qǐng)求,服務(wù)器接收到請(qǐng)求后要去數(shù)據(jù)庫(kù)中把數(shù)據(jù)加載回來(lái),對(duì)數(shù)據(jù)做處理,是不是很簡(jiǎn)單?但是當(dāng)請(qǐng)求過(guò)于頻繁或請(qǐng)求量比較大,并且數(shù)據(jù)庫(kù)表數(shù)據(jù)量又大的情況,這時(shí)候我們就不得不去關(guān)心它的響應(yīng)時(shí)間、性能怎么樣等等這些問(wèn)題了,因?yàn)樗赡軙?huì)影響到整個(gè)業(yè)務(wù)流程,甚至整個(gè)系統(tǒng)。為解決這個(gè)問(wèn)題,相信很多人會(huì)想到用多線(xiàn)程來(lái)解決。即在程序中開(kāi)多個(gè)線(xiàn)程并發(fā)去處理請(qǐng)求,也就是并發(fā)編程,從用戶(hù)角度看還是一個(gè)串行過(guò)程,實(shí)際上是并發(fā)在處理,很顯然這樣做可以提升響應(yīng)效率。但這又會(huì)引起另外一個(gè)問(wèn)題,那就是線(xiàn)程安全,并發(fā)編程有三要素:原子性、有序性、可見(jiàn)性。在多線(xiàn)程編程中要遵守好這三大要素,如果程序沒(méi)有處理好,可能會(huì)造成一些意想不到的后果……

針對(duì)這些問(wèn)題,jdk提供了一些線(xiàn)程安全的接口和類(lèi),例如我們熟悉的Vector、HashTable。java還提供了synchronized關(guān)鍵字和修飾符valitate來(lái)保證線(xiàn)程安全,這兩種都是利用鎖來(lái)實(shí)現(xiàn)的。
  • valitate
    被valitate修飾的變量,當(dāng)一個(gè)線(xiàn)程改變了它的值時(shí),在內(nèi)存中相對(duì)其他線(xiàn)程來(lái)說(shuō)是可見(jiàn)的。還可以防止重排,即程序按代碼的順序來(lái)執(zhí)行,防止順序被打亂。但是它并不能保證原子性。

  • synchronized關(guān)鍵字
    synchronized是jdk定義實(shí)現(xiàn)的鎖,確保程序能夠?qū)ν綁K或方法互斥訪(fǎng)問(wèn),即當(dāng)一個(gè)線(xiàn)程獲取到鎖后,別的線(xiàn)程只能等待,可以保證原子性。
    本文主要是對(duì)synchronized關(guān)鍵字來(lái)展開(kāi)說(shuō)明,以下為synchronized的一個(gè)例子:

案例一
大量請(qǐng)求同時(shí)讀取數(shù)據(jù)庫(kù)表load回?cái)?shù)據(jù)寫(xiě)到一個(gè)文件(比如excel)中,這個(gè)表是分布在不同的庫(kù)中,并且分表的。若單線(xiàn)程去處理這樣的請(qǐng)求,勢(shì)必會(huì)耗時(shí)比較久,甚至因?yàn)橐恍┞樵?xún)導(dǎo)致連接耗盡,造成嚴(yán)重后果。

思路:
使用ExecutorService接口來(lái)實(shí)現(xiàn)多線(xiàn)程,ExecutorService是在包java.util.concurrent下Java中對(duì)線(xiàn)程池定義的一個(gè)接口,實(shí)現(xiàn)異步執(zhí)行的機(jī)制,讓任務(wù)在后臺(tái)執(zhí)行。

ExcelUtils.java

// 使用poi的SXSSFWorkbook支持導(dǎo)出大數(shù)據(jù)
 private SXSSFWorkbook sxworkBook; 
 private OutputStream out;

 public ExcelBuilder(String path, String fileName) {
   sxworkBook = new SXSSFWorkbook(1000);
   try {
      File file = new File(path);
      if (!file.exists()) {
          file.mkdirs();
      }
      File savePath = new File(path + "/" + fileName);
      this.out = new FileOutputStream(savePath);
   } catch (FileNotFoundException e) {
  }
 }
 
public <T> void writeFile(String sheetname, List<T> dataList, Class<T> clazz) {
   …… 此處寫(xiě)文件,具體參考POI寫(xiě)文件API
}

public void create(){
 try {
        sxworkBook.write(out);
        out.flush();
    } catch (IOException e) {
         logger.error(e.getMessage(), e);
    } finally {
         // IOUtils.closeQuietly(out);
    }
}

ThreadTask.java

String dir = "C:/director";
String fileName = "xxxxx.xlsx";
ExcelUtils instance = new ExcelUtils(dir, fileName);
ExecutorService executor = Executors.newCachedThreadPool();

// 開(kāi)兩個(gè)線(xiàn)程處理
executor.execute(new InnerThread(queryParam1, instance));
executor.execute(new InnerThread(queryParam2, instance));

executor.shutdown();
while(!executor.awaitTermination(1, TimeUnit.SECONDS));

instance.create();




// 定義一個(gè)內(nèi)部線(xiàn)程類(lèi) 
class InnerThread implements Runnable{
  private Object queryParam;
  private ExcelUtils instance;
  Object obj = new Object();

  InnerThread(Object queryParam, ExcelUtils instance){
    this.queryParam = queryParam;
    this.instance = instance;
  }
  
  @Override
  public void run() {
     List<XXX> lists = service.query(queryParam); // service是查詢(xún)接口類(lèi)
     synchronized(obj){
        write(lists);
     }
  }
  
  public void write(List<Vo> lists){
     logger.info("當(dāng)前線(xiàn)程:" + Thread.currentThread().getName());
     instance.writeFile(System.currentTimeMillis() + "", lists, XXX.class);
  }
}

到這里就可以實(shí)現(xiàn)多線(xiàn)程讀DB加載數(shù)據(jù),多線(xiàn)程寫(xiě)文件了,但是,事實(shí)并不是那樣,報(bào)異常了?。。。。。?/p>

很明顯,write方法使用關(guān)鍵字synchronized加鎖無(wú)效, 多個(gè)線(xiàn)程同時(shí)進(jìn)入同時(shí)寫(xiě)文件,從而導(dǎo)致出現(xiàn)異常。那么為什么加了synchronized還出現(xiàn)并發(fā)寫(xiě)呢? 先看看synchronized的定義,如下:(本段摘自百度百科)
Java語(yǔ)言的關(guān)鍵字,可用來(lái)給對(duì)象和方法或者代碼塊加鎖,當(dāng)它鎖定一個(gè)方法或者一個(gè)代碼塊的時(shí)候,同一時(shí)刻最多只有一個(gè)線(xiàn)程執(zhí)行這段代碼。當(dāng)兩個(gè)并發(fā)線(xiàn)程訪(fǎng)問(wèn)同一個(gè)對(duì)象object中的這個(gè)加鎖同步代碼塊時(shí),一個(gè)時(shí)間內(nèi)只能有一個(gè)線(xiàn)程得到執(zhí)行。另一個(gè)線(xiàn)程必須等待當(dāng)前線(xiàn)程執(zhí)行完這個(gè)代碼塊以后才能執(zhí)行該代碼塊。
原來(lái)synchronized修飾的代碼塊,必須要實(shí)例相同才能鎖住代碼塊,也就是只有一個(gè)線(xiàn)程可以執(zhí)行該塊內(nèi)容。
于是做了以下修改:

class InnerThread implements Runnable{
  private Object queryParam;
  private ExcelUtils instance;
  private Lock lock;

  InnerThread(Object queryParam, ExcelUtils instance, Lock lock){
    this.queryParam = queryParam;
    this.instance = instance;
    this.lock = lock; // 傳入同一個(gè)實(shí)例
  }
  
  @Override
  public void run() {
     List<XXX> lists = service.query(queryParam); // service是查詢(xún)接口類(lèi)
     synchronized(lock){
        write(lists);
     }
  }
  
  public void write(List<Vo> lists){
     logger.info("當(dāng)前線(xiàn)程:" + Thread.currentThread().getName());
     instance.writeFile(System.currentTimeMillis() + "", lists, XXX.class);
  }
}
Lock lock = new ReentrantLock();
// 開(kāi)兩個(gè)線(xiàn)程處理
executor.execute(new InnerThread(queryParam1, instance, lock));
executor.execute(new InnerThread(queryParam2, instance, lock));

再次執(zhí)行,生成文件成功。


注意到一點(diǎn),這里只對(duì)寫(xiě)文件部分加了鎖,對(duì)于讀DB加載數(shù)據(jù)返回并沒(méi)有加鎖,load數(shù)據(jù)依然是多線(xiàn)程并行去請(qǐng)求DB,在響應(yīng)效率上得到了較高提升。

在上面的單機(jī)場(chǎng)景中,我們可以運(yùn)用ava中提供的很多并發(fā)處理相關(guān)的API,但是這些API在分布式場(chǎng)景中就無(wú)能為力了,由于分布式系統(tǒng)的分布性,即多線(xiàn)程和多進(jìn)程并且分布在不同機(jī)器中,synchronized這種鎖將失去原有鎖的效果,這時(shí)候就需要我們自己實(shí)現(xiàn)分布式鎖。

案例二
redis基于緩存,實(shí)現(xiàn)分布式鎖,利用redis的鎖機(jī)制來(lái)實(shí)現(xiàn)分布式鎖。

思路:
利用redis接口API對(duì)對(duì)象就行上鎖,并且設(shè)置過(guò)期時(shí)間,實(shí)現(xiàn)并發(fā)編程。

RedisClient.java

public class RedisClient{
    private Logger log = LoggerFactory.getLogger(RedisClient.class);
    private JedisPool jedisPool;
    private Jedis jedis;
    String lock;
    long expires = 5000;
    
    public RedisClient(String lock) {
       this.lock = lock;
       this.init();
    }
    private void init() {
        // 池基本配置
        JedisPoolConfig config = new JedisPoolConfig();
        // config.setMaxActive(20);
        config.setMaxIdle(5);
        // config.setMaxWait(1000l);
        config.setTestOnBorrow(false);

        jedisPool = new JedisPool(config, "XX.XXX.XXX.XX", 6400);
        jedis = jedisPool.getResource();
    }
    
    public boolean getLock() {
        while (true) {
            boolean lock = setlock();
            if (lock) {
                return lock;
            }
        }
    }
    
    public boolean setlock() {
        long currentTime = System.currentTimeMillis();
        String expire = String.valueOf(currentTime + expires);
        if (jedis.setnx(lock, expire) > 0) {
            log.info("當(dāng)前線(xiàn)程:" + Thread.currentThread().getName() + "獲取到鎖");
            jedis.expire(lock, 5);
            return true;
        } else {
            String oldTime = jedis.get(lock);
            if (oldTime != null && (currentTime - Long.parseLong(oldTime)) > 0) {
                String oldValue = jedis.getSet(lock, expire);
                if (oldValue != null && oldValue.equals(oldTime)) {
                    jedis.expire(lock, 5);
                    log.info("過(guò)期了,讓其它線(xiàn)程獲取鎖,當(dāng)前線(xiàn)程:" + Thread.currentThread().getName() + "獲取到鎖");
                    return true;
                }
            }
        }

        return false;
    }
}

ThreadTest.java

public class ThreadTest extends Thread {
    private Logger log = LoggerFactory.getLogger(Main.class);
    String lock;

    ThreadTest(String lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        RedisClient client = new RedisClient(lock);
        boolean hasLock = client.getLock();
        if (hasLock) {
            log.info(Thread.currentThread().getName() + "開(kāi)始執(zhí)行……");
        }
    }
}

Main.java

public class Main {
    public static void main(String[] args) {
        String lock = "key";
        new ThreadTest(lock).start();
        new ThreadTest(lock).start();
        new ThreadTest(lock).start();
    }

}

運(yùn)行效果:



總結(jié):
隨著日益增長(zhǎng)的業(yè)務(wù)量,數(shù)據(jù)越來(lái)越大,處理請(qǐng)求響應(yīng)速度也要隨著提升,并發(fā)編程就必不可少,在單機(jī)多線(xiàn)程下,我們可以使用jdk提供的并發(fā)處理相關(guān)的API來(lái)解決我們的問(wèn)題,例如上面提到的valitate,synchronized,還有CAS,也可以在業(yè)務(wù)上實(shí)現(xiàn)鎖的機(jī)制,比如說(shuō)在數(shù)據(jù)庫(kù)變層面來(lái)處理。 但在也正是因?yàn)闃I(yè)務(wù)量越來(lái)越大,需求更復(fù)雜的前提下,系統(tǒng)分布式部署就越來(lái)越重要,負(fù)載均衡,多節(jié)點(diǎn),多機(jī)器部署勢(shì)必帶來(lái)更多的問(wèn)題,因此分布式鎖就廣泛被使用,多種實(shí)現(xiàn)也隨之出現(xiàn),如前面提到的基于緩存redis,還有基于Zookeeper實(shí)現(xiàn)分布式鎖等等。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容