提取出每條染色體著絲粒區(qū)域,比對到不同參考基因組,查看差異:
import os
import subprocess
import argparse
import shutil
import re
from collections import defaultdict
def check_dependency(tools):
"""檢查多個工具是否安裝(新增blastn依賴)"""
missing = []
for tool in tools:
if not shutil.which(tool):
missing.append(tool)
if missing:
print(f"? 缺少工具:{', '.join(missing)}")
print(" 安裝建議(conda):conda install -c bioconda blast samtools minimap2")
exit(1)
def parse_query_detail(query_detail_path):
"""解析query_longest_chrom_detail.txt,獲取有效比對的query及其基因組區(qū)域信息"""
print(f"?? 解析query比對詳情文件:{query_detail_path}")
valid_queries = {} # {query_id: {'chrom': '', 'start': int, 'end': int, 'strand': '', 'query_len': int}}
if not os.path.exists(query_detail_path):
print(f"? query詳情文件不存在:{query_detail_path}")
exit(1)
with open(query_detail_path, 'r') as f:
next(f) # 跳過表頭
for line_num, line in enumerate(f, 2):
line = line.strip()
if not line:
continue
cols = line.split('\t')
if len(cols) != 9:
print(f"?? 跳過第{line_num}行:格式錯誤(需9列)")
continue
query_id = cols[0]
chrom = cols[1]
start = cols[2]
end = cols[3]
strand = cols[4]
query_len = cols[5]
# 篩選有效比對(chrom、start、end非'-')
if chrom == '-' or start == '-' or end == '-':
continue
try:
start = int(start)
end = int(end)
query_len = int(query_len)
except ValueError:
print(f"?? 跳過第{line_num}行:坐標(biāo)/長度非整數(shù)")
continue
valid_queries[query_id] = {
'chrom': chrom,
'start': start,
'end': end,
'strand': strand,
'query_len': query_len
}
print(f"? 共解析到 {len(valid_queries)} 條有效比對的query")
if not valid_queries:
print("? 無有效比對的query,無法繼續(xù)")
exit(1)
return valid_queries
def extract_genome_seqs(ref_fasta, valid_queries, out_dir):
"""從基因組中提取有效比對區(qū)域的序列(負(fù)鏈反轉(zhuǎn)互補(bǔ))"""
print(f"\n?? 提取基因組比對區(qū)域序列...")
# 檢查基因組索引(samtools faidx需要)
ref_index = f"{ref_fasta}.fai"
if not os.path.exists(ref_index):
print(f"?? 基因組索引不存在,自動生成:{ref_index}")
subprocess.run(['samtools', 'faidx', ref_fasta], check=True, stderr=subprocess.PIPE, text=True)
# 輸出基因組提取序列文件(用于blastn)
subj_fasta = os.path.join(out_dir, "extracted_genome_seqs.fasta")
subj_seqs = {} # {query_id: 基因組提取序列}
failed_queries = []
with open(subj_fasta, 'w') as f:
for query_id, info in valid_queries.items():
chrom = info['chrom']
start = info['start']
end = info['end']
strand = info['strand']
# 提取基因組序列(samtools faidx)
cmd = ['samtools', 'faidx', ref_fasta, f"{chrom}:{start}-{end}"]
try:
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True
)
except subprocess.CalledProcessError as e:
print(f"?? {query_id} 提取失?。簕e.stderr.strip()}")
failed_queries.append(query_id)
continue
# 處理提取的序列(去除表頭,合并)
seq_lines = []
for line in result.stdout.splitlines():
if not line.startswith('>'):
seq_lines.append(line.strip().upper())
subj_seq = ''.join(seq_lines)
if not subj_seq:
print(f"?? {query_id} 提取序列為空")
failed_queries.append(query_id)
continue
# 負(fù)鏈反轉(zhuǎn)互補(bǔ)
if strand == '-':
subj_seq = subj_seq[::-1].translate(str.maketrans('ATCG', 'TAGC'))
# 寫入文件(ID格式:query_id_subj,確保與query對應(yīng))
f.write(f">{query_id}_subj\n{subj_seq}\n")
subj_seqs[query_id] = subj_seq
# 移除提取失敗的query
for qid in failed_queries:
valid_queries.pop(qid, None)
print(f"? 基因組序列提取完成:")
print(f" - 成功提?。簕len(subj_seqs)} 條序列({subj_fasta})")
print(f" - 提取失敗:{len(failed_queries)} 條query(已排除)")
return subj_fasta, subj_seqs, valid_queries
def prepare_blast_input(query_fasta, valid_queries, out_dir):
"""準(zhǔn)備blastn的query輸入文件(僅包含有效比對的query)"""
print(f"\n?? 準(zhǔn)備blastn query輸入文件...")
blast_query_fasta = os.path.join(out_dir, "blastn_query_seqs.fasta")
query_seqs = {} # {query_id: query序列}
missing_queries = []
# 先讀取完整query序列
with open(query_fasta, 'r') as f:
current_id, current_seq = None, []
for line in f:
line = line.strip()
if line.startswith('>'):
if current_id:
query_seqs[current_id] = ''.join(current_seq).upper()
current_id = line[1:].split()[0]
current_seq = []
else:
current_seq.append(line)
if current_id:
query_seqs[current_id] = ''.join(current_seq).upper()
# 篩選有效query并寫入blast輸入文件
with open(blast_query_fasta, 'w') as f:
for qid in valid_queries.keys():
if qid not in query_seqs:
missing_queries.append(qid)
continue
f.write(f">{qid}\n{query_seqs[qid]}\n")
# 移除缺失序列的query
for qid in missing_queries:
valid_queries.pop(qid, None)
print(f"? blastn query文件準(zhǔn)備完成:")
print(f" - 有效query:{len(valid_queries)} 條({blast_query_fasta})")
print(f" - 缺失序列:{len(missing_queries)} 條query(已排除)")
return blast_query_fasta, query_seqs
def run_blastn(blast_query_fasta, subj_fasta, out_dir, threads=4):
"""運(yùn)行blastn比對(query vs 提取的基因組序列)"""
print(f"\n?? 運(yùn)行blastn比對...")
# 1. 構(gòu)建blast數(shù)據(jù)庫(subject:提取的基因組序列)
db_prefix = os.path.join(out_dir, "genome_subj_blastdb")
if not all(os.path.exists(f"{db_prefix}.{ext}") for ext in ['nhr', 'nin', 'nsq']):
cmd = ['makeblastdb', '-in', subj_fasta, '-dbtype', 'nucl', '-out', db_prefix]
subprocess.run(cmd, check=True, stderr=subprocess.PIPE, text=True)
print(f"? blast數(shù)據(jù)庫構(gòu)建完成:{db_prefix}.*")
else:
print(f"? blast數(shù)據(jù)庫已存在:{db_prefix}.*")
# 2. 運(yùn)行blastn
blast_out = os.path.join(out_dir, "blastn_alignment_results.txt")
# blastn參數(shù):高置信度比對(evalue=1e-20),輸出表格格式(含比對長度、相似度)
cmd = [
'blastn', '-query', blast_query_fasta, '-db', db_prefix,
'-out', blast_out, '-outfmt', "6 qseqid sseqid length qlen pident evalue",
'-evalue', '1e-20', '-num_threads', str(threads), '-max_target_seqs', '1' # 每條query僅保留1條最佳比對
]
subprocess.run(cmd, check=True, stderr=subprocess.PIPE, text=True)
if os.path.exists(blast_out) and os.path.getsize(blast_out) > 0:
print(f"? blastn比對完成:{blast_out}")
return blast_out
else:
print(f"? blastn無結(jié)果輸出,可能無有效比對")
exit(1)
def parse_blastn(blast_out, valid_queries):
"""解析blastn結(jié)果,統(tǒng)計每條query的比對長度、比對率"""
print(f"\n?? 解析blastn結(jié)果...")
blast_stats = {} # {query_id: {'align_len': int, 'query_len': int, 'align_rate': float, 'pident': float, 'evalue': str}}
no_blast_hit = []
with open(blast_out, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
cols = line.split('\t')
if len(cols) != 6:
continue # 跳過格式錯誤行
qid = cols[0]
align_len = int(cols[2])
query_len = int(cols[3])
pident = float(cols[4])
evalue = cols[5]
# 計算blastn比對率(比對長度 / query長度 ×100%)
align_rate = (align_len / query_len) * 100 if query_len > 0 else 0.0
blast_stats[qid] = {
'align_len': align_len,
'query_len': query_len,
'align_rate': align_rate,
'pident': pident,
'evalue': evalue
}
# 檢查無blast結(jié)果的query
for qid in valid_queries.keys():
if qid not in blast_stats:
no_blast_hit.append(qid)
# 移除無blast結(jié)果的query
for qid in no_blast_hit:
valid_queries.pop(qid, None)
print(f"? blastn結(jié)果解析完成:")
print(f" - 有有效比對:{len(blast_stats)} 條query")
print(f" - 無blast結(jié)果:{len(no_blast_hit)} 條query(已排除)")
return blast_stats, valid_queries
def integrate_results(valid_queries, subj_seqs, query_seqs, blast_stats, out_dir):
"""整合所有結(jié)果:query信息+基因組信息+blastn統(tǒng)計"""
print(f"\n?? 整合所有結(jié)果...")
# 1. 詳細(xì)整合文件
integrate_detail = os.path.join(out_dir, "blastn_integrated_details.txt")
# 2. 染色體匯總統(tǒng)計文件
chrom_summary = os.path.join(out_dir, "blastn_chrom_summary.txt")
# 寫入詳細(xì)整合結(jié)果
with open(integrate_detail, 'w') as f:
f.write(
"query_id\tgenome_chrom\tgenome_start\tgenome_end\tgenome_strand\t"
"query_length\tgenome_seq_length\tblastn_align_length\tblastn_align_rate(%)\t"
"blastn_pident(%)\tblastn_evalue\tstatus\n"
)
for qid, info in valid_queries.items():
q_len = info['query_len']
subj_seq = subj_seqs[qid]
subj_len = len(subj_seq)
blast = blast_stats[qid]
f.write(
f"{qid}\t{info['chrom']}\t{info['start']}\t{info['end']}\t{info['strand']}\t"
f"{q_len}\t{subj_len}\t{blast['align_len']}\t{blast['align_rate']:.2f}\t"
f"{blast['pident']:.2f}\t{blast['evalue']}\tsuccess\n"
)
# 按染色體分組統(tǒng)計blastn結(jié)果
chrom_stats = defaultdict(lambda: {
'query_count': 0,
'total_align_len': 0,
'total_align_rate': 0,
'total_pident': 0
})
for qid, info in valid_queries.items():
chrom = info['chrom']
blast = blast_stats[qid]
chrom_stats[chrom]['query_count'] += 1
chrom_stats[chrom]['total_align_len'] += blast['align_len']
chrom_stats[chrom]['total_align_rate'] += blast['align_rate']
chrom_stats[chrom]['total_pident'] += blast['pident']
# 寫入染色體匯總統(tǒng)計
with open(chrom_summary, 'w') as f:
f.write(
"chromosome\tquery_count\tavg_blastn_align_length(bp)\t"
"avg_blastn_align_rate(%)\tavg_blastn_pident(%)\n"
)
# 按query_count降序排序
sorted_chroms = sorted(chrom_stats.items(), key=lambda x: x[1]['query_count'], reverse=True)
for chrom, stats in sorted_chroms:
count = stats['query_count']
avg_align_len = stats['total_align_len'] / count if count > 0 else 0.0
avg_align_rate = stats['total_align_rate'] / count if count > 0 else 0.0
avg_pident = stats['total_pident'] / count if count > 0 else 0.0
f.write(
f"{chrom}\t{count}\t{avg_align_len:.1f}\t"
f"{avg_align_rate:.2f}\t{avg_pident:.2f}\n"
)
# 全局匯總統(tǒng)計
total_query = len(valid_queries)
if total_query > 0:
avg_all_align_len = sum(b['align_len'] for b in blast_stats.values()) / total_query
avg_all_align_rate = sum(b['align_rate'] for b in blast_stats.values()) / total_query
avg_all_pident = sum(b['pident'] for b in blast_stats.values()) / total_query
print(f"\n?? 全局blastn統(tǒng)計匯總:")
print(f" - 有效比對query總數(shù):{total_query}")
print(f" - 平均blastn比對長度:{avg_all_align_len:.1f}bp")
print(f" - 平均blastn比對率:{avg_all_align_rate:.2f}%")
print(f" - 平均blastn相似度(pident):{avg_all_pident:.2f}%")
print(f" - 涉及染色體數(shù):{len(chrom_stats)}")
print(f"\n? 整合結(jié)果保存完成:")
print(f" - 詳細(xì)結(jié)果:{integrate_detail}")
print(f" - 染色體匯總:{chrom_summary}")
return integrate_detail, chrom_summary
def main():
parser = argparse.ArgumentParser(description="提取基因組比對序列+blastn比對+統(tǒng)計整合")
parser.add_argument("--query", default="extracted_sequences.txt", help="原query序列文件(FASTA)")
parser.add_argument("--ref", default="H7L31.fixed.fa", help="基因組文件(FASTA)")
parser.add_argument("--query_detail", default="chrom_integrated_results/query_longest_chrom_detail.txt",
help="之前生成的query最長比對詳情文件")
parser.add_argument("--outdir", default="blastn_integrated_results", help="blastn整合結(jié)果輸出目錄")
parser.add_argument("--threads", type=int, default=4, help="blastn線程數(shù)")
args = parser.parse_args()
# 1. 檢查依賴(新增blastn、makeblastdb)
check_dependency(['samtools', 'minimap2', 'blastn', 'makeblastdb'])
# 2. 檢查輸入文件
required_files = [args.query, args.ref, args.query_detail]
for f in required_files:
if not os.path.exists(f):
print(f"? 輸入文件不存在:{f}")
exit(1)
# 3. 創(chuàng)建輸出目錄
os.makedirs(args.outdir, exist_ok=True)
print(f"? 輸出目錄已創(chuàng)建:{args.outdir}")
# 4. 核心流程執(zhí)行
print("\n===== 步驟1/5:解析query比對詳情 =====")
valid_queries = parse_query_detail(args.query_detail)
print("\n===== 步驟2/5:提取基因組比對區(qū)域序列 =====")
subj_fasta, subj_seqs, valid_queries = extract_genome_seqs(args.ref, valid_queries, args.outdir)
print("\n===== 步驟3/5:準(zhǔn)備blastn輸入文件 =====")
blast_query_fasta, query_seqs = prepare_blast_input(args.query, valid_queries, args.outdir)
print("\n===== 步驟4/5:運(yùn)行blastn比對 =====")
blast_out = run_blastn(blast_query_fasta, subj_fasta, args.outdir, args.threads)
print("\n===== 步驟5/5:解析并整合結(jié)果 =====")
blast_stats, valid_queries = parse_blastn(blast_out, valid_queries)
integrate_detail, chrom_summary = integrate_results(valid_queries, subj_seqs, query_seqs, blast_stats, args.outdir)
print("\n?? 全流程完成!核心結(jié)果文件:")
print(f" 1. 基因組提取序列:{subj_fasta}")
print(f" 2. blastn比對結(jié)果:{blast_out}")
print(f" 3. 整合詳細(xì)結(jié)果:{integrate_detail}")
print(f" 4. 染色體匯總統(tǒng)計:{chrom_summary}")
if __name__ == "__main__":
main()


結(jié)果如上
另具體信息:
import os
import subprocess
import argparse
import shutil
import re
from collections import defaultdict
def check_dependency(tools):
"""檢查必需工具是否安裝(minimap2、samtools)"""
missing = [tool for tool in tools if not shutil.which(tool)]
if missing:
print(f"? 缺少必需工具:{', '.join(missing)}")
print(" 安裝建議(conda):conda install -c bioconda minimap2 samtools")
exit(1)
def build_minimap2_index(ref_fasta):
"""構(gòu)建參考基因組minimap2索引(若不存在)"""
index_file = f"{ref_fasta}.mmi"
if os.path.exists(index_file):
print(f"? 基因組索引已存在:{index_file}")
return index_file
print(f"?? 構(gòu)建基因組索引({ref_fasta})...")
cmd = ["minimap2", "-d", index_file, ref_fasta]
try:
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print(f"? 索引構(gòu)建完成:{index_file}")
return index_file
except subprocess.CalledProcessError as e:
err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
print(f"? 索引構(gòu)建失?。簕err_msg}")
exit(1)
def run_minimap2_alignment(ref_index, query_file, output_bam, threads=4):
"""運(yùn)行minimap2比對,輸出排序BAM(保留高置信度比對信息)"""
if os.path.exists(output_bam) and os.path.exists(f"{output_bam}.bai"):
print(f"? 比對結(jié)果BAM已存在:{output_bam}")
return
temp_sam = output_bam.replace(".bam", ".temp.sam")
try:
print("?? 開始序列與基因組比對...")
cmd = [
"minimap2", "-a", "-x", "map-ont",
"--end-bonus", "10", "--MD", # 保留MD標(biāo)簽用于變異解析
"-t", str(threads),
ref_index, query_file
]
with open(temp_sam, "w") as f:
subprocess.run(cmd, stdout=f, check=True, stderr=subprocess.PIPE, text=True)
print("?? 轉(zhuǎn)換為排序BAM并構(gòu)建索引...")
subprocess.run(
["samtools", "sort", "-@", str(threads), "-o", output_bam, temp_sam],
check=True, stderr=subprocess.PIPE, text=True
)
subprocess.run(["samtools", "index", output_bam], check=True)
os.remove(temp_sam)
print(f"? 比對完成:{output_bam}")
except subprocess.CalledProcessError as e:
err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
print(f"? 比對失?。簕err_msg}")
if os.path.exists(temp_sam):
os.remove(temp_sam)
exit(1)
def parse_cigar_and_md(cigar, md_tag, query_seq, ref_start):
"""解析CIGAR和MD標(biāo)簽,提取變異信息(錯配、插入、缺失)"""
variants = []
query_pos = 0 # query序列當(dāng)前位置(0-based)
ref_pos = ref_start # 參考序列當(dāng)前位置(1-based)
md_parts = re.findall(r"(\d+|[ATCGatcg]|\^[ATCGatcg]+)", md_tag)
# 處理CIGAR,記錄插入/缺失
for length, op in re.findall(r"(\d+)([MIDNSHPX=])", cigar):
length = int(length)
if op in ["M", "=", "X"]:
query_pos += length
ref_pos += length
elif op == "I": # 插入(query有,ref無)
variants.append({
"type": "insertion",
"pos": ref_pos,
"ref_base": "-",
"query_base": query_seq[query_pos:query_pos+length],
"length": length
})
query_pos += length
elif op == "D": # 缺失(ref有,query無)
variants.append({
"type": "deletion",
"pos": ref_pos,
"ref_base": "", # 后續(xù)用MD標(biāo)簽補(bǔ)充
"query_base": "-",
"length": length
})
ref_pos += length
elif op in ["S", "H"]:
query_pos += length
elif op in ["N", "P"]:
ref_pos += length
# 處理MD標(biāo)簽,補(bǔ)充錯配和缺失的堿基
current_ref_pos = ref_start
for part in md_parts:
if part.isdigit():
current_ref_pos += int(part)
elif part.startswith("^"): # 缺失的堿基
del_bases = part[1:].upper()
for var in variants:
if var["type"] == "deletion" and var["pos"] == current_ref_pos:
var["ref_base"] = del_bases
break
current_ref_pos += len(del_bases)
else: # 錯配的堿基
ref_base = part.upper()
query_mismatch_pos = current_ref_pos - ref_start
if 0 <= query_mismatch_pos < len(query_seq):
query_base = query_seq[query_mismatch_pos].upper()
variants.append({
"type": "mismatch",
"pos": current_ref_pos,
"ref_base": ref_base,
"query_base": query_base,
"length": 1
})
current_ref_pos += 1
return variants
def extract_and_integrate_by_chrom(bam_file, query_file, ref_fasta, output_dir):
"""提取比對信息,按染色體整合統(tǒng)計所有數(shù)據(jù)(query信息放最左側(cè))"""
# 1. 讀取query序列(ID: (序列字符串, 長度))
print("\n?? 讀取query序列...")
query_dict = {}
with open(query_file, "r") as f:
current_id = None
current_seq_lines = []
for line in f:
line = line.strip()
if not line:
continue
if line.startswith(">"):
if current_id is not None and current_seq_lines:
full_seq = "".join(current_seq_lines).upper()
query_dict[current_id] = (full_seq, len(full_seq))
current_id = line[1:].split()[0]
current_seq_lines = []
else:
current_seq_lines.append(line)
if current_id is not None and current_seq_lines:
full_seq = "".join(current_seq_lines).upper()
query_dict[current_id] = (full_seq, len(full_seq))
total_queries = len(query_dict)
print(f"?? 共讀取 {total_queries} 條query序列")
# 2. 初始化數(shù)據(jù)結(jié)構(gòu)(按染色體分組統(tǒng)計)
chrom_stats = defaultdict(lambda: {
"query_count": 0, # 該染色體上的比對query數(shù)
"total_query_length": 0, # 總query長度
"total_align_length": 0, # 總比對長度
"total_align_rate": 0.0, # 總比對率
"total_mismatch": 0, # 總錯配數(shù)
"total_insertion": 0, # 總插入數(shù)
"total_deletion": 0, # 總?cè)笔?shù)
"total_insertion_length": 0,# 插入總長度
"total_deletion_length": 0, # 缺失總長度
"queries": [] # 比對到該染色體的query ID列表
})
query_detail = [] # 單條query的詳細(xì)信息
# 3. 提取BAM中的高置信度比對
print("?? 提取比對信息并按染色體整合...")
view_cmd = ["samtools", "view", "-q", "20", "-F", "4", bam_file]
try:
result = subprocess.run(view_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
except subprocess.CalledProcessError as e:
err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
print(f"? 讀取BAM失?。簕err_msg}")
exit(1)
# 4. 解析每條比對結(jié)果,更新統(tǒng)計
for line in result.stdout.splitlines():
fields = line.split("\t")
qid = fields[0]
if qid not in query_dict:
continue
query_seq, qlen = query_dict[qid]
# 解析BAM基礎(chǔ)字段
flag = int(fields[1])
chrom = fields[2]
ref_start = int(fields[3])
cigar = fields[5]
strand = "-" if (flag & 16) else "+"
md_tag = next((f.split(":")[-1] for f in fields if f.startswith("MD:Z:")), "")
# 計算比對指標(biāo)
align_length = 0
for length, op in re.findall(r"(\d+)([MIDNSHPX=])", cigar):
if op in ["M", "D", "N", "X", "="]:
align_length += int(length)
align_rate = (align_length / qlen) * 100 if qlen > 0 else 0.0
ref_end = ref_start + align_length - 1 if align_length > 0 else ref_start
# 解析變異并統(tǒng)計
variants = parse_cigar_and_md(cigar, md_tag, query_seq, ref_start)
mismatch_count = len([v for v in variants if v["type"] == "mismatch"])
insertion_count = len([v for v in variants if v["type"] == "insertion"])
deletion_count = len([v for v in variants if v["type"] == "deletion"])
insertion_length = sum(v["length"] for v in variants if v["type"] == "insertion")
deletion_length = sum(v["length"] for v in variants if v["type"] == "deletion")
# 記錄query明細(xì)(query信息優(yōu)先)
query_detail.append({
"query_id": qid,
"query_length": qlen, # query信息放前面
"chrom": chrom,
"ref_start": ref_start,
"ref_end": ref_end,
"strand": strand,
"align_length": align_length,
"align_rate": align_rate,
"mismatch": mismatch_count,
"insertion": insertion_count,
"deletion": deletion_count,
"insertion_length": insertion_length,
"deletion_length": deletion_length,
"variants": variants
})
# 更新染色體統(tǒng)計
chrom_stats[chrom]["query_count"] += 1
chrom_stats[chrom]["total_query_length"] += qlen
chrom_stats[chrom]["total_align_length"] += align_length
chrom_stats[chrom]["total_align_rate"] += align_rate
chrom_stats[chrom]["total_mismatch"] += mismatch_count
chrom_stats[chrom]["total_insertion"] += insertion_count
chrom_stats[chrom]["total_deletion"] += deletion_count
chrom_stats[chrom]["total_insertion_length"] += insertion_length
chrom_stats[chrom]["total_deletion_length"] += deletion_length
chrom_stats[chrom]["queries"].append(qid)
# 5. 補(bǔ)充未比對的query
aligned_qids = {d["query_id"] for d in query_detail}
for qid in query_dict.keys():
if qid not in aligned_qids:
qlen = query_dict[qid][1]
query_detail.append({
"query_id": qid,
"query_length": qlen, # query信息放前面
"chrom": "-",
"ref_start": "-",
"ref_end": "-",
"strand": "-",
"align_length": "-",
"align_rate": "-",
"mismatch": "-",
"insertion": "-",
"deletion": "-",
"insertion_length": "-",
"deletion_length": "-",
"variants": []
})
# 6. 計算染色體平均指標(biāo)
for chrom in chrom_stats:
stats = chrom_stats[chrom]
count = stats["query_count"]
if count == 0:
continue
stats["avg_align_rate"] = stats["total_align_rate"] / count
stats["avg_query_length"] = stats["total_query_length"] / count
stats["avg_align_length"] = stats["total_align_length"] / count
stats["avg_mismatch"] = stats["total_mismatch"] / count
stats["avg_insertion"] = stats["total_insertion"] / count
stats["avg_deletion"] = stats["total_deletion"] / count
stats["avg_insertion_length"] = (stats["total_insertion_length"] / stats["total_insertion"]) if stats["total_insertion"] > 0 else 0
stats["avg_deletion_length"] = (stats["total_deletion_length"] / stats["total_deletion"]) if stats["total_deletion"] > 0 else 0
# 7. 輸出結(jié)果文件(query信息放最左側(cè))
# 7.1 單條query詳細(xì)信息(query_id、query_length居左)
detail_file = os.path.join(output_dir, "query_alignment_details.txt")
with open(detail_file, "w") as f:
# 表頭順序:query信息 → 染色體信息 → 比對/變異信息
f.write(
"query_id\tquery_length\tchromosome\tref_start\tref_end\tstrand\t"
"align_length\talign_rate(%)\t"
"mismatch_count\tinsertion_count\tdeletion_count\t"
"insertion_total_length\tdeletion_total_length\tvariant_type\t"
"variant_pos\tref_base\tquery_base\tvariant_length\n"
)
for q in query_detail:
if not q["variants"]:
f.write(
f"{q['query_id']}\t{q['query_length']}\t{q['chrom']}\t{q['ref_start']}\t{q['ref_end']}\t{q['strand']}\t"
f"{q['align_length']}\t{q['align_rate']}\t"
f"{q['mismatch']}\t{q['insertion']}\t{q['deletion']}\t"
f"{q['insertion_length']}\t{q['deletion_length']}\tno_variant\t-\t-\t-\t-\n"
)
else:
for var in q["variants"]:
f.write(
f"{q['query_id']}\t{q['query_length']}\t{q['chrom']}\t{q['ref_start']}\t{q['ref_end']}\t{q['strand']}\t"
f"{q['align_length']}\t{q['align_rate']:.2f}\t"
f"{q['mismatch']}\t{q['insertion']}\t{q['deletion']}\t"
f"{q['insertion_length']}\t{q['deletion_length']}\t{var['type']}\t"
f"{var['pos']}\t{var['ref_base']}\t{var['query_base']}\t{var['length']}\n"
)
# 7.2 按染色體整合統(tǒng)計(關(guān)聯(lián)的query列表放右側(cè),不影響左側(cè)染色體信息閱讀)
chrom_file = os.path.join(output_dir, "chromosome_integrated_stats.txt")
with open(chrom_file, "w") as f:
f.write(
"chromosome\tquery_count\tavg_query_length(bp)\t" # query相關(guān)統(tǒng)計緊隨染色體
"total_align_length(bp)\tavg_align_length(bp)\tavg_align_rate(%)\t"
"total_mismatch\tavg_mismatch\t"
"total_insertion\tavg_insertion\tavg_insertion_length(bp)\t"
"total_deletion\tavg_deletion\tavg_deletion_length(bp)\t"
"matched_queries\n" # query列表放最右側(cè)
)
for chrom, stats in sorted(chrom_stats.items(), key=lambda x: x[1]["query_count"], reverse=True):
query_list = stats["queries"]
query_str = ", ".join(query_list) if len(query_list) <= 10 else \
f"{', '.join(query_list[:10])}...(共{len(query_list)}條)"
f.write(
f"{chrom}\t{stats['query_count']}\t{stats['avg_query_length']:.1f}\t"
f"{stats['total_align_length']}\t{stats['avg_align_length']:.1f}\t{stats['avg_align_rate']:.2f}\t"
f"{stats['total_mismatch']}\t{stats['avg_mismatch']:.1f}\t"
f"{stats['total_insertion']}\t{stats['avg_insertion']:.1f}\t{stats['avg_insertion_length']:.1f}\t"
f"{stats['total_deletion']}\t{stats['avg_deletion']:.1f}\t{stats['avg_deletion_length']:.1f}\t"
f"{query_str}\n"
)
# 7.3 全局匯總統(tǒng)計(保持原有邏輯)
global_file = os.path.join(output_dir, "global_summary.txt")
with open(global_file, "w") as f:
aligned_count = len(aligned_qids)
unaligned_count = total_queries - aligned_count
f.write(f"總query序列數(shù): {total_queries}\n")
f.write(f"成功比對的query數(shù): {aligned_count} ({aligned_count/total_queries*100:.1f}%)\n")
f.write(f"未比對的query數(shù): {unaligned_count} ({unaligned_count/total_queries*100:.1f}%)\n")
f.write(f"涉及的染色體數(shù): {len(chrom_stats)}\n\n")
if chrom_stats:
top_chrom_by_count = max(chrom_stats.items(), key=lambda x: x[1]["query_count"])[0]
valid_chroms = [c for c in chrom_stats.items() if c[1]["query_count"] >= 3]
top_chrom_by_rate = max(valid_chroms, key=lambda x: x[1]["avg_align_rate"])[0] if valid_chroms else "N/A"
top_chrom_by_variant = max(chrom_stats.items(), key=lambda x: x[1]["total_mismatch"] + x[1]["total_insertion"] + x[1]["total_deletion"])[0]
f.write(f"比對query數(shù)最多的染色體: {top_chrom_by_count}({chrom_stats[top_chrom_by_count]['query_count']}條)\n")
f.write(f"平均比對率最高的染色體(≥3條query): {top_chrom_by_rate}({chrom_stats[top_chrom_by_rate]['avg_align_rate']:.2f}%)\n" if valid_chroms else "平均比對率最高的染色體: 無足夠數(shù)據(jù)(單染色體query數(shù)<3)\n")
f.write(f"總變異數(shù)最多的染色體: {top_chrom_by_variant}({chrom_stats[top_chrom_by_variant]['total_mismatch'] + chrom_stats[top_chrom_by_variant]['total_insertion'] + chrom_stats[top_chrom_by_variant]['total_deletion']}個變異)\n")
# 8. 輸出完成信息
print(f"\n? 結(jié)果文件保存完成:")
print(f" - 單條query詳細(xì)信息(query信息居左):{detail_file}")
print(f" - 按染色體整合統(tǒng)計:{chrom_file}")
print(f" - 全局匯總統(tǒng)計:{global_file}")
def main():
parser = argparse.ArgumentParser(description="序列比對+按染色體整合(query信息放最左側(cè))")
parser.add_argument("--query", default="extracted_rice_sequences.txt", help="query序列文件(FASTA)")
parser.add_argument("--ref", default="H7L31.fixed.fa", help="參考基因組文件(FASTA)")
parser.add_argument("--outdir", default="chrom_integrated_final_results", help="結(jié)果輸出目錄")
parser.add_argument("--threads", type=int, default=4, help="比對線程數(shù)")
args = parser.parse_args()
# 檢查依賴和輸入文件
check_dependency(["minimap2", "samtools"])
for file in [args.query, args.ref]:
if not os.path.exists(file):
print(f"? 輸入文件不存在:{file}")
exit(1)
# 創(chuàng)建輸出目錄
os.makedirs(args.outdir, exist_ok=True)
bam_file = os.path.join(args.outdir, "query_vs_genome.bam")
# 執(zhí)行流程
print("===== 步驟1/3:構(gòu)建基因組索引 =====")
ref_index = build_minimap2_index(args.ref)
print("\n===== 步驟2/3:運(yùn)行minimap2比對 =====")
run_minimap2_alignment(ref_index, args.query, bam_file, args.threads)
print("\n===== 步驟3/3:按染色體整合所有信息 =====")
extract_and_integrate_by_chrom(bam_file, args.query, args.ref, args.outdir)
print("\n?? 全流程完成!核心結(jié)果目錄:", args.outdir)
if __name__ == "__main__":
main()


有具體的差異(SNP/INDEL)信息
all
import os
import subprocess
import argparse
import shutil
import re
from collections import defaultdict
def check_dependency(tools):
"""檢查必需工具是否安裝(minimap2、samtools)"""
missing = [tool for tool in tools if not shutil.which(tool)]
if missing:
print(f"? 缺少必需工具:{', '.join(missing)}")
print(" 安裝建議(conda):conda install -c bioconda minimap2 samtools")
exit(1)
def get_ref_samples(ref_dir):
"""獲取參考文件夾(fq)下的所有樣本(每個FA文件對應(yīng)一個樣本)"""
if not os.path.exists(ref_dir):
print(f"? 參考樣本文件夾不存在:{ref_dir}")
exit(1)
ref_samples = [] # 存儲格式:(樣本名, FA文件路徑, 樣本文件夾路徑)
for filename in os.listdir(ref_dir):
if filename.startswith('.'):
continue # 跳過隱藏文件
if filename.endswith('.fa') or filename.endswith('.fasta'):
# 樣本名:去掉后綴(如“ref1.fa”→“ref1”)
sample_name = os.path.splitext(filename)[0]
# FA文件絕對路徑
fa_path = os.path.abspath(os.path.join(ref_dir, filename))
# 樣本專屬文件夾路徑(輸出根目錄/樣本名)
sample_dir = os.path.join(args.outdir, sample_name)
ref_samples.append((sample_name, fa_path, sample_dir))
if not ref_samples:
print(f"? 在參考文件夾 {ref_dir} 中未找到任何.fa/.fasta樣本文件")
exit(1)
print(f"? 共識別 {len(ref_samples)} 個參考樣本:")
for idx, (sample_name, fa_path, sample_dir) in enumerate(ref_samples, 1):
print(f" {idx}. 樣本名:{sample_name} → FA路徑:{fa_path} → 結(jié)果文件夾:{sample_dir}")
return ref_samples
def init_sample_dir(sample_dir):
"""初始化樣本專屬文件夾(若不存在則創(chuàng)建)"""
if not os.path.exists(sample_dir):
os.makedirs(sample_dir, exist_ok=True)
print(f"?? 創(chuàng)建樣本文件夾:{sample_dir}")
else:
print(f"?? 樣本文件夾已存在:{sample_dir}")
return sample_dir
def build_sample_index(sample_name, fa_path, sample_dir):
"""為單個樣本構(gòu)建minimap2索引(存入樣本文件夾)"""
# 索引文件名:樣本名_index.mmi(避免跨樣本沖突)
index_file = os.path.join(sample_dir, f"{sample_name}_index.mmi")
if os.path.exists(index_file):
print(f"? 樣本 {sample_name} 的索引已存在:{os.path.basename(index_file)}")
return index_file
print(f"?? 為樣本 {sample_name} 構(gòu)建基因組索引...")
cmd = ["minimap2", "-d", index_file, fa_path]
try:
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print(f"? 樣本 {sample_name} 的索引構(gòu)建完成:{os.path.basename(index_file)}")
return index_file
except subprocess.CalledProcessError as e:
err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
print(f"? 樣本 {sample_name} 的索引構(gòu)建失?。簕err_msg}")
exit(1)
def run_sample_alignment(sample_name, index_file, query_file, sample_dir, threads=4):
"""為單個樣本運(yùn)行minimap2比對(BAM存入樣本文件夾)"""
# BAM文件名:樣本名_query_vs_ref.bam(明確標(biāo)識樣本)
bam_file = os.path.join(sample_dir, f"{sample_name}_query_vs_ref.bam")
if os.path.exists(bam_file) and os.path.exists(f"{bam_file}.bai"):
print(f"? 樣本 {sample_name} 的比對BAM已存在:{os.path.basename(bam_file)}")
return bam_file
temp_sam = bam_file.replace('.bam', '.temp.sam')
try:
print(f"?? 開始query與樣本 {sample_name} 的比對...")
cmd = [
"minimap2", "-a", "-x", "map-ont",
"--end-bonus", "10", "--MD", # 保留MD標(biāo)簽用于變異解析
"-t", str(threads),
index_file, query_file
]
with open(temp_sam, "w") as f:
subprocess.run(cmd, stdout=f, check=True, stderr=subprocess.PIPE, text=True)
print(f"?? 轉(zhuǎn)換樣本 {sample_name} 的比對結(jié)果為BAM...")
subprocess.run(
["samtools", "sort", "-@", str(threads), "-o", bam_file, temp_sam],
check=True, stderr=subprocess.PIPE, text=True
)
subprocess.run(["samtools", "index", bam_file], check=True)
os.remove(temp_sam)
print(f"? 樣本 {sample_name} 的比對完成:{os.path.basename(bam_file)}")
return bam_file
except subprocess.CalledProcessError as e:
err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
print(f"? 樣本 {sample_name} 的比對失?。簕err_msg}")
if os.path.exists(temp_sam):
os.remove(temp_sam)
exit(1)
def parse_cigar_and_md(cigar, md_tag, query_seq, ref_start):
"""解析CIGAR和MD標(biāo)簽,提取變異信息(錯配、插入、缺失)"""
variants = []
query_pos = 0 # query序列當(dāng)前位置(0-based)
ref_pos = ref_start # 參考序列當(dāng)前位置(1-based)
md_parts = re.findall(r"(\d+|[ATCGatcg]|\^[ATCGatcg]+)", md_tag)
for length, op in re.findall(r"(\d+)([MIDNSHPX=])", cigar):
length = int(length)
if op in ["M", "=", "X"]:
query_pos += length
ref_pos += length
elif op == "I": # 插入(query有,ref無)
variants.append({
"type": "insertion",
"pos": ref_pos,
"ref_base": "-",
"query_base": query_seq[query_pos:query_pos+length],
"length": length
})
query_pos += length
elif op == "D": # 缺失(ref有,query無)
variants.append({
"type": "deletion",
"pos": ref_pos,
"ref_base": "", # 后續(xù)用MD標(biāo)簽補(bǔ)充
"query_base": "-",
"length": length
})
ref_pos += length
elif op in ["S", "H"]:
query_pos += length
elif op in ["N", "P"]:
ref_pos += length
# 補(bǔ)充MD標(biāo)簽信息
current_ref_pos = ref_start
for part in md_parts:
if part.isdigit():
current_ref_pos += int(part)
elif part.startswith("^"): # 缺失的堿基
del_bases = part[1:].upper()
for var in variants:
if var["type"] == "deletion" and var["pos"] == current_ref_pos:
var["ref_base"] = del_bases
break
current_ref_pos += len(del_bases)
else: # 錯配的堿基
ref_base = part.upper()
query_mismatch_pos = current_ref_pos - ref_start
if 0 <= query_mismatch_pos < len(query_seq):
query_base = query_seq[query_mismatch_pos].upper()
variants.append({
"type": "mismatch",
"pos": current_ref_pos,
"ref_base": ref_base,
"query_base": query_base,
"length": 1
})
current_ref_pos += 1
return variants
def extract_sample_results(sample_name, bam_file, query_file, sample_dir):
"""提取單個樣本的比對結(jié)果(存入樣本文件夾),并返回核心統(tǒng)計用于全局匯總"""
# 1. 讀取query序列
query_dict = {}
with open(query_file, "r") as f:
current_id, current_seq_lines = None, []
for line in f:
line = line.strip()
if line.startswith(">"):
if current_id and current_seq_lines:
query_dict[current_id] = (''.join(current_seq_lines).upper(), len(current_seq_lines))
current_id = line[1:].split()[0]
current_seq_lines = []
else:
current_seq_lines.append(line)
if current_id and current_seq_lines:
query_dict[current_id] = (''.join(current_seq_lines).upper(), len(current_seq_lines))
# 2. 提取BAM比對信息
view_cmd = ["samtools", "view", "-q", "20", "-F", "4", bam_file]
try:
result = subprocess.run(view_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
except subprocess.CalledProcessError as e:
err_msg = e.stderr.strip() if e.stderr else "未知錯誤"
print(f"?? 讀取樣本 {sample_name} 的BAM失?。簕err_msg},跳過該樣本統(tǒng)計")
return None
# 3. 解析比對結(jié)果并生成樣本內(nèi)統(tǒng)計文件
sample_detail_file = os.path.join(sample_dir, f"{sample_name}_query_details.txt") # 樣本內(nèi)query詳情
sample_chrom_file = os.path.join(sample_dir, f"{sample_name}_chrom_stats.txt") # 樣本內(nèi)染色體統(tǒng)計
sample_stats = {
"sample_name": sample_name,
"total_query": len(query_dict),
"aligned_query": 0,
"total_align_length": 0,
"total_align_rate": 0.0,
"total_mismatch": 0
}
chrom_stats = defaultdict(lambda: {"query_count": 0, "total_align_length": 0})
with open(sample_detail_file, "w") as detail_f, open(sample_chrom_file, "w") as chrom_f:
# 樣本內(nèi)query詳情表頭
detail_f.write(
"query_id\tquery_length\tchromosome\tref_start\tref_end\tstrand\t"
"align_length\talign_rate(%)\tmismatch_count\tinsertion_count\tdeletion_count\n"
)
# 樣本內(nèi)染色體統(tǒng)計表頭
chrom_f.write(
"chromosome\tquery_count\ttotal_align_length(bp)\tavg_align_length(bp)\n"
)
aligned_qids = set()
for line in result.stdout.splitlines():
fields = line.split("\t")
qid = fields[0]
if qid not in query_dict:
continue
aligned_qids.add(qid)
query_seq, qlen = query_dict[qid]
# 解析BAM字段
flag = int(fields[1])
chrom = fields[2]
ref_start = int(fields[3])
cigar = fields[5]
strand = "-" if (flag & 16) else "+"
md_tag = next((f.split(":")[-1] for f in fields if f.startswith("MD:Z:")), "")
# 計算比對指標(biāo)
align_length = sum(int(l) for l, op in re.findall(r"(\d+)([MIDNSHPX=])", cigar) if op in ["M", "D", "N", "X", "="])
align_rate = (align_length / qlen) * 100 if qlen > 0 else 0.0
ref_end = ref_start + align_length - 1 if align_length > 0 else ref_start
# 解析變異
variants = parse_cigar_and_md(cigar, md_tag, query_seq, ref_start)
mismatch = len([v for v in variants if v["type"] == "mismatch"])
insertion = len([v for v in variants if v["type"] == "insertion"])
deletion = len([v for v in variants if v["type"] == "deletion"])
# 寫入樣本內(nèi)query詳情
detail_f.write(
f"{qid}\t{qlen}\t{chrom}\t{ref_start}\t{ref_end}\t{strand}\t"
f"{align_length}\t{align_rate:.2f}\t{mismatch}\t{insertion}\t{deletion}\n"
)
# 更新樣本內(nèi)染色體統(tǒng)計
chrom_stats[chrom]["query_count"] += 1
chrom_stats[chrom]["total_align_length"] += align_length
# 更新樣本核心統(tǒng)計
sample_stats["aligned_query"] += 1
sample_stats["total_align_length"] += align_length
sample_stats["total_align_rate"] += align_rate
sample_stats["total_mismatch"] += mismatch
# 補(bǔ)充未比對的query到樣本內(nèi)詳情
for qid in query_dict.keys():
if qid not in aligned_qids:
qlen = query_dict[qid][1]
detail_f.write(
f"{qid}\t{qlen}\t-\t-\t-\t-\t-\t-\t-\t-\t-\n"
)
# 寫入樣本內(nèi)染色體統(tǒng)計
for chrom, stats in chrom_stats.items():
avg_align_len = stats["total_align_length"] / stats["query_count"] if stats["query_count"] > 0 else 0.0
chrom_f.write(
f"{chrom}\t{stats['query_count']}\t{stats['total_align_length']}\t{avg_align_len:.1f}\n"
)
# 計算樣本平均比對率
sample_stats["avg_align_rate"] = sample_stats["total_align_rate"] / sample_stats["aligned_query"] if sample_stats["aligned_query"] > 0 else 0.0
print(f"? 樣本 {sample_name} 結(jié)果已保存至:")
print(f" - query詳情:{os.path.basename(sample_detail_file)}")
print(f" - 染色體統(tǒng)計:{os.path.basename(sample_chrom_file)}")
return sample_stats
def generate_global_summary(all_sample_stats, outdir):
"""生成全局匯總文件(輸出根目錄),匯總所有樣本的核心指標(biāo)"""
global_file = os.path.join(outdir, "global_sample_summary.txt")
with open(global_file, "w") as f:
f.write("樣本名\t總query數(shù)\t比對query數(shù)\t比對率(%)\t平均比對率(%)\t總錯配數(shù)\n")
total_global_query = 0
total_global_aligned = 0
for stats in all_sample_stats:
if not stats:
continue
# 計算樣本的“query比對率”(比對query數(shù)/總query數(shù))
sample_query_rate = (stats["aligned_query"] / stats["total_query"]) * 100 if stats["total_query"] > 0 else 0.0
f.write(
f"{stats['sample_name']}\t{stats['total_query']}\t{stats['aligned_query']}\t"
f"{sample_query_rate:.1f}\t{stats['avg_align_rate']:.2f}\t{stats['total_mismatch']}\n"
)
total_global_query = stats["total_query"] # 所有樣本的總query數(shù)一致(同一query文件)
total_global_aligned += stats["aligned_query"]
# 全局總統(tǒng)計
f.write(f"\n全局匯總:\n")
f.write(f"總樣本數(shù):{len(all_sample_stats)}\n")
f.write(f"總query數(shù):{total_global_query}\n")
f.write(f"所有樣本合計比對query數(shù):{total_global_aligned}\n")
f.write(f"全局平均query比對率:{(total_global_aligned / (total_global_query * len(all_sample_stats))) * 100:.1f}%\n")
print(f"\n? 全局樣本匯總文件已保存:{os.path.basename(global_file)}")
return global_file
def main():
global args # 全局參數(shù),方便子函數(shù)訪問輸出根目錄
parser = argparse.ArgumentParser(description="按樣本生成獨(dú)立文件夾+多參考樣本比對與統(tǒng)計")
parser.add_argument("--query", default="extracted_sequences.txt", help="query序列文件(FASTA)")
parser.add_argument("--ref_dir", default="fq", help="參考樣本文件夾(默認(rèn):fq,每個FA文件對應(yīng)一個樣本)")
parser.add_argument("--outdir", default="sample_based_results", help="結(jié)果輸出根目錄(默認(rèn):sample_based_results)")
parser.add_argument("--threads", type=int, default=4, help="比對線程數(shù)")
args = parser.parse_args()
# 1. 基礎(chǔ)檢查
check_dependency(["minimap2", "samtools"])
if not os.path.exists(args.query):
print(f"? query文件不存在:{args.query}")
exit(1)
os.makedirs(args.outdir, exist_ok=True)
print(f"?? 結(jié)果輸出根目錄:{args.outdir}\n")
# 2. 核心流程:按樣本處理
print("===== 步驟1/5:識別參考樣本 =====")
ref_samples = get_ref_samples(args.ref_dir) # (樣本名, FA路徑, 樣本文件夾路徑)
print("\n===== 步驟2/5:初始化樣本文件夾 =====")
for sample_name, fa_path, sample_dir in ref_samples:
init_sample_dir(sample_dir)
print("\n===== 步驟3/5:為所有樣本構(gòu)建索引 =====")
sample_index_list = [] # (樣本名, 索引路徑, 樣本文件夾路徑)
for sample_name, fa_path, sample_dir in ref_samples:
index_path = build_sample_index(sample_name, fa_path, sample_dir)
sample_index_list.append((sample_name, index_path, sample_dir))
print("\n===== 步驟4/5:為所有樣本運(yùn)行比對并提取結(jié)果 =====")
all_sample_stats = [] # 存儲所有樣本的核心統(tǒng)計,用于全局匯總
for sample_name, index_path, sample_dir in sample_index_list:
bam_path = run_sample_alignment(sample_name, index_path, args.query, sample_dir, args.threads)
sample_stats = extract_sample_results(sample_name, bam_path, args.query, sample_dir)
all_sample_stats.append(sample_stats)
print("\n===== 步驟5/5:生成全局樣本匯總 =====")
generate_global_summary(all_sample_stats, args.outdir)
print(f"\n?? 全流程完成!所有樣本結(jié)果已按文件夾隔離,根目錄:{args.outdir}")
if __name__ == "__main__":
main()