Coverage for application/image_review/vars/vars_annotation_processor.py: 83%

126 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-23 02:22 +0000

1import json 

2import os 

3import pandas as pd 

4import requests 

5import sys 

6 

7from application.util.functions import * 

8from application.util.constants import TERM_RED, TERM_YELLOW, TERM_NORMAL 

9 

10 

11class VarsAnnotationProcessor: 

12 """ 

13 Fetches annotation information from the VARS db on HURLSTOR given a list of sequences. Cleans, formats, and sorts 

14 the annotation data for display on the image review pages. 

15 """ 

16 

17 def __init__(self, sequence_names: list, vars_dive_url: str, vars_phylogeny_url: str): 

18 self.sequence_names = sequence_names 

19 self.vars_dive_url = vars_dive_url 

20 self.vars_phylogeny_url = vars_phylogeny_url 

21 self.phylogeny = {} 

22 self.working_records = [] # all the annotations that have images 

23 self.final_records = [] # the final list of annotations 

24 self.highest_id_ref = 0 

25 temp_name = sequence_names[0].split() 

26 temp_name.pop() 

27 self.vessel_name = ' '.join(temp_name) 

28 

29 def process_sequences(self): 

30 self.load_phylogeny() 

31 videos = [] 

32 for name in self.sequence_names: 

33 print(f'Fetching annotations for sequence {name} from VARS...', end='') 

34 sys.stdout.flush() 

35 self.fetch_media(name, videos) 

36 print('fetched!') 

37 print('Processing annotations...', end='') 

38 sys.stdout.flush() 

39 self.sort_records(self.process_working_records(videos)) 

40 print('done!') 

41 self.save_phylogeny() 

42 

43 def load_phylogeny(self): 

44 try: 

45 with open(os.path.join('cache', 'phylogeny.json'), 'r') as f: 

46 self.phylogeny = json.load(f) 

47 except FileNotFoundError: 

48 self.phylogeny = {'Animalia': {}} 

49 

50 def save_phylogeny(self): 

51 try: 

52 with open(os.path.join('cache', 'phylogeny.json'), 'w') as f: 

53 json.dump(self.phylogeny, f, indent=2) 

54 except FileNotFoundError: 

55 os.makedirs('cache') 

56 with open(os.path.join('cache', 'phylogeny.json'), 'w') as f: 

57 json.dump(self.phylogeny, f, indent=2) 

58 

59 def fetch_media(self, sequence_name: str, videos: list): 

60 """ 

61 Fetches all annotations that have images and all video uris/start times from VARS. 

62 """ 

63 response = requests.get(url=f'{self.vars_dive_url}/{sequence_name.replace(" ", "%20")}').json() 

64 

65 # get list of video links and start timestamps 

66 for video in response['media']: 

67 if 'urn:imagecollection:org' not in video['uri']: 

68 videos.append({ 

69 'start_timestamp': parse_datetime(video['start_timestamp']), 

70 'uri': video['uri'].replace('http://hurlstor.soest.hawaii.edu/videoarchive', 'https://hurlvideo.soest.hawaii.edu'), 

71 'sequence_name': video['video_sequence_name'], 

72 'video_reference_uuid': video['video_reference_uuid'], 

73 }) 

74 # get all annotations that have images 

75 for annotation in response['annotations']: 

76 concept_name = annotation['concept'] 

77 if annotation['image_references'] and concept_name[0].isupper(): 

78 self.working_records.append(annotation) 

79 

80 def fetch_vars_phylogeny(self, concept_name: str, no_match_records: set): 

81 """ 

82 Fetches phylogeny for given concept from the VARS knowledge base. 

83 """ 

84 vars_tax_res = requests.get(url=f'{self.vars_phylogeny_url}/{concept_name.replace("/", "%2F")}') 

85 if vars_tax_res.status_code == 200: 

86 try: 

87 # this get us to phylum 

88 vars_tree = vars_tax_res.json()['children'][0]['children'][0]['children'][0]['children'][0]['children'][0] 

89 self.phylogeny[concept_name] = {} 

90 except KeyError: 

91 if concept_name not in no_match_records: 

92 no_match_records.add(concept_name) 

93 print(f'{TERM_YELLOW}WARNING: Could not find phylogeny for concept "{concept_name}" in VARS knowledge base{TERM_NORMAL}') 

94 vars_tree = {} 

95 while 'children' in vars_tree.keys(): 

96 if 'rank' in vars_tree.keys(): # sometimes it's not 

97 self.phylogeny[concept_name][vars_tree['rank']] = vars_tree['name'] 

98 vars_tree = vars_tree['children'][0] 

99 if 'rank' in vars_tree.keys(): 

100 self.phylogeny[concept_name][vars_tree['rank']] = vars_tree['name'] 

101 else: 

102 print(f'\n{TERM_RED}Unable to find record for {concept_name}{TERM_NORMAL}') 

103 

104 def get_image_url(self, annotation: dict) -> str: 

105 """ 

106 Gets the correct image url from the given annotation record. Preferentially selects a png image if available 

107 (higher quality). 

108 """ 

109 if len(annotation['image_references']) == 0: 

110 return '' 

111 image_url = annotation['image_references'][0]['url'] 

112 for i in range(1, len(annotation['image_references'])): 

113 if '.png' in annotation['image_references'][i]['url']: 

114 image_url = annotation['image_references'][i]['url'] 

115 break 

116 return image_url.replace('http://hurlstor.soest.hawaii.edu/imagearchive', 'https://hurlimage.soest.hawaii.edu') 

117 

118 def get_video(self, annotation: dict, videos: list) -> dict: 

119 """ 

120 Gets the video url and sequence name for the given annotation record. Selects the video from the list of 

121 sequence videos that contains the annotation and adds offset to the video url. 

122 """ 

123 if 'recorded_timestamp' not in annotation.keys(): 

124 return {} 

125 timestamp = parse_datetime(annotation['recorded_timestamp']) 

126 matching_video = videos[0] # default to first video 

127 for video in videos: 

128 if video['start_timestamp'] > timestamp: 

129 break 

130 matching_video = video 

131 time_diff = timestamp - matching_video['start_timestamp'] 

132 return { 

133 'uri': f'{matching_video["uri"]}#t={int(time_diff.total_seconds())}', 

134 'sequence_name': matching_video['sequence_name'], 

135 } 

136 

137 def process_working_records(self, videos: list): 

138 """ 

139 Cleans and formats the working records into a list of dicts. 

140 """ 

141 formatted_records = [] 

142 no_match_records = set() 

143 

144 for record in self.working_records: 

145 concept_name = record['concept'] 

146 identity_reference = None 

147 depth = None 

148 

149 if concept_name not in self.phylogeny.keys() and concept_name != 'none': 

150 self.fetch_vars_phylogeny(concept_name, no_match_records) 

151 

152 video = self.get_video(record, videos) 

153 

154 if record.get('associations'): 

155 for association in record['associations']: 

156 if association['link_name'] == 'identity-reference': 

157 identity_reference = association['link_value'] 

158 if int(identity_reference) > self.highest_id_ref: 

159 self.highest_id_ref = int(identity_reference) 

160 break 

161 

162 if record.get('ancillary_data'): 

163 for key in record['ancillary_data'].keys(): 

164 if key == 'depth_meters': 

165 depth = int(record['ancillary_data']['depth_meters']) 

166 

167 annotation_dict = { 

168 'observation_uuid': record['observation_uuid'], 

169 'concept': concept_name, 

170 'associations': record['associations'], 

171 'identity_reference': identity_reference, 

172 'image_url': self.get_image_url(record), 

173 'video_url': video.get('uri'), 

174 'recorded_timestamp': record['recorded_timestamp'], 

175 'video_sequence_name': video.get('sequence_name'), 

176 'annotator': format_annotator(record['observer']), 

177 'activity': record['activity'] if 'activity' in record.keys() else None, 

178 'depth': depth, 

179 } 

180 

181 if concept_name in self.phylogeny.keys(): 

182 for key in self.phylogeny[concept_name].keys(): 

183 # split to account for worms 'Phylum (Division)' case 

184 annotation_dict[key.split(' ')[0]] = self.phylogeny[concept_name][key] 

185 formatted_records.append(annotation_dict) 

186 return formatted_records 

187 

188 def sort_records(self, formatted_records: list): 

189 """ 

190 Uses pandas to sort the formatted images by phylogeny and other attributes. Adds the sorted records to the 

191 distilled records list. 

192 """ 

193 annotation_df = pd.DataFrame(formatted_records, columns=[ 

194 'observation_uuid', 

195 'concept', 

196 'associations', 

197 'identity_reference', 

198 'image_url', 

199 'video_url', 

200 'recorded_timestamp', 

201 'video_sequence_name', 

202 'annotator', 

203 'activity', 

204 'depth', 

205 'phylum', 

206 'subphylum', 

207 'superclass', 

208 'class', 

209 'subclass', 

210 'superorder', 

211 'order', 

212 'suborder', 

213 'infraorder', 

214 'superfamily', 

215 'family', 

216 'subfamily', 

217 'genus', 

218 'species', 

219 ]) 

220 

221 annotation_df = annotation_df.sort_values(by=[ 

222 'phylum', 

223 'subphylum', 

224 'superclass', 

225 'class', 

226 'subclass', 

227 'superorder', 

228 'order', 

229 'suborder', 

230 'infraorder', 

231 'superfamily', 

232 'family', 

233 'subfamily', 

234 'genus', 

235 'species', 

236 'concept', 

237 'identity_reference', 

238 'recorded_timestamp', 

239 ]) 

240 

241 annotation_df = annotation_df.replace({float('nan'): None}) 

242 

243 for index, row in annotation_df.iterrows(): 

244 self.final_records.append({ 

245 'observation_uuid': row['observation_uuid'], 

246 'concept': row['concept'], 

247 'associations': row['associations'], 

248 'activity': row['activity'], 

249 'annotator': row['annotator'], 

250 'depth': row['depth'], 

251 'phylum': row['phylum'], 

252 'class': row['class'], 

253 'order': row['order'], 

254 'family': row['family'], 

255 'genus': row['genus'], 

256 'species': row['species'], 

257 'identity_reference': row['identity_reference'], 

258 'image_url': row['image_url'], 

259 'video_url': row['video_url'], 

260 'recorded_timestamp': parse_datetime(row['recorded_timestamp']).strftime('%d %b %y %H:%M:%S UTC'), 

261 'video_sequence_name': row['video_sequence_name'], 

262 })