Coverage for application / image_review / tator / tator_localization_processor.py: 14%

161 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-07 06:46 +0000

1import datetime 

2import json 

3import os 

4from typing import List 

5 

6import pandas as pd 

7import requests 

8import sys 

9import tator 

10 

11from flask import session 

12from application.util.constants import KNOWN_ANNOTATORS, TERM_RED, TERM_NORMAL, TERM_YELLOW 

13from application.util.tator_localization_type import TatorLocalizationType 

14from application.util.functions import flatten_taxa_tree 

15 

16WORMS_REST_URL = 'https://www.marinespecies.org/rest' 

17 

18 

19class Section: 

20 def __init__(self, section_id: str, api: tator.api): 

21 section_data = api.get_section(int(section_id)) 

22 self.section_id = section_id 

23 self.deployment_name = section_data.name 

24 self.expedition_name = section_data.path.split('.')[0] 

25 self.localizations = [] 

26 self.bottom_time = None 

27 

28 

29class TatorLocalizationProcessor: 

30 """ 

31 Fetches all localization information for a given project/section/deployment list from Tator. Processes 

32 and sorts data for display on the image review pages. 

33 """ 

34 

35 BOTTOM_TIME_FORMAT = '%Y-%m-%d %H:%M:%SZ' 

36 

37 def __init__( 

38 self, 

39 project_id: int, 

40 section_ids: List[str], 

41 api: tator.api, 

42 tator_url: str, 

43 darc_review_url: str = None, 

44 ): 

45 self.project_id = project_id 

46 self.tator_url = tator_url 

47 self.darc_review_url = darc_review_url 

48 self.sections = [Section(section_id, api) for section_id in section_ids] 

49 self.api = api 

50 self.final_records = [] # final list formatted for review page 

51 self.phylogeny = {} 

52 

53 def load_phylogeny(self): 

54 try: 

55 with open(os.path.join('cache', 'phylogeny.json'), 'r') as f: 

56 self.phylogeny = json.load(f) 

57 except FileNotFoundError: 

58 self.phylogeny = {} 

59 

60 def save_phylogeny(self): 

61 try: 

62 with open(os.path.join('cache', 'phylogeny.json'), 'w') as f: 

63 json.dump(self.phylogeny, f, indent=2) 

64 except FileNotFoundError: 

65 os.makedirs('cache') 

66 with open(os.path.join('cache', 'phylogeny.json'), 'w') as f: 

67 json.dump(self.phylogeny, f, indent=2) 

68 

69 def fetch_worms_phylogeny(self, scientific_name: str) -> bool: 

70 """ 

71 Fetches the phylogeny of a given scientific name from WoRMS. Returns True if successful, False otherwise. 

72 """ 

73 print(f'Fetching phylogeny for "{scientific_name}"') 

74 worms_id_res = requests.get(url=f'{WORMS_REST_URL}/AphiaIDByName/{scientific_name}?marine_only=true') 

75 if worms_id_res.status_code == 200 and worms_id_res.json() != -999: # -999 means more than one matching record 

76 aphia_id = worms_id_res.json() 

77 worms_tree_res = requests.get(url=f'{WORMS_REST_URL}/AphiaClassificationByAphiaID/{aphia_id}') 

78 if worms_tree_res.status_code == 200: 

79 self.phylogeny[scientific_name] = flatten_taxa_tree(worms_tree_res.json(), {}) 

80 self.phylogeny[scientific_name]['aphia_id'] = aphia_id 

81 else: 

82 worms_name_res = requests.get(url=f'{WORMS_REST_URL}/AphiaRecordsByName/{scientific_name}?like=false&marine_only=true&offset=1') 

83 if worms_name_res.status_code == 200 and len(worms_name_res.json()) > 0: 

84 # just take the first accepted record 

85 for record in worms_name_res.json(): 

86 if record['status'] == 'accepted': 

87 worms_tree_res_2 = requests.get(url=f'{WORMS_REST_URL}/AphiaClassificationByAphiaID/{record["AphiaID"]}') 

88 if worms_tree_res_2.status_code == 200: 

89 self.phylogeny[scientific_name] = flatten_taxa_tree(worms_tree_res_2.json(), {}) 

90 self.phylogeny[scientific_name]['aphia_id'] = record['AphiaID'] 

91 break 

92 else: 

93 print(f'{TERM_RED}No accepted record found for concept name "{scientific_name}"{TERM_NORMAL}') 

94 return False 

95 return True 

96 

97 def fetch_localizations(self): 

98 print('Fetching localizations...') 

99 sys.stdout.flush() 

100 for section in self.sections: 

101 # REST is much faster than Python API for large queries 

102 res = requests.get( 

103 url=f'{self.tator_url}/rest/Localizations/{self.project_id}?section={section.section_id}', 

104 headers={ 

105 'Content-Type': 'application/json', 

106 'Authorization': f'Token {session["tator_token"]}', 

107 }) 

108 section.localizations = res.json() 

109 print(f'Fetched {len(section.localizations)} localizations for deployment {section.deployment_name}') 

110 

111 def process_records( 

112 self, 

113 no_match_records: set = None, 

114 get_timestamp: bool = False, 

115 get_ctd: bool = False, 

116 get_substrates: bool = False, 

117 ): 

118 print('Processing localizations...', end='') 

119 sys.stdout.flush() 

120 formatted_localizations = [] 

121 expedition_fieldbook = {} # {section_id: deployments[]} 

122 media_substrates = {} # {media_id: substrates} 

123 

124 if not no_match_records: 

125 no_match_records = set() 

126 

127 for section in self.sections: 

128 for localization in section.localizations: 

129 if localization['type'] not in [TatorLocalizationType.BOX.value, TatorLocalizationType.DOT.value]: 

130 continue # we only care about boxes and dots 

131 scientific_name = localization['attributes'].get('Scientific Name') 

132 cached_phylogeny = self.phylogeny.get(scientific_name) 

133 if (cached_phylogeny is None or 'aphia_id' not in cached_phylogeny.keys())\ 

134 and scientific_name not in no_match_records: 

135 if not self.fetch_worms_phylogeny(scientific_name): 

136 no_match_records.add(scientific_name) 

137 localization_dict = { 

138 'elemental_id': localization['elemental_id'], 

139 'section_id': section.section_id, 

140 'all_localizations': { 

141 'id': localization['id'], 

142 'elemental_id': localization['elemental_id'], 

143 'version': localization['version'], 

144 'type': localization['type'], 

145 'points': [round(localization['x'], 5), round(localization['y'], 5)], 

146 'dimensions': [localization['width'], localization['height']] if localization['type'] == TatorLocalizationType.BOX.value else None, 

147 }, 

148 'type': localization['type'], 

149 'video_sequence_name': section.deployment_name, 

150 'scientific_name': scientific_name, 

151 'count': 0 if localization['type'] == TatorLocalizationType.BOX.value else 1, 

152 'attracted': localization['attributes'].get('Attracted'), 

153 'categorical_abundance': localization['attributes'].get('Categorical Abundance'), 

154 'identification_remarks': localization['attributes'].get('IdentificationRemarks'), 

155 'identified_by': localization['attributes'].get('Identified By'), 

156 'notes': localization['attributes'].get('Notes'), 

157 'qualifier': localization['attributes'].get('Qualifier'), 

158 'reason': localization['attributes'].get('Reason'), 

159 'morphospecies': localization['attributes'].get('Morphospecies'), 

160 'tentative_id': localization['attributes'].get('Tentative ID'), 

161 'good_image': True if localization['attributes'].get('Good Image') else False, 

162 'annotator': KNOWN_ANNOTATORS[localization['created_by']] if localization['created_by'] in KNOWN_ANNOTATORS.keys() else f'Unknown Annotator (#{localization["created_by"]})', 

163 'frame': localization['frame'], 

164 'frame_url': f'/tator/frame/{localization["media"]}/{localization["frame"]}', 

165 'media_id': localization['media'], 

166 'problems': localization['problems'] if 'problems' in localization.keys() else None, 

167 'do_temp_c': localization['attributes'].get('DO Temperature (celsius)'), 

168 'do_concentration_salin_comp_mol_L': localization['attributes'].get('DO Concentration Salin Comp (mol per L)'), 

169 'depth_m': localization['attributes'].get('Depth'), 

170 } 

171 if localization_dict['categorical_abundance'] and localization_dict['categorical_abundance'] != '--': 

172 match localization_dict['categorical_abundance']: 

173 case '1-19': 

174 localization_dict['count'] = 10 

175 case '20-49': 

176 localization_dict['count'] = 35 

177 case '50-99': 

178 localization_dict['count'] = 75 

179 case '100-999': 

180 localization_dict['count'] = 500 

181 case '1000+': 

182 localization_dict['count'] = 1000 

183 case _: 

184 print(f'{TERM_RED}Unknown categorical abundance: {localization_dict["categorical_abundance"]}{TERM_NORMAL}') 

185 if get_timestamp: 

186 if localization['media'] in session['media_timestamps'].keys(): 

187 camera_bottom_arrival = datetime.datetime.strptime(section.bottom_time, self.BOTTOM_TIME_FORMAT).replace(tzinfo=datetime.timezone.utc) 

188 video_start_timestamp = datetime.datetime.fromisoformat(session['media_timestamps'][localization['media']]) 

189 observation_timestamp = video_start_timestamp + datetime.timedelta(seconds=localization['frame'] / 30) 

190 time_diff = observation_timestamp - camera_bottom_arrival 

191 localization_dict['timestamp'] = observation_timestamp.strftime(self.BOTTOM_TIME_FORMAT) 

192 localization_dict['camera_seafloor_arrival'] = camera_bottom_arrival.strftime(self.BOTTOM_TIME_FORMAT) 

193 localization_dict['animal_arrival'] = str(datetime.timedelta( 

194 days=time_diff.days, 

195 seconds=time_diff.seconds 

196 )) if observation_timestamp > camera_bottom_arrival else '00:00:00' 

197 if get_ctd: 

198 if not expedition_fieldbook.get(section.section_id): 

199 fieldbook_res = requests.get( 

200 url=f'{self.darc_review_url}/dropcam-fieldbook/{section.section_id}', 

201 headers={'API-Key': os.environ.get('DARC_REVIEW_API_KEY')}, 

202 ) 

203 if fieldbook_res.status_code == 200: 

204 expedition_fieldbook[section.section_id] = fieldbook_res.json()['deployments'] 

205 else: 

206 print(f'{TERM_RED}Error fetching expedition fieldbook.{TERM_NORMAL}') 

207 print(fieldbook_res.text) 

208 deployment_name = section.deployment_name.replace('-', '_') # for DOEX0087_NIU-dscm-02 

209 deployment_ctd = next((x for x in expedition_fieldbook[section.section_id] if x['deployment_name'] == deployment_name), None) 

210 if deployment_ctd: 

211 localization_dict['lat'] = deployment_ctd['lat'] 

212 localization_dict['long'] = deployment_ctd['long'] 

213 localization_dict['bait_type'] = deployment_ctd['bait_type'] 

214 localization_dict['depth_m'] = localization_dict['depth_m'] or deployment_ctd['depth_m'] 

215 if get_substrates: 

216 media_id = localization['media'] 

217 if not media_substrates.get(media_id): 

218 media_substrates[media_id] = self.api.get_media(media_id).attributes 

219 localization_dict['primary_substrate'] = media_substrates[media_id].get('Primary Substrate') 

220 localization_dict['secondary_substrate'] = media_substrates[media_id].get('Secondary Substrate') 

221 localization_dict['bedforms'] = media_substrates[media_id].get('Bedforms') 

222 localization_dict['relief'] = media_substrates[media_id].get('Relief') 

223 localization_dict['substrate_notes'] = media_substrates[media_id].get('Substrate Notes') 

224 localization_dict['deployment_notes'] = media_substrates[media_id].get('Deployment Notes') 

225 if scientific_name in self.phylogeny.keys(): 

226 for key in self.phylogeny[scientific_name].keys(): 

227 # split to account for worms 'Phylum (Division)' case 

228 localization_dict[key.split(' ')[0]] = self.phylogeny[scientific_name][key] 

229 formatted_localizations.append(localization_dict) 

230 

231 if not formatted_localizations: 

232 print('no records to process!') 

233 return 

234 

235 localization_df = pd.DataFrame(formatted_localizations, columns=[ 

236 'elemental_id', 

237 'section_id', 

238 'timestamp', 

239 'camera_seafloor_arrival', 

240 'animal_arrival', 

241 'all_localizations', 

242 'type', 

243 'video_sequence_name', 

244 'scientific_name', 

245 'count', 

246 'attracted', 

247 'categorical_abundance', 

248 'identification_remarks', 

249 'identified_by', 

250 'notes', 

251 'qualifier', 

252 'morphospecies', 

253 'reason', 

254 'tentative_id', 

255 'good_image', 

256 'annotator', 

257 'frame', 

258 'frame_url', 

259 'media_id', 

260 'problems', 

261 'lat', 

262 'long', 

263 'depth_m', 

264 'do_temp_c', 

265 'do_concentration_salin_comp_mol_L', 

266 'bait_type', 

267 'primary_substrate', 

268 'secondary_substrate', 

269 'bedforms', 

270 'relief', 

271 'substrate_notes', 

272 'deployment_notes', 

273 'phylum', 

274 'class', 

275 'subclass', 

276 'order', 

277 'suborder', 

278 'family', 

279 'subfamily', 

280 'genus', 

281 'subgenus', 

282 'species', 

283 'subspecies', 

284 'aphia_id', 

285 ]) 

286 

287 def collect_localizations(items): 

288 return [item for item in items] 

289 

290 def first_if_all_same(series): 

291 return series.iloc[0] if len(series.unique()) == 1 else f'Non-uniform values across dots: {series.unique()}'.replace("'", '"') 

292 

293 localization_df = localization_df.groupby([ 

294 'media_id', 

295 'frame', 

296 'scientific_name', 

297 'tentative_id', 

298 'morphospecies', 

299 'type', 

300 ], dropna=False).agg({ 

301 'elemental_id': 'first', 

302 'section_id': 'first', 

303 'timestamp': 'first', 

304 'camera_seafloor_arrival': 'first', 

305 'animal_arrival': 'first', 

306 'all_localizations': collect_localizations, 

307 'count': 'sum', 

308 'attracted': first_if_all_same, 

309 'categorical_abundance': first_if_all_same, 

310 'identification_remarks': first_if_all_same, 

311 'identified_by': first_if_all_same, 

312 'notes': first_if_all_same, 

313 'qualifier': first_if_all_same, 

314 'reason': first_if_all_same, 

315 'good_image': 'first', 

316 'video_sequence_name': 'first', 

317 'annotator': 'first', 

318 'frame_url': 'first', 

319 'problems': 'first', 

320 'lat': 'first', 

321 'long': 'first', 

322 'depth_m': 'first', 

323 'do_temp_c': 'first', 

324 'do_concentration_salin_comp_mol_L': 'first', 

325 'bait_type': 'first', 

326 'primary_substrate': 'first', 

327 'secondary_substrate': 'first', 

328 'bedforms': 'first', 

329 'relief': 'first', 

330 'substrate_notes': 'first', 

331 'deployment_notes': 'first', 

332 'phylum': 'first', 

333 'class': 'first', 

334 'subclass': 'first', 

335 'order': 'first', 

336 'suborder': 'first', 

337 'family': 'first', 

338 'subfamily': 'first', 

339 'genus': 'first', 

340 'subgenus': 'first', 

341 'species': 'first', 

342 'subspecies': 'first', 

343 'aphia_id': 'first', 

344 }).reset_index() 

345 

346 localization_df = localization_df.sort_values(by=[ 

347 'phylum', 

348 'class', 

349 'subclass', 

350 'order', 

351 'suborder', 

352 'family', 

353 'subfamily', 

354 'genus', 

355 'species', 

356 'scientific_name', 

357 'tentative_id', 

358 'media_id', 

359 'frame', 

360 ]) 

361 

362 def is_populated(val): 

363 if isinstance(val, (list, pd.Series)): 

364 return pd.notnull(val).all() 

365 return pd.notnull(val) 

366 

367 for index, row in localization_df.iterrows(): 

368 record = { 

369 'observation_uuid': row['elemental_id'], 

370 'timestamp': row['timestamp'], 

371 'camera_seafloor_arrival': row['camera_seafloor_arrival'], 

372 'animal_arrival': row['animal_arrival'], 

373 'all_localizations': row['all_localizations'], 

374 'media_id': row['media_id'], 

375 'frame': row['frame'], 

376 'frame_url': row['frame_url'], 

377 'annotator': row['annotator'], 

378 'type': row['type'], 

379 'scientific_name': row['scientific_name'] if row['scientific_name'] != '' else '--', 

380 'section_id': row['section_id'], 

381 'video_sequence_name': row['video_sequence_name'], 

382 'count': row['count'], 

383 'attracted': row['attracted'], 

384 'categorical_abundance': row['categorical_abundance'], 

385 'identification_remarks': row['identification_remarks'], 

386 'identified_by': row['identified_by'], 

387 'notes': row['notes'], 

388 'qualifier': row['qualifier'], 

389 'reason': row['reason'], 

390 'tentative_id': row['tentative_id'], 

391 'morphospecies': row['morphospecies'], 

392 'good_image': row['good_image'], 

393 'problems': row['problems'], 

394 'lat': row['lat'], 

395 'long': row['long'], 

396 'depth_m': row['depth_m'], 

397 'do_temp_c': row['do_temp_c'], 

398 'do_concentration_salin_comp_mol_L': row['do_concentration_salin_comp_mol_L'], 

399 'bait_type': row['bait_type'], 

400 'primary_substrate': row['primary_substrate'], 

401 'secondary_substrate': row['secondary_substrate'], 

402 'bedforms': row['bedforms'], 

403 'relief': row['relief'], 

404 'substrate_notes': row['substrate_notes'], 

405 'deployment_notes': row['deployment_notes'], 

406 'phylum': row['phylum'], 

407 'class': row['class'], 

408 'subclass': row['subclass'], 

409 'order': row['order'], 

410 'suborder': row['suborder'], 

411 'family': row['family'], 

412 'subfamily': row['subfamily'], 

413 'genus': row['genus'], 

414 'subgenus': row['subgenus'], 

415 'species': row['species'], 

416 'subspecies': row['subspecies'], 

417 'aphia_id': row['aphia_id'], 

418 } 

419 self.final_records.append({key: val for key, val in record.items() if is_populated(val)}) 

420 self.save_phylogeny() 

421 print('processed!')