Coverage for application / vars / vars_annotation_processor.py: 98%
107 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 05:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 05:22 +0000
1import datetime
3import pandas as pd
4import requests
5import sys
7from application.util.constants import TERM_YELLOW, TERM_NORMAL
8from application.util.functions import format_annotator, parse_datetime
9from application.util.phylogeny_cache import PhylogenyCache
12class VarsAnnotationProcessor:
13 """
14 Fetches annotation information from the VARS db on HURLSTOR given a list of sequences. Cleans, formats, and sorts
15 the annotation data for display on the image review pages.
16 """
18 def __init__(self, sequence_names: list, vars_charybdis_url: str, vars_kb_url: str):
19 self.sequence_names = sequence_names
20 self.vars_charybdis_url = vars_charybdis_url
21 self.vars_kb_url = vars_kb_url
22 self.phylogeny = PhylogenyCache()
23 self.videos = []
24 self.working_records = [] # all the annotations that have images
25 self.final_records = [] # the final list of annotations
26 self.highest_id_ref = 0
27 temp_name = sequence_names[0].split()
28 temp_name.pop()
29 self.vessel_name = ' '.join(temp_name)
31 def process_sequences(self):
32 for name in self.sequence_names:
33 print(f'Fetching annotations for sequence {name} from VARS...', end='')
34 sys.stdout.flush()
35 self.working_records = self.fetch_media_and_annotations(name, images_only=True)
36 print('fetched!')
37 print('Processing annotations...', end='')
38 sys.stdout.flush()
39 self.sort_records(self.process_working_records())
40 print('done!')
41 self.phylogeny.save()
43 def fetch_media_and_annotations(self, sequence_name: str, images_only: bool):
44 """
45 Fetches all annotations that have images and all video uris/start times from VARS.
46 """
47 response = requests.get(url=f'{self.vars_charybdis_url}/query/dive/{sequence_name.replace(" ", "%20")}').json()
49 # get list of video links and start timestamps
50 for video in response['media']:
51 if 'urn:imagecollection:org' not in video['uri']:
52 self.videos.append({
53 'start_timestamp': parse_datetime(video['start_timestamp']),
54 'uri': video['uri'].replace('http://hurlstor.soest.hawaii.edu/videoarchive', 'https://hurlvideo.soest.hawaii.edu'),
55 'sequence_name': video['video_sequence_name'],
56 'video_reference_uuid': video['video_reference_uuid'],
57 'duration_millis': video['duration_millis'],
58 })
60 self.videos.sort(key=lambda x: x['start_timestamp'])
62 if not images_only:
63 return response['annotations'] # return all annotations
65 # only return annotations that have images
66 annotations_with_images = []
67 for annotation in response['annotations']:
68 concept_name = annotation['concept']
69 if annotation['image_references'] and concept_name[0].isupper():
70 annotations_with_images.append(annotation)
71 return annotations_with_images
73 @staticmethod
74 def get_image_url(annotation: dict) -> str:
75 """
76 Gets the correct image url from the given annotation record. Preferentially selects a png image if available
77 (higher quality).
78 """
79 if len(annotation['image_references']) == 0:
80 return ''
81 image_url = annotation['image_references'][0]['url']
82 for i in range(1, len(annotation['image_references'])):
83 if '.png' in annotation['image_references'][i]['url']:
84 image_url = annotation['image_references'][i]['url']
85 break
86 return image_url.replace('http://hurlstor.soest.hawaii.edu/imagearchive', 'https://hurlimage.soest.hawaii.edu')
88 def get_video(self, annotation: dict) -> dict:
89 """
90 Gets the video url and sequence name for the given annotation record. Selects the video from the list of
91 sequence videos that contains the annotation and adds offset to the video url.
92 """
93 if 'recorded_timestamp' not in annotation.keys():
94 return {}
95 # we use timestamps instead of annotation['video_reference_uuid'] to match videos because some video_reference_uuids are incorrect
96 timestamp = parse_datetime(annotation['recorded_timestamp'])
97 if len(self.videos) == 0:
98 return {}
99 matching_video = self.videos[0] # default to first video (videos are sorted by timestamp)
100 for video in self.videos:
101 if video['start_timestamp'] > timestamp:
102 break
103 matching_video = video
104 time_diff = timestamp - matching_video['start_timestamp']
105 # if the annotation timestamp is before the start of the video or after the end of the video, return empty dict (no video)
106 if time_diff.total_seconds() < 0 or time_diff.total_seconds() * 1000 > matching_video['duration_millis']:
107 first_video_start_time = self.videos[0]['start_timestamp']
108 last_video_end_time = self.videos[-1]['start_timestamp'] + datetime.timedelta(milliseconds=self.videos[-1]['duration_millis'])
109 print(f'{TERM_YELLOW}WARNING: Unable to find video for annotation {annotation["observation_uuid"]} (concept name "{annotation["concept"]}"){TERM_NORMAL}')
110 print(f'Annotation timestamp is {annotation["recorded_timestamp"]}, but videos start at '
111 f'{first_video_start_time.isoformat()}Z and end at {last_video_end_time.isoformat()}Z')
112 return {}
113 return {
114 'uri': f'{matching_video["uri"]}#t={int(time_diff.total_seconds())}',
115 'sequence_name': matching_video['sequence_name'],
116 }
118 def process_working_records(self):
119 """
120 Cleans and formats the working records into a list of dicts.
121 """
122 formatted_records = []
123 no_match_records = set()
125 for record in self.working_records:
126 concept_name = record['concept']
127 identity_reference = None
128 depth = None
130 if concept_name not in self.phylogeny.data and concept_name != 'none' and concept_name not in no_match_records:
131 self.phylogeny.fetch_vars(concept_name, self.vars_kb_url, no_match_records)
133 video = self.get_video(record)
135 if record.get('associations'):
136 for association in record['associations']:
137 if association['link_name'] == 'identity-reference':
138 identity_reference = association['link_value']
139 if int(identity_reference) > self.highest_id_ref:
140 self.highest_id_ref = int(identity_reference)
141 break
143 if record.get('ancillary_data'):
144 for key in record['ancillary_data'].keys():
145 if key == 'depth_meters':
146 depth = int(record['ancillary_data']['depth_meters'])
148 annotation_dict = {
149 'observation_uuid': record['observation_uuid'],
150 'concept': concept_name,
151 'associations': record['associations'],
152 'identity_reference': identity_reference,
153 'image_url': self.get_image_url(record),
154 'video_url': video.get('uri'),
155 'recorded_timestamp': record['recorded_timestamp'],
156 'video_sequence_name': video.get('sequence_name'),
157 'annotator': format_annotator(record['observer']),
158 'activity': record['activity'] if 'activity' in record.keys() else None,
159 'depth': depth,
160 }
162 if concept_name in self.phylogeny.data:
163 for key in self.phylogeny.data[concept_name].keys():
164 # split to account for worms 'Phylum (Division)' case
165 annotation_dict[key.split(' ')[0]] = self.phylogeny.data[concept_name][key]
166 formatted_records.append(annotation_dict)
167 return formatted_records
169 def sort_records(self, formatted_records: list):
170 """
171 Uses pandas to sort the formatted images by phylogeny and other attributes. Adds the sorted records to the
172 distilled records list.
173 """
174 annotation_df = pd.DataFrame(formatted_records, columns=[
175 'observation_uuid',
176 'concept',
177 'associations',
178 'identity_reference',
179 'image_url',
180 'video_url',
181 'recorded_timestamp',
182 'video_sequence_name',
183 'annotator',
184 'activity',
185 'depth',
186 'phylum',
187 'subphylum',
188 'superclass',
189 'class',
190 'subclass',
191 'superorder',
192 'order',
193 'suborder',
194 'infraorder',
195 'superfamily',
196 'family',
197 'subfamily',
198 'genus',
199 'species',
200 ])
202 annotation_df = annotation_df.sort_values(by=[
203 'phylum',
204 'subphylum',
205 'superclass',
206 'class',
207 'subclass',
208 'superorder',
209 'order',
210 'suborder',
211 'infraorder',
212 'superfamily',
213 'family',
214 'subfamily',
215 'genus',
216 'species',
217 'concept',
218 'identity_reference',
219 'recorded_timestamp',
220 ])
222 annotation_df = annotation_df.replace({float('nan'): None})
224 for index, row in annotation_df.iterrows():
225 self.final_records.append({
226 'observation_uuid': row['observation_uuid'],
227 'concept': row['concept'],
228 'associations': row['associations'],
229 'activity': row['activity'],
230 'annotator': row['annotator'],
231 'depth': row['depth'],
232 'phylum': row['phylum'],
233 'class': row['class'],
234 'order': row['order'],
235 'family': row['family'],
236 'genus': row['genus'],
237 'species': row['species'],
238 'identity_reference': row['identity_reference'],
239 'image_url': row['image_url'],
240 'video_url': row['video_url'],
241 'recorded_timestamp': parse_datetime(row['recorded_timestamp']).strftime('%d %b %y %H:%M:%S UTC'),
242 'video_sequence_name': row['video_sequence_name'],
243 })