Coverage for application / image_review / external_review / comment_processor.py: 11%

151 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 05:22 +0000

1import json 

2import requests 

3import sys 

4from concurrent.futures import ThreadPoolExecutor, as_completed 

5 

6import pandas as pd 

7import numpy as np 

8from json import JSONDecodeError 

9 

10from flask import current_app 

11 

12from application.util.functions import format_annotator, parse_datetime 

13from application.util.constants import TERM_RED, TERM_NORMAL 

14from application.util.phylogeny_cache import PhylogenyCache 

15from application.tator.tator_rest_client import TatorRestClient 

16 

17 

18class CommentProcessor: 

19 """ 

20 Fetches annotation information from the VARS db on HURLSTOR and Tator given a dict of comments (key = uuid). Merges 

21 fetched annotations with the data in the comment dict into an array of dicts (self.annotations). 

22 """ 

23 def __init__(self, comments: dict, annosaurus_url: str, vars_kb_url: str, tator_url: str, tator_token: str = None): 

24 self.comments = comments 

25 self.annosaurus_url = annosaurus_url 

26 self.vars_kb_url = vars_kb_url 

27 self.tator_client = TatorRestClient(tator_url, tator_token) if tator_token else None 

28 self.distilled_records = [] 

29 self.missing_records = [] 

30 self.no_match_records = set() 

31 self.phylogeny = PhylogenyCache() 

32 self.load_comments() 

33 

34 def load_comments(self): 

35 formatted_comments = [] 

36 

37 print(f'Processing {len(self.comments)} comments...', end='') 

38 sys.stdout.flush() 

39 

40 # get all tator localizations 

41 media_ids = set() 

42 localizations = [] 

43 if self.tator_client: 

44 for comment in self.comments: 

45 if 'all_localizations' in self.comments[comment].keys() and self.comments[comment]['all_localizations'] is not None: 

46 # get the media id from the video url (not stored as its own field) 

47 media_id = self.comments[comment]['video_url'].split('/')[-1].split('&')[0] 

48 media_ids.add(media_id) 

49 for i in range(0, len(media_ids), 300): 

50 chunk = list(media_ids)[i:i + 300] 

51 localizations += self.tator_client.get_localizations(current_app.config.get('TATOR_PROJECT_ID'), media_id=chunk) 

52 

53 # fetch all VARS annotations in parallel 

54 vars_uuids = [comment for comment in self.comments if self.is_vars_annotation(comment)] 

55 vars_annotations = {} 

56 

57 with ThreadPoolExecutor(max_workers=20) as executor: 

58 futures = {executor.submit(self.fetch_vars_annotation, uuid): uuid for uuid in vars_uuids} 

59 for future in as_completed(futures): 

60 uuid, annotation = future.result() 

61 vars_annotations[uuid] = annotation 

62 

63 # add formatted comments to list 

64 for comment in self.comments: 

65 concept_name = None 

66 comment_dict = { 

67 'observation_uuid': comment, 

68 'image_url': self.comments[comment].get('image_url'), 

69 'video_url': self.comments[comment].get('video_url'), 

70 'video_sequence_name': self.comments[comment]['sequence'], 

71 } 

72 

73 if 'all_localizations' not in self.comments[comment].keys()\ 

74 or self.comments[comment]['all_localizations'] is None\ 

75 or self.comments[comment]['all_localizations'] == '': 

76 # vars annotation 

77 guide_photo = None 

78 upon = None 

79 identity_certainty = None 

80 identity_reference = None 

81 depth = None 

82 vars_comment = None 

83 annotation = vars_annotations.get(comment) 

84 try: 

85 concept_name = annotation['concept'] 

86 except (TypeError, KeyError): 

87 problem_comment = self.comments[comment] 

88 print(f'{TERM_RED}ERROR: Could not find annotation with UUID {comment} in VARS ({problem_comment["sequence"]}, {problem_comment["timestamp"]}){TERM_NORMAL}') 

89 self.missing_records.append(problem_comment) 

90 continue 

91 if annotation.get('associations'): 

92 for association in annotation['associations']: 

93 if association['link_name'] == 'identity-certainty': 

94 identity_certainty = association['link_value'] 

95 elif association['link_name'] == 'identity-reference': 

96 identity_reference = association['link_value'] 

97 elif association['link_name'] == 'guide-photo': 

98 guide_photo = association['to_concept'] 

99 elif association['link_name'] == 'upon': 

100 upon = association['to_concept'] 

101 elif association['link_name'] == 'comment': 

102 vars_comment = association['link_value'] 

103 if annotation.get('ancillary_data'): 

104 # get ctd 

105 for ancillary_data in annotation['ancillary_data']: 

106 if ancillary_data == 'depth_meters': 

107 depth = annotation['ancillary_data']['depth_meters'] 

108 comment_dict['concept'] = concept_name 

109 comment_dict['recorded_timestamp'] = parse_datetime(annotation['recorded_timestamp']).strftime('%d %b %y %H:%M:%S UTC') if 'recorded_timestamp' in annotation.keys() else None 

110 comment_dict['annotator'] = format_annotator(annotation['observer']) if 'observer' in annotation.keys() else self.comments[comment]['annotator'] 

111 comment_dict['associations'] = annotation.get('associations') 

112 comment_dict['identity_reference'] = identity_reference 

113 comment_dict['guide-photo'] = guide_photo 

114 comment_dict['upon'] = upon 

115 comment_dict['identity_certainty'] = identity_certainty 

116 comment_dict['depth'] = round(depth) if depth else None 

117 comment_dict['comment'] = vars_comment 

118 else: 

119 # tator annotation 

120 if self.tator_client: 

121 annotation = next((loco for loco in localizations if loco['elemental_id'] == comment), None) 

122 if annotation is None: 

123 problem_comment = self.comments[comment] 

124 problem_comment['timestamp'] = 'No timestamp available' 

125 print(f'{TERM_RED}ERROR: Could not find annotation with UUID {comment} in Tator ({problem_comment["sequence"]}, {problem_comment["timestamp"]}){TERM_NORMAL}') 

126 self.missing_records.append(problem_comment) 

127 continue 

128 elif annotation['variant_deleted']: 

129 problem_comment = self.comments[comment] 

130 problem_comment['timestamp'] = f'Media ID: {annotation["media"]}, Frame: {annotation["frame"]}' 

131 print(f'{TERM_RED}ERROR: Could not find annotation with UUID {comment} in Tator ({problem_comment["sequence"]}, {problem_comment["timestamp"]}){TERM_NORMAL}') 

132 self.missing_records.append(problem_comment) 

133 continue 

134 if annotation['attributes'].get('Good Image'): 

135 comment_dict['good_image'] = True 

136 else: 

137 comment_dict['good_image'] = False 

138 concept_name = annotation['attributes'].get('Scientific Name') 

139 comment_dict['all_localizations'] = json.loads(self.comments[comment].get('all_localizations')) 

140 comment_dict['scientific_name'] = concept_name 

141 comment_dict['media_id'] = annotation['media'] 

142 comment_dict['frame'] = annotation['frame'] 

143 comment_dict['recorded_timestamp'] = parse_datetime(annotation['recorded_timestamp']).strftime('%d %b %y %H:%M:%S UTC') if 'recorded_timestamp' in annotation.keys() else None 

144 comment_dict['annotator'] = format_annotator(annotation['observer']) if 'observer' in annotation.keys() else self.comments[comment]['annotator'] 

145 if annotation.get('attributes'): 

146 comment_dict['attracted'] = annotation['attributes'].get('Attracted') 

147 comment_dict['frame_url'] = f'/tator/frame/{annotation["media"]}/{annotation["frame"]}' 

148 comment_dict['categorical_abundance'] = annotation['attributes'].get('Categorical Abundance') 

149 comment_dict['identification_remarks'] = annotation['attributes'].get('IdentificationRemarks') 

150 comment_dict['morphospecies'] = annotation['attributes'].get('Morphospecies') 

151 comment_dict['identified_by'] = annotation['attributes'].get('Identified By') 

152 comment_dict['notes'] = annotation['attributes'].get('Notes') 

153 comment_dict['qualifier'] = annotation['attributes'].get('Qualifier') 

154 comment_dict['reason'] = annotation['attributes'].get('Reason') 

155 comment_dict['tentative_id'] = annotation['attributes'].get('Tentative ID') 

156 else: 

157 comment_dict['all_localizations'] = [{}] 

158 if concept_name and concept_name not in self.phylogeny.data and concept_name not in self.no_match_records: 

159 self.phylogeny.fetch_worms(concept_name) 

160 if concept_name in self.phylogeny.data: 

161 for key in self.phylogeny.data[concept_name].keys(): 

162 # split to account for worms 'Phylum (Division)' case 

163 comment_dict[key.split(' ')[0]] = self.phylogeny.data[concept_name][key] 

164 formatted_comments.append(comment_dict) 

165 

166 # add to dataframe for sorting 

167 annotation_df = pd.DataFrame(formatted_comments, columns=[ 

168 'observation_uuid', 

169 'concept', 

170 'scientific_name', 

171 'associations', 

172 'all_localizations', 

173 'attracted', 

174 'categorical_abundance', 

175 'identification_remarks', 

176 'identified_by', 

177 'notes', 

178 'qualifier', 

179 'reason', 

180 'morphospecies', 

181 'tentative_id', 

182 'identity_certainty', 

183 'identity_reference', 

184 'guide_photo', 

185 'good_image', 

186 'media_id', 

187 'frame', 

188 'comment', 

189 'image_url', 

190 'frame_url', 

191 'video_url', 

192 'upon', 

193 'recorded_timestamp', 

194 'video_sequence_name', 

195 'annotator', 

196 'depth', 

197 'phylum', 

198 'subphylum', 

199 'superclass', 

200 'class', 

201 'subclass', 

202 'superorder', 

203 'order', 

204 'suborder', 

205 'infraorder', 

206 'superfamily', 

207 'family', 

208 'subfamily', 

209 'genus', 

210 'species' 

211 ]) 

212 annotation_df = annotation_df.sort_values(by=[ 

213 'phylum', 

214 'subphylum', 

215 'superclass', 

216 'class', 

217 'subclass', 

218 'superorder', 

219 'order', 

220 'suborder', 

221 'infraorder', 

222 'superfamily', 

223 'family', 

224 'subfamily', 

225 'genus', 

226 'species', 

227 'concept', 

228 'identity_reference', 

229 'identity_certainty', 

230 'recorded_timestamp' 

231 ]) 

232 annotation_df = annotation_df.replace({pd.NA: None, np.nan: None}) 

233 temp_record_list = annotation_df.to_dict(orient='records') 

234 for record in temp_record_list: 

235 anno_dict = {} 

236 for key, value in record.items(): 

237 if value is not None: 

238 anno_dict[key] = value 

239 self.distilled_records.append(anno_dict) 

240 print('processed!') 

241 self.phylogeny.save() 

242 

243 def is_vars_annotation(self, comment): 

244 return self.comments[comment].get('all_localizations') in (None, '') 

245 

246 def fetch_vars_annotation(self, uuid): 

247 res = requests.get(url=f'{self.annosaurus_url}/annotations/{uuid}') 

248 try: 

249 print(f'Fetched VARS annotation {uuid}') 

250 return uuid, res.json() 

251 except JSONDecodeError: 

252 return uuid, None