Coverage for application / image_review / tator / tator_localization_processor.py: 14%
161 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 06:46 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 06:46 +0000
1import datetime
2import json
3import os
4from typing import List
6import pandas as pd
7import requests
8import sys
9import tator
11from flask import session
12from application.util.constants import KNOWN_ANNOTATORS, TERM_RED, TERM_NORMAL, TERM_YELLOW
13from application.util.tator_localization_type import TatorLocalizationType
14from application.util.functions import flatten_taxa_tree
16WORMS_REST_URL = 'https://www.marinespecies.org/rest'
19class Section:
20 def __init__(self, section_id: str, api: tator.api):
21 section_data = api.get_section(int(section_id))
22 self.section_id = section_id
23 self.deployment_name = section_data.name
24 self.expedition_name = section_data.path.split('.')[0]
25 self.localizations = []
26 self.bottom_time = None
29class TatorLocalizationProcessor:
30 """
31 Fetches all localization information for a given project/section/deployment list from Tator. Processes
32 and sorts data for display on the image review pages.
33 """
35 BOTTOM_TIME_FORMAT = '%Y-%m-%d %H:%M:%SZ'
37 def __init__(
38 self,
39 project_id: int,
40 section_ids: List[str],
41 api: tator.api,
42 tator_url: str,
43 darc_review_url: str = None,
44 ):
45 self.project_id = project_id
46 self.tator_url = tator_url
47 self.darc_review_url = darc_review_url
48 self.sections = [Section(section_id, api) for section_id in section_ids]
49 self.api = api
50 self.final_records = [] # final list formatted for review page
51 self.phylogeny = {}
53 def load_phylogeny(self):
54 try:
55 with open(os.path.join('cache', 'phylogeny.json'), 'r') as f:
56 self.phylogeny = json.load(f)
57 except FileNotFoundError:
58 self.phylogeny = {}
60 def save_phylogeny(self):
61 try:
62 with open(os.path.join('cache', 'phylogeny.json'), 'w') as f:
63 json.dump(self.phylogeny, f, indent=2)
64 except FileNotFoundError:
65 os.makedirs('cache')
66 with open(os.path.join('cache', 'phylogeny.json'), 'w') as f:
67 json.dump(self.phylogeny, f, indent=2)
69 def fetch_worms_phylogeny(self, scientific_name: str) -> bool:
70 """
71 Fetches the phylogeny of a given scientific name from WoRMS. Returns True if successful, False otherwise.
72 """
73 print(f'Fetching phylogeny for "{scientific_name}"')
74 worms_id_res = requests.get(url=f'{WORMS_REST_URL}/AphiaIDByName/{scientific_name}?marine_only=true')
75 if worms_id_res.status_code == 200 and worms_id_res.json() != -999: # -999 means more than one matching record
76 aphia_id = worms_id_res.json()
77 worms_tree_res = requests.get(url=f'{WORMS_REST_URL}/AphiaClassificationByAphiaID/{aphia_id}')
78 if worms_tree_res.status_code == 200:
79 self.phylogeny[scientific_name] = flatten_taxa_tree(worms_tree_res.json(), {})
80 self.phylogeny[scientific_name]['aphia_id'] = aphia_id
81 else:
82 worms_name_res = requests.get(url=f'{WORMS_REST_URL}/AphiaRecordsByName/{scientific_name}?like=false&marine_only=true&offset=1')
83 if worms_name_res.status_code == 200 and len(worms_name_res.json()) > 0:
84 # just take the first accepted record
85 for record in worms_name_res.json():
86 if record['status'] == 'accepted':
87 worms_tree_res_2 = requests.get(url=f'{WORMS_REST_URL}/AphiaClassificationByAphiaID/{record["AphiaID"]}')
88 if worms_tree_res_2.status_code == 200:
89 self.phylogeny[scientific_name] = flatten_taxa_tree(worms_tree_res_2.json(), {})
90 self.phylogeny[scientific_name]['aphia_id'] = record['AphiaID']
91 break
92 else:
93 print(f'{TERM_RED}No accepted record found for concept name "{scientific_name}"{TERM_NORMAL}')
94 return False
95 return True
97 def fetch_localizations(self):
98 print('Fetching localizations...')
99 sys.stdout.flush()
100 for section in self.sections:
101 # REST is much faster than Python API for large queries
102 res = requests.get(
103 url=f'{self.tator_url}/rest/Localizations/{self.project_id}?section={section.section_id}',
104 headers={
105 'Content-Type': 'application/json',
106 'Authorization': f'Token {session["tator_token"]}',
107 })
108 section.localizations = res.json()
109 print(f'Fetched {len(section.localizations)} localizations for deployment {section.deployment_name}')
111 def process_records(
112 self,
113 no_match_records: set = None,
114 get_timestamp: bool = False,
115 get_ctd: bool = False,
116 get_substrates: bool = False,
117 ):
118 print('Processing localizations...', end='')
119 sys.stdout.flush()
120 formatted_localizations = []
121 expedition_fieldbook = {} # {section_id: deployments[]}
122 media_substrates = {} # {media_id: substrates}
124 if not no_match_records:
125 no_match_records = set()
127 for section in self.sections:
128 for localization in section.localizations:
129 if localization['type'] not in [TatorLocalizationType.BOX.value, TatorLocalizationType.DOT.value]:
130 continue # we only care about boxes and dots
131 scientific_name = localization['attributes'].get('Scientific Name')
132 cached_phylogeny = self.phylogeny.get(scientific_name)
133 if (cached_phylogeny is None or 'aphia_id' not in cached_phylogeny.keys())\
134 and scientific_name not in no_match_records:
135 if not self.fetch_worms_phylogeny(scientific_name):
136 no_match_records.add(scientific_name)
137 localization_dict = {
138 'elemental_id': localization['elemental_id'],
139 'section_id': section.section_id,
140 'all_localizations': {
141 'id': localization['id'],
142 'elemental_id': localization['elemental_id'],
143 'version': localization['version'],
144 'type': localization['type'],
145 'points': [round(localization['x'], 5), round(localization['y'], 5)],
146 'dimensions': [localization['width'], localization['height']] if localization['type'] == TatorLocalizationType.BOX.value else None,
147 },
148 'type': localization['type'],
149 'video_sequence_name': section.deployment_name,
150 'scientific_name': scientific_name,
151 'count': 0 if localization['type'] == TatorLocalizationType.BOX.value else 1,
152 'attracted': localization['attributes'].get('Attracted'),
153 'categorical_abundance': localization['attributes'].get('Categorical Abundance'),
154 'identification_remarks': localization['attributes'].get('IdentificationRemarks'),
155 'identified_by': localization['attributes'].get('Identified By'),
156 'notes': localization['attributes'].get('Notes'),
157 'qualifier': localization['attributes'].get('Qualifier'),
158 'reason': localization['attributes'].get('Reason'),
159 'morphospecies': localization['attributes'].get('Morphospecies'),
160 'tentative_id': localization['attributes'].get('Tentative ID'),
161 'good_image': True if localization['attributes'].get('Good Image') else False,
162 'annotator': KNOWN_ANNOTATORS[localization['created_by']] if localization['created_by'] in KNOWN_ANNOTATORS.keys() else f'Unknown Annotator (#{localization["created_by"]})',
163 'frame': localization['frame'],
164 'frame_url': f'/tator/frame/{localization["media"]}/{localization["frame"]}',
165 'media_id': localization['media'],
166 'problems': localization['problems'] if 'problems' in localization.keys() else None,
167 'do_temp_c': localization['attributes'].get('DO Temperature (celsius)'),
168 'do_concentration_salin_comp_mol_L': localization['attributes'].get('DO Concentration Salin Comp (mol per L)'),
169 'depth_m': localization['attributes'].get('Depth'),
170 }
171 if localization_dict['categorical_abundance'] and localization_dict['categorical_abundance'] != '--':
172 match localization_dict['categorical_abundance']:
173 case '1-19':
174 localization_dict['count'] = 10
175 case '20-49':
176 localization_dict['count'] = 35
177 case '50-99':
178 localization_dict['count'] = 75
179 case '100-999':
180 localization_dict['count'] = 500
181 case '1000+':
182 localization_dict['count'] = 1000
183 case _:
184 print(f'{TERM_RED}Unknown categorical abundance: {localization_dict["categorical_abundance"]}{TERM_NORMAL}')
185 if get_timestamp:
186 if localization['media'] in session['media_timestamps'].keys():
187 camera_bottom_arrival = datetime.datetime.strptime(section.bottom_time, self.BOTTOM_TIME_FORMAT).replace(tzinfo=datetime.timezone.utc)
188 video_start_timestamp = datetime.datetime.fromisoformat(session['media_timestamps'][localization['media']])
189 observation_timestamp = video_start_timestamp + datetime.timedelta(seconds=localization['frame'] / 30)
190 time_diff = observation_timestamp - camera_bottom_arrival
191 localization_dict['timestamp'] = observation_timestamp.strftime(self.BOTTOM_TIME_FORMAT)
192 localization_dict['camera_seafloor_arrival'] = camera_bottom_arrival.strftime(self.BOTTOM_TIME_FORMAT)
193 localization_dict['animal_arrival'] = str(datetime.timedelta(
194 days=time_diff.days,
195 seconds=time_diff.seconds
196 )) if observation_timestamp > camera_bottom_arrival else '00:00:00'
197 if get_ctd:
198 if not expedition_fieldbook.get(section.section_id):
199 fieldbook_res = requests.get(
200 url=f'{self.darc_review_url}/dropcam-fieldbook/{section.section_id}',
201 headers={'API-Key': os.environ.get('DARC_REVIEW_API_KEY')},
202 )
203 if fieldbook_res.status_code == 200:
204 expedition_fieldbook[section.section_id] = fieldbook_res.json()['deployments']
205 else:
206 print(f'{TERM_RED}Error fetching expedition fieldbook.{TERM_NORMAL}')
207 print(fieldbook_res.text)
208 deployment_name = section.deployment_name.replace('-', '_') # for DOEX0087_NIU-dscm-02
209 deployment_ctd = next((x for x in expedition_fieldbook[section.section_id] if x['deployment_name'] == deployment_name), None)
210 if deployment_ctd:
211 localization_dict['lat'] = deployment_ctd['lat']
212 localization_dict['long'] = deployment_ctd['long']
213 localization_dict['bait_type'] = deployment_ctd['bait_type']
214 localization_dict['depth_m'] = localization_dict['depth_m'] or deployment_ctd['depth_m']
215 if get_substrates:
216 media_id = localization['media']
217 if not media_substrates.get(media_id):
218 media_substrates[media_id] = self.api.get_media(media_id).attributes
219 localization_dict['primary_substrate'] = media_substrates[media_id].get('Primary Substrate')
220 localization_dict['secondary_substrate'] = media_substrates[media_id].get('Secondary Substrate')
221 localization_dict['bedforms'] = media_substrates[media_id].get('Bedforms')
222 localization_dict['relief'] = media_substrates[media_id].get('Relief')
223 localization_dict['substrate_notes'] = media_substrates[media_id].get('Substrate Notes')
224 localization_dict['deployment_notes'] = media_substrates[media_id].get('Deployment Notes')
225 if scientific_name in self.phylogeny.keys():
226 for key in self.phylogeny[scientific_name].keys():
227 # split to account for worms 'Phylum (Division)' case
228 localization_dict[key.split(' ')[0]] = self.phylogeny[scientific_name][key]
229 formatted_localizations.append(localization_dict)
231 if not formatted_localizations:
232 print('no records to process!')
233 return
235 localization_df = pd.DataFrame(formatted_localizations, columns=[
236 'elemental_id',
237 'section_id',
238 'timestamp',
239 'camera_seafloor_arrival',
240 'animal_arrival',
241 'all_localizations',
242 'type',
243 'video_sequence_name',
244 'scientific_name',
245 'count',
246 'attracted',
247 'categorical_abundance',
248 'identification_remarks',
249 'identified_by',
250 'notes',
251 'qualifier',
252 'morphospecies',
253 'reason',
254 'tentative_id',
255 'good_image',
256 'annotator',
257 'frame',
258 'frame_url',
259 'media_id',
260 'problems',
261 'lat',
262 'long',
263 'depth_m',
264 'do_temp_c',
265 'do_concentration_salin_comp_mol_L',
266 'bait_type',
267 'primary_substrate',
268 'secondary_substrate',
269 'bedforms',
270 'relief',
271 'substrate_notes',
272 'deployment_notes',
273 'phylum',
274 'class',
275 'subclass',
276 'order',
277 'suborder',
278 'family',
279 'subfamily',
280 'genus',
281 'subgenus',
282 'species',
283 'subspecies',
284 'aphia_id',
285 ])
287 def collect_localizations(items):
288 return [item for item in items]
290 def first_if_all_same(series):
291 return series.iloc[0] if len(series.unique()) == 1 else f'Non-uniform values across dots: {series.unique()}'.replace("'", '"')
293 localization_df = localization_df.groupby([
294 'media_id',
295 'frame',
296 'scientific_name',
297 'tentative_id',
298 'morphospecies',
299 'type',
300 ], dropna=False).agg({
301 'elemental_id': 'first',
302 'section_id': 'first',
303 'timestamp': 'first',
304 'camera_seafloor_arrival': 'first',
305 'animal_arrival': 'first',
306 'all_localizations': collect_localizations,
307 'count': 'sum',
308 'attracted': first_if_all_same,
309 'categorical_abundance': first_if_all_same,
310 'identification_remarks': first_if_all_same,
311 'identified_by': first_if_all_same,
312 'notes': first_if_all_same,
313 'qualifier': first_if_all_same,
314 'reason': first_if_all_same,
315 'good_image': 'first',
316 'video_sequence_name': 'first',
317 'annotator': 'first',
318 'frame_url': 'first',
319 'problems': 'first',
320 'lat': 'first',
321 'long': 'first',
322 'depth_m': 'first',
323 'do_temp_c': 'first',
324 'do_concentration_salin_comp_mol_L': 'first',
325 'bait_type': 'first',
326 'primary_substrate': 'first',
327 'secondary_substrate': 'first',
328 'bedforms': 'first',
329 'relief': 'first',
330 'substrate_notes': 'first',
331 'deployment_notes': 'first',
332 'phylum': 'first',
333 'class': 'first',
334 'subclass': 'first',
335 'order': 'first',
336 'suborder': 'first',
337 'family': 'first',
338 'subfamily': 'first',
339 'genus': 'first',
340 'subgenus': 'first',
341 'species': 'first',
342 'subspecies': 'first',
343 'aphia_id': 'first',
344 }).reset_index()
346 localization_df = localization_df.sort_values(by=[
347 'phylum',
348 'class',
349 'subclass',
350 'order',
351 'suborder',
352 'family',
353 'subfamily',
354 'genus',
355 'species',
356 'scientific_name',
357 'tentative_id',
358 'media_id',
359 'frame',
360 ])
362 def is_populated(val):
363 if isinstance(val, (list, pd.Series)):
364 return pd.notnull(val).all()
365 return pd.notnull(val)
367 for index, row in localization_df.iterrows():
368 record = {
369 'observation_uuid': row['elemental_id'],
370 'timestamp': row['timestamp'],
371 'camera_seafloor_arrival': row['camera_seafloor_arrival'],
372 'animal_arrival': row['animal_arrival'],
373 'all_localizations': row['all_localizations'],
374 'media_id': row['media_id'],
375 'frame': row['frame'],
376 'frame_url': row['frame_url'],
377 'annotator': row['annotator'],
378 'type': row['type'],
379 'scientific_name': row['scientific_name'] if row['scientific_name'] != '' else '--',
380 'section_id': row['section_id'],
381 'video_sequence_name': row['video_sequence_name'],
382 'count': row['count'],
383 'attracted': row['attracted'],
384 'categorical_abundance': row['categorical_abundance'],
385 'identification_remarks': row['identification_remarks'],
386 'identified_by': row['identified_by'],
387 'notes': row['notes'],
388 'qualifier': row['qualifier'],
389 'reason': row['reason'],
390 'tentative_id': row['tentative_id'],
391 'morphospecies': row['morphospecies'],
392 'good_image': row['good_image'],
393 'problems': row['problems'],
394 'lat': row['lat'],
395 'long': row['long'],
396 'depth_m': row['depth_m'],
397 'do_temp_c': row['do_temp_c'],
398 'do_concentration_salin_comp_mol_L': row['do_concentration_salin_comp_mol_L'],
399 'bait_type': row['bait_type'],
400 'primary_substrate': row['primary_substrate'],
401 'secondary_substrate': row['secondary_substrate'],
402 'bedforms': row['bedforms'],
403 'relief': row['relief'],
404 'substrate_notes': row['substrate_notes'],
405 'deployment_notes': row['deployment_notes'],
406 'phylum': row['phylum'],
407 'class': row['class'],
408 'subclass': row['subclass'],
409 'order': row['order'],
410 'suborder': row['suborder'],
411 'family': row['family'],
412 'subfamily': row['subfamily'],
413 'genus': row['genus'],
414 'subgenus': row['subgenus'],
415 'species': row['species'],
416 'subspecies': row['subspecies'],
417 'aphia_id': row['aphia_id'],
418 }
419 self.final_records.append({key: val for key, val in record.items() if is_populated(val)})
420 self.save_phylogeny()
421 print('processed!')