Coverage for application/image_review/tator/tator_localization_processor.py: 12%
160 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-23 02:22 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-23 02:22 +0000
1import datetime
2import json
3import os
4import pandas as pd
5import requests
6import sys
7import tator
9from flask import session
10from application.util.constants import KNOWN_ANNOTATORS, TERM_RED, TERM_NORMAL
11from application.util.tator_localization_type import TatorLocalizationType
12from application.util.functions import flatten_taxa_tree
14WORMS_REST_URL = 'https://www.marinespecies.org/rest'
17class TatorLocalizationProcessor:
18 """
19 Fetches all localization information for a given project/section/deployment list from Tator. Processes
20 and sorts data for display on the image review pages.
21 """
23 def __init__(
24 self,
25 project_id: int,
26 section_id: int,
27 api: tator.api,
28 deployment_list: list,
29 tator_url: str,
30 darc_review_url: str = None,
31 ):
32 self.project_id = project_id
33 self.section_id = section_id
34 self.api = api
35 self.deployments = deployment_list
36 self.tator_url = tator_url
37 self.darc_review_url = darc_review_url
38 self.localizations = [] # list of raw localizations from tator
39 self.final_records = [] # final list formatted for review page
40 self.deployment_media_dict = {} # dict of all relevant media ids and their dep names
41 self.bottom_times = {deployment: '' for deployment in deployment_list}
42 self.phylogeny = {}
43 self.section_name = self.api.get_section(section_id).name
45 def load_phylogeny(self):
46 try:
47 with open(os.path.join('cache', 'phylogeny.json'), 'r') as f:
48 self.phylogeny = json.load(f)
49 except FileNotFoundError:
50 self.phylogeny = {}
52 def save_phylogeny(self):
53 try:
54 with open(os.path.join('cache', 'phylogeny.json'), 'w') as f:
55 json.dump(self.phylogeny, f, indent=2)
56 except FileNotFoundError:
57 os.makedirs('cache')
58 with open(os.path.join('cache', 'phylogeny.json'), 'w') as f:
59 json.dump(self.phylogeny, f, indent=2)
61 def fetch_worms_phylogeny(self, scientific_name: str) -> bool:
62 """
63 Fetches the phylogeny of a given scientific name from WoRMS. Returns True if successful, False otherwise.
64 """
65 print(f'Fetching phylogeny for "{scientific_name}"')
66 worms_id_res = requests.get(url=f'{WORMS_REST_URL}/AphiaIDByName/{scientific_name}?marine_only=true')
67 if worms_id_res.status_code == 200 and worms_id_res.json() != -999: # -999 means more than one matching record
68 aphia_id = worms_id_res.json()
69 worms_tree_res = requests.get(url=f'{WORMS_REST_URL}/AphiaClassificationByAphiaID/{aphia_id}')
70 if worms_tree_res.status_code == 200:
71 self.phylogeny[scientific_name] = flatten_taxa_tree(worms_tree_res.json(), {})
72 self.phylogeny[scientific_name]['aphia_id'] = aphia_id
73 else:
74 worms_name_res = requests.get(url=f'{WORMS_REST_URL}/AphiaRecordsByName/{scientific_name}?like=false&marine_only=true&offset=1')
75 if worms_name_res.status_code == 200 and len(worms_name_res.json()) > 0:
76 # just take the first accepted record
77 for record in worms_name_res.json():
78 if record['status'] == 'accepted':
79 worms_tree_res_2 = requests.get(url=f'{WORMS_REST_URL}/AphiaClassificationByAphiaID/{record["AphiaID"]}')
80 if worms_tree_res_2.status_code == 200:
81 self.phylogeny[scientific_name] = flatten_taxa_tree(worms_tree_res_2.json(), {})
82 self.phylogeny[scientific_name]['aphia_id'] = record['AphiaID']
83 break
84 else:
85 print(f'{TERM_RED}No accepted record found for concept name "{scientific_name}"{TERM_NORMAL}')
86 return False
87 return True
89 def fetch_localizations(self):
90 print('Fetching localizations...', end='')
91 sys.stdout.flush()
92 media_ids = []
93 for deployment in self.deployments:
94 for media_id in session[f'{self.project_id}_{self.section_id}'][deployment]:
95 self.deployment_media_dict[media_id] = deployment
96 media_ids += session[f'{self.project_id}_{self.section_id}'][deployment]
97 # REST is much faster than Python API for large queries
98 # adding too many media ids results in a query that is too long, so we have to break it up
99 for i in range(0, len(media_ids), 300):
100 chunk = media_ids[i:i + 300]
101 res = requests.get(
102 url=f'{self.tator_url}/rest/Localizations/{self.project_id}?media_id={",".join(map(str, chunk))}',
103 headers={
104 'Content-Type': 'application/json',
105 'Authorization': f'Token {session["tator_token"]}',
106 })
107 self.localizations += res.json()
108 print(f'fetched {len(self.localizations)} localizations!')
110 def process_records(
111 self,
112 no_match_records: set = None,
113 get_timestamp: bool = False,
114 get_ctd: bool = False,
115 get_substrates: bool = False,
116 ):
117 print('Processing localizations...', end='')
118 sys.stdout.flush()
119 formatted_localizations = []
120 expedition_fieldbook = []
121 deployment_substrates = {}
123 if not no_match_records:
124 no_match_records = set()
126 if get_ctd:
127 fieldbook = requests.get(
128 url=f'{self.darc_review_url}/dropcam-fieldbook/{self.section_id}',
129 headers={'API-Key': os.environ.get('DARC_REVIEW_API_KEY')},
130 )
131 if fieldbook.status_code == 200:
132 expedition_fieldbook = fieldbook.json()['deployments']
133 else:
134 print(f'{TERM_RED}Error fetching expedition fieldbook.{TERM_NORMAL}')
136 if get_substrates:
137 for deployment in self.deployments:
138 deployment_substrates[deployment] = self.api.get_media_list(
139 project=self.project_id,
140 section=self.section_id,
141 attribute_contains=[f'$name::{deployment}'],
142 stop=1,
143 )[0].attributes
145 for localization in self.localizations:
146 if localization['type'] not in [TatorLocalizationType.BOX.value, TatorLocalizationType.DOT.value]:
147 continue # we only care about boxes and dots
148 scientific_name = localization['attributes'].get('Scientific Name')
149 cached_phylogeny = self.phylogeny.get(scientific_name)
150 if (cached_phylogeny is None or 'aphia_id' not in cached_phylogeny.keys())\
151 and scientific_name not in no_match_records:
152 if not self.fetch_worms_phylogeny(scientific_name):
153 no_match_records.add(scientific_name)
154 localization_dict = {
155 'elemental_id': localization['elemental_id'],
156 'all_localizations': {
157 'id': localization['id'],
158 'elemental_id': localization['elemental_id'],
159 'version': localization['version'],
160 'type': localization['type'],
161 'points': [round(localization['x'], 5), round(localization['y'], 5)],
162 'dimensions': [localization['width'], localization['height']]
163 if localization['type'] == TatorLocalizationType.BOX.value
164 else None,
165 },
166 'type': localization['type'],
167 'video_sequence_name': self.deployment_media_dict[localization['media']],
168 'scientific_name': scientific_name,
169 'count': 0 if localization['type'] == TatorLocalizationType.BOX.value else 1,
170 'attracted': localization['attributes'].get('Attracted'),
171 'categorical_abundance': localization['attributes'].get('Categorical Abundance'),
172 'identification_remarks': localization['attributes'].get('IdentificationRemarks'),
173 'identified_by': localization['attributes'].get('Identified By'),
174 'notes': localization['attributes'].get('Notes'),
175 'qualifier': localization['attributes'].get('Qualifier'),
176 'reason': localization['attributes'].get('Reason'),
177 'morphospecies': localization['attributes'].get('Morphospecies'),
178 'tentative_id': localization['attributes'].get('Tentative ID'),
179 'good_image': True if localization['attributes'].get('Good Image') else False,
180 'annotator': KNOWN_ANNOTATORS[localization['created_by']] if localization['created_by'] in KNOWN_ANNOTATORS.keys() else f'Unknown Annotator (#{localization["created_by"]})',
181 'frame': localization['frame'],
182 'frame_url': f'/tator/frame/{localization["media"]}/{localization["frame"]}',
183 'media_id': localization['media'],
184 'problems': localization['problems'] if 'problems' in localization.keys() else None,
185 'do_temp_c': localization['attributes'].get('DO Temperature (celsius)'),
186 'do_concentration_salin_comp_mol_L': localization['attributes'].get('DO Concentration Salin Comp (mol per L)'),
187 'depth_m': localization['attributes'].get('Depth'),
188 }
189 if localization_dict['categorical_abundance'] and localization_dict['categorical_abundance'] != '--':
190 match localization_dict['categorical_abundance']:
191 case '1-19':
192 localization_dict['count'] = 10
193 case '20-49':
194 localization_dict['count'] = 35
195 case '50-99':
196 localization_dict['count'] = 75
197 case '100-999':
198 localization_dict['count'] = 500
199 case '1000+':
200 localization_dict['count'] = 1000
201 case _:
202 print(f'{TERM_RED}Unknown categorical abundance: {localization_dict["categorical_abundance"]}{TERM_NORMAL}')
203 if get_timestamp:
204 if localization['media'] in session['media_timestamps'].keys():
205 camera_bottom_arrival = datetime.datetime.strptime(
206 self.bottom_times[self.deployment_media_dict[localization['media']]],
207 '%Y-%m-%d %H:%M:%SZ'
208 ).replace(tzinfo=datetime.timezone.utc)
209 video_start_timestamp = datetime.datetime.fromisoformat(session['media_timestamps'][localization['media']])
210 observation_timestamp = video_start_timestamp + datetime.timedelta(seconds=localization['frame'] / 30)
211 time_diff = observation_timestamp - camera_bottom_arrival
212 localization_dict['timestamp'] = observation_timestamp.strftime('%Y-%m-%d %H:%M:%SZ')
213 localization_dict['camera_seafloor_arrival'] = camera_bottom_arrival.strftime('%Y-%m-%d %H:%M:%SZ')
214 localization_dict['animal_arrival'] = str(datetime.timedelta(
215 days=time_diff.days,
216 seconds=time_diff.seconds
217 )) if observation_timestamp > camera_bottom_arrival else '00:00:00'
218 if get_ctd and expedition_fieldbook:
219 deployment_name = self.deployment_media_dict[localization['media']]
220 deployment_name = deployment_name.replace('-', '_') # for DOEX0087_NIU-dscm-02
221 deployment_ctd = next((x for x in expedition_fieldbook if x['deployment_name'] == deployment_name.replace('-', '_')), None)
222 if deployment_ctd:
223 localization_dict['lat'] = deployment_ctd['lat']
224 localization_dict['long'] = deployment_ctd['long']
225 localization_dict['bait_type'] = deployment_ctd['bait_type']
226 if get_substrates and deployment_substrates:
227 localization_dict['primary_substrate'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Primary Substrate')
228 localization_dict['secondary_substrate'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Secondary Substrate')
229 localization_dict['bedforms'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Bedforms')
230 localization_dict['relief'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Relief')
231 localization_dict['substrate_notes'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Substrate Notes')
232 localization_dict['deployment_notes'] = deployment_substrates[self.deployment_media_dict[localization['media']]].get('Deployment Notes')
233 if scientific_name in self.phylogeny.keys():
234 for key in self.phylogeny[scientific_name].keys():
235 # split to account for worms 'Phylum (Division)' case
236 localization_dict[key.split(' ')[0]] = self.phylogeny[scientific_name][key]
237 formatted_localizations.append(localization_dict)
239 if not formatted_localizations:
240 print('no records to process!')
241 return
243 localization_df = pd.DataFrame(formatted_localizations, columns=[
244 'elemental_id',
245 'timestamp',
246 'camera_seafloor_arrival',
247 'animal_arrival',
248 'all_localizations',
249 'type',
250 'video_sequence_name',
251 'scientific_name',
252 'count',
253 'attracted',
254 'categorical_abundance',
255 'identification_remarks',
256 'identified_by',
257 'notes',
258 'qualifier',
259 'morphospecies',
260 'reason',
261 'tentative_id',
262 'good_image',
263 'annotator',
264 'frame',
265 'frame_url',
266 'media_id',
267 'problems',
268 'lat',
269 'long',
270 'depth_m',
271 'do_temp_c',
272 'do_concentration_salin_comp_mol_L',
273 'bait_type',
274 'primary_substrate',
275 'secondary_substrate',
276 'bedforms',
277 'relief',
278 'substrate_notes',
279 'deployment_notes',
280 'phylum',
281 'class',
282 'subclass',
283 'order',
284 'suborder',
285 'family',
286 'subfamily',
287 'genus',
288 'subgenus',
289 'species',
290 'subspecies',
291 'aphia_id',
292 ])
294 def collect_localizations(items):
295 return [item for item in items]
297 def first_if_all_same(series):
298 return series.iloc[0] if len(series.unique()) == 1 else f'Non-uniform values across dots: {series.unique()}'.replace("'", '"')
300 localization_df = localization_df.groupby([
301 'media_id',
302 'frame',
303 'scientific_name',
304 'tentative_id',
305 'morphospecies',
306 'type',
307 ], dropna=False).agg({
308 'elemental_id': 'first',
309 'timestamp': 'first',
310 'camera_seafloor_arrival': 'first',
311 'animal_arrival': 'first',
312 'all_localizations': collect_localizations,
313 'count': 'sum',
314 'attracted': first_if_all_same,
315 'categorical_abundance': first_if_all_same,
316 'identification_remarks': first_if_all_same,
317 'identified_by': first_if_all_same,
318 'notes': first_if_all_same,
319 'qualifier': first_if_all_same,
320 'reason': first_if_all_same,
321 'good_image': 'first',
322 'video_sequence_name': 'first',
323 'annotator': 'first',
324 'frame_url': 'first',
325 'problems': 'first',
326 'lat': 'first',
327 'long': 'first',
328 'depth_m': 'first',
329 'do_temp_c': 'first',
330 'do_concentration_salin_comp_mol_L': 'first',
331 'bait_type': 'first',
332 'primary_substrate': 'first',
333 'secondary_substrate': 'first',
334 'bedforms': 'first',
335 'relief': 'first',
336 'substrate_notes': 'first',
337 'deployment_notes': 'first',
338 'phylum': 'first',
339 'class': 'first',
340 'subclass': 'first',
341 'order': 'first',
342 'suborder': 'first',
343 'family': 'first',
344 'subfamily': 'first',
345 'genus': 'first',
346 'subgenus': 'first',
347 'species': 'first',
348 'subspecies': 'first',
349 'aphia_id': 'first',
350 }).reset_index()
352 localization_df = localization_df.sort_values(by=[
353 'phylum',
354 'class',
355 'subclass',
356 'order',
357 'suborder',
358 'family',
359 'subfamily',
360 'genus',
361 'species',
362 'scientific_name',
363 'tentative_id',
364 'media_id',
365 'frame',
366 ])
368 def is_populated(val):
369 if isinstance(val, (list, pd.Series)):
370 return pd.notnull(val).all()
371 return pd.notnull(val)
373 for index, row in localization_df.iterrows():
374 record = {
375 'observation_uuid': row['elemental_id'],
376 'timestamp': row['timestamp'],
377 'camera_seafloor_arrival': row['camera_seafloor_arrival'],
378 'animal_arrival': row['animal_arrival'],
379 'all_localizations': row['all_localizations'],
380 'media_id': row['media_id'],
381 'frame': row['frame'],
382 'frame_url': row['frame_url'],
383 'annotator': row['annotator'],
384 'type': row['type'],
385 'scientific_name': row['scientific_name'] if row['scientific_name'] != '' else '--',
386 'section_id': self.section_id,
387 'video_sequence_name': row['video_sequence_name'],
388 'count': row['count'],
389 'attracted': row['attracted'],
390 'categorical_abundance': row['categorical_abundance'],
391 'identification_remarks': row['identification_remarks'],
392 'identified_by': row['identified_by'],
393 'notes': row['notes'],
394 'qualifier': row['qualifier'],
395 'reason': row['reason'],
396 'tentative_id': row['tentative_id'],
397 'morphospecies': row['morphospecies'],
398 'good_image': row['good_image'],
399 'problems': row['problems'],
400 'lat': row['lat'],
401 'long': row['long'],
402 'depth_m': row['depth_m'],
403 'do_temp_c': row['do_temp_c'],
404 'do_concentration_salin_comp_mol_L': row['do_concentration_salin_comp_mol_L'],
405 'bait_type': row['bait_type'],
406 'primary_substrate': row['primary_substrate'],
407 'secondary_substrate': row['secondary_substrate'],
408 'bedforms': row['bedforms'],
409 'relief': row['relief'],
410 'substrate_notes': row['substrate_notes'],
411 'deployment_notes': row['deployment_notes'],
412 'phylum': row['phylum'],
413 'class': row['class'],
414 'subclass': row['subclass'],
415 'order': row['order'],
416 'suborder': row['suborder'],
417 'family': row['family'],
418 'subfamily': row['subfamily'],
419 'genus': row['genus'],
420 'subgenus': row['subgenus'],
421 'species': row['species'],
422 'subspecies': row['subspecies'],
423 'aphia_id': row['aphia_id'],
424 }
425 self.final_records.append({key: val for key, val in record.items() if is_populated(val)})
426 self.save_phylogeny()
427 print('processed!')