python beanstalk 使用

import greenstalk


class BeanStalk:
    def __init__(self, host, port, encoding='utf-8', use='default', watch='default'):
        self.conn = greenstalk.Client(host=host, port=port, encoding=encoding, use=use, watch=watch)

    def producer(self, body, priority=65536, delay=0, ttr=60, tube='default'):
        """
        生產(chǎn)者
        :param body: job 類型為bytes或者str
        :param priority: 優(yōu)先級 0-2^32 0優(yōu)先級最大
        :param delay: 延遲
        :param ttr: time-to-run 超時重發(fā)
        :param tube: 使用哪個管道 默認(rèn)default
        :return: job id
        """
        self.use(tube)
        return self.conn.put(body, priority, delay, ttr)

    def consumer(self, timeout=None):
        """
        消費者
        :param timeout: 一直處于阻塞狀態(tài) 將等待多長時間來接收這個job。如果這個reserve的timeout時間到了,它將返回TimedOutError
        :return: Job類型
        """
        return self.conn.reserve(timeout)

    def use(self, tube='default'):
        """
        使用哪個管道
        :param tube: 管道名稱 str
        :return: None
        """
        self.conn.use(tube)

    def delete(self, job):
        """
        刪除一個job
        :param job: job或者jobId
        :return: None
        """
        self.conn.delete(job)

    def relase(self, job, priority=65536, delay=0):
        """
        將一個被消費的任務(wù)重新加入隊列
        :param job: 任務(wù)
        :param priority: 優(yōu)先級
        :param delay: 延遲
        :return: None
        """
        self.conn.release(job, priority, delay)

    def bury(self, job, priority=65536):
        """
        將job放到一個特殊的FIFO隊列中,之后不能被reserve命令獲取,但可以用kick命令扔回工作隊列中,之后就能被消費了(相當(dāng)于“邏輯刪除”)
        :param job: 任務(wù)
        :param priority: 優(yōu)先級
        :return: None
        """
        self.conn.bury(job, priority)

    def kick(self, bound):
        """
        :param bound: 最大kick的任務(wù)數(shù)
        :return: 返回受影響任務(wù)的數(shù)量
        """
        return self.conn.kick(bound)

    def kick_job(self, job):
        """
        把delayed 或者 buried 的任務(wù)加到ready隊列
        :param job: job或者id
        :return: None
        """
        self.conn.kick_job(job)

    def touch(self, job):
        """
        讓任務(wù)重新計算任務(wù)超時重發(fā)ttr時間,相當(dāng)于給任務(wù)延長壽命
        :param job: 任務(wù)
        :return: None
        """
        self.conn.touch(job)

    def watch(self, tube):
        """
        watch一個管道,影響的是消費者操作 當(dāng)監(jiān)控多個tube時,只要有一個tube有數(shù)據(jù)到來,reserve會返回
        watch和use是兩個獨立的動作,use一個tube不代表watching它了,反之watch一個tube也不代表using它
        :param tube: 管道
        :return: 返回監(jiān)聽管道的數(shù)量
        """
        return self.conn.watch(tube)

    def ignore(self, tube):
        """
        取消對管道的監(jiān)聽
        :param tube: 管道
        :return: 返回監(jiān)聽管道的數(shù)量
        """
        return self.conn.ignore(tube)

    def peek(self, jobid):
        """
        查看job的信息
        :param jobid:
        :return: 返回job類型
        """
        return self.conn.peek(jobid)

    def peek_ready(self):
        """
        獲取當(dāng)前管道下一個處于ready狀態(tài)的任務(wù)
        :return: Job
        """
        return self.conn.peek_ready()

    def peek_delayed(self):
        """
        獲取當(dāng)前管道下一個處于delay狀態(tài)的任務(wù)
        :return: Job
        """
        return self.conn.peek_delayed()

    def peek_buried(self):
        """
        獲取當(dāng)前管道最早處于buried狀態(tài)的任務(wù)
        :return: Job
        """
        return self.conn.peek_buried()

    def stats_job(self, job):
        """
        查看任務(wù)的狀態(tài) job有四種狀態(tài)
        ready,需要立即處理的任務(wù),當(dāng)延時 (delayed) 任務(wù)到期后會自動成為當(dāng)前任務(wù);
        delayed,延遲執(zhí)行的任務(wù), 當(dāng)消費者處理任務(wù)后,可以用將消息再次放回 delayed 隊列延遲執(zhí)行;
        reserved,已經(jīng)被消費者獲取, 正在執(zhí)行的任務(wù),Beanstalkd 負(fù)責(zé)檢查任務(wù)是否在 TTR(time-to-run) 內(nèi)完成;
        buried,保留的任務(wù): 任務(wù)不會被執(zhí)行,也不會消失,除非有人把它 "踢" 回隊列;
        :param job: job id
        :return: dict
        """
        return self.conn.stats_job(job)

    def stats_tube(self, tube):
        """
        查看管道的狀態(tài)
        :param tube: 管道名稱
        :return: dict
        """
        return self.conn.stats_tube(tube)

    def stats(self):
        """
        返回系統(tǒng)的狀態(tài)
        :return: dict
        """
        return self.conn.stats()

    def tubes(self):
        """
        返回所有的管道名稱
        :return list
        """
        return self.conn.tubes()

    def using(self):
        """
        返回正在使用的管道名稱
        :return: str 管道名稱
        """
        return self.conn.using()

    def watching(self):
        """
        返回正在使用的管道名稱
        :return: list 管道名稱列表
        """
        return self.conn.watching()

    def pause_tube(self, tube, delay):
        """
        暫停一個管道,delay期間的job無法使用
        :param tube: str 管道名稱
        :param delay: 延遲
        :return: None
        """
        self.conn.pause_tube(tube, delay)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容