#! /usr/bin/env python
import pyfaidx
import re
from Bio.Seq import reverse_complement
from typing import Generator, Tuple, Dict, Union, TextIO
from collections import Counter, defaultdict
import bisect
import numpy as np
[docs]class Ancestor(pyfaidx.Fasta):
r"""Ancestral state of a chromosome.
Args:
fasta: path to ancestral sequence FASTA
k: the size of the context window (default 3)
target: which position for the site within the kmer (default middle)
strand_file: path to bed file (or I/O object) with regions where
reverse strand defines mutation context, e.g. direction of
replication or transcription. Sites not in these regions
are assigned forward strand context. If not provided,
collapse by reverse complement, with the target base as ``A``
or ``C``. Note that bed file regions should be
`0-based and right-open
<https://en.wikipedia.org/wiki/BED_(file_format)#Coordinate_system>`_.
kwargs: additional keyword arguments passed to base class. Useful
ones are ``key_function`` (for chromosome name parsing),
``read_ahead`` (for buffering), and ``sequence_always_upper``
(to allow lowercase nucleotides to be considered ancestrally
identified)
"""
acgt = set("ACGT")
ac = set("AC")
def __init__(
self,
fasta: str,
k: int = 3,
target: int = None,
strand_file: Union[str, TextIO] = None,
**kwargs,
):
super(Ancestor, self).__init__(fasta, **kwargs)
if target is None:
assert k % 2 == 1, f"k = {k} must be odd for default middle target"
target = k // 2
else:
raise NotImplementedError("target must be None (default)")
assert 0 <= target < k
self.target = target
self.k = k
if strand_file is None:
self._revcomp_func = self._AC
if self.target != self.k // 2:
raise ValueError(
f"non-central target {self.target} requires " "strand_file"
)
else:
self.strandedness = defaultdict(list)
if isinstance(strand_file, str):
bed = open(strand_file, "r")
for line in bed:
chrom, start, end = line.rstrip().split("\t")
bisect.insort(self.strandedness[chrom], [int(start), int(end)])
for chrom in self.strandedness:
self.strandedness[chrom] = np.array(self.strandedness[chrom])
self._revcomp_func = self._reverse_strand
def _reverse_strand(self, chrom: str, pos: int):
r"""Return ``True`` if ``strand_file`` indicates reverse
complementation at this site."""
closest_idx = bisect.bisect(self.strandedness[chrom][:, 0], pos) - 1
if closest_idx != -1 and pos < self.strandedness[chrom][closest_idx, 1]:
return True
return False
def _AC(self, chrom: str, pos: int):
r"""Return True if reverse complementation is needed at this site to
get state A or C."""
if self[chrom][pos].seq not in self.ac:
return True
return False
[docs] def mutation_type(
self, chrom: str, pos: int, ref: str, alt: str
) -> Tuple[str, str]:
r"""Mutation type of a given snp, oriented or collapsed by strand,
returns a tuple of ancestral and derived kmers.
Args:
chrom: FASTA record chromosome identifier
pos: position (0-based)
ref: reference allele (A, C, G, or T)
alt: alternative allele (A, C, G, or T)
"""
# ancestral state
anc = self[chrom][pos].seq
# derived state
if anc == ref:
der = alt
elif anc == alt:
der = ref
else:
# infinite sites violation
return None, None
start = pos - self.target
assert start >= 0
end = pos + self.k - self.target
assert start <= end
context = self[chrom][start:end]
anc_kmer = f"{context[:self.target]}{anc}{context[(self.target + 1):]}"
der_kmer = f"{context[:self.target]}{der}{context[(self.target + 1):]}"
if not re.match("^[ACGT]+$", anc_kmer) or not re.match("^[ACGT]+$", der_kmer):
return None, None
if not self._revcomp_func(chrom, pos):
return anc_kmer, der_kmer
else:
return (reverse_complement(anc_kmer), reverse_complement(der_kmer))
[docs] def region_contexts(
self, chrom: str, start: int = None, end: int = None
) -> Generator[str, None, None]:
r"""Ancestral context of each site in a BED style region (0-based, half-
open), oriented according to self.strandedness or collapsed by reverse
complementation (returns ``None`` if ancestral state at target not in
capital ACGT)
Args:
chrom: chromosome name
start: region start position (default to chromsome start)
end: region end position (default to chromsome end)
"""
if start is None:
start = 0
if end is None:
end = len(self[chrom])
for pos in range(start, end):
if self[chrom][pos].seq not in self.acgt:
yield None
continue
if not self._revcomp_func(chrom, pos):
context_start = pos - self.target
context_end = pos + self.k - self.target
if context_start < 0 or context_end > len(self[chrom]):
yield None
continue
else:
context = self[chrom][context_start:context_end].seq
else:
context_start = pos - self.k + self.target + 1
context_end = pos + self.target + 1
if context_start < 0 or context_end > len(self[chrom]):
yield None
continue
else:
context = reverse_complement(
self[chrom][context_start:context_end].seq
)
if not re.match("^[ACGT]+$", context):
context = None
yield context
[docs] def targets(self, bed: Union[str, TextIO] = None) -> Dict[str, int]:
r"""Return a dictionary of the number of sites of each k-mer.
Args:
bed: optional path to BED mask file, or I/O object
"""
sizes = Counter()
if bed is None:
for chrom in self.keys():
sizes.update(self.region_contexts(chrom))
else:
if isinstance(bed, str):
bed = open(bed, "r")
for line in bed:
chrom, start, end = line.rstrip().split("\t")
sizes.update(self.region_contexts(chrom, int(start), int(end)))
del sizes[None]
return sizes