Coverage for application / image_review / external_review / comment_processor.py: 11%
151 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 05:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 05:22 +0000
1import json
2import requests
3import sys
4from concurrent.futures import ThreadPoolExecutor, as_completed
6import pandas as pd
7import numpy as np
8from json import JSONDecodeError
10from flask import current_app
12from application.util.functions import format_annotator, parse_datetime
13from application.util.constants import TERM_RED, TERM_NORMAL
14from application.util.phylogeny_cache import PhylogenyCache
15from application.tator.tator_rest_client import TatorRestClient
18class CommentProcessor:
19 """
20 Fetches annotation information from the VARS db on HURLSTOR and Tator given a dict of comments (key = uuid). Merges
21 fetched annotations with the data in the comment dict into an array of dicts (self.annotations).
22 """
23 def __init__(self, comments: dict, annosaurus_url: str, vars_kb_url: str, tator_url: str, tator_token: str = None):
24 self.comments = comments
25 self.annosaurus_url = annosaurus_url
26 self.vars_kb_url = vars_kb_url
27 self.tator_client = TatorRestClient(tator_url, tator_token) if tator_token else None
28 self.distilled_records = []
29 self.missing_records = []
30 self.no_match_records = set()
31 self.phylogeny = PhylogenyCache()
32 self.load_comments()
34 def load_comments(self):
35 formatted_comments = []
37 print(f'Processing {len(self.comments)} comments...', end='')
38 sys.stdout.flush()
40 # get all tator localizations
41 media_ids = set()
42 localizations = []
43 if self.tator_client:
44 for comment in self.comments:
45 if 'all_localizations' in self.comments[comment].keys() and self.comments[comment]['all_localizations'] is not None:
46 # get the media id from the video url (not stored as its own field)
47 media_id = self.comments[comment]['video_url'].split('/')[-1].split('&')[0]
48 media_ids.add(media_id)
49 for i in range(0, len(media_ids), 300):
50 chunk = list(media_ids)[i:i + 300]
51 localizations += self.tator_client.get_localizations(current_app.config.get('TATOR_PROJECT_ID'), media_id=chunk)
53 # fetch all VARS annotations in parallel
54 vars_uuids = [comment for comment in self.comments if self.is_vars_annotation(comment)]
55 vars_annotations = {}
57 with ThreadPoolExecutor(max_workers=20) as executor:
58 futures = {executor.submit(self.fetch_vars_annotation, uuid): uuid for uuid in vars_uuids}
59 for future in as_completed(futures):
60 uuid, annotation = future.result()
61 vars_annotations[uuid] = annotation
63 # add formatted comments to list
64 for comment in self.comments:
65 concept_name = None
66 comment_dict = {
67 'observation_uuid': comment,
68 'image_url': self.comments[comment].get('image_url'),
69 'video_url': self.comments[comment].get('video_url'),
70 'video_sequence_name': self.comments[comment]['sequence'],
71 }
73 if 'all_localizations' not in self.comments[comment].keys()\
74 or self.comments[comment]['all_localizations'] is None\
75 or self.comments[comment]['all_localizations'] == '':
76 # vars annotation
77 guide_photo = None
78 upon = None
79 identity_certainty = None
80 identity_reference = None
81 depth = None
82 vars_comment = None
83 annotation = vars_annotations.get(comment)
84 try:
85 concept_name = annotation['concept']
86 except (TypeError, KeyError):
87 problem_comment = self.comments[comment]
88 print(f'{TERM_RED}ERROR: Could not find annotation with UUID {comment} in VARS ({problem_comment["sequence"]}, {problem_comment["timestamp"]}){TERM_NORMAL}')
89 self.missing_records.append(problem_comment)
90 continue
91 if annotation.get('associations'):
92 for association in annotation['associations']:
93 if association['link_name'] == 'identity-certainty':
94 identity_certainty = association['link_value']
95 elif association['link_name'] == 'identity-reference':
96 identity_reference = association['link_value']
97 elif association['link_name'] == 'guide-photo':
98 guide_photo = association['to_concept']
99 elif association['link_name'] == 'upon':
100 upon = association['to_concept']
101 elif association['link_name'] == 'comment':
102 vars_comment = association['link_value']
103 if annotation.get('ancillary_data'):
104 # get ctd
105 for ancillary_data in annotation['ancillary_data']:
106 if ancillary_data == 'depth_meters':
107 depth = annotation['ancillary_data']['depth_meters']
108 comment_dict['concept'] = concept_name
109 comment_dict['recorded_timestamp'] = parse_datetime(annotation['recorded_timestamp']).strftime('%d %b %y %H:%M:%S UTC') if 'recorded_timestamp' in annotation.keys() else None
110 comment_dict['annotator'] = format_annotator(annotation['observer']) if 'observer' in annotation.keys() else self.comments[comment]['annotator']
111 comment_dict['associations'] = annotation.get('associations')
112 comment_dict['identity_reference'] = identity_reference
113 comment_dict['guide-photo'] = guide_photo
114 comment_dict['upon'] = upon
115 comment_dict['identity_certainty'] = identity_certainty
116 comment_dict['depth'] = round(depth) if depth else None
117 comment_dict['comment'] = vars_comment
118 else:
119 # tator annotation
120 if self.tator_client:
121 annotation = next((loco for loco in localizations if loco['elemental_id'] == comment), None)
122 if annotation is None:
123 problem_comment = self.comments[comment]
124 problem_comment['timestamp'] = 'No timestamp available'
125 print(f'{TERM_RED}ERROR: Could not find annotation with UUID {comment} in Tator ({problem_comment["sequence"]}, {problem_comment["timestamp"]}){TERM_NORMAL}')
126 self.missing_records.append(problem_comment)
127 continue
128 elif annotation['variant_deleted']:
129 problem_comment = self.comments[comment]
130 problem_comment['timestamp'] = f'Media ID: {annotation["media"]}, Frame: {annotation["frame"]}'
131 print(f'{TERM_RED}ERROR: Could not find annotation with UUID {comment} in Tator ({problem_comment["sequence"]}, {problem_comment["timestamp"]}){TERM_NORMAL}')
132 self.missing_records.append(problem_comment)
133 continue
134 if annotation['attributes'].get('Good Image'):
135 comment_dict['good_image'] = True
136 else:
137 comment_dict['good_image'] = False
138 concept_name = annotation['attributes'].get('Scientific Name')
139 comment_dict['all_localizations'] = json.loads(self.comments[comment].get('all_localizations'))
140 comment_dict['scientific_name'] = concept_name
141 comment_dict['media_id'] = annotation['media']
142 comment_dict['frame'] = annotation['frame']
143 comment_dict['recorded_timestamp'] = parse_datetime(annotation['recorded_timestamp']).strftime('%d %b %y %H:%M:%S UTC') if 'recorded_timestamp' in annotation.keys() else None
144 comment_dict['annotator'] = format_annotator(annotation['observer']) if 'observer' in annotation.keys() else self.comments[comment]['annotator']
145 if annotation.get('attributes'):
146 comment_dict['attracted'] = annotation['attributes'].get('Attracted')
147 comment_dict['frame_url'] = f'/tator/frame/{annotation["media"]}/{annotation["frame"]}'
148 comment_dict['categorical_abundance'] = annotation['attributes'].get('Categorical Abundance')
149 comment_dict['identification_remarks'] = annotation['attributes'].get('IdentificationRemarks')
150 comment_dict['morphospecies'] = annotation['attributes'].get('Morphospecies')
151 comment_dict['identified_by'] = annotation['attributes'].get('Identified By')
152 comment_dict['notes'] = annotation['attributes'].get('Notes')
153 comment_dict['qualifier'] = annotation['attributes'].get('Qualifier')
154 comment_dict['reason'] = annotation['attributes'].get('Reason')
155 comment_dict['tentative_id'] = annotation['attributes'].get('Tentative ID')
156 else:
157 comment_dict['all_localizations'] = [{}]
158 if concept_name and concept_name not in self.phylogeny.data and concept_name not in self.no_match_records:
159 self.phylogeny.fetch_worms(concept_name)
160 if concept_name in self.phylogeny.data:
161 for key in self.phylogeny.data[concept_name].keys():
162 # split to account for worms 'Phylum (Division)' case
163 comment_dict[key.split(' ')[0]] = self.phylogeny.data[concept_name][key]
164 formatted_comments.append(comment_dict)
166 # add to dataframe for sorting
167 annotation_df = pd.DataFrame(formatted_comments, columns=[
168 'observation_uuid',
169 'concept',
170 'scientific_name',
171 'associations',
172 'all_localizations',
173 'attracted',
174 'categorical_abundance',
175 'identification_remarks',
176 'identified_by',
177 'notes',
178 'qualifier',
179 'reason',
180 'morphospecies',
181 'tentative_id',
182 'identity_certainty',
183 'identity_reference',
184 'guide_photo',
185 'good_image',
186 'media_id',
187 'frame',
188 'comment',
189 'image_url',
190 'frame_url',
191 'video_url',
192 'upon',
193 'recorded_timestamp',
194 'video_sequence_name',
195 'annotator',
196 'depth',
197 'phylum',
198 'subphylum',
199 'superclass',
200 'class',
201 'subclass',
202 'superorder',
203 'order',
204 'suborder',
205 'infraorder',
206 'superfamily',
207 'family',
208 'subfamily',
209 'genus',
210 'species'
211 ])
212 annotation_df = annotation_df.sort_values(by=[
213 'phylum',
214 'subphylum',
215 'superclass',
216 'class',
217 'subclass',
218 'superorder',
219 'order',
220 'suborder',
221 'infraorder',
222 'superfamily',
223 'family',
224 'subfamily',
225 'genus',
226 'species',
227 'concept',
228 'identity_reference',
229 'identity_certainty',
230 'recorded_timestamp'
231 ])
232 annotation_df = annotation_df.replace({pd.NA: None, np.nan: None})
233 temp_record_list = annotation_df.to_dict(orient='records')
234 for record in temp_record_list:
235 anno_dict = {}
236 for key, value in record.items():
237 if value is not None:
238 anno_dict[key] = value
239 self.distilled_records.append(anno_dict)
240 print('processed!')
241 self.phylogeny.save()
243 def is_vars_annotation(self, comment):
244 return self.comments[comment].get('all_localizations') in (None, '')
246 def fetch_vars_annotation(self, uuid):
247 res = requests.get(url=f'{self.annosaurus_url}/annotations/{uuid}')
248 try:
249 print(f'Fetched VARS annotation {uuid}')
250 return uuid, res.json()
251 except JSONDecodeError:
252 return uuid, None