Kafka多分區(qū)下二分法查找指定時間戳的offset

python消費Kafka的時候,不能指定時間戳開始消費,只能指定offset,因此需要先找到指定時間戳所在的offset再消費。百度找到的文章都是單分區(qū)下的查找方法,多分區(qū)時需要做一定的修改,記錄下代碼:

import time

from kafka import KafkaConsumer, TopicPartition


def from_timestamp(timestamp):
    """
    將long型的時間戳轉(zhuǎn)換為格式話的限制方式
    :param timestamp:
    :return:
    """
    timeArray = time.localtime(timestamp)
    otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
    return otherStyleTime


# 目標(biāo)時間戳
timestamp = 1635696000000

# 指定消費者
consumer = KafkaConsumer(bootstrap_servers=['xxx.xxx.xxx.xxx:9092'], 
                         auto_offset_reset='oldest',
                         max_poll_records=10000,
                         max_poll_interval_ms=500
                         )
# 指定partition信息
tp0 = TopicPartition('tpc_bd_hu_track', 0)
tp1 = TopicPartition('tpc_bd_hu_track', 1)
tp2 = TopicPartition('tpc_bd_hu_track', 2)

tp_tuple = (tp0, tp1, tp2)

consumer.assign(tp_tuple)

# 二分法找到指定時間戳啊的offset
tp0_start_offset = consumer.beginning_offsets(tp_tuple).get(tp0)
tp1_start_offset = consumer.beginning_offsets(tp_tuple).get(tp1)
tp2_start_offset = consumer.beginning_offsets(tp_tuple).get(tp2)

tp0_end_offset = consumer.end_offsets(tp_tuple).get(tp0)
tp1_end_offset = consumer.end_offsets(tp_tuple).get(tp1)
tp2_end_offset = consumer.end_offsets(tp_tuple).get(tp2)

tp0_nos = int((tp0_start_offset + tp0_end_offset) / 2)
tp1_nos = int((tp1_start_offset + tp1_end_offset) / 2)
tp2_nos = int((tp2_start_offset + tp2_end_offset) / 2)

# 目標(biāo)時間戳
base_timestamp = 1635609600000

print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset)
while True:
    consumer.seek(tp0, tp0_nos)
    consumer.seek(tp1, tp1_nos)
    consumer.seek(tp2, tp2_nos)
    res = consumer.poll(timeout_ms=10000, max_records=1)
    if tp0 in res:
        res_record = res[tp0][0]
        the_time = res_record.timestamp
        if the_time < base_timestamp:
            tp0_start_offset, tp0_nos = tp0_nos, int((tp0_nos + tp0_end_offset) / 2)
            print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:',
                  res_record.offset)
        elif the_time > base_timestamp:
            tp0_nos, tp0_end_offset = int((tp0_nos + tp0_start_offset) / 2), tp0_nos
            print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:',
                  res_record.offset)
        else:
            continue
    elif tp1 in res:
        res_record = res[tp1][0]
        the_time = res_record.timestamp
        if the_time < base_timestamp:
            tp1_start_offset, tp1_nos = tp1_nos, int((tp1_nos + tp1_end_offset) / 2)
            print("Partition-1: ", tp1_start_offset, tp1_nos, tp1_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:',
                  res_record.offset)
        elif the_time > base_timestamp:
            tp1_nos, tp1_end_offset = int((tp1_nos + tp1_start_offset) / 2), tp1_nos
            print("Partition-1: ", tp1_start_offset, tp1_nos, tp1_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:',
                  res_record.offset)
        else:
            continue
    elif tp2 in res:
        res_record = res[tp2][0]
        the_time = res_record.timestamp
        if the_time < base_timestamp:
            tp2_start_offset, tp2_nos = tp2_nos, int((tp2_nos + tp2_end_offset) / 2)
            print("Partition-2: ", tp2_start_offset, tp2_nos, tp2_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:', res_record.offset)
        elif the_time > base_timestamp:
            tp2_nos, tp2_end_offset = int((tp2_nos + tp2_start_offset) / 2), tp2_nos
            print("Partition-2: ", tp2_start_offset, tp2_nos, tp2_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:', res_record.offset)
        else
            continue
    if (tp0_nos == tp0_end_offset or tp0_start_offset == tp0_nos) \
            and (tp1_nos == tp1_end_offset or tp1_start_offset == tp1_nos) \
            and (tp2_nos == tp2_end_offset or tp2_start_offset == tp2_nos):
        break

print('Partition-0: ', tp0_start_offset, tp0_nos, tp0_end_offset)
print('Partition-1: ', tp1_start_offset, tp1_nos, tp1_end_offset)
print('Partition-2: ', tp2_start_offset, tp2_nos, tp2_end_offset)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容