Coverage for application / tator / tator_localization_processor.py: 12%
161 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 05:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 05:22 +0000
1import datetime
2import os
4import pandas as pd
5import requests
6import sys
7import tator
9from flask import session
10from application.util.constants import TERM_RED, TERM_NORMAL
11from application.tator.tator_type import TatorLocalizationType
12from application.util.phylogeny_cache import PhylogenyCache
13from application.tator.tator_rest_client import TatorRestClient
16class Section:
17 def __init__(self, section_id: str, api: tator.api):
18 section_data = api.get_section(int(section_id))
19 self.section_id = section_id
20 self.deployment_name = section_data.name
21 self.expedition_name = section_data.path.split('.')[0]
22 self.localizations = []
23 self.bottom_time = None
26class TatorLocalizationProcessor:
27 """
28 Fetches all localization information for a given project/section/deployment list from Tator. Processes
29 and sorts data for display on the image review pages.
30 """
32 BOTTOM_TIME_FORMAT = '%Y-%m-%d %H:%M:%SZ'
34 def __init__(
35 self,
36 project_id: int,
37 section_ids: list[str],
38 api: tator.api,
39 tator_url: str,
40 darc_review_url: str = None,
41 transect_media_ids: list[int] = None,
42 ):
43 self.project_id = project_id
44 self.tator_url = tator_url
45 self.darc_review_url = darc_review_url
46 self.sections = [Section(section_id, api) for section_id in section_ids]
47 self.api = api
48 self.tator_client = TatorRestClient(tator_url, session['tator_token'])
49 self.final_records: list[dict]|dict = [] # final list formatted for review page
50 self.phylogeny = PhylogenyCache()
51 self.transect_media_ids = set(media_id for media_id in transect_media_ids) if transect_media_ids else None
53 def fetch_localizations(self):
54 print('Fetching localizations...')
55 sys.stdout.flush()
56 if self.transect_media_ids: # list of transects, fetch by media IDs instead of section
57 section_map = {int(section.section_id): section for section in self.sections}
58 media_id_list = list(self.transect_media_ids)
59 for i in range(0, len(media_id_list), 50):
60 batch = media_id_list[i:i + 50]
61 for localization in self.tator_client.get_localizations(self.project_id, media_id=batch):
62 section = section_map.get(localization.get('master_section'), self.sections[0])
63 section.localizations.append(localization)
64 for section in self.sections:
65 print(f'Fetched {len(section.localizations)} localizations for deployment {section.deployment_name}')
66 else:
67 for section in self.sections:
68 section.localizations = self.tator_client.get_localizations(self.project_id, section=section.section_id)
69 print(f'Fetched {len(section.localizations)} localizations for deployment {section.deployment_name}')
71 def process_records(
72 self,
73 no_match_records: set = None,
74 get_timestamp: bool = False,
75 get_ctd: bool = False,
76 get_substrates: bool = False,
77 ):
78 print('Processing localizations...', end='')
79 sys.stdout.flush()
80 formatted_localizations = []
81 expedition_fieldbook = {} # {section_id: deployments[]}
82 media_substrates = {} # {media_id: substrates}
83 if 'media_fps' not in session:
84 session['media_fps'] = {}
86 if not no_match_records:
87 no_match_records = set()
89 for section in self.sections:
90 for localization in section.localizations:
91 if not TatorLocalizationType.is_relevant(localization['type']):
92 continue # we only care about boxes and dots
93 scientific_name = localization['attributes'].get('Scientific Name')
94 cached_phylogeny = self.phylogeny.data.get(scientific_name)
95 if (cached_phylogeny is None or 'aphia_id' not in cached_phylogeny.keys())\
96 and scientific_name not in no_match_records:
97 if not self.phylogeny.fetch_worms(scientific_name):
98 no_match_records.add(scientific_name)
99 localization_dict = {
100 'elemental_id': localization['elemental_id'],
101 'section_id': section.section_id,
102 'all_localizations': {
103 'id': localization['id'],
104 'elemental_id': localization['elemental_id'],
105 'version': localization['version'],
106 'type': localization['type'],
107 'points': [round(localization['x'], 5), round(localization['y'], 5)],
108 'dimensions': [localization['width'], localization['height']] if TatorLocalizationType.is_box(localization['type']) else None,
109 },
110 'type': localization['type'],
111 'video_sequence_name': section.deployment_name,
112 'scientific_name': scientific_name,
113 'count': 0 if TatorLocalizationType.is_box(localization['type']) else 1,
114 'attracted': localization['attributes'].get('Attracted'),
115 'upon': localization['attributes'].get('Upon'),
116 'categorical_abundance': localization['attributes'].get('Categorical Abundance'),
117 'identification_remarks': localization['attributes'].get('IdentificationRemarks'),
118 'identified_by': localization['attributes'].get('Identified By'),
119 'notes': localization['attributes'].get('Notes'),
120 'qualifier': localization['attributes'].get('Qualifier'),
121 'reason': localization['attributes'].get('Reason'),
122 'morphospecies': localization['attributes'].get('Morphospecies'),
123 'tentative_id': localization['attributes'].get('Tentative ID'),
124 'good_image': True if localization['attributes'].get('Good Image') else False,
125 'annotator': self._get_annotator_name(localization['created_by']),
126 'frame': localization['frame'],
127 'frame_url': f'/tator/frame/{localization["media"]}/{localization["frame"]}',
128 'media_id': localization['media'],
129 'problems': localization['problems'] if 'problems' in localization.keys() else None,
130 'do_temp_c': localization['attributes'].get('DO Temperature (celsius)'),
131 'do_concentration_salin_comp_mol_L': localization['attributes'].get('DO Concentration Salin Comp (mol per L)'),
132 'depth_m': localization['attributes'].get('Depth'),
133 }
134 if localization_dict['categorical_abundance'] and localization_dict['categorical_abundance'] != '--':
135 match localization_dict['categorical_abundance']:
136 case '1-19':
137 localization_dict['count'] = 10
138 case '20-49':
139 localization_dict['count'] = 35
140 case '50-99':
141 localization_dict['count'] = 75
142 case '100-999':
143 localization_dict['count'] = 500
144 case '1000+':
145 localization_dict['count'] = 1000
146 case _:
147 print(f'{TERM_RED}Unknown categorical abundance: {localization_dict["categorical_abundance"]}{TERM_NORMAL}')
148 if get_timestamp:
149 if section.bottom_time is None:
150 raise ValueError(f'No Arrival time found for section {section.deployment_name}. Cannot calculate timestamps.')
151 media_id = localization['media']
152 if media_id in session['media_timestamps'].keys():
153 if media_id not in session['media_fps'].keys():
154 session['media_fps'][media_id] = self.api.get_media(media_id).fps
155 session.modified = True
156 media_fps = session['media_fps'][media_id] or 30
157 camera_bottom_arrival = datetime.datetime.strptime(section.bottom_time, self.BOTTOM_TIME_FORMAT).replace(tzinfo=datetime.timezone.utc)
158 video_start_timestamp = datetime.datetime.fromisoformat(session['media_timestamps'][media_id]).astimezone(datetime.timezone.utc)
159 observation_timestamp = video_start_timestamp + datetime.timedelta(seconds=localization['frame'] / media_fps)
160 time_diff = observation_timestamp - camera_bottom_arrival
161 localization_dict['timestamp'] = observation_timestamp.strftime(self.BOTTOM_TIME_FORMAT)
162 localization_dict['camera_seafloor_arrival'] = camera_bottom_arrival.strftime(self.BOTTOM_TIME_FORMAT)
163 localization_dict['animal_arrival'] = str(datetime.timedelta(
164 days=time_diff.days,
165 seconds=time_diff.seconds
166 )) if observation_timestamp > camera_bottom_arrival else '00:00:00'
167 if get_ctd:
168 if not expedition_fieldbook.get(section.section_id):
169 fieldbook_res = requests.get(
170 url=f'{self.darc_review_url}/dropcam-fieldbook/{section.section_id}',
171 headers={'API-Key': os.environ.get('DARC_REVIEW_API_KEY')},
172 )
173 if fieldbook_res.status_code == 200:
174 expedition_fieldbook[section.section_id] = fieldbook_res.json()['deployments']
175 else:
176 print(f'{TERM_RED}Error fetching expedition fieldbook.{TERM_NORMAL}')
177 print(fieldbook_res.text)
178 deployment_name = section.deployment_name.replace('-', '_') # for DOEX0087_NIU-dscm-02
179 if section.section_id not in expedition_fieldbook.keys():
180 print(f'{TERM_RED}No fieldbook data found for section {section.section_id}{TERM_NORMAL}')
181 raise ValueError(f'No fieldbook data found for section {section.section_id}')
182 deployment_ctd = next((x for x in expedition_fieldbook[section.section_id] if x['deployment_name'] == deployment_name), None)
183 if deployment_ctd:
184 localization_dict['lat'] = deployment_ctd['lat']
185 localization_dict['long'] = deployment_ctd['long']
186 localization_dict['bait_type'] = deployment_ctd['bait_type']
187 localization_dict['depth_m'] = localization_dict['depth_m'] or deployment_ctd['depth_m']
188 if get_substrates:
189 media_id = localization['media']
190 if not media_substrates.get(media_id):
191 media_substrates[media_id] = self.api.get_media(media_id).attributes
192 localization_dict['primary_substrate'] = media_substrates[media_id].get('Primary Substrate')
193 localization_dict['secondary_substrate'] = media_substrates[media_id].get('Secondary Substrate')
194 localization_dict['bedforms'] = media_substrates[media_id].get('Bedforms')
195 localization_dict['relief'] = media_substrates[media_id].get('Relief')
196 localization_dict['substrate_notes'] = media_substrates[media_id].get('Substrate Notes')
197 localization_dict['deployment_notes'] = media_substrates[media_id].get('Deployment Notes')
198 if scientific_name in self.phylogeny.data:
199 for key in self.phylogeny.data[scientific_name].keys():
200 # split to account for worms 'Phylum (Division)' case
201 localization_dict[key.split(' ')[0]] = self.phylogeny.data[scientific_name][key]
202 formatted_localizations.append(localization_dict)
204 if not formatted_localizations:
205 print('no records to process!')
206 return
208 localization_df = pd.DataFrame(formatted_localizations, columns=[
209 'elemental_id',
210 'section_id',
211 'timestamp',
212 'camera_seafloor_arrival',
213 'animal_arrival',
214 'all_localizations',
215 'type',
216 'video_sequence_name',
217 'scientific_name',
218 'count',
219 'attracted',
220 'upon',
221 'categorical_abundance',
222 'identification_remarks',
223 'identified_by',
224 'notes',
225 'qualifier',
226 'morphospecies',
227 'reason',
228 'tentative_id',
229 'good_image',
230 'annotator',
231 'frame',
232 'frame_url',
233 'media_id',
234 'problems',
235 'lat',
236 'long',
237 'depth_m',
238 'do_temp_c',
239 'do_concentration_salin_comp_mol_L',
240 'bait_type',
241 'primary_substrate',
242 'secondary_substrate',
243 'bedforms',
244 'relief',
245 'substrate_notes',
246 'deployment_notes',
247 'phylum',
248 'class',
249 'subclass',
250 'order',
251 'suborder',
252 'family',
253 'subfamily',
254 'genus',
255 'subgenus',
256 'species',
257 'subspecies',
258 'aphia_id',
259 ])
261 def collect_localizations(items):
262 return [item for item in items]
264 def first_if_all_same(series):
265 return series.iloc[0] if len(series.unique()) == 1 else f'Non-uniform values across dots: {series.unique()}'.replace("'", '"')
267 localization_df = localization_df.groupby([
268 'media_id',
269 'frame',
270 'scientific_name',
271 'tentative_id',
272 'morphospecies',
273 'type',
274 ], dropna=False).agg({
275 'elemental_id': 'first',
276 'section_id': 'first',
277 'timestamp': 'first',
278 'camera_seafloor_arrival': 'first',
279 'animal_arrival': 'first',
280 'all_localizations': collect_localizations,
281 'count': 'sum',
282 'attracted': first_if_all_same,
283 'upon': first_if_all_same,
284 'categorical_abundance': first_if_all_same,
285 'identification_remarks': first_if_all_same,
286 'identified_by': first_if_all_same,
287 'notes': first_if_all_same,
288 'qualifier': first_if_all_same,
289 'reason': first_if_all_same,
290 'good_image': 'first',
291 'video_sequence_name': 'first',
292 'annotator': 'first',
293 'frame_url': 'first',
294 'problems': 'first',
295 'lat': 'first',
296 'long': 'first',
297 'depth_m': 'first',
298 'do_temp_c': 'first',
299 'do_concentration_salin_comp_mol_L': 'first',
300 'bait_type': 'first',
301 'primary_substrate': 'first',
302 'secondary_substrate': 'first',
303 'bedforms': 'first',
304 'relief': 'first',
305 'substrate_notes': 'first',
306 'deployment_notes': 'first',
307 'phylum': 'first',
308 'class': 'first',
309 'subclass': 'first',
310 'order': 'first',
311 'suborder': 'first',
312 'family': 'first',
313 'subfamily': 'first',
314 'genus': 'first',
315 'subgenus': 'first',
316 'species': 'first',
317 'subspecies': 'first',
318 'aphia_id': 'first',
319 }).reset_index()
321 localization_df = localization_df.sort_values(by=[
322 'phylum',
323 'class',
324 'subclass',
325 'order',
326 'suborder',
327 'family',
328 'subfamily',
329 'genus',
330 'species',
331 'scientific_name',
332 'tentative_id',
333 'media_id',
334 'frame',
335 ])
337 def is_populated(val):
338 if isinstance(val, (list, pd.Series)):
339 return pd.notnull(val).all()
340 return pd.notnull(val)
342 for index, row in localization_df.iterrows():
343 record = {
344 'observation_uuid': row['elemental_id'],
345 'timestamp': row['timestamp'],
346 'camera_seafloor_arrival': row['camera_seafloor_arrival'],
347 'animal_arrival': row['animal_arrival'],
348 'all_localizations': row['all_localizations'],
349 'media_id': row['media_id'],
350 'frame': row['frame'],
351 'frame_url': row['frame_url'],
352 'annotator': row['annotator'],
353 'type': row['type'],
354 'scientific_name': row['scientific_name'] if row['scientific_name'] != '' else '--',
355 'section_id': row['section_id'],
356 'video_sequence_name': row['video_sequence_name'],
357 'count': row['count'],
358 'attracted': row['attracted'],
359 'upon': row['upon'],
360 'categorical_abundance': row['categorical_abundance'],
361 'identification_remarks': row['identification_remarks'],
362 'identified_by': row['identified_by'],
363 'notes': row['notes'],
364 'qualifier': row['qualifier'],
365 'reason': row['reason'],
366 'tentative_id': row['tentative_id'],
367 'morphospecies': row['morphospecies'],
368 'good_image': row['good_image'],
369 'problems': row['problems'],
370 'lat': row['lat'],
371 'long': row['long'],
372 'depth_m': row['depth_m'],
373 'do_temp_c': row['do_temp_c'],
374 'do_concentration_salin_comp_mol_L': row['do_concentration_salin_comp_mol_L'],
375 'bait_type': row['bait_type'],
376 'primary_substrate': row['primary_substrate'],
377 'secondary_substrate': row['secondary_substrate'],
378 'bedforms': row['bedforms'],
379 'relief': row['relief'],
380 'substrate_notes': row['substrate_notes'],
381 'deployment_notes': row['deployment_notes'],
382 'phylum': row['phylum'],
383 'class': row['class'],
384 'subclass': row['subclass'],
385 'order': row['order'],
386 'suborder': row['suborder'],
387 'family': row['family'],
388 'subfamily': row['subfamily'],
389 'genus': row['genus'],
390 'subgenus': row['subgenus'],
391 'species': row['species'],
392 'subspecies': row['subspecies'],
393 'aphia_id': row['aphia_id'],
394 }
395 self.final_records.append({key: val for key, val in record.items() if is_populated(val)})
396 self.phylogeny.save()
397 print('processed!')
399 def _get_annotator_name(self, user_id: int) -> str:
400 if 'tator_usernames' not in session.keys():
401 session['tator_usernames'] = {}
402 if user_id not in session['tator_usernames']:
403 print(f'Fetching annotator name for user ID {user_id} from Tator...')
404 res_json = self.tator_client.get_user(user_id)
405 if 'first_name' not in res_json:
406 print(f'{TERM_RED}Error fetching annotator name for user ID {user_id}{TERM_NORMAL}')
407 return f'Unknown annotator (#{user_id})'
408 annotator_name = f'{res_json["first_name"]} {res_json["last_name"]}'
409 print(f'Annotator name for user ID {user_id} is "{annotator_name}"')
410 session['tator_usernames'][user_id] = annotator_name
411 session.modified = True
412 return session['tator_usernames'][user_id]