指定位置序列的提取

位置信息
import os
import sys
import argparse
from Bio import SeqIO


def read_cer_file(cer_path):
    """讀取cer文件,返回有效樣本信息(樣本名、染色體、區(qū)間)"""
    samples = []
    # 檢查cer文件是否存在
    if not os.path.exists(cer_path):
        print(f"錯誤:cer文件不存在 → {cer_path}")
        sys.exit(1)
    
    with open(cer_path, 'r') as f:
        header = f.readline()  # 跳過表頭(若無需跳過可刪除此句)
        line_num = 1
        for line in f:
            line_num += 1
            line = line.strip()
            if not line:
                continue  # 跳過空行
            
            parts = line.split()
            if len(parts) < 5:
                print(f"警告:行 {line_num} 列數(shù)不足(需≥5列),跳過 → {line}")
                continue
            
            # 解析并校驗數(shù)值(起始、終止、染色體總長)
            try:
                start = int(parts[2])
                end = int(parts[3])
                chrom_length = int(parts[4])
            except ValueError as e:
                print(f"警告:行 {line_num} 數(shù)值無效({e}),跳過 → {line}")
                continue
            
            # 確保起始位置小于終止位置
            if start >= end:
                print(f"警告:行 {line_num} 起始≥終止,已交換 → 原{start}-{end} → 新{end}-{start}")
                start, end = end, start
            
            samples.append({
                'sample_name': parts[0],
                'chromosome': parts[1],
                'start': start,
                'end': end,
                'chrom_length': chrom_length
            })
    
    if not samples:
        print(f"錯誤:從cer文件中未讀取到有效樣本信息 → {cer_path}")
        sys.exit(1)
    return samples


def extract_sequence(record, start, end):
    """提取序列片段(處理區(qū)間超出序列長度的情況)"""
    seq_total_len = len(record.seq)
    # 修正區(qū)間:確保不超出序列實際長度(1-based)
    actual_start = max(1, start)
    actual_end = min(seq_total_len, end)
    
    # 若修正后區(qū)間無效(起始>終止),返回失敗
    if actual_start > actual_end:
        print(f"警告:序列區(qū)間無效 → 原{start}-{end},序列總長{seq_total_len},無法提取")
        return None, (start, end, False)
    
    # 提取子序列(Python切片為0-based,需減1)
    sub_seq = record.seq[actual_start - 1:actual_end]
    return sub_seq, (actual_start, actual_end, True)


def extract_and_merge_sequences(samples, fa_dir, output_single_dir, output_merged_dir):
    """
    提取樣本序列:
    1. 為每個染色體生成單獨序列文件(output_single_dir)
    2. 將同一樣本的所有染色體序列合并到一個文件(output_merged_dir)
    """
    # 創(chuàng)建輸出目錄(不存在則新建)
    os.makedirs(output_single_dir, exist_ok=True)
    os.makedirs(output_merged_dir, exist_ok=True)
    
    # 用于分組存儲同一樣本的所有染色體序列(鍵:樣本名,值:SeqRecord列表)
    sample_seq_groups = {}
    
    for sample in samples:
        s_name = sample['sample_name']
        chrom = sample['chromosome']
        start = sample['start']
        end = sample['end']
        
        # 1. 查找當前樣本的FASTA文件(支持多種常見命名格式)
        possible_fasta_names = [
            f"{s_name}.fasta", f"{s_name}.fa",  # 樣本整體FASTA(含所有染色體)
            f"{s_name}_{chrom}.fasta", f"{s_name}_{chrom}.fa"  # 樣本-染色體單獨FASTA
        ]
        fasta_path = None
        for fn in possible_fasta_names:
            test_path = os.path.join(fa_dir, fn)
            if os.path.exists(test_path):
                fasta_path = test_path
                break
        
        # 若未找到FASTA文件,跳過當前樣本-染色體
        if not fasta_path:
            print(f"警告:未找到樣本 {s_name} 的FASTA文件(已嘗試{len(possible_fasta_names)}種格式),跳過染色體 {chrom}")
            continue
        
        # 2. 加載FASTA文件并匹配染色體
        try:
            # 讀取所有序列,用字典存儲(鍵:序列ID,值:SeqRecord)
            seq_records = {rec.id: rec for rec in SeqIO.parse(fasta_path, 'fasta')}
        except Exception as e:
            print(f"錯誤:加載FASTA文件失敗 → {fasta_path},{e},跳過染色體 {chrom}")
            continue
        
        # 匹配染色體(優(yōu)先精確匹配,其次模糊匹配)
        chrom_id = None
        # 精確匹配:染色體名完全等于序列ID
        if chrom in seq_records:
            chrom_id = chrom
        # 模糊匹配:序列ID中包含染色體名(如"Chr01"在"IR64_Chr01"中)
        else:
            for rec_id in seq_records:
                if chrom in rec_id:
                    chrom_id = rec_id
                    break
        
        # 若未匹配到染色體,跳過
        if not chrom_id:
            print(f"警告:樣本 {s_name} 的FASTA中未找到染色體 {chrom},跳過")
            continue
        
        # 3. 提取染色體的目標區(qū)間序列
        full_seq_rec = seq_records[chrom_id]
        sub_seq, (actual_start, actual_end, is_valid) = extract_sequence(full_seq_rec, start, end)
        if not is_valid:
            continue
        
        # 4. 生成單個染色體的序列文件
        sub_seq_len = len(sub_seq)
        single_fn = f"{s_name}_{chrom}_{actual_start}-{actual_end}.fasta"
        single_path = os.path.join(output_single_dir, single_fn)
        # 寫入FASTA(描述包含實際區(qū)間和長度)
        with open(single_path, 'w') as f:
            desc = f"{s_name}_{chrom} | 實際區(qū)間:{actual_start}-{actual_end} | 長度:{sub_seq_len}bp"
            f.write(f">{desc}\n")
            # 按80bp換行(符合FASTA標準格式)
            for i in range(0, sub_seq_len, 80):
                f.write(str(sub_seq[i:i+80]) + "\n")
        print(f"已生成單獨序列文件 → {single_path}")
        
        # 5. 將當前染色體序列加入樣本分組(用于后續(xù)合并)
        # 創(chuàng)建SeqRecord對象(包含完整元信息)
        merged_rec = SeqIO.SeqRecord(
            seq=sub_seq,
            id=f"{s_name}_{chrom}_{actual_start}-{actual_end}",  # 唯一ID
            description=f"染色體:{chrom} | 原始區(qū)間:{start}-{end} | 實際區(qū)間:{actual_start}-{actual_end} | 長度:{sub_seq_len}bp"
        )
        # 加入分組(若樣本首次出現(xiàn),初始化列表)
        if s_name not in sample_seq_groups:
            sample_seq_groups[s_name] = []
        sample_seq_groups[s_name].append(merged_rec)
    
    # 6. 合并同一樣本的所有染色體序列并生成文件
    if not sample_seq_groups:
        print("警告:未收集到任何可合并的樣本序列")
        return
    
    for s_name, seq_records in sample_seq_groups.items():
        merged_fn = f"{s_name}_merged.fasta"
        merged_path = os.path.join(output_merged_dir, merged_fn)
        # 寫入合并文件
        SeqIO.write(seq_records, merged_path, "fasta")
        print(f"已生成樣本合并文件 → {merged_path}(含{len(seq_records)}條染色體序列)")
    
    print(f"\n=== 序列提取完成 ===")
    print(f"單獨染色體文件總數(shù):{sum(len(recs) for recs in sample_seq_groups.values())}")
    print(f"合并樣本文件總數(shù):{len(sample_seq_groups)}")
    print(f"單獨文件目錄:{output_single_dir}")
    print(f"合并文件目錄:{output_merged_dir}")


def main():
    # 解析命令行參數(shù)
    parser = argparse.ArgumentParser(description='僅提取樣本序列并合并同一樣本到單個文件')
    # 必選參數(shù)
    parser.add_argument('cer_file', help='cer文件路徑(格式:樣本名 染色體 起始 終止 染色體總長)')
    parser.add_argument('fa_dir', help='樣本FASTA文件所在文件夾路徑')
    # 可選參數(shù)(輸出目錄)
    parser.add_argument('-o', '--output', default='sequence_extraction_results', 
                        help='總輸出目錄(默認:sequence_extraction_results)')
    args = parser.parse_args()

    # 定義子輸出目錄(總目錄下分兩個子目錄,結(jié)構(gòu)更清晰)
    output_single_dir = os.path.join(args.output, 'single_chromosome_sequences')  # 單個染色體文件
    output_merged_dir = os.path.join(args.output, 'merged_sample_sequences')      # 樣本合并文件

    # 核心流程:讀取cer → 提取并合并序列
    print("=== 步驟1:讀取cer文件 ===")
    samples = read_cer_file(args.cer_file)
    print(f"成功讀取 {len(samples)} 個有效樣本-染色體區(qū)間\n")

    print("=== 步驟2:提取序列并合并同一樣本 ===")
    extract_and_merge_sequences(samples, args.fa_dir, output_single_dir, output_merged_dir)


if __name__ == "__main__":
    main()
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容