Coverage for application / tator / tator_localization_processor.py: 12%

161 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 05:22 +0000

1import datetime 

2import os 

3 

4import pandas as pd 

5import requests 

6import sys 

7import tator 

8 

9from flask import session 

10from application.util.constants import TERM_RED, TERM_NORMAL 

11from application.tator.tator_type import TatorLocalizationType 

12from application.util.phylogeny_cache import PhylogenyCache 

13from application.tator.tator_rest_client import TatorRestClient 

14 

15 

16class Section: 

17 def __init__(self, section_id: str, api: tator.api): 

18 section_data = api.get_section(int(section_id)) 

19 self.section_id = section_id 

20 self.deployment_name = section_data.name 

21 self.expedition_name = section_data.path.split('.')[0] 

22 self.localizations = [] 

23 self.bottom_time = None 

24 

25 

26class TatorLocalizationProcessor: 

27 """ 

28 Fetches all localization information for a given project/section/deployment list from Tator. Processes 

29 and sorts data for display on the image review pages. 

30 """ 

31 

32 BOTTOM_TIME_FORMAT = '%Y-%m-%d %H:%M:%SZ' 

33 

34 def __init__( 

35 self, 

36 project_id: int, 

37 section_ids: list[str], 

38 api: tator.api, 

39 tator_url: str, 

40 darc_review_url: str = None, 

41 transect_media_ids: list[int] = None, 

42 ): 

43 self.project_id = project_id 

44 self.tator_url = tator_url 

45 self.darc_review_url = darc_review_url 

46 self.sections = [Section(section_id, api) for section_id in section_ids] 

47 self.api = api 

48 self.tator_client = TatorRestClient(tator_url, session['tator_token']) 

49 self.final_records: list[dict]|dict = [] # final list formatted for review page 

50 self.phylogeny = PhylogenyCache() 

51 self.transect_media_ids = set(media_id for media_id in transect_media_ids) if transect_media_ids else None 

52 

53 def fetch_localizations(self): 

54 print('Fetching localizations...') 

55 sys.stdout.flush() 

56 if self.transect_media_ids: # list of transects, fetch by media IDs instead of section 

57 section_map = {int(section.section_id): section for section in self.sections} 

58 media_id_list = list(self.transect_media_ids) 

59 for i in range(0, len(media_id_list), 50): 

60 batch = media_id_list[i:i + 50] 

61 for localization in self.tator_client.get_localizations(self.project_id, media_id=batch): 

62 section = section_map.get(localization.get('master_section'), self.sections[0]) 

63 section.localizations.append(localization) 

64 for section in self.sections: 

65 print(f'Fetched {len(section.localizations)} localizations for deployment {section.deployment_name}') 

66 else: 

67 for section in self.sections: 

68 section.localizations = self.tator_client.get_localizations(self.project_id, section=section.section_id) 

69 print(f'Fetched {len(section.localizations)} localizations for deployment {section.deployment_name}') 

70 

71 def process_records( 

72 self, 

73 no_match_records: set = None, 

74 get_timestamp: bool = False, 

75 get_ctd: bool = False, 

76 get_substrates: bool = False, 

77 ): 

78 print('Processing localizations...', end='') 

79 sys.stdout.flush() 

80 formatted_localizations = [] 

81 expedition_fieldbook = {} # {section_id: deployments[]} 

82 media_substrates = {} # {media_id: substrates} 

83 if 'media_fps' not in session: 

84 session['media_fps'] = {} 

85 

86 if not no_match_records: 

87 no_match_records = set() 

88 

89 for section in self.sections: 

90 for localization in section.localizations: 

91 if not TatorLocalizationType.is_relevant(localization['type']): 

92 continue # we only care about boxes and dots 

93 scientific_name = localization['attributes'].get('Scientific Name') 

94 cached_phylogeny = self.phylogeny.data.get(scientific_name) 

95 if (cached_phylogeny is None or 'aphia_id' not in cached_phylogeny.keys())\ 

96 and scientific_name not in no_match_records: 

97 if not self.phylogeny.fetch_worms(scientific_name): 

98 no_match_records.add(scientific_name) 

99 localization_dict = { 

100 'elemental_id': localization['elemental_id'], 

101 'section_id': section.section_id, 

102 'all_localizations': { 

103 'id': localization['id'], 

104 'elemental_id': localization['elemental_id'], 

105 'version': localization['version'], 

106 'type': localization['type'], 

107 'points': [round(localization['x'], 5), round(localization['y'], 5)], 

108 'dimensions': [localization['width'], localization['height']] if TatorLocalizationType.is_box(localization['type']) else None, 

109 }, 

110 'type': localization['type'], 

111 'video_sequence_name': section.deployment_name, 

112 'scientific_name': scientific_name, 

113 'count': 0 if TatorLocalizationType.is_box(localization['type']) else 1, 

114 'attracted': localization['attributes'].get('Attracted'), 

115 'upon': localization['attributes'].get('Upon'), 

116 'categorical_abundance': localization['attributes'].get('Categorical Abundance'), 

117 'identification_remarks': localization['attributes'].get('IdentificationRemarks'), 

118 'identified_by': localization['attributes'].get('Identified By'), 

119 'notes': localization['attributes'].get('Notes'), 

120 'qualifier': localization['attributes'].get('Qualifier'), 

121 'reason': localization['attributes'].get('Reason'), 

122 'morphospecies': localization['attributes'].get('Morphospecies'), 

123 'tentative_id': localization['attributes'].get('Tentative ID'), 

124 'good_image': True if localization['attributes'].get('Good Image') else False, 

125 'annotator': self._get_annotator_name(localization['created_by']), 

126 'frame': localization['frame'], 

127 'frame_url': f'/tator/frame/{localization["media"]}/{localization["frame"]}', 

128 'media_id': localization['media'], 

129 'problems': localization['problems'] if 'problems' in localization.keys() else None, 

130 'do_temp_c': localization['attributes'].get('DO Temperature (celsius)'), 

131 'do_concentration_salin_comp_mol_L': localization['attributes'].get('DO Concentration Salin Comp (mol per L)'), 

132 'depth_m': localization['attributes'].get('Depth'), 

133 } 

134 if localization_dict['categorical_abundance'] and localization_dict['categorical_abundance'] != '--': 

135 match localization_dict['categorical_abundance']: 

136 case '1-19': 

137 localization_dict['count'] = 10 

138 case '20-49': 

139 localization_dict['count'] = 35 

140 case '50-99': 

141 localization_dict['count'] = 75 

142 case '100-999': 

143 localization_dict['count'] = 500 

144 case '1000+': 

145 localization_dict['count'] = 1000 

146 case _: 

147 print(f'{TERM_RED}Unknown categorical abundance: {localization_dict["categorical_abundance"]}{TERM_NORMAL}') 

148 if get_timestamp: 

149 if section.bottom_time is None: 

150 raise ValueError(f'No Arrival time found for section {section.deployment_name}. Cannot calculate timestamps.') 

151 media_id = localization['media'] 

152 if media_id in session['media_timestamps'].keys(): 

153 if media_id not in session['media_fps'].keys(): 

154 session['media_fps'][media_id] = self.api.get_media(media_id).fps 

155 session.modified = True 

156 media_fps = session['media_fps'][media_id] or 30 

157 camera_bottom_arrival = datetime.datetime.strptime(section.bottom_time, self.BOTTOM_TIME_FORMAT).replace(tzinfo=datetime.timezone.utc) 

158 video_start_timestamp = datetime.datetime.fromisoformat(session['media_timestamps'][media_id]).astimezone(datetime.timezone.utc) 

159 observation_timestamp = video_start_timestamp + datetime.timedelta(seconds=localization['frame'] / media_fps) 

160 time_diff = observation_timestamp - camera_bottom_arrival 

161 localization_dict['timestamp'] = observation_timestamp.strftime(self.BOTTOM_TIME_FORMAT) 

162 localization_dict['camera_seafloor_arrival'] = camera_bottom_arrival.strftime(self.BOTTOM_TIME_FORMAT) 

163 localization_dict['animal_arrival'] = str(datetime.timedelta( 

164 days=time_diff.days, 

165 seconds=time_diff.seconds 

166 )) if observation_timestamp > camera_bottom_arrival else '00:00:00' 

167 if get_ctd: 

168 if not expedition_fieldbook.get(section.section_id): 

169 fieldbook_res = requests.get( 

170 url=f'{self.darc_review_url}/dropcam-fieldbook/{section.section_id}', 

171 headers={'API-Key': os.environ.get('DARC_REVIEW_API_KEY')}, 

172 ) 

173 if fieldbook_res.status_code == 200: 

174 expedition_fieldbook[section.section_id] = fieldbook_res.json()['deployments'] 

175 else: 

176 print(f'{TERM_RED}Error fetching expedition fieldbook.{TERM_NORMAL}') 

177 print(fieldbook_res.text) 

178 deployment_name = section.deployment_name.replace('-', '_') # for DOEX0087_NIU-dscm-02 

179 if section.section_id not in expedition_fieldbook.keys(): 

180 print(f'{TERM_RED}No fieldbook data found for section {section.section_id}{TERM_NORMAL}') 

181 raise ValueError(f'No fieldbook data found for section {section.section_id}') 

182 deployment_ctd = next((x for x in expedition_fieldbook[section.section_id] if x['deployment_name'] == deployment_name), None) 

183 if deployment_ctd: 

184 localization_dict['lat'] = deployment_ctd['lat'] 

185 localization_dict['long'] = deployment_ctd['long'] 

186 localization_dict['bait_type'] = deployment_ctd['bait_type'] 

187 localization_dict['depth_m'] = localization_dict['depth_m'] or deployment_ctd['depth_m'] 

188 if get_substrates: 

189 media_id = localization['media'] 

190 if not media_substrates.get(media_id): 

191 media_substrates[media_id] = self.api.get_media(media_id).attributes 

192 localization_dict['primary_substrate'] = media_substrates[media_id].get('Primary Substrate') 

193 localization_dict['secondary_substrate'] = media_substrates[media_id].get('Secondary Substrate') 

194 localization_dict['bedforms'] = media_substrates[media_id].get('Bedforms') 

195 localization_dict['relief'] = media_substrates[media_id].get('Relief') 

196 localization_dict['substrate_notes'] = media_substrates[media_id].get('Substrate Notes') 

197 localization_dict['deployment_notes'] = media_substrates[media_id].get('Deployment Notes') 

198 if scientific_name in self.phylogeny.data: 

199 for key in self.phylogeny.data[scientific_name].keys(): 

200 # split to account for worms 'Phylum (Division)' case 

201 localization_dict[key.split(' ')[0]] = self.phylogeny.data[scientific_name][key] 

202 formatted_localizations.append(localization_dict) 

203 

204 if not formatted_localizations: 

205 print('no records to process!') 

206 return 

207 

208 localization_df = pd.DataFrame(formatted_localizations, columns=[ 

209 'elemental_id', 

210 'section_id', 

211 'timestamp', 

212 'camera_seafloor_arrival', 

213 'animal_arrival', 

214 'all_localizations', 

215 'type', 

216 'video_sequence_name', 

217 'scientific_name', 

218 'count', 

219 'attracted', 

220 'upon', 

221 'categorical_abundance', 

222 'identification_remarks', 

223 'identified_by', 

224 'notes', 

225 'qualifier', 

226 'morphospecies', 

227 'reason', 

228 'tentative_id', 

229 'good_image', 

230 'annotator', 

231 'frame', 

232 'frame_url', 

233 'media_id', 

234 'problems', 

235 'lat', 

236 'long', 

237 'depth_m', 

238 'do_temp_c', 

239 'do_concentration_salin_comp_mol_L', 

240 'bait_type', 

241 'primary_substrate', 

242 'secondary_substrate', 

243 'bedforms', 

244 'relief', 

245 'substrate_notes', 

246 'deployment_notes', 

247 'phylum', 

248 'class', 

249 'subclass', 

250 'order', 

251 'suborder', 

252 'family', 

253 'subfamily', 

254 'genus', 

255 'subgenus', 

256 'species', 

257 'subspecies', 

258 'aphia_id', 

259 ]) 

260 

261 def collect_localizations(items): 

262 return [item for item in items] 

263 

264 def first_if_all_same(series): 

265 return series.iloc[0] if len(series.unique()) == 1 else f'Non-uniform values across dots: {series.unique()}'.replace("'", '"') 

266 

267 localization_df = localization_df.groupby([ 

268 'media_id', 

269 'frame', 

270 'scientific_name', 

271 'tentative_id', 

272 'morphospecies', 

273 'type', 

274 ], dropna=False).agg({ 

275 'elemental_id': 'first', 

276 'section_id': 'first', 

277 'timestamp': 'first', 

278 'camera_seafloor_arrival': 'first', 

279 'animal_arrival': 'first', 

280 'all_localizations': collect_localizations, 

281 'count': 'sum', 

282 'attracted': first_if_all_same, 

283 'upon': first_if_all_same, 

284 'categorical_abundance': first_if_all_same, 

285 'identification_remarks': first_if_all_same, 

286 'identified_by': first_if_all_same, 

287 'notes': first_if_all_same, 

288 'qualifier': first_if_all_same, 

289 'reason': first_if_all_same, 

290 'good_image': 'first', 

291 'video_sequence_name': 'first', 

292 'annotator': 'first', 

293 'frame_url': 'first', 

294 'problems': 'first', 

295 'lat': 'first', 

296 'long': 'first', 

297 'depth_m': 'first', 

298 'do_temp_c': 'first', 

299 'do_concentration_salin_comp_mol_L': 'first', 

300 'bait_type': 'first', 

301 'primary_substrate': 'first', 

302 'secondary_substrate': 'first', 

303 'bedforms': 'first', 

304 'relief': 'first', 

305 'substrate_notes': 'first', 

306 'deployment_notes': 'first', 

307 'phylum': 'first', 

308 'class': 'first', 

309 'subclass': 'first', 

310 'order': 'first', 

311 'suborder': 'first', 

312 'family': 'first', 

313 'subfamily': 'first', 

314 'genus': 'first', 

315 'subgenus': 'first', 

316 'species': 'first', 

317 'subspecies': 'first', 

318 'aphia_id': 'first', 

319 }).reset_index() 

320 

321 localization_df = localization_df.sort_values(by=[ 

322 'phylum', 

323 'class', 

324 'subclass', 

325 'order', 

326 'suborder', 

327 'family', 

328 'subfamily', 

329 'genus', 

330 'species', 

331 'scientific_name', 

332 'tentative_id', 

333 'media_id', 

334 'frame', 

335 ]) 

336 

337 def is_populated(val): 

338 if isinstance(val, (list, pd.Series)): 

339 return pd.notnull(val).all() 

340 return pd.notnull(val) 

341 

342 for index, row in localization_df.iterrows(): 

343 record = { 

344 'observation_uuid': row['elemental_id'], 

345 'timestamp': row['timestamp'], 

346 'camera_seafloor_arrival': row['camera_seafloor_arrival'], 

347 'animal_arrival': row['animal_arrival'], 

348 'all_localizations': row['all_localizations'], 

349 'media_id': row['media_id'], 

350 'frame': row['frame'], 

351 'frame_url': row['frame_url'], 

352 'annotator': row['annotator'], 

353 'type': row['type'], 

354 'scientific_name': row['scientific_name'] if row['scientific_name'] != '' else '--', 

355 'section_id': row['section_id'], 

356 'video_sequence_name': row['video_sequence_name'], 

357 'count': row['count'], 

358 'attracted': row['attracted'], 

359 'upon': row['upon'], 

360 'categorical_abundance': row['categorical_abundance'], 

361 'identification_remarks': row['identification_remarks'], 

362 'identified_by': row['identified_by'], 

363 'notes': row['notes'], 

364 'qualifier': row['qualifier'], 

365 'reason': row['reason'], 

366 'tentative_id': row['tentative_id'], 

367 'morphospecies': row['morphospecies'], 

368 'good_image': row['good_image'], 

369 'problems': row['problems'], 

370 'lat': row['lat'], 

371 'long': row['long'], 

372 'depth_m': row['depth_m'], 

373 'do_temp_c': row['do_temp_c'], 

374 'do_concentration_salin_comp_mol_L': row['do_concentration_salin_comp_mol_L'], 

375 'bait_type': row['bait_type'], 

376 'primary_substrate': row['primary_substrate'], 

377 'secondary_substrate': row['secondary_substrate'], 

378 'bedforms': row['bedforms'], 

379 'relief': row['relief'], 

380 'substrate_notes': row['substrate_notes'], 

381 'deployment_notes': row['deployment_notes'], 

382 'phylum': row['phylum'], 

383 'class': row['class'], 

384 'subclass': row['subclass'], 

385 'order': row['order'], 

386 'suborder': row['suborder'], 

387 'family': row['family'], 

388 'subfamily': row['subfamily'], 

389 'genus': row['genus'], 

390 'subgenus': row['subgenus'], 

391 'species': row['species'], 

392 'subspecies': row['subspecies'], 

393 'aphia_id': row['aphia_id'], 

394 } 

395 self.final_records.append({key: val for key, val in record.items() if is_populated(val)}) 

396 self.phylogeny.save() 

397 print('processed!') 

398 

399 def _get_annotator_name(self, user_id: int) -> str: 

400 if 'tator_usernames' not in session.keys(): 

401 session['tator_usernames'] = {} 

402 if user_id not in session['tator_usernames']: 

403 print(f'Fetching annotator name for user ID {user_id} from Tator...') 

404 res_json = self.tator_client.get_user(user_id) 

405 if 'first_name' not in res_json: 

406 print(f'{TERM_RED}Error fetching annotator name for user ID {user_id}{TERM_NORMAL}') 

407 return f'Unknown annotator (#{user_id})' 

408 annotator_name = f'{res_json["first_name"]} {res_json["last_name"]}' 

409 print(f'Annotator name for user ID {user_id} is "{annotator_name}"') 

410 session['tator_usernames'][user_id] = annotator_name 

411 session.modified = True 

412 return session['tator_usernames'][user_id]