-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclean-seqs.py
executable file
·96 lines (74 loc) · 2.55 KB
/
clean-seqs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python
import datetime
import click
import pandas as pd
import numpy as np
import skbio
def tgen_id_to_datetime(s):
# this is very specific to one data set, and probably
# shouldn't be used widely
fields = s.split('|')
date_field = fields[-1]
if '/' in date_field:
year, month, day = date_field.split('/')
elif '.' in date_field:
month, day, year = date_field.split('.')
else:
raise ValueError("Can't parse date from id: %s" % s)
return '-'.join([year, month, day])
def fix_ua(s):
if s == 'UA|SARS-COV-2|171|PIMA':
return '2020-03-19'
raise Exception("Unknown date for UA sequence")
@click.command()
@click.option('--remove', help='fasta IDs to remove',
type=click.File(mode='r'), required=False)
@click.option('--fix-date', help='fix tgen/ua dates to match gisaid',
type=bool, required=False)
@click.argument('input', type=click.Path())
@click.argument('output', type=click.Path())
def main(input, output, remove, fix_date):
if remove:
remove = set(l.strip() for l in remove)
else:
remove = set()
total = 0
duplicates = 0
removed = 0
renamed = 0
seen = set()
def generator():
for seq in skbio.io.read(input, format='fasta', constructor=skbio.DNA):
nonlocal total
nonlocal duplicates
nonlocal removed
nonlocal renamed
total += 1
if seq.metadata['id'] in seen:
duplicates += 1
continue
seen.add(seq.metadata['id'])
if seq.metadata['id'] in remove:
removed += 1
continue
if fix_date:
change = False
try:
start, *mid, date = seq.metadata['id'].split('|')
except ValueError:
start = ""
if 'TGEN' in start:
date = tgen_id_to_datetime(seq.metadata['id'])
change = True
elif 'UA' in start:
date = fix_ua(seq.metadata['id'])
change = True
if change:
seq.metadata['id'] = '|'.join([start, *mid, date])
renamed += 1
yield seq
skbio.io.write(generator(), format='fasta', into=output)
print("%d sequences were duplicated, %d were removed, and %d were renamed"
" out of %d total." % (duplicates, removed, renamed, total))
if __name__ == '__main__':
main()