長序列比對到基因組測試

提取出每條染色體著絲粒區(qū)域,比對到不同參考基因組,查看差異:
import os
import subprocess
import argparse
import shutil
import re
from collections import defaultdict


def check_dependency(tools):
    """檢查多個工具是否安裝(新增blastn依賴)"""
    missing = []
    for tool in tools:
        if not shutil.which(tool):
            missing.append(tool)
    if missing:
        print(f"? 缺少工具:{', '.join(missing)}")
        print("  安裝建議(conda):conda install -c bioconda blast samtools minimap2")
        exit(1)


def parse_query_detail(query_detail_path):
    """解析query_longest_chrom_detail.txt,獲取有效比對的query及其基因組區(qū)域信息"""
    print(f"?? 解析query比對詳情文件:{query_detail_path}")
    valid_queries = {}  # {query_id: {'chrom': '', 'start': int, 'end': int, 'strand': '', 'query_len': int}}
    if not os.path.exists(query_detail_path):
        print(f"? query詳情文件不存在:{query_detail_path}")
        exit(1)

    with open(query_detail_path, 'r') as f:
        next(f)  # 跳過表頭
        for line_num, line in enumerate(f, 2):
            line = line.strip()
            if not line:
                continue
            cols = line.split('\t')
            if len(cols) != 9:
                print(f"?? 跳過第{line_num}行:格式錯誤(需9列)")
                continue

            query_id = cols[0]
            chrom = cols[1]
            start = cols[2]
            end = cols[3]
            strand = cols[4]
            query_len = cols[5]

            # 篩選有效比對(chrom、start、end非'-')
            if chrom == '-' or start == '-' or end == '-':
                continue
            try:
                start = int(start)
                end = int(end)
                query_len = int(query_len)
            except ValueError:
                print(f"?? 跳過第{line_num}行:坐標(biāo)/長度非整數(shù)")
                continue

            valid_queries[query_id] = {
                'chrom': chrom,
                'start': start,
                'end': end,
                'strand': strand,
                'query_len': query_len
            }

    print(f"? 共解析到 {len(valid_queries)} 條有效比對的query")
    if not valid_queries:
        print("? 無有效比對的query,無法繼續(xù)")
        exit(1)
    return valid_queries


def extract_genome_seqs(ref_fasta, valid_queries, out_dir):
    """從基因組中提取有效比對區(qū)域的序列(負(fù)鏈反轉(zhuǎn)互補(bǔ))"""
    print(f"\n?? 提取基因組比對區(qū)域序列...")
    # 檢查基因組索引(samtools faidx需要)
    ref_index = f"{ref_fasta}.fai"
    if not os.path.exists(ref_index):
        print(f"?? 基因組索引不存在,自動生成:{ref_index}")
        subprocess.run(['samtools', 'faidx', ref_fasta], check=True, stderr=subprocess.PIPE, text=True)

    # 輸出基因組提取序列文件(用于blastn)
    subj_fasta = os.path.join(out_dir, "extracted_genome_seqs.fasta")
    subj_seqs = {}  # {query_id: 基因組提取序列}
    failed_queries = []

    with open(subj_fasta, 'w') as f:
        for query_id, info in valid_queries.items():
            chrom = info['chrom']
            start = info['start']
            end = info['end']
            strand = info['strand']

            # 提取基因組序列(samtools faidx)
            cmd = ['samtools', 'faidx', ref_fasta, f"{chrom}:{start}-{end}"]
            try:
                result = subprocess.run(
                    cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True
                )
            except subprocess.CalledProcessError as e:
                print(f"?? {query_id} 提取失?。簕e.stderr.strip()}")
                failed_queries.append(query_id)
                continue

            # 處理提取的序列(去除表頭,合并)
            seq_lines = []
            for line in result.stdout.splitlines():
                if not line.startswith('>'):
                    seq_lines.append(line.strip().upper())
            subj_seq = ''.join(seq_lines)
            if not subj_seq:
                print(f"?? {query_id} 提取序列為空")
                failed_queries.append(query_id)
                continue

            # 負(fù)鏈反轉(zhuǎn)互補(bǔ)
            if strand == '-':
                subj_seq = subj_seq[::-1].translate(str.maketrans('ATCG', 'TAGC'))

            # 寫入文件(ID格式:query_id_subj,確保與query對應(yīng))
            f.write(f">{query_id}_subj\n{subj_seq}\n")
            subj_seqs[query_id] = subj_seq

    # 移除提取失敗的query
    for qid in failed_queries:
        valid_queries.pop(qid, None)
    print(f"? 基因組序列提取完成:")
    print(f"   - 成功提?。簕len(subj_seqs)} 條序列({subj_fasta})")
    print(f"   - 提取失敗:{len(failed_queries)} 條query(已排除)")
    return subj_fasta, subj_seqs, valid_queries


def prepare_blast_input(query_fasta, valid_queries, out_dir):
    """準(zhǔn)備blastn的query輸入文件(僅包含有效比對的query)"""
    print(f"\n?? 準(zhǔn)備blastn query輸入文件...")
    blast_query_fasta = os.path.join(out_dir, "blastn_query_seqs.fasta")
    query_seqs = {}  # {query_id: query序列}
    missing_queries = []

    # 先讀取完整query序列
    with open(query_fasta, 'r') as f:
        current_id, current_seq = None, []
        for line in f:
            line = line.strip()
            if line.startswith('>'):
                if current_id:
                    query_seqs[current_id] = ''.join(current_seq).upper()
                current_id = line[1:].split()[0]
                current_seq = []
            else:
                current_seq.append(line)
        if current_id:
            query_seqs[current_id] = ''.join(current_seq).upper()

    # 篩選有效query并寫入blast輸入文件
    with open(blast_query_fasta, 'w') as f:
        for qid in valid_queries.keys():
            if qid not in query_seqs:
                missing_queries.append(qid)
                continue
            f.write(f">{qid}\n{query_seqs[qid]}\n")

    # 移除缺失序列的query
    for qid in missing_queries:
        valid_queries.pop(qid, None)
    print(f"? blastn query文件準(zhǔn)備完成:")
    print(f"   - 有效query:{len(valid_queries)} 條({blast_query_fasta})")
    print(f"   - 缺失序列:{len(missing_queries)} 條query(已排除)")
    return blast_query_fasta, query_seqs


def run_blastn(blast_query_fasta, subj_fasta, out_dir, threads=4):
    """運(yùn)行blastn比對(query vs 提取的基因組序列)"""
    print(f"\n?? 運(yùn)行blastn比對...")
    # 1. 構(gòu)建blast數(shù)據(jù)庫(subject:提取的基因組序列)
    db_prefix = os.path.join(out_dir, "genome_subj_blastdb")
    if not all(os.path.exists(f"{db_prefix}.{ext}") for ext in ['nhr', 'nin', 'nsq']):
        cmd = ['makeblastdb', '-in', subj_fasta, '-dbtype', 'nucl', '-out', db_prefix]
        subprocess.run(cmd, check=True, stderr=subprocess.PIPE, text=True)
        print(f"? blast數(shù)據(jù)庫構(gòu)建完成:{db_prefix}.*")
    else:
        print(f"? blast數(shù)據(jù)庫已存在:{db_prefix}.*")

    # 2. 運(yùn)行blastn
    blast_out = os.path.join(out_dir, "blastn_alignment_results.txt")
    # blastn參數(shù):高置信度比對(evalue=1e-20),輸出表格格式(含比對長度、相似度)
    cmd = [
        'blastn', '-query', blast_query_fasta, '-db', db_prefix,
        '-out', blast_out, '-outfmt', "6 qseqid sseqid length qlen pident evalue",
        '-evalue', '1e-20', '-num_threads', str(threads), '-max_target_seqs', '1'  # 每條query僅保留1條最佳比對
    ]
    subprocess.run(cmd, check=True, stderr=subprocess.PIPE, text=True)

    if os.path.exists(blast_out) and os.path.getsize(blast_out) > 0:
        print(f"? blastn比對完成:{blast_out}")
        return blast_out
    else:
        print(f"? blastn無結(jié)果輸出,可能無有效比對")
        exit(1)


def parse_blastn(blast_out, valid_queries):
    """解析blastn結(jié)果,統(tǒng)計每條query的比對長度、比對率"""
    print(f"\n?? 解析blastn結(jié)果...")
    blast_stats = {}  # {query_id: {'align_len': int, 'query_len': int, 'align_rate': float, 'pident': float, 'evalue': str}}
    no_blast_hit = []

    with open(blast_out, 'r') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            cols = line.split('\t')
            if len(cols) != 6:
                continue  # 跳過格式錯誤行

            qid = cols[0]
            align_len = int(cols[2])
            query_len = int(cols[3])
            pident = float(cols[4])
            evalue = cols[5]

            # 計算blastn比對率(比對長度 / query長度 ×100%)
            align_rate = (align_len / query_len) * 100 if query_len > 0 else 0.0
            blast_stats[qid] = {
                'align_len': align_len,
                'query_len': query_len,
                'align_rate': align_rate,
                'pident': pident,
                'evalue': evalue
            }

    # 檢查無blast結(jié)果的query
    for qid in valid_queries.keys():
        if qid not in blast_stats:
            no_blast_hit.append(qid)
    # 移除無blast結(jié)果的query
    for qid in no_blast_hit:
        valid_queries.pop(qid, None)

    print(f"? blastn結(jié)果解析完成:")
    print(f"   - 有有效比對:{len(blast_stats)} 條query")
    print(f"   - 無blast結(jié)果:{len(no_blast_hit)} 條query(已排除)")
    return blast_stats, valid_queries


def integrate_results(valid_queries, subj_seqs, query_seqs, blast_stats, out_dir):
    """整合所有結(jié)果:query信息+基因組信息+blastn統(tǒng)計"""
    print(f"\n?? 整合所有結(jié)果...")
    # 1. 詳細(xì)整合文件
    integrate_detail = os.path.join(out_dir, "blastn_integrated_details.txt")
    # 2. 染色體匯總統(tǒng)計文件
    chrom_summary = os.path.join(out_dir, "blastn_chrom_summary.txt")

    # 寫入詳細(xì)整合結(jié)果
    with open(integrate_detail, 'w') as f:
        f.write(
            "query_id\tgenome_chrom\tgenome_start\tgenome_end\tgenome_strand\t"
            "query_length\tgenome_seq_length\tblastn_align_length\tblastn_align_rate(%)\t"
            "blastn_pident(%)\tblastn_evalue\tstatus\n"
        )
        for qid, info in valid_queries.items():
            q_len = info['query_len']
            subj_seq = subj_seqs[qid]
            subj_len = len(subj_seq)
            blast = blast_stats[qid]
            f.write(
                f"{qid}\t{info['chrom']}\t{info['start']}\t{info['end']}\t{info['strand']}\t"
                f"{q_len}\t{subj_len}\t{blast['align_len']}\t{blast['align_rate']:.2f}\t"
                f"{blast['pident']:.2f}\t{blast['evalue']}\tsuccess\n"
            )

    # 按染色體分組統(tǒng)計blastn結(jié)果
    chrom_stats = defaultdict(lambda: {
        'query_count': 0,
        'total_align_len': 0,
        'total_align_rate': 0,
        'total_pident': 0
    })
    for qid, info in valid_queries.items():
        chrom = info['chrom']
        blast = blast_stats[qid]
        chrom_stats[chrom]['query_count'] += 1
        chrom_stats[chrom]['total_align_len'] += blast['align_len']
        chrom_stats[chrom]['total_align_rate'] += blast['align_rate']
        chrom_stats[chrom]['total_pident'] += blast['pident']

    # 寫入染色體匯總統(tǒng)計
    with open(chrom_summary, 'w') as f:
        f.write(
            "chromosome\tquery_count\tavg_blastn_align_length(bp)\t"
            "avg_blastn_align_rate(%)\tavg_blastn_pident(%)\n"
        )
        # 按query_count降序排序
        sorted_chroms = sorted(chrom_stats.items(), key=lambda x: x[1]['query_count'], reverse=True)
        for chrom, stats in sorted_chroms:
            count = stats['query_count']
            avg_align_len = stats['total_align_len'] / count if count > 0 else 0.0
            avg_align_rate = stats['total_align_rate'] / count if count > 0 else 0.0
            avg_pident = stats['total_pident'] / count if count > 0 else 0.0
            f.write(
                f"{chrom}\t{count}\t{avg_align_len:.1f}\t"
                f"{avg_align_rate:.2f}\t{avg_pident:.2f}\n"
            )

    # 全局匯總統(tǒng)計
    total_query = len(valid_queries)
    if total_query > 0:
        avg_all_align_len = sum(b['align_len'] for b in blast_stats.values()) / total_query
        avg_all_align_rate = sum(b['align_rate'] for b in blast_stats.values()) / total_query
        avg_all_pident = sum(b['pident'] for b in blast_stats.values()) / total_query
        print(f"\n?? 全局blastn統(tǒng)計匯總:")
        print(f"   - 有效比對query總數(shù):{total_query}")
        print(f"   - 平均blastn比對長度:{avg_all_align_len:.1f}bp")
        print(f"   - 平均blastn比對率:{avg_all_align_rate:.2f}%")
        print(f"   - 平均blastn相似度(pident):{avg_all_pident:.2f}%")
        print(f"   - 涉及染色體數(shù):{len(chrom_stats)}")

    print(f"\n? 整合結(jié)果保存完成:")
    print(f"   - 詳細(xì)結(jié)果:{integrate_detail}")
    print(f"   - 染色體匯總:{chrom_summary}")
    return integrate_detail, chrom_summary


def main():
    parser = argparse.ArgumentParser(description="提取基因組比對序列+blastn比對+統(tǒng)計整合")
    parser.add_argument("--query", default="extracted_sequences.txt", help="原query序列文件(FASTA)")
    parser.add_argument("--ref", default="H7L31.fixed.fa", help="基因組文件(FASTA)")
    parser.add_argument("--query_detail", default="chrom_integrated_results/query_longest_chrom_detail.txt", 
                        help="之前生成的query最長比對詳情文件")
    parser.add_argument("--outdir", default="blastn_integrated_results", help="blastn整合結(jié)果輸出目錄")
    parser.add_argument("--threads", type=int, default=4, help="blastn線程數(shù)")
    args = parser.parse_args()

    # 1. 檢查依賴(新增blastn、makeblastdb)
    check_dependency(['samtools', 'minimap2', 'blastn', 'makeblastdb'])

    # 2. 檢查輸入文件
    required_files = [args.query, args.ref, args.query_detail]
    for f in required_files:
        if not os.path.exists(f):
            print(f"? 輸入文件不存在:{f}")
            exit(1)

    # 3. 創(chuàng)建輸出目錄
    os.makedirs(args.outdir, exist_ok=True)
    print(f"? 輸出目錄已創(chuàng)建:{args.outdir}")

    # 4. 核心流程執(zhí)行
    print("\n===== 步驟1/5:解析query比對詳情 =====")
    valid_queries = parse_query_detail(args.query_detail)

    print("\n===== 步驟2/5:提取基因組比對區(qū)域序列 =====")
    subj_fasta, subj_seqs, valid_queries = extract_genome_seqs(args.ref, valid_queries, args.outdir)

    print("\n===== 步驟3/5:準(zhǔn)備blastn輸入文件 =====")
    blast_query_fasta, query_seqs = prepare_blast_input(args.query, valid_queries, args.outdir)

    print("\n===== 步驟4/5:運(yùn)行blastn比對 =====")
    blast_out = run_blastn(blast_query_fasta, subj_fasta, args.outdir, args.threads)

    print("\n===== 步驟5/5:解析并整合結(jié)果 =====")
    blast_stats, valid_queries = parse_blastn(blast_out, valid_queries)
    integrate_detail, chrom_summary = integrate_results(valid_queries, subj_seqs, query_seqs, blast_stats, args.outdir)

    print("\n?? 全流程完成!核心結(jié)果文件:")
    print(f"   1. 基因組提取序列:{subj_fasta}")
    print(f"   2. blastn比對結(jié)果:{blast_out}")
    print(f"   3. 整合詳細(xì)結(jié)果:{integrate_detail}")
    print(f"   4. 染色體匯總統(tǒng)計:{chrom_summary}")


if __name__ == "__main__":
    main()
結(jié)果如上

另具體信息:

import os
import subprocess
import argparse
import shutil
import re
from collections import defaultdict


def check_dependency(tools):
    """檢查必需工具是否安裝(minimap2、samtools)"""
    missing = [tool for tool in tools if not shutil.which(tool)]
    if missing:
        print(f"? 缺少必需工具:{', '.join(missing)}")
        print("  安裝建議(conda):conda install -c bioconda minimap2 samtools")
        exit(1)


def build_minimap2_index(ref_fasta):
    """構(gòu)建參考基因組minimap2索引(若不存在)"""
    index_file = f"{ref_fasta}.mmi"
    if os.path.exists(index_file):
        print(f"? 基因組索引已存在:{index_file}")
        return index_file
    print(f"?? 構(gòu)建基因組索引({ref_fasta})...")
    cmd = ["minimap2", "-d", index_file, ref_fasta]
    try:
        subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        print(f"? 索引構(gòu)建完成:{index_file}")
        return index_file
    except subprocess.CalledProcessError as e:
        err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
        print(f"? 索引構(gòu)建失?。簕err_msg}")
        exit(1)


def run_minimap2_alignment(ref_index, query_file, output_bam, threads=4):
    """運(yùn)行minimap2比對,輸出排序BAM(保留高置信度比對信息)"""
    if os.path.exists(output_bam) and os.path.exists(f"{output_bam}.bai"):
        print(f"? 比對結(jié)果BAM已存在:{output_bam}")
        return
    temp_sam = output_bam.replace(".bam", ".temp.sam")
    try:
        print("?? 開始序列與基因組比對...")
        cmd = [
            "minimap2", "-a", "-x", "map-ont",
            "--end-bonus", "10", "--MD",  # 保留MD標(biāo)簽用于變異解析
            "-t", str(threads),
            ref_index, query_file
        ]
        with open(temp_sam, "w") as f:
            subprocess.run(cmd, stdout=f, check=True, stderr=subprocess.PIPE, text=True)
        
        print("?? 轉(zhuǎn)換為排序BAM并構(gòu)建索引...")
        subprocess.run(
            ["samtools", "sort", "-@", str(threads), "-o", output_bam, temp_sam],
            check=True, stderr=subprocess.PIPE, text=True
        )
        subprocess.run(["samtools", "index", output_bam], check=True)
        os.remove(temp_sam)
        print(f"? 比對完成:{output_bam}")
    except subprocess.CalledProcessError as e:
        err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
        print(f"? 比對失?。簕err_msg}")
        if os.path.exists(temp_sam):
            os.remove(temp_sam)
        exit(1)


def parse_cigar_and_md(cigar, md_tag, query_seq, ref_start):
    """解析CIGAR和MD標(biāo)簽,提取變異信息(錯配、插入、缺失)"""
    variants = []
    query_pos = 0  # query序列當(dāng)前位置(0-based)
    ref_pos = ref_start  # 參考序列當(dāng)前位置(1-based)
    md_parts = re.findall(r"(\d+|[ATCGatcg]|\^[ATCGatcg]+)", md_tag)

    # 處理CIGAR,記錄插入/缺失
    for length, op in re.findall(r"(\d+)([MIDNSHPX=])", cigar):
        length = int(length)
        if op in ["M", "=", "X"]:
            query_pos += length
            ref_pos += length
        elif op == "I":  # 插入(query有,ref無)
            variants.append({
                "type": "insertion",
                "pos": ref_pos,
                "ref_base": "-",
                "query_base": query_seq[query_pos:query_pos+length],
                "length": length
            })
            query_pos += length
        elif op == "D":  # 缺失(ref有,query無)
            variants.append({
                "type": "deletion",
                "pos": ref_pos,
                "ref_base": "",  # 后續(xù)用MD標(biāo)簽補(bǔ)充
                "query_base": "-",
                "length": length
            })
            ref_pos += length
        elif op in ["S", "H"]:
            query_pos += length
        elif op in ["N", "P"]:
            ref_pos += length

    # 處理MD標(biāo)簽,補(bǔ)充錯配和缺失的堿基
    current_ref_pos = ref_start
    for part in md_parts:
        if part.isdigit():
            current_ref_pos += int(part)
        elif part.startswith("^"):  # 缺失的堿基
            del_bases = part[1:].upper()
            for var in variants:
                if var["type"] == "deletion" and var["pos"] == current_ref_pos:
                    var["ref_base"] = del_bases
                    break
            current_ref_pos += len(del_bases)
        else:  # 錯配的堿基
            ref_base = part.upper()
            query_mismatch_pos = current_ref_pos - ref_start
            if 0 <= query_mismatch_pos < len(query_seq):
                query_base = query_seq[query_mismatch_pos].upper()
                variants.append({
                    "type": "mismatch",
                    "pos": current_ref_pos,
                    "ref_base": ref_base,
                    "query_base": query_base,
                    "length": 1
                })
            current_ref_pos += 1

    return variants


def extract_and_integrate_by_chrom(bam_file, query_file, ref_fasta, output_dir):
    """提取比對信息,按染色體整合統(tǒng)計所有數(shù)據(jù)(query信息放最左側(cè))"""
    # 1. 讀取query序列(ID: (序列字符串, 長度))
    print("\n?? 讀取query序列...")
    query_dict = {}
    with open(query_file, "r") as f:
        current_id = None
        current_seq_lines = []
        for line in f:
            line = line.strip()
            if not line:
                continue
            if line.startswith(">"):
                if current_id is not None and current_seq_lines:
                    full_seq = "".join(current_seq_lines).upper()
                    query_dict[current_id] = (full_seq, len(full_seq))
                current_id = line[1:].split()[0]
                current_seq_lines = []
            else:
                current_seq_lines.append(line)
        if current_id is not None and current_seq_lines:
            full_seq = "".join(current_seq_lines).upper()
            query_dict[current_id] = (full_seq, len(full_seq))
    total_queries = len(query_dict)
    print(f"?? 共讀取 {total_queries} 條query序列")

    # 2. 初始化數(shù)據(jù)結(jié)構(gòu)(按染色體分組統(tǒng)計)
    chrom_stats = defaultdict(lambda: {
        "query_count": 0,           # 該染色體上的比對query數(shù)
        "total_query_length": 0,    # 總query長度
        "total_align_length": 0,    # 總比對長度
        "total_align_rate": 0.0,    # 總比對率
        "total_mismatch": 0,        # 總錯配數(shù)
        "total_insertion": 0,       # 總插入數(shù)
        "total_deletion": 0,        # 總?cè)笔?shù)
        "total_insertion_length": 0,# 插入總長度
        "total_deletion_length": 0, # 缺失總長度
        "queries": []               # 比對到該染色體的query ID列表
    })
    query_detail = []  # 單條query的詳細(xì)信息

    # 3. 提取BAM中的高置信度比對
    print("?? 提取比對信息并按染色體整合...")
    view_cmd = ["samtools", "view", "-q", "20", "-F", "4", bam_file]
    try:
        result = subprocess.run(view_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
    except subprocess.CalledProcessError as e:
        err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
        print(f"? 讀取BAM失?。簕err_msg}")
        exit(1)

    # 4. 解析每條比對結(jié)果,更新統(tǒng)計
    for line in result.stdout.splitlines():
        fields = line.split("\t")
        qid = fields[0]
        if qid not in query_dict:
            continue
        query_seq, qlen = query_dict[qid]

        # 解析BAM基礎(chǔ)字段
        flag = int(fields[1])
        chrom = fields[2]
        ref_start = int(fields[3])
        cigar = fields[5]
        strand = "-" if (flag & 16) else "+"
        md_tag = next((f.split(":")[-1] for f in fields if f.startswith("MD:Z:")), "")

        # 計算比對指標(biāo)
        align_length = 0
        for length, op in re.findall(r"(\d+)([MIDNSHPX=])", cigar):
            if op in ["M", "D", "N", "X", "="]:
                align_length += int(length)
        align_rate = (align_length / qlen) * 100 if qlen > 0 else 0.0
        ref_end = ref_start + align_length - 1 if align_length > 0 else ref_start

        # 解析變異并統(tǒng)計
        variants = parse_cigar_and_md(cigar, md_tag, query_seq, ref_start)
        mismatch_count = len([v for v in variants if v["type"] == "mismatch"])
        insertion_count = len([v for v in variants if v["type"] == "insertion"])
        deletion_count = len([v for v in variants if v["type"] == "deletion"])
        insertion_length = sum(v["length"] for v in variants if v["type"] == "insertion")
        deletion_length = sum(v["length"] for v in variants if v["type"] == "deletion")

        # 記錄query明細(xì)(query信息優(yōu)先)
        query_detail.append({
            "query_id": qid,
            "query_length": qlen,  # query信息放前面
            "chrom": chrom,
            "ref_start": ref_start,
            "ref_end": ref_end,
            "strand": strand,
            "align_length": align_length,
            "align_rate": align_rate,
            "mismatch": mismatch_count,
            "insertion": insertion_count,
            "deletion": deletion_count,
            "insertion_length": insertion_length,
            "deletion_length": deletion_length,
            "variants": variants
        })

        # 更新染色體統(tǒng)計
        chrom_stats[chrom]["query_count"] += 1
        chrom_stats[chrom]["total_query_length"] += qlen
        chrom_stats[chrom]["total_align_length"] += align_length
        chrom_stats[chrom]["total_align_rate"] += align_rate
        chrom_stats[chrom]["total_mismatch"] += mismatch_count
        chrom_stats[chrom]["total_insertion"] += insertion_count
        chrom_stats[chrom]["total_deletion"] += deletion_count
        chrom_stats[chrom]["total_insertion_length"] += insertion_length
        chrom_stats[chrom]["total_deletion_length"] += deletion_length
        chrom_stats[chrom]["queries"].append(qid)

    # 5. 補(bǔ)充未比對的query
    aligned_qids = {d["query_id"] for d in query_detail}
    for qid in query_dict.keys():
        if qid not in aligned_qids:
            qlen = query_dict[qid][1]
            query_detail.append({
                "query_id": qid,
                "query_length": qlen,  # query信息放前面
                "chrom": "-",
                "ref_start": "-",
                "ref_end": "-",
                "strand": "-",
                "align_length": "-",
                "align_rate": "-",
                "mismatch": "-",
                "insertion": "-",
                "deletion": "-",
                "insertion_length": "-",
                "deletion_length": "-",
                "variants": []
            })

    # 6. 計算染色體平均指標(biāo)
    for chrom in chrom_stats:
        stats = chrom_stats[chrom]
        count = stats["query_count"]
        if count == 0:
            continue
        stats["avg_align_rate"] = stats["total_align_rate"] / count
        stats["avg_query_length"] = stats["total_query_length"] / count
        stats["avg_align_length"] = stats["total_align_length"] / count
        stats["avg_mismatch"] = stats["total_mismatch"] / count
        stats["avg_insertion"] = stats["total_insertion"] / count
        stats["avg_deletion"] = stats["total_deletion"] / count
        stats["avg_insertion_length"] = (stats["total_insertion_length"] / stats["total_insertion"]) if stats["total_insertion"] > 0 else 0
        stats["avg_deletion_length"] = (stats["total_deletion_length"] / stats["total_deletion"]) if stats["total_deletion"] > 0 else 0

    # 7. 輸出結(jié)果文件(query信息放最左側(cè))
    # 7.1 單條query詳細(xì)信息(query_id、query_length居左)
    detail_file = os.path.join(output_dir, "query_alignment_details.txt")
    with open(detail_file, "w") as f:
        # 表頭順序:query信息 → 染色體信息 → 比對/變異信息
        f.write(
            "query_id\tquery_length\tchromosome\tref_start\tref_end\tstrand\t"
            "align_length\talign_rate(%)\t"
            "mismatch_count\tinsertion_count\tdeletion_count\t"
            "insertion_total_length\tdeletion_total_length\tvariant_type\t"
            "variant_pos\tref_base\tquery_base\tvariant_length\n"
        )
        for q in query_detail:
            if not q["variants"]:
                f.write(
                    f"{q['query_id']}\t{q['query_length']}\t{q['chrom']}\t{q['ref_start']}\t{q['ref_end']}\t{q['strand']}\t"
                    f"{q['align_length']}\t{q['align_rate']}\t"
                    f"{q['mismatch']}\t{q['insertion']}\t{q['deletion']}\t"
                    f"{q['insertion_length']}\t{q['deletion_length']}\tno_variant\t-\t-\t-\t-\n"
                )
            else:
                for var in q["variants"]:
                    f.write(
                        f"{q['query_id']}\t{q['query_length']}\t{q['chrom']}\t{q['ref_start']}\t{q['ref_end']}\t{q['strand']}\t"
                        f"{q['align_length']}\t{q['align_rate']:.2f}\t"
                        f"{q['mismatch']}\t{q['insertion']}\t{q['deletion']}\t"
                        f"{q['insertion_length']}\t{q['deletion_length']}\t{var['type']}\t"
                        f"{var['pos']}\t{var['ref_base']}\t{var['query_base']}\t{var['length']}\n"
                    )

    # 7.2 按染色體整合統(tǒng)計(關(guān)聯(lián)的query列表放右側(cè),不影響左側(cè)染色體信息閱讀)
    chrom_file = os.path.join(output_dir, "chromosome_integrated_stats.txt")
    with open(chrom_file, "w") as f:
        f.write(
            "chromosome\tquery_count\tavg_query_length(bp)\t"  # query相關(guān)統(tǒng)計緊隨染色體
            "total_align_length(bp)\tavg_align_length(bp)\tavg_align_rate(%)\t"
            "total_mismatch\tavg_mismatch\t"
            "total_insertion\tavg_insertion\tavg_insertion_length(bp)\t"
            "total_deletion\tavg_deletion\tavg_deletion_length(bp)\t"
            "matched_queries\n"  # query列表放最右側(cè)
        )
        for chrom, stats in sorted(chrom_stats.items(), key=lambda x: x[1]["query_count"], reverse=True):
            query_list = stats["queries"]
            query_str = ", ".join(query_list) if len(query_list) <= 10 else \
                f"{', '.join(query_list[:10])}...(共{len(query_list)}條)"
            f.write(
                f"{chrom}\t{stats['query_count']}\t{stats['avg_query_length']:.1f}\t"
                f"{stats['total_align_length']}\t{stats['avg_align_length']:.1f}\t{stats['avg_align_rate']:.2f}\t"
                f"{stats['total_mismatch']}\t{stats['avg_mismatch']:.1f}\t"
                f"{stats['total_insertion']}\t{stats['avg_insertion']:.1f}\t{stats['avg_insertion_length']:.1f}\t"
                f"{stats['total_deletion']}\t{stats['avg_deletion']:.1f}\t{stats['avg_deletion_length']:.1f}\t"
                f"{query_str}\n"
            )

    # 7.3 全局匯總統(tǒng)計(保持原有邏輯)
    global_file = os.path.join(output_dir, "global_summary.txt")
    with open(global_file, "w") as f:
        aligned_count = len(aligned_qids)
        unaligned_count = total_queries - aligned_count
        f.write(f"總query序列數(shù): {total_queries}\n")
        f.write(f"成功比對的query數(shù): {aligned_count} ({aligned_count/total_queries*100:.1f}%)\n")
        f.write(f"未比對的query數(shù): {unaligned_count} ({unaligned_count/total_queries*100:.1f}%)\n")
        f.write(f"涉及的染色體數(shù): {len(chrom_stats)}\n\n")

        if chrom_stats:
            top_chrom_by_count = max(chrom_stats.items(), key=lambda x: x[1]["query_count"])[0]
            valid_chroms = [c for c in chrom_stats.items() if c[1]["query_count"] >= 3]
            top_chrom_by_rate = max(valid_chroms, key=lambda x: x[1]["avg_align_rate"])[0] if valid_chroms else "N/A"
            top_chrom_by_variant = max(chrom_stats.items(), key=lambda x: x[1]["total_mismatch"] + x[1]["total_insertion"] + x[1]["total_deletion"])[0]

            f.write(f"比對query數(shù)最多的染色體: {top_chrom_by_count}({chrom_stats[top_chrom_by_count]['query_count']}條)\n")
            f.write(f"平均比對率最高的染色體(≥3條query): {top_chrom_by_rate}({chrom_stats[top_chrom_by_rate]['avg_align_rate']:.2f}%)\n" if valid_chroms else "平均比對率最高的染色體: 無足夠數(shù)據(jù)(單染色體query數(shù)<3)\n")
            f.write(f"總變異數(shù)最多的染色體: {top_chrom_by_variant}({chrom_stats[top_chrom_by_variant]['total_mismatch'] + chrom_stats[top_chrom_by_variant]['total_insertion'] + chrom_stats[top_chrom_by_variant]['total_deletion']}個變異)\n")

    # 8. 輸出完成信息
    print(f"\n? 結(jié)果文件保存完成:")
    print(f"   - 單條query詳細(xì)信息(query信息居左):{detail_file}")
    print(f"   - 按染色體整合統(tǒng)計:{chrom_file}")
    print(f"   - 全局匯總統(tǒng)計:{global_file}")


def main():
    parser = argparse.ArgumentParser(description="序列比對+按染色體整合(query信息放最左側(cè))")
    parser.add_argument("--query", default="extracted_rice_sequences.txt", help="query序列文件(FASTA)")
    parser.add_argument("--ref", default="H7L31.fixed.fa", help="參考基因組文件(FASTA)")
    parser.add_argument("--outdir", default="chrom_integrated_final_results", help="結(jié)果輸出目錄")
    parser.add_argument("--threads", type=int, default=4, help="比對線程數(shù)")
    args = parser.parse_args()

    # 檢查依賴和輸入文件
    check_dependency(["minimap2", "samtools"])
    for file in [args.query, args.ref]:
        if not os.path.exists(file):
            print(f"? 輸入文件不存在:{file}")
            exit(1)

    # 創(chuàng)建輸出目錄
    os.makedirs(args.outdir, exist_ok=True)
    bam_file = os.path.join(args.outdir, "query_vs_genome.bam")

    # 執(zhí)行流程
    print("===== 步驟1/3:構(gòu)建基因組索引 =====")
    ref_index = build_minimap2_index(args.ref)

    print("\n===== 步驟2/3:運(yùn)行minimap2比對 =====")
    run_minimap2_alignment(ref_index, args.query, bam_file, args.threads)

    print("\n===== 步驟3/3:按染色體整合所有信息 =====")
    extract_and_integrate_by_chrom(bam_file, args.query, args.ref, args.outdir)

    print("\n?? 全流程完成!核心結(jié)果目錄:", args.outdir)


if __name__ == "__main__":
    main()
有具體的差異(SNP/INDEL)信息

all

import os
import subprocess
import argparse
import shutil
import re
from collections import defaultdict


def check_dependency(tools):
    """檢查必需工具是否安裝(minimap2、samtools)"""
    missing = [tool for tool in tools if not shutil.which(tool)]
    if missing:
        print(f"? 缺少必需工具:{', '.join(missing)}")
        print("  安裝建議(conda):conda install -c bioconda minimap2 samtools")
        exit(1)


def get_ref_samples(ref_dir):
    """獲取參考文件夾(fq)下的所有樣本(每個FA文件對應(yīng)一個樣本)"""
    if not os.path.exists(ref_dir):
        print(f"? 參考樣本文件夾不存在:{ref_dir}")
        exit(1)
    
    ref_samples = []  # 存儲格式:(樣本名, FA文件路徑, 樣本文件夾路徑)
    for filename in os.listdir(ref_dir):
        if filename.startswith('.'):
            continue  # 跳過隱藏文件
        if filename.endswith('.fa') or filename.endswith('.fasta'):
            # 樣本名:去掉后綴(如“ref1.fa”→“ref1”)
            sample_name = os.path.splitext(filename)[0]
            # FA文件絕對路徑
            fa_path = os.path.abspath(os.path.join(ref_dir, filename))
            # 樣本專屬文件夾路徑(輸出根目錄/樣本名)
            sample_dir = os.path.join(args.outdir, sample_name)
            ref_samples.append((sample_name, fa_path, sample_dir))
    
    if not ref_samples:
        print(f"? 在參考文件夾 {ref_dir} 中未找到任何.fa/.fasta樣本文件")
        exit(1)
    
    print(f"? 共識別 {len(ref_samples)} 個參考樣本:")
    for idx, (sample_name, fa_path, sample_dir) in enumerate(ref_samples, 1):
        print(f"   {idx}. 樣本名:{sample_name} → FA路徑:{fa_path} → 結(jié)果文件夾:{sample_dir}")
    return ref_samples


def init_sample_dir(sample_dir):
    """初始化樣本專屬文件夾(若不存在則創(chuàng)建)"""
    if not os.path.exists(sample_dir):
        os.makedirs(sample_dir, exist_ok=True)
        print(f"?? 創(chuàng)建樣本文件夾:{sample_dir}")
    else:
        print(f"?? 樣本文件夾已存在:{sample_dir}")
    return sample_dir


def build_sample_index(sample_name, fa_path, sample_dir):
    """為單個樣本構(gòu)建minimap2索引(存入樣本文件夾)"""
    # 索引文件名:樣本名_index.mmi(避免跨樣本沖突)
    index_file = os.path.join(sample_dir, f"{sample_name}_index.mmi")
    
    if os.path.exists(index_file):
        print(f"? 樣本 {sample_name} 的索引已存在:{os.path.basename(index_file)}")
        return index_file
    
    print(f"?? 為樣本 {sample_name} 構(gòu)建基因組索引...")
    cmd = ["minimap2", "-d", index_file, fa_path]
    try:
        subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        print(f"? 樣本 {sample_name} 的索引構(gòu)建完成:{os.path.basename(index_file)}")
        return index_file
    except subprocess.CalledProcessError as e:
        err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
        print(f"? 樣本 {sample_name} 的索引構(gòu)建失?。簕err_msg}")
        exit(1)


def run_sample_alignment(sample_name, index_file, query_file, sample_dir, threads=4):
    """為單個樣本運(yùn)行minimap2比對(BAM存入樣本文件夾)"""
    # BAM文件名:樣本名_query_vs_ref.bam(明確標(biāo)識樣本)
    bam_file = os.path.join(sample_dir, f"{sample_name}_query_vs_ref.bam")
    
    if os.path.exists(bam_file) and os.path.exists(f"{bam_file}.bai"):
        print(f"? 樣本 {sample_name} 的比對BAM已存在:{os.path.basename(bam_file)}")
        return bam_file
    
    temp_sam = bam_file.replace('.bam', '.temp.sam')
    try:
        print(f"?? 開始query與樣本 {sample_name} 的比對...")
        cmd = [
            "minimap2", "-a", "-x", "map-ont",
            "--end-bonus", "10", "--MD",  # 保留MD標(biāo)簽用于變異解析
            "-t", str(threads),
            index_file, query_file
        ]
        with open(temp_sam, "w") as f:
            subprocess.run(cmd, stdout=f, check=True, stderr=subprocess.PIPE, text=True)
        
        print(f"?? 轉(zhuǎn)換樣本 {sample_name} 的比對結(jié)果為BAM...")
        subprocess.run(
            ["samtools", "sort", "-@", str(threads), "-o", bam_file, temp_sam],
            check=True, stderr=subprocess.PIPE, text=True
        )
        subprocess.run(["samtools", "index", bam_file], check=True)
        os.remove(temp_sam)
        print(f"? 樣本 {sample_name} 的比對完成:{os.path.basename(bam_file)}")
        return bam_file
    except subprocess.CalledProcessError as e:
        err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
        print(f"? 樣本 {sample_name} 的比對失?。簕err_msg}")
        if os.path.exists(temp_sam):
            os.remove(temp_sam)
        exit(1)


def parse_cigar_and_md(cigar, md_tag, query_seq, ref_start):
    """解析CIGAR和MD標(biāo)簽,提取變異信息(錯配、插入、缺失)"""
    variants = []
    query_pos = 0  # query序列當(dāng)前位置(0-based)
    ref_pos = ref_start  # 參考序列當(dāng)前位置(1-based)
    md_parts = re.findall(r"(\d+|[ATCGatcg]|\^[ATCGatcg]+)", md_tag)

    for length, op in re.findall(r"(\d+)([MIDNSHPX=])", cigar):
        length = int(length)
        if op in ["M", "=", "X"]:
            query_pos += length
            ref_pos += length
        elif op == "I":  # 插入(query有,ref無)
            variants.append({
                "type": "insertion",
                "pos": ref_pos,
                "ref_base": "-",
                "query_base": query_seq[query_pos:query_pos+length],
                "length": length
            })
            query_pos += length
        elif op == "D":  # 缺失(ref有,query無)
            variants.append({
                "type": "deletion",
                "pos": ref_pos,
                "ref_base": "",  # 后續(xù)用MD標(biāo)簽補(bǔ)充
                "query_base": "-",
                "length": length
            })
            ref_pos += length
        elif op in ["S", "H"]:
            query_pos += length
        elif op in ["N", "P"]:
            ref_pos += length

    # 補(bǔ)充MD標(biāo)簽信息
    current_ref_pos = ref_start
    for part in md_parts:
        if part.isdigit():
            current_ref_pos += int(part)
        elif part.startswith("^"):  # 缺失的堿基
            del_bases = part[1:].upper()
            for var in variants:
                if var["type"] == "deletion" and var["pos"] == current_ref_pos:
                    var["ref_base"] = del_bases
                    break
            current_ref_pos += len(del_bases)
        else:  # 錯配的堿基
            ref_base = part.upper()
            query_mismatch_pos = current_ref_pos - ref_start
            if 0 <= query_mismatch_pos < len(query_seq):
                query_base = query_seq[query_mismatch_pos].upper()
                variants.append({
                    "type": "mismatch",
                    "pos": current_ref_pos,
                    "ref_base": ref_base,
                    "query_base": query_base,
                    "length": 1
                })
            current_ref_pos += 1

    return variants


def extract_sample_results(sample_name, bam_file, query_file, sample_dir):
    """提取單個樣本的比對結(jié)果(存入樣本文件夾),并返回核心統(tǒng)計用于全局匯總"""
    # 1. 讀取query序列
    query_dict = {}
    with open(query_file, "r") as f:
        current_id, current_seq_lines = None, []
        for line in f:
            line = line.strip()
            if line.startswith(">"):
                if current_id and current_seq_lines:
                    query_dict[current_id] = (''.join(current_seq_lines).upper(), len(current_seq_lines))
                current_id = line[1:].split()[0]
                current_seq_lines = []
            else:
                current_seq_lines.append(line)
        if current_id and current_seq_lines:
            query_dict[current_id] = (''.join(current_seq_lines).upper(), len(current_seq_lines))

    # 2. 提取BAM比對信息
    view_cmd = ["samtools", "view", "-q", "20", "-F", "4", bam_file]
    try:
        result = subprocess.run(view_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
    except subprocess.CalledProcessError as e:
        err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
        print(f"?? 讀取樣本 {sample_name} 的BAM失?。簕err_msg},跳過該樣本統(tǒng)計")
        return None

    # 3. 解析比對結(jié)果并生成樣本內(nèi)統(tǒng)計文件
    sample_detail_file = os.path.join(sample_dir, f"{sample_name}_query_details.txt")  # 樣本內(nèi)query詳情
    sample_chrom_file = os.path.join(sample_dir, f"{sample_name}_chrom_stats.txt")    # 樣本內(nèi)染色體統(tǒng)計
    sample_stats = {
        "sample_name": sample_name,
        "total_query": len(query_dict),
        "aligned_query": 0,
        "total_align_length": 0,
        "total_align_rate": 0.0,
        "total_mismatch": 0
    }
    chrom_stats = defaultdict(lambda: {"query_count": 0, "total_align_length": 0})

    with open(sample_detail_file, "w") as detail_f, open(sample_chrom_file, "w") as chrom_f:
        # 樣本內(nèi)query詳情表頭
        detail_f.write(
            "query_id\tquery_length\tchromosome\tref_start\tref_end\tstrand\t"
            "align_length\talign_rate(%)\tmismatch_count\tinsertion_count\tdeletion_count\n"
        )
        # 樣本內(nèi)染色體統(tǒng)計表頭
        chrom_f.write(
            "chromosome\tquery_count\ttotal_align_length(bp)\tavg_align_length(bp)\n"
        )

        aligned_qids = set()
        for line in result.stdout.splitlines():
            fields = line.split("\t")
            qid = fields[0]
            if qid not in query_dict:
                continue
            aligned_qids.add(qid)
            query_seq, qlen = query_dict[qid]

            # 解析BAM字段
            flag = int(fields[1])
            chrom = fields[2]
            ref_start = int(fields[3])
            cigar = fields[5]
            strand = "-" if (flag & 16) else "+"
            md_tag = next((f.split(":")[-1] for f in fields if f.startswith("MD:Z:")), "")

            # 計算比對指標(biāo)
            align_length = sum(int(l) for l, op in re.findall(r"(\d+)([MIDNSHPX=])", cigar) if op in ["M", "D", "N", "X", "="])
            align_rate = (align_length / qlen) * 100 if qlen > 0 else 0.0
            ref_end = ref_start + align_length - 1 if align_length > 0 else ref_start

            # 解析變異
            variants = parse_cigar_and_md(cigar, md_tag, query_seq, ref_start)
            mismatch = len([v for v in variants if v["type"] == "mismatch"])
            insertion = len([v for v in variants if v["type"] == "insertion"])
            deletion = len([v for v in variants if v["type"] == "deletion"])

            # 寫入樣本內(nèi)query詳情
            detail_f.write(
                f"{qid}\t{qlen}\t{chrom}\t{ref_start}\t{ref_end}\t{strand}\t"
                f"{align_length}\t{align_rate:.2f}\t{mismatch}\t{insertion}\t{deletion}\n"
            )

            # 更新樣本內(nèi)染色體統(tǒng)計
            chrom_stats[chrom]["query_count"] += 1
            chrom_stats[chrom]["total_align_length"] += align_length

            # 更新樣本核心統(tǒng)計
            sample_stats["aligned_query"] += 1
            sample_stats["total_align_length"] += align_length
            sample_stats["total_align_rate"] += align_rate
            sample_stats["total_mismatch"] += mismatch

        # 補(bǔ)充未比對的query到樣本內(nèi)詳情
        for qid in query_dict.keys():
            if qid not in aligned_qids:
                qlen = query_dict[qid][1]
                detail_f.write(
                    f"{qid}\t{qlen}\t-\t-\t-\t-\t-\t-\t-\t-\t-\n"
                )

        # 寫入樣本內(nèi)染色體統(tǒng)計
        for chrom, stats in chrom_stats.items():
            avg_align_len = stats["total_align_length"] / stats["query_count"] if stats["query_count"] > 0 else 0.0
            chrom_f.write(
                f"{chrom}\t{stats['query_count']}\t{stats['total_align_length']}\t{avg_align_len:.1f}\n"
            )

    # 計算樣本平均比對率
    sample_stats["avg_align_rate"] = sample_stats["total_align_rate"] / sample_stats["aligned_query"] if sample_stats["aligned_query"] > 0 else 0.0
    print(f"? 樣本 {sample_name} 結(jié)果已保存至:")
    print(f"   - query詳情:{os.path.basename(sample_detail_file)}")
    print(f"   - 染色體統(tǒng)計:{os.path.basename(sample_chrom_file)}")
    return sample_stats


def generate_global_summary(all_sample_stats, outdir):
    """生成全局匯總文件(輸出根目錄),匯總所有樣本的核心指標(biāo)"""
    global_file = os.path.join(outdir, "global_sample_summary.txt")
    with open(global_file, "w") as f:
        f.write("樣本名\t總query數(shù)\t比對query數(shù)\t比對率(%)\t平均比對率(%)\t總錯配數(shù)\n")
        total_global_query = 0
        total_global_aligned = 0

        for stats in all_sample_stats:
            if not stats:
                continue
            # 計算樣本的“query比對率”(比對query數(shù)/總query數(shù))
            sample_query_rate = (stats["aligned_query"] / stats["total_query"]) * 100 if stats["total_query"] > 0 else 0.0
            f.write(
                f"{stats['sample_name']}\t{stats['total_query']}\t{stats['aligned_query']}\t"
                f"{sample_query_rate:.1f}\t{stats['avg_align_rate']:.2f}\t{stats['total_mismatch']}\n"
            )
            total_global_query = stats["total_query"]  # 所有樣本的總query數(shù)一致(同一query文件)
            total_global_aligned += stats["aligned_query"]

        # 全局總統(tǒng)計
        f.write(f"\n全局匯總:\n")
        f.write(f"總樣本數(shù):{len(all_sample_stats)}\n")
        f.write(f"總query數(shù):{total_global_query}\n")
        f.write(f"所有樣本合計比對query數(shù):{total_global_aligned}\n")
        f.write(f"全局平均query比對率:{(total_global_aligned / (total_global_query * len(all_sample_stats))) * 100:.1f}%\n")

    print(f"\n? 全局樣本匯總文件已保存:{os.path.basename(global_file)}")
    return global_file


def main():
    global args  # 全局參數(shù),方便子函數(shù)訪問輸出根目錄
    parser = argparse.ArgumentParser(description="按樣本生成獨(dú)立文件夾+多參考樣本比對與統(tǒng)計")
    parser.add_argument("--query", default="extracted_sequences.txt", help="query序列文件(FASTA)")
    parser.add_argument("--ref_dir", default="fq", help="參考樣本文件夾(默認(rèn):fq,每個FA文件對應(yīng)一個樣本)")
    parser.add_argument("--outdir", default="sample_based_results", help="結(jié)果輸出根目錄(默認(rèn):sample_based_results)")
    parser.add_argument("--threads", type=int, default=4, help="比對線程數(shù)")
    args = parser.parse_args()

    # 1. 基礎(chǔ)檢查
    check_dependency(["minimap2", "samtools"])
    if not os.path.exists(args.query):
        print(f"? query文件不存在:{args.query}")
        exit(1)
    os.makedirs(args.outdir, exist_ok=True)
    print(f"?? 結(jié)果輸出根目錄:{args.outdir}\n")

    # 2. 核心流程:按樣本處理
    print("===== 步驟1/5:識別參考樣本 =====")
    ref_samples = get_ref_samples(args.ref_dir)  # (樣本名, FA路徑, 樣本文件夾路徑)

    print("\n===== 步驟2/5:初始化樣本文件夾 =====")
    for sample_name, fa_path, sample_dir in ref_samples:
        init_sample_dir(sample_dir)

    print("\n===== 步驟3/5:為所有樣本構(gòu)建索引 =====")
    sample_index_list = []  # (樣本名, 索引路徑, 樣本文件夾路徑)
    for sample_name, fa_path, sample_dir in ref_samples:
        index_path = build_sample_index(sample_name, fa_path, sample_dir)
        sample_index_list.append((sample_name, index_path, sample_dir))

    print("\n===== 步驟4/5:為所有樣本運(yùn)行比對并提取結(jié)果 =====")
    all_sample_stats = []  # 存儲所有樣本的核心統(tǒng)計,用于全局匯總
    for sample_name, index_path, sample_dir in sample_index_list:
        bam_path = run_sample_alignment(sample_name, index_path, args.query, sample_dir, args.threads)
        sample_stats = extract_sample_results(sample_name, bam_path, args.query, sample_dir)
        all_sample_stats.append(sample_stats)

    print("\n===== 步驟5/5:生成全局樣本匯總 =====")
    generate_global_summary(all_sample_stats, args.outdir)

    print(f"\n?? 全流程完成!所有樣本結(jié)果已按文件夾隔離,根目錄:{args.outdir}")


if __name__ == "__main__":
    main()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容