Coverage for application / tator / tator_dropcam_qaqc_processor.py: 9%

286 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 05:22 +0000

1import datetime 

2import math 

3import sys 

4from io import BytesIO 

5 

6import requests 

7import tator 

8from flask import session 

9from pptx import Presentation 

10from pptx.dml.color import RGBColor 

11from pptx.enum.text import PP_ALIGN 

12from pptx.util import Inches, Pt 

13 

14from application.tator.tator_base_qaqc_processor import TatorBaseQaqcProcessor 

15from application.util.constants import TERM_NORMAL, TERM_RED 

16from application.tator.tator_type import TatorLocalizationType 

17 

18 

19class TatorDropcamQaqcProcessor(TatorBaseQaqcProcessor): 

20 def __init__( 

21 self, 

22 project_id: int, 

23 section_ids: list[str], 

24 api: tator.api, 

25 tator_url: str, 

26 darc_review_url: str = None, 

27 transect_media_ids: list[int] = None, 

28 ): 

29 super().__init__( 

30 project_id=project_id, 

31 section_ids=section_ids, 

32 api=api, 

33 darc_review_url=darc_review_url, 

34 tator_url=tator_url, 

35 transect_media_ids=transect_media_ids, 

36 ) 

37 

38 def check_attracted_not_attracted(self, attracted_dict: dict): 

39 """ 

40 Finds all records that are marked as "attracted" but are saved as "not attracted" in the attracted_dict, and 

41 vice versa. Also flags all records with taxa that are marked as "attracted/not attracted" in the attracted_dict. 

42 """ 

43 for section in self.sections: 

44 records_of_interest = [] 

45 for localization in section.localizations: 

46 scientific_name = localization['attributes'].get('Scientific Name') 

47 if scientific_name not in attracted_dict.keys() or attracted_dict[scientific_name] == 2: 

48 localization['problems'] = 'Scientific Name, Attracted' 

49 records_of_interest.append(localization) 

50 elif localization['attributes'].get('Attracted') == 'Attracted' and attracted_dict[scientific_name] == 0: 

51 localization['problems'] = 'Scientific Name, Attracted' 

52 records_of_interest.append(localization) 

53 elif localization['attributes'].get('Attracted') == 'Not Attracted' and attracted_dict[scientific_name] == 1: 

54 localization['problems'] = 'Scientific Name, Attracted' 

55 records_of_interest.append(localization) 

56 section.localizations = records_of_interest 

57 self.process_records() 

58 

59 def check_same_name_qualifier(self): 

60 """ 

61 Finds records that have the same scientific name/tentative ID combo but a different qualifier. 

62 """ 

63 scientific_name_qualifiers = {} 

64 problem_scientific_names = set() 

65 for section in self.sections: 

66 # first pass: build dict of scientific name/tentative ID combos and their qualifiers, add to problem set if mismatch 

67 for localization in section.localizations: 

68 scientific_name = f'{localization["attributes"].get("Scientific Name")}{" (" + localization["attributes"]["Tentative ID"] + "?)" if localization["attributes"].get("Tentative ID") else ""}' 

69 if scientific_name not in scientific_name_qualifiers.keys(): 

70 scientific_name_qualifiers[scientific_name] = localization['attributes'].get('Qualifier') 

71 else: 

72 if scientific_name_qualifiers[scientific_name] != localization['attributes'].get('Qualifier'): 

73 problem_scientific_names.add(scientific_name) 

74 for section in self.sections: 

75 # second pass: add records with problem scientific names to records of interest 

76 records_of_interest = [] 

77 for localization in section.localizations: 

78 scientific_name = f'{localization["attributes"].get("Scientific Name")}{" (" + localization["attributes"]["Tentative ID"] + "?)" if localization["attributes"].get("Tentative ID") else ""}' 

79 if scientific_name in problem_scientific_names: 

80 localization['problems'] = 'Scientific Name, Qualifier' 

81 records_of_interest.append(localization) 

82 section.localizations = records_of_interest 

83 self.process_records() 

84 

85 def check_non_target_not_attracted(self): 

86 """ 

87 Finds records that are marked as "non-target" but are marked as "attracted". 

88 """ 

89 for section in self.sections: 

90 records_of_interest = [] 

91 for localization in section.localizations: 

92 attracted = localization['attributes'].get('Attracted') 

93 reason = localization['attributes'].get('Reason') 

94 if 'Non-target' in reason and attracted != 'Not Attracted': 

95 localization['problems'] = 'Attracted, Reason' 

96 records_of_interest.append(localization) 

97 section.localizations = records_of_interest 

98 self.process_records() 

99 

100 def check_exists_in_image_references(self, image_refs: dict): 

101 """ 

102 Finds records that do not exist in the image references db (combo scientific name, tentative ID, 

103 and morphospecies). Also flags records with both tentative ID and morphospecies set. 

104 """ 

105 for section in self.sections: 

106 records_of_interest = [] 

107 for localization in section.localizations: 

108 image_ref_key = localization['attributes'].get('Scientific Name') 

109 tentative_id = localization['attributes'].get('Tentative ID') 

110 morphospecies = localization['attributes'].get('Morphospecies') 

111 if tentative_id and morphospecies: 

112 localization['problems'] = 'Tentative ID, Morphospecies' 

113 records_of_interest.append(localization) 

114 continue 

115 if tentative_id and tentative_id != '': 

116 image_ref_key += f'~tid={tentative_id}' 

117 if morphospecies and morphospecies != '': 

118 image_ref_key += f'~m={morphospecies}' 

119 if image_ref_key not in image_refs: 

120 records_of_interest.append(localization) 

121 section.localizations = records_of_interest 

122 self.process_records() 

123 

124 def get_unique_taxa(self): 

125 self.fetch_start_times() 

126 self.process_records(get_timestamp=True) 

127 unique_taxa = {} 

128 for record in self.final_records: 

129 scientific_name = record.get('scientific_name') 

130 tentative_id = record.get('tentative_id', '') 

131 morphospecies = record.get('morphospecies', '') 

132 key = f'{scientific_name}:{tentative_id}:{morphospecies}' 

133 if key not in unique_taxa.keys(): 

134 # add new unique taxa to dict 

135 unique_taxa[key] = { 

136 'scientific_name': scientific_name, 

137 'tentative_id': tentative_id, 

138 'morphospecies': morphospecies, 

139 'box_count': 0, 

140 'dot_count': 0, 

141 'first_box': '', 

142 'first_dot': '', 

143 } 

144 for localization in record['all_localizations']: 

145 # increment box/dot counts, set first box/dot and TOFA 

146 if TatorLocalizationType.is_box(localization['type']): 

147 unique_taxa[key]['box_count'] += 1 

148 if not record.get('timestamp'): 

149 continue 

150 first_box = unique_taxa[key]['first_box'] 

151 observed_timestamp = datetime.datetime.strptime(record['timestamp'], self.BOTTOM_TIME_FORMAT) 

152 if not first_box or observed_timestamp < datetime.datetime.strptime(first_box, self.BOTTOM_TIME_FORMAT): 

153 unique_taxa[key]['first_box'] = record['timestamp'] 

154 unique_taxa[key]['first_box_url'] = f'{self.tator_url}/{self.project_id}/annotation/{record["media_id"]}?frame={record["frame"]}&selected_entity={localization["elemental_id"]}' 

155 elif TatorLocalizationType.is_dot(localization['type']): 

156 unique_taxa[key]['dot_count'] += 1 

157 if not record.get('timestamp'): 

158 continue 

159 first_dot = unique_taxa[key]['first_dot'] 

160 observed_timestamp = datetime.datetime.strptime(record['timestamp'], self.BOTTOM_TIME_FORMAT) 

161 if not first_dot or observed_timestamp < datetime.datetime.strptime(first_dot, self.BOTTOM_TIME_FORMAT): 

162 unique_taxa[key]['first_dot'] = record['timestamp'] 

163 unique_taxa[key]['first_dot_url'] = f'{self.tator_url}/{self.project_id}/annotation/{record["media_id"]}?frame={record["frame"]}&selected_entity={localization["elemental_id"]}' 

164 self.final_records = unique_taxa 

165 

166 def get_max_n(self): 

167 """ 

168 Finds the highest dot count for each unique scientific name, tentative ID, and morphospecies combo per 

169 deployment. Ignores non-attracted taxa. 

170 """ 

171 self.process_records(get_ctd=True) 

172 deployment_taxa = {} 

173 unique_taxa = {} 

174 for record in self.final_records: 

175 scientific_name = record.get('scientific_name') 

176 tentative_id_suffix = f' ({record["tentative_id"]}?)' if record.get('tentative_id') else '' 

177 morphospecies_suffix = f' ({record["morphospecies"]})' if record.get('morphospecies') else '' 

178 unique_name = f'{scientific_name}{tentative_id_suffix}{morphospecies_suffix}' 

179 if record.get('count', 0) < 1 or record.get('attracted') == 'Not Attracted': 

180 continue 

181 if unique_name not in unique_taxa.keys(): 

182 unique_taxa[unique_name] = { 

183 'unique_name': unique_name, 

184 'phylum': record.get('phylum'), 

185 'class': record.get('class'), 

186 'order': record.get('order'), 

187 'family': record.get('family'), 

188 'genus': record.get('genus'), 

189 'species': record.get('species'), 

190 } 

191 if record['video_sequence_name'] not in deployment_taxa.keys(): 

192 deployment_taxa[record['video_sequence_name']] = { 

193 'depth_m': record.get('depth_m'), 

194 'max_n_dict': {}, 

195 } 

196 if unique_name not in deployment_taxa[record['video_sequence_name']]['max_n_dict'].keys(): 

197 # add new unique taxa to dict 

198 deployment_taxa[record['video_sequence_name']]['max_n_dict'][unique_name] = { 

199 'max_n': record['count'], 

200 'max_n_url': f'{self.tator_url}/{self.project_id}/annotation/{record["media_id"]}?frame={record["frame"]}', 

201 } 

202 else: 

203 # check for new max N 

204 if record['count'] > deployment_taxa[record['video_sequence_name']]['max_n_dict'][unique_name]['max_n']: 

205 deployment_taxa[record['video_sequence_name']]['max_n_dict'][unique_name]['max_n'] = record['count'] 

206 deployment_taxa[record['video_sequence_name']]['max_n_dict'][unique_name]['max_n_url'] = f'{self.tator_url}/{self.project_id}/annotation/{record["media_id"]}?frame={record["frame"]}' 

207 # convert unique taxa to list for sorting 

208 unique_taxa_list = list(unique_taxa.values()) 

209 unique_taxa_list.sort(key=lambda x: ( 

210 x['phylum'] if x.get('phylum') else '', 

211 x['class'] if x.get('class') else '', 

212 x['order'] if x.get('order') else '', 

213 x['family'] if x.get('family') else '', 

214 x['genus'] if x.get('genus') else '', 

215 x['species'] if x.get('species') else '', 

216 )) 

217 self.final_records = { 

218 'deployments': deployment_taxa, 

219 'unique_taxa': [taxa['unique_name'] for taxa in unique_taxa_list], 

220 } 

221 

222 def get_tofa(self): 

223 """ 

224 Finds the time of first arrival for each unique scientific name, tentative ID, and morphospecies combo per 

225 deployment. Also shows species accumulation curve. Ignores non-attracted taxa. 

226 """ 

227 self.fetch_start_times() 

228 self.process_records(get_timestamp=True, get_ctd=True) 

229 deployment_taxa = {} 

230 unique_taxa = {} 

231 unique_taxa_first_seen = {} 

232 section_id_indices = {section.section_id: index for index, section in enumerate(self.sections)} 

233 bottom_time = None 

234 latest_timestamp = datetime.datetime.fromtimestamp(0) # to find the duration of the deployment 

235 for record in self.final_records: 

236 scientific_name = record.get('scientific_name') 

237 tentative_id_suffix = f' ({record["tentative_id"]}?)' if record.get('tentative_id') else '' 

238 morphospecies_suffix = f' ({record["morphospecies"]})' if record.get('morphospecies') else '' 

239 unique_name = f'{scientific_name}{tentative_id_suffix}{morphospecies_suffix}' 

240 if not record.get('timestamp'): 

241 continue 

242 observed_timestamp = datetime.datetime.strptime(record['timestamp'], self.BOTTOM_TIME_FORMAT) 

243 this_section = self.sections[section_id_indices[record['section_id']]] 

244 bottom_time = datetime.datetime.strptime(this_section.bottom_time, self.BOTTOM_TIME_FORMAT) 

245 if record.get('count', 0) < 1 or record.get('attracted') == 'Not Attracted': 

246 continue 

247 if observed_timestamp > latest_timestamp: 

248 latest_timestamp = observed_timestamp 

249 if unique_name not in unique_taxa_first_seen.keys(): 

250 unique_taxa_first_seen[unique_name] = observed_timestamp 

251 else: 

252 if observed_timestamp < unique_taxa_first_seen[unique_name]: 

253 unique_taxa_first_seen[unique_name] = observed_timestamp 

254 if unique_name not in unique_taxa.keys(): 

255 unique_taxa[unique_name] = { 

256 'unique_name': unique_name, 

257 'phylum': record.get('phylum'), 

258 'class': record.get('class'), 

259 'order': record.get('order'), 

260 'family': record.get('family'), 

261 'genus': record.get('genus'), 

262 'species': record.get('species'), 

263 } 

264 if record['video_sequence_name'] not in deployment_taxa.keys(): 

265 deployment_taxa[record['video_sequence_name']] = { 

266 'depth_m': record.get('depth_m'), 

267 'tofa_dict': {}, 

268 } 

269 time_diff = observed_timestamp - bottom_time if observed_timestamp > bottom_time else datetime.timedelta(0) 

270 if unique_name not in deployment_taxa[record['video_sequence_name']]['tofa_dict'].keys(): 

271 # add new unique taxa to dict 

272 deployment_taxa[record['video_sequence_name']]['tofa_dict'][unique_name] = { 

273 'tofa': str(time_diff), 

274 'tofa_seconds': time_diff.total_seconds(), 

275 'tofa_url': f'{self.tator_url}/{self.project_id}/annotation/{record["media_id"]}?frame={record["frame"]}', 

276 } 

277 else: 

278 # check for new tofa 

279 if time_diff.total_seconds() < deployment_taxa[record['video_sequence_name']]['tofa_dict'][unique_name]['tofa_seconds']: 

280 deployment_taxa[record['video_sequence_name']]['tofa_dict'][unique_name]['tofa'] = str(time_diff) 

281 deployment_taxa[record['video_sequence_name']]['tofa_dict'][unique_name]['tofa_seconds'] = time_diff.total_seconds() 

282 deployment_taxa[record['video_sequence_name']]['tofa_dict'][unique_name]['tofa_url'] = \ 

283 f'{self.tator_url}/{self.project_id}/annotation/{record["media_id"]}?frame={record["frame"]}' 

284 # convert unique taxa to list for sorting 

285 unique_taxa_list = list(unique_taxa.values()) 

286 unique_taxa_list.sort(key=lambda x: ( 

287 x['phylum'] if x.get('phylum') else '', 

288 x['class'] if x.get('class') else '', 

289 x['order'] if x.get('order') else '', 

290 x['family'] if x.get('family') else '', 

291 x['genus'] if x.get('genus') else '', 

292 x['species'] if x.get('species') else '', 

293 )) 

294 if len(unique_taxa_list) == 0: 

295 print(f'{TERM_RED}ERROR: Unable to calculate TOFA. Missing start times?{TERM_NORMAL}') 

296 self.final_records = { 

297 'deployments': deployment_taxa, 

298 'unique_taxa': [], 

299 'deployment_time': 0, 

300 'accumulation_data': [], 

301 } 

302 return 

303 # rounding up to nearest hour 

304 deployment_time = datetime.timedelta(hours=math.ceil((latest_timestamp - bottom_time).total_seconds() / 3600)) 

305 accumulation_data = [] # just a list of the number of unique taxa seen at each hour 

306 for hour in range(1, deployment_time.seconds // 3600 + 1): 

307 accumulation_data.append(len([ 

308 taxa for taxa in unique_taxa_first_seen.values() if taxa < bottom_time + datetime.timedelta(hours=hour) 

309 ])) 

310 self.final_records = { 

311 'deployments': deployment_taxa, 

312 'unique_taxa': [taxa['unique_name'] for taxa in unique_taxa_list], 

313 'deployment_time': deployment_time.seconds // 3600, 

314 'accumulation_data': accumulation_data, 

315 } 

316 

317 def get_summary(self): 

318 self.fetch_start_times() 

319 for section in self.sections: 

320 section.localizations = [ 

321 localization for localization in section.localizations if not TatorLocalizationType.is_box(localization['type']) 

322 ] 

323 self.process_records(get_timestamp=True, get_ctd=True, get_substrates=True) 

324 

325 def download_image_guide(self, app) -> Presentation: 

326 for section in self.sections: 

327 records_of_interest = [] 

328 for localization in section.localizations: 

329 if localization['attributes'].get('Good Image'): 

330 records_of_interest.append(localization) 

331 section.localizations = records_of_interest 

332 self.process_records() 

333 pres = Presentation() 

334 image_slide_layout = pres.slide_layouts[6] 

335 

336 i = 0 

337 while i < len(self.final_records): 

338 slide = pres.slides.add_slide(image_slide_layout) 

339 current_phylum = self.final_records[i].get('phylum') 

340 if current_phylum is None: 

341 current_phylum = 'UNKNOWN PHYLUM' 

342 phylum_text_box = slide.shapes.add_textbox(Inches(0.5), Inches(0.5), Inches(9), Inches(0.5)) 

343 phylum_text_frame = phylum_text_box.text_frame 

344 phylum_paragraph = phylum_text_frame.paragraphs[0] 

345 phylum_paragraph.alignment = PP_ALIGN.CENTER 

346 phylum_run = phylum_paragraph.add_run() 

347 phylum_run.text = ' '.join(list(current_phylum.upper())) 

348 phylum_font = phylum_run.font 

349 phylum_font.name = 'Arial' 

350 phylum_font.size = Pt(32) 

351 phylum_font.color.rgb = RGBColor(0, 0, 0) 

352 for j in range(4): 

353 # add four images to slide 

354 localization = self.final_records[i] 

355 if localization['phylum'] != current_phylum and current_phylum != 'UNKNOWN PHYLUM': 

356 break 

357 localization_id = localization['all_localizations'][0]['id'] 

358 response = requests.get(f'{app.config.get("LOCAL_APP_URL")}/tator/localization-image/{localization_id}?token={session["tator_token"]}') 

359 if response.status_code != 200: 

360 print(f'Error fetching image for record {localization["observation_uuid"]}') 

361 continue 

362 image_data = BytesIO(response.content) 

363 top = Inches(1.5 if j < 2 else 4) 

364 left = Inches(1 if j % 2 == 0 else 5) 

365 picture = slide.shapes.add_picture(image_data, left, top, height=Inches(2.5)) 

366 line = picture.line 

367 line.color.rgb = RGBColor(0, 0, 0) 

368 line.width = Pt(1.5) 

369 # add text box 

370 width = Inches(2) 

371 height = Inches(1) 

372 text_box = slide.shapes.add_textbox(left, top, width, height) 

373 text_frame = text_box.text_frame 

374 paragraph = text_frame.paragraphs[0] 

375 run = paragraph.add_run() 

376 run.text = f'{localization["scientific_name"]}{" (" + localization["tentative_id"] + "?)" if localization.get("tentative_id") else ""}' 

377 font = run.font 

378 font.name = 'Arial' 

379 font.size = Pt(18) 

380 font.color.rgb = RGBColor(0xff, 0xff, 0xff) 

381 font.italic = True 

382 if localization['attracted'] == 'Not Attracted': 

383 text_frame.add_paragraph() 

384 paragraph = text_frame.paragraphs[1] 

385 run_2 = paragraph.add_run() 

386 run_2.text = 'NOT ATTRACTED' 

387 font = run_2.font 

388 font.name = 'Arial' 

389 font.size = Pt(18) 

390 font.color.rgb = RGBColor(0xff, 0x0, 0x0) 

391 font.italic = False 

392 i += 1 

393 if i >= len(self.final_records): 

394 break 

395 return pres 

396 

397 def fetch_start_times(self): 

398 if 'media_timestamps' not in session.keys(): 

399 session['media_timestamps'] = {} 

400 for section in self.sections: 

401 print(f'Fetching media start times for deployment "{section.deployment_name}"...', end='') 

402 sys.stdout.flush() 

403 for media in self.tator_client.get_medias_for_section(self.project_id, section=section.section_id): 

404 # get media start times 

405 if media['id'] not in session['media_timestamps'].keys(): 

406 if 'Start Time' in media['attributes'].keys(): 

407 session['media_timestamps'][media['id']] = media['attributes']['Start Time'] 

408 session.modified = True 

409 else: 

410 print(f'{TERM_RED}Warning:{TERM_NORMAL} No start time found for media {media["id"]}') 

411 continue 

412 # get deployment bottom time 

413 media_arrival_attribute = media['attributes'].get('Arrival') 

414 if media_arrival_attribute and media_arrival_attribute.strip() != '': 

415 video_start_timestamp = datetime.datetime.fromisoformat(media['attributes']['Start Time']).astimezone(datetime.timezone.utc) 

416 if 'not observed' in media_arrival_attribute.lower(): 

417 arrival_frame = 0 

418 else: 

419 try: 

420 arrival_frame = int(media_arrival_attribute.strip().split(' ')[0]) 

421 except ValueError: 

422 error_message = (f'Could not parse Arrival value for media "{media["name"]}". ' 

423 f'Expected format like "1234" or "not observed" but got "{media["attributes"]["Arrival"]}".') 

424 print(f'\n\n{TERM_RED}ERROR: {error_message}{TERM_NORMAL}') 

425 raise ValueError(error_message) 

426 media_fps = media.get('fps') or 30 

427 deployment_bottom_time = video_start_timestamp + datetime.timedelta(seconds=arrival_frame / media_fps) 

428 section.bottom_time = deployment_bottom_time.strftime(self.BOTTOM_TIME_FORMAT) 

429 print('fetched!')