Coverage for application/image_review/tator/tator_localization_processor.py: 12%

160 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-23 02:22 +0000

1import datetime 

2import json 

3import os 

4import pandas as pd 

5import requests 

6import sys 

7import tator 

8 

9from flask import session 

10from application.util.constants import KNOWN_ANNOTATORS, TERM_RED, TERM_NORMAL 

11from application.util.tator_localization_type import TatorLocalizationType 

12from application.util.functions import flatten_taxa_tree 

13 

14WORMS_REST_URL = 'https://www.marinespecies.org/rest' 

15 

16 

17class TatorLocalizationProcessor: 

18 """ 

19 Fetches all localization information for a given project/section/deployment list from Tator. Processes 

20 and sorts data for display on the image review pages. 

21 """ 

22 

23 def __init__( 

24 self, 

25 project_id: int, 

26 section_id: int, 

27 api: tator.api, 

28 deployment_list: list, 

29 tator_url: str, 

30 darc_review_url: str = None, 

31 ): 

32 self.project_id = project_id 

33 self.section_id = section_id 

34 self.api = api 

35 self.deployments = deployment_list 

36 self.tator_url = tator_url 

37 self.darc_review_url = darc_review_url 

38 self.localizations = [] # list of raw localizations from tator 

39 self.final_records = [] # final list formatted for review page 

40 self.deployment_media_dict = {} # dict of all relevant media ids and their dep names 

41 self.bottom_times = {deployment: '' for deployment in deployment_list} 

42 self.phylogeny = {} 

43 self.section_name = self.api.get_section(section_id).name 

44 

45 def load_phylogeny(self): 

46 try: 

47 with open(os.path.join('cache', 'phylogeny.json'), 'r') as f: 

48 self.phylogeny = json.load(f) 

49 except FileNotFoundError: 

50 self.phylogeny = {} 

51 

52 def save_phylogeny(self): 

53 try: 

54 with open(os.path.join('cache', 'phylogeny.json'), 'w') as f: 

55 json.dump(self.phylogeny, f, indent=2) 

56 except FileNotFoundError: 

57 os.makedirs('cache') 

58 with open(os.path.join('cache', 'phylogeny.json'), 'w') as f: 

59 json.dump(self.phylogeny, f, indent=2) 

60 

61 def fetch_worms_phylogeny(self, scientific_name: str) -> bool: 

62 """ 

63 Fetches the phylogeny of a given scientific name from WoRMS. Returns True if successful, False otherwise. 

64 """ 

65 print(f'Fetching phylogeny for "{scientific_name}"') 

66 worms_id_res = requests.get(url=f'{WORMS_REST_URL}/AphiaIDByName/{scientific_name}?marine_only=true') 

67 if worms_id_res.status_code == 200 and worms_id_res.json() != -999: # -999 means more than one matching record 

68 aphia_id = worms_id_res.json() 

69 worms_tree_res = requests.get(url=f'{WORMS_REST_URL}/AphiaClassificationByAphiaID/{aphia_id}') 

70 if worms_tree_res.status_code == 200: 

71 self.phylogeny[scientific_name] = flatten_taxa_tree(worms_tree_res.json(), {}) 

72 self.phylogeny[scientific_name]['aphia_id'] = aphia_id 

73 else: 

74 worms_name_res = requests.get(url=f'{WORMS_REST_URL}/AphiaRecordsByName/{scientific_name}?like=false&marine_only=true&offset=1') 

75 if worms_name_res.status_code == 200 and len(worms_name_res.json()) > 0: 

76 # just take the first accepted record 

77 for record in worms_name_res.json(): 

78 if record['status'] == 'accepted': 

79 worms_tree_res_2 = requests.get(url=f'{WORMS_REST_URL}/AphiaClassificationByAphiaID/{record["AphiaID"]}') 

80 if worms_tree_res_2.status_code == 200: 

81 self.phylogeny[scientific_name] = flatten_taxa_tree(worms_tree_res_2.json(), {}) 

82 self.phylogeny[scientific_name]['aphia_id'] = record['AphiaID'] 

83 break 

84 else: 

85 print(f'{TERM_RED}No accepted record found for concept name "{scientific_name}"{TERM_NORMAL}') 

86 return False 

87 return True 

88 

89 def fetch_localizations(self): 

90 print('Fetching localizations...', end='') 

91 sys.stdout.flush() 

92 media_ids = [] 

93 for deployment in self.deployments: 

94 for media_id in session[f'{self.project_id}_{self.section_id}'][deployment]: 

95 self.deployment_media_dict[media_id] = deployment 

96 media_ids += session[f'{self.project_id}_{self.section_id}'][deployment] 

97 # REST is much faster than Python API for large queries 

98 # adding too many media ids results in a query that is too long, so we have to break it up 

99 for i in range(0, len(media_ids), 300): 

100 chunk = media_ids[i:i + 300] 

101 res = requests.get( 

102 url=f'{self.tator_url}/rest/Localizations/{self.project_id}?media_id={",".join(map(str, chunk))}', 

103 headers={ 

104 'Content-Type': 'application/json', 

105 'Authorization': f'Token {session["tator_token"]}', 

106 }) 

107 self.localizations += res.json() 

108 print(f'fetched {len(self.localizations)} localizations!') 

109 

110 def process_records( 

111 self, 

112 no_match_records: set = None, 

113 get_timestamp: bool = False, 

114 get_ctd: bool = False, 

115 get_substrates: bool = False, 

116 ): 

117 print('Processing localizations...', end='') 

118 sys.stdout.flush() 

119 formatted_localizations = [] 

120 expedition_fieldbook = [] 

121 deployment_substrates = {} 

122 

123 if not no_match_records: 

124 no_match_records = set() 

125 

126 if get_ctd: 

127 fieldbook = requests.get( 

128 url=f'{self.darc_review_url}/dropcam-fieldbook/{self.section_id}', 

129 headers={'API-Key': os.environ.get('DARC_REVIEW_API_KEY')}, 

130 ) 

131 if fieldbook.status_code == 200: 

132 expedition_fieldbook = fieldbook.json()['deployments'] 

133 else: 

134 print(f'{TERM_RED}Error fetching expedition fieldbook.{TERM_NORMAL}') 

135 

136 if get_substrates: 

137 for deployment in self.deployments: 

138 deployment_substrates[deployment] = self.api.get_media_list( 

139 project=self.project_id, 

140 section=self.section_id, 

141 attribute_contains=[f'$name::{deployment}'], 

142 stop=1, 

143 )[0].attributes 

144 

145 for localization in self.localizations: 

146 if localization['type'] not in [TatorLocalizationType.BOX.value, TatorLocalizationType.DOT.value]: 

147 continue # we only care about boxes and dots 

148 scientific_name = localization['attributes'].get('Scientific Name') 

149 cached_phylogeny = self.phylogeny.get(scientific_name) 

150 if (cached_phylogeny is None or 'aphia_id' not in cached_phylogeny.keys())\ 

151 and scientific_name not in no_match_records: 

152 if not self.fetch_worms_phylogeny(scientific_name): 

153 no_match_records.add(scientific_name) 

154 localization_dict = { 

155 'elemental_id': localization['elemental_id'], 

156 'all_localizations': { 

157 'id': localization['id'], 

158 'elemental_id': localization['elemental_id'], 

159 'version': localization['version'], 

160 'type': localization['type'], 

161 'points': [round(localization['x'], 5), round(localization['y'], 5)], 

162 'dimensions': [localization['width'], localization['height']] 

163 if localization['type'] == TatorLocalizationType.BOX.value 

164 else None, 

165 }, 

166 'type': localization['type'], 

167 'video_sequence_name': self.deployment_media_dict[localization['media']], 

168 'scientific_name': scientific_name, 

169 'count': 0 if localization['type'] == TatorLocalizationType.BOX.value else 1, 

170 'attracted': localization['attributes'].get('Attracted'), 

171 'categorical_abundance': localization['attributes'].get('Categorical Abundance'), 

172 'identification_remarks': localization['attributes'].get('IdentificationRemarks'), 

173 'identified_by': localization['attributes'].get('Identified By'), 

174 'notes': localization['attributes'].get('Notes'), 

175 'qualifier': localization['attributes'].get('Qualifier'), 

176 'reason': localization['attributes'].get('Reason'), 

177 'morphospecies': localization['attributes'].get('Morphospecies'), 

178 'tentative_id': localization['attributes'].get('Tentative ID'), 

179 'good_image': True if localization['attributes'].get('Good Image') else False, 

180 'annotator': KNOWN_ANNOTATORS[localization['created_by']] if localization['created_by'] in KNOWN_ANNOTATORS.keys() else f'Unknown Annotator (#{localization["created_by"]})', 

181 'frame': localization['frame'], 

182 'frame_url': f'/tator/frame/{localization["media"]}/{localization["frame"]}', 

183 'media_id': localization['media'], 

184 'problems': localization['problems'] if 'problems' in localization.keys() else None, 

185 'do_temp_c': localization['attributes'].get('DO Temperature (celsius)'), 

186 'do_concentration_salin_comp_mol_L': localization['attributes'].get('DO Concentration Salin Comp (mol per L)'), 

187 'depth_m': localization['attributes'].get('Depth'), 

188 } 

189 if localization_dict['categorical_abundance'] and localization_dict['categorical_abundance'] != '--': 

190 match localization_dict['categorical_abundance']: 

191 case '1-19': 

192 localization_dict['count'] = 10 

193 case '20-49': 

194 localization_dict['count'] = 35 

195 case '50-99': 

196 localization_dict['count'] = 75 

197 case '100-999': 

198 localization_dict['count'] = 500 

199 case '1000+': 

200 localization_dict['count'] = 1000 

201 case _: 

202 print(f'{TERM_RED}Unknown categorical abundance: {localization_dict["categorical_abundance"]}{TERM_NORMAL}') 

203 if get_timestamp: 

204 if localization['media'] in session['media_timestamps'].keys(): 

205 camera_bottom_arrival = datetime.datetime.strptime( 

206 self.bottom_times[self.deployment_media_dict[localization['media']]], 

207 '%Y-%m-%d %H:%M:%SZ' 

208 ).replace(tzinfo=datetime.timezone.utc) 

209 video_start_timestamp = datetime.datetime.fromisoformat(session['media_timestamps'][localization['media']]) 

210 observation_timestamp = video_start_timestamp + datetime.timedelta(seconds=localization['frame'] / 30) 

211 time_diff = observation_timestamp - camera_bottom_arrival 

212 localization_dict['timestamp'] = observation_timestamp.strftime('%Y-%m-%d %H:%M:%SZ') 

213 localization_dict['camera_seafloor_arrival'] = camera_bottom_arrival.strftime('%Y-%m-%d %H:%M:%SZ') 

214 localization_dict['animal_arrival'] = str(datetime.timedelta( 

215 days=time_diff.days, 

216 seconds=time_diff.seconds 

217 )) if observation_timestamp > camera_bottom_arrival else '00:00:00' 

218 if get_ctd and expedition_fieldbook: 

219 deployment_name = self.deployment_media_dict[localization['media']] 

220 deployment_name = deployment_name.replace('-', '_') # for DOEX0087_NIU-dscm-02 

221 deployment_ctd = next((x for x in expedition_fieldbook if x['deployment_name'] == deployment_name.replace('-', '_')), None) 

222 if deployment_ctd: 

223 localization_dict['lat'] = deployment_ctd['lat'] 

224 localization_dict['long'] = deployment_ctd['long'] 

225 localization_dict['bait_type'] = deployment_ctd['bait_type'] 

226 if get_substrates and deployment_substrates: 

227 localization_dict['primary_substrate'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Primary Substrate') 

228 localization_dict['secondary_substrate'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Secondary Substrate') 

229 localization_dict['bedforms'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Bedforms') 

230 localization_dict['relief'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Relief') 

231 localization_dict['substrate_notes'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Substrate Notes') 

232 localization_dict['deployment_notes'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Deployment Notes') 

233 if scientific_name in self.phylogeny.keys(): 

234 for key in self.phylogeny[scientific_name].keys(): 

235 # split to account for worms 'Phylum (Division)' case 

236 localization_dict[key.split(' ')[0]] = self.phylogeny[scientific_name][key] 

237 formatted_localizations.append(localization_dict) 

238 

239 if not formatted_localizations: 

240 print('no records to process!') 

241 return 

242 

243 localization_df = pd.DataFrame(formatted_localizations, columns=[ 

244 'elemental_id', 

245 'timestamp', 

246 'camera_seafloor_arrival', 

247 'animal_arrival', 

248 'all_localizations', 

249 'type', 

250 'video_sequence_name', 

251 'scientific_name', 

252 'count', 

253 'attracted', 

254 'categorical_abundance', 

255 'identification_remarks', 

256 'identified_by', 

257 'notes', 

258 'qualifier', 

259 'morphospecies', 

260 'reason', 

261 'tentative_id', 

262 'good_image', 

263 'annotator', 

264 'frame', 

265 'frame_url', 

266 'media_id', 

267 'problems', 

268 'lat', 

269 'long', 

270 'depth_m', 

271 'do_temp_c', 

272 'do_concentration_salin_comp_mol_L', 

273 'bait_type', 

274 'primary_substrate', 

275 'secondary_substrate', 

276 'bedforms', 

277 'relief', 

278 'substrate_notes', 

279 'deployment_notes', 

280 'phylum', 

281 'class', 

282 'subclass', 

283 'order', 

284 'suborder', 

285 'family', 

286 'subfamily', 

287 'genus', 

288 'subgenus', 

289 'species', 

290 'subspecies', 

291 'aphia_id', 

292 ]) 

293 

294 def collect_localizations(items): 

295 return [item for item in items] 

296 

297 def first_if_all_same(series): 

298 return series.iloc[0] if len(series.unique()) == 1 else f'Non-uniform values across dots: {series.unique()}'.replace("'", '"') 

299 

300 localization_df = localization_df.groupby([ 

301 'media_id', 

302 'frame', 

303 'scientific_name', 

304 'tentative_id', 

305 'morphospecies', 

306 'type', 

307 ], dropna=False).agg({ 

308 'elemental_id': 'first', 

309 'timestamp': 'first', 

310 'camera_seafloor_arrival': 'first', 

311 'animal_arrival': 'first', 

312 'all_localizations': collect_localizations, 

313 'count': 'sum', 

314 'attracted': first_if_all_same, 

315 'categorical_abundance': first_if_all_same, 

316 'identification_remarks': first_if_all_same, 

317 'identified_by': first_if_all_same, 

318 'notes': first_if_all_same, 

319 'qualifier': first_if_all_same, 

320 'reason': first_if_all_same, 

321 'good_image': 'first', 

322 'video_sequence_name': 'first', 

323 'annotator': 'first', 

324 'frame_url': 'first', 

325 'problems': 'first', 

326 'lat': 'first', 

327 'long': 'first', 

328 'depth_m': 'first', 

329 'do_temp_c': 'first', 

330 'do_concentration_salin_comp_mol_L': 'first', 

331 'bait_type': 'first', 

332 'primary_substrate': 'first', 

333 'secondary_substrate': 'first', 

334 'bedforms': 'first', 

335 'relief': 'first', 

336 'substrate_notes': 'first', 

337 'deployment_notes': 'first', 

338 'phylum': 'first', 

339 'class': 'first', 

340 'subclass': 'first', 

341 'order': 'first', 

342 'suborder': 'first', 

343 'family': 'first', 

344 'subfamily': 'first', 

345 'genus': 'first', 

346 'subgenus': 'first', 

347 'species': 'first', 

348 'subspecies': 'first', 

349 'aphia_id': 'first', 

350 }).reset_index() 

351 

352 localization_df = localization_df.sort_values(by=[ 

353 'phylum', 

354 'class', 

355 'subclass', 

356 'order', 

357 'suborder', 

358 'family', 

359 'subfamily', 

360 'genus', 

361 'species', 

362 'scientific_name', 

363 'tentative_id', 

364 'media_id', 

365 'frame', 

366 ]) 

367 

368 def is_populated(val): 

369 if isinstance(val, (list, pd.Series)): 

370 return pd.notnull(val).all() 

371 return pd.notnull(val) 

372 

373 for index, row in localization_df.iterrows(): 

374 record = { 

375 'observation_uuid': row['elemental_id'], 

376 'timestamp': row['timestamp'], 

377 'camera_seafloor_arrival': row['camera_seafloor_arrival'], 

378 'animal_arrival': row['animal_arrival'], 

379 'all_localizations': row['all_localizations'], 

380 'media_id': row['media_id'], 

381 'frame': row['frame'], 

382 'frame_url': row['frame_url'], 

383 'annotator': row['annotator'], 

384 'type': row['type'], 

385 'scientific_name': row['scientific_name'] if row['scientific_name'] != '' else '--', 

386 'section_id': self.section_id, 

387 'video_sequence_name': row['video_sequence_name'], 

388 'count': row['count'], 

389 'attracted': row['attracted'], 

390 'categorical_abundance': row['categorical_abundance'], 

391 'identification_remarks': row['identification_remarks'], 

392 'identified_by': row['identified_by'], 

393 'notes': row['notes'], 

394 'qualifier': row['qualifier'], 

395 'reason': row['reason'], 

396 'tentative_id': row['tentative_id'], 

397 'morphospecies': row['morphospecies'], 

398 'good_image': row['good_image'], 

399 'problems': row['problems'], 

400 'lat': row['lat'], 

401 'long': row['long'], 

402 'depth_m': row['depth_m'], 

403 'do_temp_c': row['do_temp_c'], 

404 'do_concentration_salin_comp_mol_L': row['do_concentration_salin_comp_mol_L'], 

405 'bait_type': row['bait_type'], 

406 'primary_substrate': row['primary_substrate'], 

407 'secondary_substrate': row['secondary_substrate'], 

408 'bedforms': row['bedforms'], 

409 'relief': row['relief'], 

410 'substrate_notes': row['substrate_notes'], 

411 'deployment_notes': row['deployment_notes'], 

412 'phylum': row['phylum'], 

413 'class': row['class'], 

414 'subclass': row['subclass'], 

415 'order': row['order'], 

416 'suborder': row['suborder'], 

417 'family': row['family'], 

418 'subfamily': row['subfamily'], 

419 'genus': row['genus'], 

420 'subgenus': row['subgenus'], 

421 'species': row['species'], 

422 'subspecies': row['subspecies'], 

423 'aphia_id': row['aphia_id'], 

424 } 

425 self.final_records.append({key: val for key, val in record.items() if is_populated(val)}) 

426 self.save_phylogeny() 

427 print('processed!')