Coverage for application / vars / vars_annotation_processor.py: 98%

107 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 05:22 +0000

1import datetime 

2 

3import pandas as pd 

4import requests 

5import sys 

6 

7from application.util.constants import TERM_YELLOW, TERM_NORMAL 

8from application.util.functions import format_annotator, parse_datetime 

9from application.util.phylogeny_cache import PhylogenyCache 

10 

11 

12class VarsAnnotationProcessor: 

13 """ 

14 Fetches annotation information from the VARS db on HURLSTOR given a list of sequences. Cleans, formats, and sorts 

15 the annotation data for display on the image review pages. 

16 """ 

17 

18 def __init__(self, sequence_names: list, vars_charybdis_url: str, vars_kb_url: str): 

19 self.sequence_names = sequence_names 

20 self.vars_charybdis_url = vars_charybdis_url 

21 self.vars_kb_url = vars_kb_url 

22 self.phylogeny = PhylogenyCache() 

23 self.videos = [] 

24 self.working_records = [] # all the annotations that have images 

25 self.final_records = [] # the final list of annotations 

26 self.highest_id_ref = 0 

27 temp_name = sequence_names[0].split() 

28 temp_name.pop() 

29 self.vessel_name = ' '.join(temp_name) 

30 

31 def process_sequences(self): 

32 for name in self.sequence_names: 

33 print(f'Fetching annotations for sequence {name} from VARS...', end='') 

34 sys.stdout.flush() 

35 self.working_records = self.fetch_media_and_annotations(name, images_only=True) 

36 print('fetched!') 

37 print('Processing annotations...', end='') 

38 sys.stdout.flush() 

39 self.sort_records(self.process_working_records()) 

40 print('done!') 

41 self.phylogeny.save() 

42 

43 def fetch_media_and_annotations(self, sequence_name: str, images_only: bool): 

44 """ 

45 Fetches all annotations that have images and all video uris/start times from VARS. 

46 """ 

47 response = requests.get(url=f'{self.vars_charybdis_url}/query/dive/{sequence_name.replace(" ", "%20")}').json() 

48 

49 # get list of video links and start timestamps 

50 for video in response['media']: 

51 if 'urn:imagecollection:org' not in video['uri']: 

52 self.videos.append({ 

53 'start_timestamp': parse_datetime(video['start_timestamp']), 

54 'uri': video['uri'].replace('http://hurlstor.soest.hawaii.edu/videoarchive', 'https://hurlvideo.soest.hawaii.edu'), 

55 'sequence_name': video['video_sequence_name'], 

56 'video_reference_uuid': video['video_reference_uuid'], 

57 'duration_millis': video['duration_millis'], 

58 }) 

59 

60 self.videos.sort(key=lambda x: x['start_timestamp']) 

61 

62 if not images_only: 

63 return response['annotations'] # return all annotations 

64 

65 # only return annotations that have images 

66 annotations_with_images = [] 

67 for annotation in response['annotations']: 

68 concept_name = annotation['concept'] 

69 if annotation['image_references'] and concept_name[0].isupper(): 

70 annotations_with_images.append(annotation) 

71 return annotations_with_images 

72 

73 @staticmethod 

74 def get_image_url(annotation: dict) -> str: 

75 """ 

76 Gets the correct image url from the given annotation record. Preferentially selects a png image if available 

77 (higher quality). 

78 """ 

79 if len(annotation['image_references']) == 0: 

80 return '' 

81 image_url = annotation['image_references'][0]['url'] 

82 for i in range(1, len(annotation['image_references'])): 

83 if '.png' in annotation['image_references'][i]['url']: 

84 image_url = annotation['image_references'][i]['url'] 

85 break 

86 return image_url.replace('http://hurlstor.soest.hawaii.edu/imagearchive', 'https://hurlimage.soest.hawaii.edu') 

87 

88 def get_video(self, annotation: dict) -> dict: 

89 """ 

90 Gets the video url and sequence name for the given annotation record. Selects the video from the list of 

91 sequence videos that contains the annotation and adds offset to the video url. 

92 """ 

93 if 'recorded_timestamp' not in annotation.keys(): 

94 return {} 

95 # we use timestamps instead of annotation['video_reference_uuid'] to match videos because some video_reference_uuids are incorrect 

96 timestamp = parse_datetime(annotation['recorded_timestamp']) 

97 if len(self.videos) == 0: 

98 return {} 

99 matching_video = self.videos[0] # default to first video (videos are sorted by timestamp) 

100 for video in self.videos: 

101 if video['start_timestamp'] > timestamp: 

102 break 

103 matching_video = video 

104 time_diff = timestamp - matching_video['start_timestamp'] 

105 # if the annotation timestamp is before the start of the video or after the end of the video, return empty dict (no video) 

106 if time_diff.total_seconds() < 0 or time_diff.total_seconds() * 1000 > matching_video['duration_millis']: 

107 first_video_start_time = self.videos[0]['start_timestamp'] 

108 last_video_end_time = self.videos[-1]['start_timestamp'] + datetime.timedelta(milliseconds=self.videos[-1]['duration_millis']) 

109 print(f'{TERM_YELLOW}WARNING: Unable to find video for annotation {annotation["observation_uuid"]} (concept name "{annotation["concept"]}"){TERM_NORMAL}') 

110 print(f'Annotation timestamp is {annotation["recorded_timestamp"]}, but videos start at ' 

111 f'{first_video_start_time.isoformat()}Z and end at {last_video_end_time.isoformat()}Z') 

112 return {} 

113 return { 

114 'uri': f'{matching_video["uri"]}#t={int(time_diff.total_seconds())}', 

115 'sequence_name': matching_video['sequence_name'], 

116 } 

117 

118 def process_working_records(self): 

119 """ 

120 Cleans and formats the working records into a list of dicts. 

121 """ 

122 formatted_records = [] 

123 no_match_records = set() 

124 

125 for record in self.working_records: 

126 concept_name = record['concept'] 

127 identity_reference = None 

128 depth = None 

129 

130 if concept_name not in self.phylogeny.data and concept_name != 'none' and concept_name not in no_match_records: 

131 self.phylogeny.fetch_vars(concept_name, self.vars_kb_url, no_match_records) 

132 

133 video = self.get_video(record) 

134 

135 if record.get('associations'): 

136 for association in record['associations']: 

137 if association['link_name'] == 'identity-reference': 

138 identity_reference = association['link_value'] 

139 if int(identity_reference) > self.highest_id_ref: 

140 self.highest_id_ref = int(identity_reference) 

141 break 

142 

143 if record.get('ancillary_data'): 

144 for key in record['ancillary_data'].keys(): 

145 if key == 'depth_meters': 

146 depth = int(record['ancillary_data']['depth_meters']) 

147 

148 annotation_dict = { 

149 'observation_uuid': record['observation_uuid'], 

150 'concept': concept_name, 

151 'associations': record['associations'], 

152 'identity_reference': identity_reference, 

153 'image_url': self.get_image_url(record), 

154 'video_url': video.get('uri'), 

155 'recorded_timestamp': record['recorded_timestamp'], 

156 'video_sequence_name': video.get('sequence_name'), 

157 'annotator': format_annotator(record['observer']), 

158 'activity': record['activity'] if 'activity' in record.keys() else None, 

159 'depth': depth, 

160 } 

161 

162 if concept_name in self.phylogeny.data: 

163 for key in self.phylogeny.data[concept_name].keys(): 

164 # split to account for worms 'Phylum (Division)' case 

165 annotation_dict[key.split(' ')[0]] = self.phylogeny.data[concept_name][key] 

166 formatted_records.append(annotation_dict) 

167 return formatted_records 

168 

169 def sort_records(self, formatted_records: list): 

170 """ 

171 Uses pandas to sort the formatted images by phylogeny and other attributes. Adds the sorted records to the 

172 distilled records list. 

173 """ 

174 annotation_df = pd.DataFrame(formatted_records, columns=[ 

175 'observation_uuid', 

176 'concept', 

177 'associations', 

178 'identity_reference', 

179 'image_url', 

180 'video_url', 

181 'recorded_timestamp', 

182 'video_sequence_name', 

183 'annotator', 

184 'activity', 

185 'depth', 

186 'phylum', 

187 'subphylum', 

188 'superclass', 

189 'class', 

190 'subclass', 

191 'superorder', 

192 'order', 

193 'suborder', 

194 'infraorder', 

195 'superfamily', 

196 'family', 

197 'subfamily', 

198 'genus', 

199 'species', 

200 ]) 

201 

202 annotation_df = annotation_df.sort_values(by=[ 

203 'phylum', 

204 'subphylum', 

205 'superclass', 

206 'class', 

207 'subclass', 

208 'superorder', 

209 'order', 

210 'suborder', 

211 'infraorder', 

212 'superfamily', 

213 'family', 

214 'subfamily', 

215 'genus', 

216 'species', 

217 'concept', 

218 'identity_reference', 

219 'recorded_timestamp', 

220 ]) 

221 

222 annotation_df = annotation_df.replace({float('nan'): None}) 

223 

224 for index, row in annotation_df.iterrows(): 

225 self.final_records.append({ 

226 'observation_uuid': row['observation_uuid'], 

227 'concept': row['concept'], 

228 'associations': row['associations'], 

229 'activity': row['activity'], 

230 'annotator': row['annotator'], 

231 'depth': row['depth'], 

232 'phylum': row['phylum'], 

233 'class': row['class'], 

234 'order': row['order'], 

235 'family': row['family'], 

236 'genus': row['genus'], 

237 'species': row['species'], 

238 'identity_reference': row['identity_reference'], 

239 'image_url': row['image_url'], 

240 'video_url': row['video_url'], 

241 'recorded_timestamp': parse_datetime(row['recorded_timestamp']).strftime('%d %b %y %H:%M:%S UTC'), 

242 'video_sequence_name': row['video_sequence_name'], 

243 })