
位置信息
import os
import sys
import argparse
from Bio import SeqIO
def read_cer_file(cer_path):
"""讀取cer文件,返回有效樣本信息(樣本名、染色體、區(qū)間)"""
samples = []
# 檢查cer文件是否存在
if not os.path.exists(cer_path):
print(f"錯誤:cer文件不存在 → {cer_path}")
sys.exit(1)
with open(cer_path, 'r') as f:
header = f.readline() # 跳過表頭(若無需跳過可刪除此句)
line_num = 1
for line in f:
line_num += 1
line = line.strip()
if not line:
continue # 跳過空行
parts = line.split()
if len(parts) < 5:
print(f"警告:行 {line_num} 列數(shù)不足(需≥5列),跳過 → {line}")
continue
# 解析并校驗數(shù)值(起始、終止、染色體總長)
try:
start = int(parts[2])
end = int(parts[3])
chrom_length = int(parts[4])
except ValueError as e:
print(f"警告:行 {line_num} 數(shù)值無效({e}),跳過 → {line}")
continue
# 確保起始位置小于終止位置
if start >= end:
print(f"警告:行 {line_num} 起始≥終止,已交換 → 原{start}-{end} → 新{end}-{start}")
start, end = end, start
samples.append({
'sample_name': parts[0],
'chromosome': parts[1],
'start': start,
'end': end,
'chrom_length': chrom_length
})
if not samples:
print(f"錯誤:從cer文件中未讀取到有效樣本信息 → {cer_path}")
sys.exit(1)
return samples
def extract_sequence(record, start, end):
"""提取序列片段(處理區(qū)間超出序列長度的情況)"""
seq_total_len = len(record.seq)
# 修正區(qū)間:確保不超出序列實際長度(1-based)
actual_start = max(1, start)
actual_end = min(seq_total_len, end)
# 若修正后區(qū)間無效(起始>終止),返回失敗
if actual_start > actual_end:
print(f"警告:序列區(qū)間無效 → 原{start}-{end},序列總長{seq_total_len},無法提取")
return None, (start, end, False)
# 提取子序列(Python切片為0-based,需減1)
sub_seq = record.seq[actual_start - 1:actual_end]
return sub_seq, (actual_start, actual_end, True)
def extract_and_merge_sequences(samples, fa_dir, output_single_dir, output_merged_dir):
"""
提取樣本序列:
1. 為每個染色體生成單獨序列文件(output_single_dir)
2. 將同一樣本的所有染色體序列合并到一個文件(output_merged_dir)
"""
# 創(chuàng)建輸出目錄(不存在則新建)
os.makedirs(output_single_dir, exist_ok=True)
os.makedirs(output_merged_dir, exist_ok=True)
# 用于分組存儲同一樣本的所有染色體序列(鍵:樣本名,值:SeqRecord列表)
sample_seq_groups = {}
for sample in samples:
s_name = sample['sample_name']
chrom = sample['chromosome']
start = sample['start']
end = sample['end']
# 1. 查找當前樣本的FASTA文件(支持多種常見命名格式)
possible_fasta_names = [
f"{s_name}.fasta", f"{s_name}.fa", # 樣本整體FASTA(含所有染色體)
f"{s_name}_{chrom}.fasta", f"{s_name}_{chrom}.fa" # 樣本-染色體單獨FASTA
]
fasta_path = None
for fn in possible_fasta_names:
test_path = os.path.join(fa_dir, fn)
if os.path.exists(test_path):
fasta_path = test_path
break
# 若未找到FASTA文件,跳過當前樣本-染色體
if not fasta_path:
print(f"警告:未找到樣本 {s_name} 的FASTA文件(已嘗試{len(possible_fasta_names)}種格式),跳過染色體 {chrom}")
continue
# 2. 加載FASTA文件并匹配染色體
try:
# 讀取所有序列,用字典存儲(鍵:序列ID,值:SeqRecord)
seq_records = {rec.id: rec for rec in SeqIO.parse(fasta_path, 'fasta')}
except Exception as e:
print(f"錯誤:加載FASTA文件失敗 → {fasta_path},{e},跳過染色體 {chrom}")
continue
# 匹配染色體(優(yōu)先精確匹配,其次模糊匹配)
chrom_id = None
# 精確匹配:染色體名完全等于序列ID
if chrom in seq_records:
chrom_id = chrom
# 模糊匹配:序列ID中包含染色體名(如"Chr01"在"IR64_Chr01"中)
else:
for rec_id in seq_records:
if chrom in rec_id:
chrom_id = rec_id
break
# 若未匹配到染色體,跳過
if not chrom_id:
print(f"警告:樣本 {s_name} 的FASTA中未找到染色體 {chrom},跳過")
continue
# 3. 提取染色體的目標區(qū)間序列
full_seq_rec = seq_records[chrom_id]
sub_seq, (actual_start, actual_end, is_valid) = extract_sequence(full_seq_rec, start, end)
if not is_valid:
continue
# 4. 生成單個染色體的序列文件
sub_seq_len = len(sub_seq)
single_fn = f"{s_name}_{chrom}_{actual_start}-{actual_end}.fasta"
single_path = os.path.join(output_single_dir, single_fn)
# 寫入FASTA(描述包含實際區(qū)間和長度)
with open(single_path, 'w') as f:
desc = f"{s_name}_{chrom} | 實際區(qū)間:{actual_start}-{actual_end} | 長度:{sub_seq_len}bp"
f.write(f">{desc}\n")
# 按80bp換行(符合FASTA標準格式)
for i in range(0, sub_seq_len, 80):
f.write(str(sub_seq[i:i+80]) + "\n")
print(f"已生成單獨序列文件 → {single_path}")
# 5. 將當前染色體序列加入樣本分組(用于后續(xù)合并)
# 創(chuàng)建SeqRecord對象(包含完整元信息)
merged_rec = SeqIO.SeqRecord(
seq=sub_seq,
id=f"{s_name}_{chrom}_{actual_start}-{actual_end}", # 唯一ID
description=f"染色體:{chrom} | 原始區(qū)間:{start}-{end} | 實際區(qū)間:{actual_start}-{actual_end} | 長度:{sub_seq_len}bp"
)
# 加入分組(若樣本首次出現(xiàn),初始化列表)
if s_name not in sample_seq_groups:
sample_seq_groups[s_name] = []
sample_seq_groups[s_name].append(merged_rec)
# 6. 合并同一樣本的所有染色體序列并生成文件
if not sample_seq_groups:
print("警告:未收集到任何可合并的樣本序列")
return
for s_name, seq_records in sample_seq_groups.items():
merged_fn = f"{s_name}_merged.fasta"
merged_path = os.path.join(output_merged_dir, merged_fn)
# 寫入合并文件
SeqIO.write(seq_records, merged_path, "fasta")
print(f"已生成樣本合并文件 → {merged_path}(含{len(seq_records)}條染色體序列)")
print(f"\n=== 序列提取完成 ===")
print(f"單獨染色體文件總數(shù):{sum(len(recs) for recs in sample_seq_groups.values())}")
print(f"合并樣本文件總數(shù):{len(sample_seq_groups)}")
print(f"單獨文件目錄:{output_single_dir}")
print(f"合并文件目錄:{output_merged_dir}")
def main():
# 解析命令行參數(shù)
parser = argparse.ArgumentParser(description='僅提取樣本序列并合并同一樣本到單個文件')
# 必選參數(shù)
parser.add_argument('cer_file', help='cer文件路徑(格式:樣本名 染色體 起始 終止 染色體總長)')
parser.add_argument('fa_dir', help='樣本FASTA文件所在文件夾路徑')
# 可選參數(shù)(輸出目錄)
parser.add_argument('-o', '--output', default='sequence_extraction_results',
help='總輸出目錄(默認:sequence_extraction_results)')
args = parser.parse_args()
# 定義子輸出目錄(總目錄下分兩個子目錄,結(jié)構(gòu)更清晰)
output_single_dir = os.path.join(args.output, 'single_chromosome_sequences') # 單個染色體文件
output_merged_dir = os.path.join(args.output, 'merged_sample_sequences') # 樣本合并文件
# 核心流程:讀取cer → 提取并合并序列
print("=== 步驟1:讀取cer文件 ===")
samples = read_cer_file(args.cer_file)
print(f"成功讀取 {len(samples)} 個有效樣本-染色體區(qū)間\n")
print("=== 步驟2:提取序列并合并同一樣本 ===")
extract_and_merge_sequences(samples, args.fa_dir, output_single_dir, output_merged_dir)
if __name__ == "__main__":
main()