Coverage for format_and_output.py: 0%

199 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-12 17:57 +0000

1""" 

2This script is used to retrieve all annotations for a specified list of dives from HURLSTOR and reformat them into 

3Deep Sea Corals Research and Technology Program's submission format. 

4 

5The basic program structure: 

6 

71) For each dive in the specified list, get the dive information from Dives.csv. 

82) For each dive in the specified list, get every annotation for the dive from HURLSTOR. 

93) For each annotation in the dive, load the annotation data and update the taxon info from WoRMS (optional). 

104) For each annotation, merge the annotation data, the dive information, and the WoRMS information. 

115) Perform merging and checks (e.g. remove duplicate records and populate 'associated taxa' fields). 

126) Output a formatted .tsv file. 

13""" 

14 

15import json 

16import csv 

17import os 

18import errno 

19 

20from util.functions import * 

21from annotation.annotation_row import AnnotationRow 

22from concept.concept_handler import * 

23from util.terminal_output import Color, Messages 

24 

25OUTPUT_FILE_NAME = '' 

26OUTPUT_FILE_PATH = '' 

27SEQUENCE_NAMES_PATH = '' 

28SAVE_HIGHLIGHT_IMAGES = False 

29REPORTER = 'Bingo, Sarah' 

30REPORTER_EMAIL = 'sarahr6@hawaii.edu' 

31 

32"""##################################################################################################################### 

33If you need to run this script multiple times (e.g. for testing or troubleshooting), you can hardcode names and file 

34paths here so you don't have to enter them in the CLI every time. If these are left blank, the script will prompt you 

35to enter this information at runtime. If you don't want to use the hardcoded values, simply comment out this block of 

36code. """ 

37 

38# the name of the output file without the .tsv extension, e.g. 'NA134' 

39# OUTPUT_FILE_NAME = 'test' 

40# path where you want the output file to be saved, e.g. '/Volumes/maxarray2/varsadditional/AnnotationExtracts' 

41# OUTPUT_FILE_PATH = '/Users/darc/Desktop' 

42# path to a csv of the sequence names, e.g. '/Users/darc/Documents/GitHub/Format-Output/reference/test_sequences.csv' 

43# SEQUENCE_NAMES_PATH = '/Users/darc/Documents/Github/Format-Output/reference/test_sequences.csv' 

44 

45"""##################################################################################################################""" 

46 

47# Initialization: Get the cache directory (note: does not consider Linux file structure) 

48current_folder = os.getcwd() 

49 

50if os.name == 'nt': 

51 # Windows 

52 save_folder = os.getenv('LOCALAPPDATA') 

53else: 

54 # Mac 

55 save_folder = os.getenv('HOME') + '/Library/Application Support' 

56 

57os.chdir(save_folder) 

58 

59try: 

60 os.mkdir('CTDProcess') 

61except OSError as err: 

62 if err.errno != errno.EEXIST: 

63 # if the OS error is something other than 'directory already exists', raise the error 

64 raise 

65 # otherwise, ignore the error 

66 pass 

67 

68save_folder = os.path.join(save_folder, 'CTDProcess') 

69print(f'\n{Color.BOLD}Saved cache files located in:{Color.END} {Color.UNDERLINE}{save_folder}{Color.END}') 

70 

71os.chdir(current_folder) 

72 

73dive_info = [] # the script's copy of Dives.csv 

74 

75# Load info from Dives.csv: Dives.csv must be in the same directory that the script was called. This file must be 

76# up to date with all video sequences listed in the input file 

77with open('reference/Dives.csv', 'r', encoding='utf-8') as dive_csv: 

78 reader = csv.reader(dive_csv) 

79 dive_info_headers = next(reader) 

80 for row in reader: 

81 dive_info.append(row) 

82 

83output_file_name = OUTPUT_FILE_NAME or input('Name of output file (without the .tsv file extension: ') 

84output_file_path = OUTPUT_FILE_PATH or input('Path to folder of output files: ') 

85sequence_names_path = SEQUENCE_NAMES_PATH or input('Path to a list of sequence names: ') 

86save_highlight_images = SAVE_HIGHLIGHT_IMAGES or input('Download highlight images? (y/n): ').lower() in ['y', 'yes'] 

87 

88# Decide whether to output only localizations or only regular annotations 

89output_type = input('Output regular annotations or localizations? (enter "r" for regular or "l" for localizations): ').lower() 

90 

91# Decide whether to load or overwrite concepts 

92load_concepts = input(Messages.LOAD_CONCEPTS_PROMPT).lower() in ['y', 'yes'] 

93 

94concepts = {} 

95 

96if load_concepts: 

97 try: 

98 os.chdir(save_folder) 

99 with open('concepts.json') as file: 

100 concepts = json.load(file) 

101 except FileNotFoundError: 

102 print('No concepts file found, using WoRMS instead.') 

103 

104sequence_names = [] # list of video sequences numbers to query VARS API 

105 

106with open(sequence_names_path, 'r') as seq_names_file: 

107 seq_reader = csv.reader(seq_names_file) 

108 h = next(seq_reader) 

109 for row in seq_reader: 

110 sequence_names.append(row[1]) 

111 

112# GeoForm: declaring here so val is saved across multiple annotations AND multiple dives 

113# (this is only updated once per major change in VARS) 

114current_cmecs_geo_form = NULL_VAL_STRING 

115 

116full_report_records = [] # list of every concept formatted for final output 

117warning_messages = [] # list of items to review (QA/QC) 

118 

119if load_concepts: 

120 print(Messages.DIVE_HEADER) 

121 

122################################################################### 

123# Outer loop: iterates over each dive listed in the input CSV file 

124################################################################### 

125for dive_name in sequence_names: 

126 first_round = True # to print header in terminal 

127 report_records = [] # array of concepts records for the dive 

128 concepts_from_worms = 0 # count of how many concepts were loaded from worms 

129 

130 if load_concepts: 

131 print(f'{Color.BOLD}%-35s{Color.END}' % dive_name, end='') 

132 sys.stdout.flush() 

133 else: 

134 print(f'\nFetching annotations for {Color.CYAN}{dive_name}{Color.END}') 

135 

136 if save_highlight_images: # create folder for highlight images 

137 os.chdir(output_file_path) 

138 try: 

139 os.mkdir('highlight-images') 

140 except OSError as err: 

141 if err.errno != errno.EEXIST: 

142 raise # if the OS error is something other than 'directory already exists', raise the error 

143 pass # otherwise, ignore the error 

144 

145 with requests.get(f'http://hurlstor.soest.hawaii.edu:8086/query/dive/{dive_name.replace(" ", "%20")}') as r: 

146 report_json = r.json() 

147 

148 # Tries to get the current dive from Dives.csv, links information from Dives.csv to the current dive 

149 dive_row = next((row for row in dive_info if row[0] in dive_name or dive_name in row[0]), None) 

150 if not dive_row: 

151 print(Messages.dive_not_found(dive_name=dive_name)) 

152 break 

153 

154 # Set all blank values to the null val string 

155 for i in range(len(dive_row)): 

156 if dive_row[i] == '': 

157 dive_row[i] = NULL_VAL_STRING 

158 dive_dict = dict(zip(dive_info_headers, dive_row)) 

159 

160 if load_concepts: 

161 print('%-30s' % len(report_json['annotations']), end='') 

162 sys.stdout.flush() 

163 else: 

164 print(f'{len(report_json["annotations"])} annotations found') 

165 

166 # sort objects by uuid - this is so the final output can match the expected output for easier testing 

167 report_json['annotations'].sort(key=extract_uuid) 

168 # Sort json objects by time 

169 report_json['annotations'].sort(key=extract_time) 

170 

171 if dive_dict['LocationAccuracy'] == NULL_VAL_STRING: 

172 warning_messages.append([ 

173 dive_name, 'NA', 'NA', 

174 f'{Color.YELLOW}No location accuracy found{Color.END} - Add to {Color.UNDERLINE}Dives.csv{Color.END}' 

175 ]) 

176 

177 if dive_dict['WebSite'] == NULL_VAL_STRING: 

178 warning_messages.append([ 

179 dive_name, 'NA', 'NA', 

180 f'{Color.YELLOW}No website found{Color.END} - Add to {Color.UNDERLINE}Dives.csv{Color.END}' 

181 ]) 

182 

183 # get start time and end time of each video (to use later to check whether annotation falls inside a video time) 

184 dive_video_timestamps = [] 

185 for i in range(len(report_json['media'])): 

186 media = report_json['media'][i] 

187 # the second check here can be removed if we need to consider clips longer than 10 minutes 

188 # ↓ remove me ↓ 

189 if 'image' not in media['video_name'] and media['duration_millis'] > 600000: # 600000 millis = 10 mins 

190 start_time = parse_datetime(report_json['media'][i]['start_timestamp']) 

191 dive_video_timestamps.append([start_time, start_time + timedelta(milliseconds=media['duration_millis'])]) 

192 

193 ############################################################################################################# 

194 # Main inner loop: iterates through all annotations for the dive and fills out the fields required by DSCRTP 

195 ############################################################################################################# 

196 for annotation in report_json['annotations']: 

197 if output_type == 'r': # only output regular annotations 

198 if annotation.get('group') == 'localization': 

199 continue # skip annotations in the 'localization' group 

200 else: # only output localizations 

201 if annotation.get('group') != 'localization': 

202 continue # skip annotations not in the 'localization' group 

203 concept_name = annotation['concept'] 

204 

205 annotation_row = AnnotationRow( 

206 annotation=annotation, 

207 reporter=REPORTER, 

208 reporter_email=REPORTER_EMAIL 

209 ) # object to store all annotation information 

210 

211 # populate simple data from annotation & Dives.csv 

212 annotation_row.set_sample_id(dive_name=dive_name) 

213 annotation_row.set_simple_static_data() 

214 annotation_row.set_ancillary_data(warning_messages=warning_messages) 

215 annotation_row.set_dive_info(dive_info=dive_dict) 

216 

217 # get concept info: check WoRMS if specified by user OR if concept info missing from save file 

218 if concept_name != 'none': 

219 if concept_name not in concepts: # if concept name not in saved concepts file, search WoRMS 

220 if first_round: # for printing worms header 

221 first_round = False 

222 print(Messages.WORMS_HEADER) 

223 concept = Concept(concept_name=concept_name) 

224 cons_handler = ConceptHandler(concept=concept) 

225 cons_handler.fetch_worms() 

226 cons_handler.fetch_vars_synonyms(warning_messages=warning_messages) 

227 concepts[concept_name] = { 

228 'scientific_name': concept.scientific_name, 

229 'aphia_id': concept.aphia_id, 

230 'authorship': concept.authorship, 

231 'synonyms': concept.synonyms, 

232 'taxon_rank': concept.taxon_rank, 

233 'taxon_ranks': concept.taxon_ranks, 

234 'descriptors': concept.descriptors, 

235 'vernacular_name': concept.vernacular_names 

236 } 

237 

238 annotation_row.set_concept_info(concepts=concepts, warning_messages=warning_messages) # populate annotation row object with concept info 

239 

240 # loop through timestamps and check if recorded_timestamps is in retrieved timestamp ranges 

241 media_type = 'still image' 

242 for i in range(len(dive_video_timestamps)): 

243 if dive_video_timestamps[i][0] <= annotation_row.recorded_time.timestamp <= dive_video_timestamps[i][1]: 

244 media_type = 'video observation' 

245 break 

246 

247 # update megahabitat 

248 if get_association(annotation, 'megahabitat'): 

249 current_cmecs_geo_form = get_association(annotation, "megahabitat")["to_concept"] 

250 # update habitat 

251 if get_association(annotation, 'habitat'): 

252 current_cmecs_geo_form = f'{current_cmecs_geo_form.split(",")[0]}, ' \ 

253 f'{get_association(annotation, "habitat")["to_concept"]}' 

254 

255 # populate the rest of the annotation data 

256 annotation_row.set_media_type(media_type=media_type) 

257 annotation_row.set_id_comments() 

258 annotation_row.set_indv_count_and_cat_abundance() 

259 annotation_row.set_size(warning_messages=warning_messages) 

260 annotation_row.set_condition_comment(warning_messages=warning_messages) 

261 annotation_row.set_comments_and_sample() 

262 annotation_row.set_cmecs_geo(cmecs_geo=current_cmecs_geo_form) 

263 annotation_row.set_habitat(warning_messages=warning_messages) 

264 annotation_row.set_upon() 

265 annotation_row.set_id_ref(warning_messages=warning_messages) 

266 annotation_row.set_image_paths( 

267 download_highlight_images=save_highlight_images, 

268 output_file_path=os.path.join(output_file_path, 'highlight-images'), 

269 warning_messages=warning_messages, 

270 ) 

271 annotation_row.set_bounding_box_uuid() 

272 

273 record = [annotation_row.columns[x] for x in HEADERS] # convert to list 

274 report_records.append(record) # append annotation to a list of all annotations from this dive 

275 

276 # find associates and hosts 

277 find_associated_taxa(report_records=report_records, concepts=concepts, warning_messages=warning_messages) 

278 

279 # remove duplicates (ie records with matching id reference numbers) 

280 dupes_removed = collapse_id_records(report_records=report_records) 

281 

282 if load_concepts: 

283 print('%-30s' % str(dupes_removed), end='') 

284 sys.stdout.flush() 

285 else: 

286 print(f'\n{str(dupes_removed)} duplicate records removed') 

287 

288 # translate substrate (upon) names - this must be done after finding the associated taxa (relies on concept name) 

289 for i in range(len(report_records)): 

290 record = report_records[i] 

291 if record[SUBSTRATE] == 'organism (dead)': 

292 record[SUBSTRATE] = 'dead organism' 

293 elif record[SUBSTRATE] in concepts: 

294 saved = concepts[record[SUBSTRATE]] 

295 record[SUBSTRATE] = saved["scientific_name"] 

296 if saved["descriptors"]: 

297 record[SUBSTRATE] += f' ({" ".join(saved["descriptors"])})' 

298 

299 # Add this formatted dive to the full list of report associate_records 

300 full_report_records += report_records 

301 print(f'{Color.GREEN}Complete{Color.END}') 

302 

303# Save everything to output file 

304print('\nSaving output file...') 

305os.chdir(save_folder) 

306 

307with open('concepts.json', 'w') as file: 

308 json.dump(concepts, file) 

309os.chdir(output_file_path) 

310 

311with open(output_file_name + '.tsv', 'w', newline='', encoding='utf-8') as file: 

312 csv_writer = csv.writer(file, delimiter='\t') 

313 csv_writer.writerow(HEADERS[:len(HEADERS) - 3]) # all headers except the last 3 

314 for record in full_report_records: 

315 csv_writer.writerow(record[:len(HEADERS) - 3]) 

316 

317print(f'\n{Color.BOLD}Output file saved to:{Color.END} {Color.UNDERLINE}{output_file_path}/{output_file_name}.tsv{Color.END}') 

318if save_highlight_images: 

319 print(f'{Color.BOLD}Highlight images saved to:{Color.END} {Color.UNDERLINE}{output_file_path}/highlight-images/{Color.END}') 

320print(f'\n{Color.YELLOW}There are {len(warning_messages)} warning messages.{Color.END}\n') 

321 

322# Print warning messages 

323if len(warning_messages) > 0: 

324 with open('warnings.tsv', 'w') as file: 

325 csv_writer = csv.writer(file, delimiter='\t') 

326 csv_writer.writerow(['Sample ID', 'Concept Name', 'UUID', 'Message']) 

327 for message in warning_messages: 

328 raw_message = [] 

329 for col in message: 

330 raw_message.append(col.replace(Color.BOLD, '') 

331 .replace(Color.END, '').replace(Color.YELLOW, '') 

332 .replace(Color.RED, '').replace(Color.UNDERLINE, '')) 

333 csv_writer.writerow(raw_message) 

334 

335 print(f'{Color.BOLD}Warnings saved to: {Color.END}{Color.UNDERLINE}{OUTPUT_FILE_PATH}/warnings.tsv{Color.END}') 

336 

337 print(f'\nView messages in console?') 

338 view_messages = input('\nEnter "y" to view, or press enter to skip >> ').lower() in ['y', 'yes'] 

339 

340 if view_messages: 

341 print(Messages.WARNINGS_HEADER) 

342 for message in warning_messages: 

343 if len(message[1]) > 22: 

344 message[1] = f'{message[1][:22]}...' 

345 message[2] = message[2][:37] 

346 print("%-37s%-25s%-40s%-s" % (message[0], message[1], message[2], message[3]))