Coverage for annotation/annotation_row.py: 89%

340 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-12 17:57 +0000

1import os 

2import requests 

3 

4from datetime import datetime, timezone 

5from typing import Dict 

6 

7from annotation.timestamp_processor import TimestampProcessor 

8from util.constants import NULL_VAL_STRING, HEADERS, NULL_VAL_INT 

9from util.functions import get_association, convert_username_to_name, get_associations_list, add_meters, \ 

10 translate_substrate_code, grain_size 

11from util.terminal_output import Color 

12 

13 

14class AnnotationRow: 

15 """ 

16 Stores information for a specific annotation. See util.constants.HEADERS for a list of all the columns. 

17 """ 

18 

19 def __init__(self, annotation: Dict, reporter: str, reporter_email: str): 

20 """ 

21 :param dict annotation: A VARS annotation object retrieved from the HURL server. 

22 """ 

23 self.columns = dict(zip(HEADERS, [NULL_VAL_STRING] * len(HEADERS))) # inits dict of header keys with NA vals 

24 self.annotation = annotation 

25 self.reporter = reporter 

26 self.reporter_email = reporter_email 

27 self.recorded_time = TimestampProcessor(self.annotation['recorded_timestamp']) 

28 self.observation_time = TimestampProcessor(self.annotation['observation_timestamp']) 

29 

30 def set_simple_static_data(self): 

31 """ 

32 Sets columns values to simple annotation data directly from the annotation JSON object. 

33 """ 

34 self.columns['VARSConceptName'] = self.annotation['concept'] 

35 self.columns['TrackingID'] = self.annotation['observation_uuid'] 

36 self.columns['AphiaID'] = NULL_VAL_INT 

37 self.columns['IdentifiedBy'] = convert_username_to_name(self.annotation['observer']) 

38 self.columns['IdentificationDate'] = self.observation_time.timestamp.strftime('%Y-%m-%d') 

39 self.columns['IdentificationVerificationStatus'] = 1 

40 self.columns['DepthMethod'] = 'reported' 

41 self.columns['ObservationDate'] = self.recorded_time.timestamp.strftime('%Y-%m-%d') 

42 self.columns['ObservationTime'] = self.recorded_time.timestamp.strftime('%H:%M:%S') 

43 self.columns['OtherData'] = 'CTD' 

44 self.columns['Modified'] = datetime.now(timezone.utc).strftime('%Y-%m-%d') 

45 self.columns['Reporter'] = self.reporter 

46 self.columns['ReporterEmail'] = self.reporter_email 

47 

48 self.columns['EntryDate'] = '' # this column is left blank, to be filled by DSCRTP admin 

49 

50 # these seven values are hardcoded for now, keeping columns in case of future update 

51 self.columns['SampleAreaInSquareMeters'] = NULL_VAL_INT 

52 self.columns['Density'] = NULL_VAL_INT 

53 self.columns['Cover'] = NULL_VAL_INT 

54 self.columns['WeightInKg'] = NULL_VAL_INT 

55 self.columns['SampleAreaInSquareMeters'] = NULL_VAL_INT 

56 self.columns['Density'] = NULL_VAL_INT 

57 self.columns['WeightInKg'] = NULL_VAL_INT 

58 

59 def set_ancillary_data(self, warning_messages: list): 

60 """ 

61 Sets ancillary data from annotation object. Adds a warning message if ancillary data or lat/long data is 

62 missing. 

63 

64 :param list warning_messages: The list of warning messages to display at the end of the script. 

65 """ 

66 if 'ancillary_data' not in self.annotation: 

67 warning_messages.append([ 

68 self.columns['SampleID'], 

69 self.annotation['concept'], 

70 self.annotation['observation_uuid'], 

71 f'{Color.RED}No ancillary data found for this record{Color.END}' 

72 ]) 

73 self.columns['Latitude'] = NULL_VAL_INT 

74 self.columns['Longitude'] = NULL_VAL_INT 

75 self.columns['VerbatimLatitude'] = NULL_VAL_INT 

76 self.columns['VerbatimLongitude'] = NULL_VAL_INT 

77 self.columns['DepthInMeters'] = NULL_VAL_INT 

78 self.columns['MinimumDepthInMeters'] = NULL_VAL_INT 

79 self.columns['MaximumDepthInMeters'] = NULL_VAL_INT 

80 self.columns['Temperature'] = NULL_VAL_INT 

81 self.columns['Salinity'] = NULL_VAL_INT 

82 self.columns['Oxygen'] = NULL_VAL_INT 

83 return 

84 if 'latitude' in self.annotation['ancillary_data'] and 'longitude' in self.annotation['ancillary_data']: 

85 self.columns['Latitude'] = round(self.annotation['ancillary_data']['latitude'], 8) 

86 self.columns['Longitude'] = round(self.annotation['ancillary_data']['longitude'], 8) 

87 self.columns['VerbatimLatitude'] = self.annotation['ancillary_data']['latitude'] 

88 self.columns['VerbatimLongitude'] = self.annotation['ancillary_data']['longitude'] 

89 else: 

90 self.columns['Latitude'] = NULL_VAL_INT 

91 self.columns['Longitude'] = NULL_VAL_INT 

92 self.columns['VerbatimLatitude'] = NULL_VAL_INT 

93 self.columns['VerbatimLongitude'] = NULL_VAL_INT 

94 # flag warning 

95 warning_messages.append([ 

96 self.columns['SampleID'], 

97 self.annotation['concept'], 

98 self.annotation['observation_uuid'], 

99 f'{Color.RED}No location data found for this record{Color.END}' 

100 ]) 

101 self.set_depth(warning_messages=warning_messages) 

102 self.set_temperature(warning_messages=warning_messages) 

103 self.set_salinity(warning_messages=warning_messages) 

104 self.set_oxygen(warning_messages=warning_messages) 

105 

106 def set_sample_id(self, dive_name: str): 

107 """ 

108 Sets the SampleID column with the properly formatted SampleID: [DIVE_NAME]_[TIMESTAMP] 

109 

110 :param str dive_name: The name of the dive, e.g. 'Deep Discoverer 14040201'. 

111 """ 

112 self.columns['SampleID'] = dive_name.replace(' ', 

113 '_') + '_' + self.recorded_time.get_formatted_timestamp() 

114 

115 def set_dive_info(self, dive_info: dict): 

116 """ 

117 Sets dive-related annotation data from passed dive_info dict. 

118 

119 :param dict dive_info: A dictionary of information about the dive (imported from Dives.csv). 

120 """ 

121 self.columns['Citation'] = dive_info['Citation'] if dive_info['Citation'] != NULL_VAL_STRING else '' 

122 self.columns['Repository'] = dive_info['DataProvider'].split(';')[0] + \ 

123 ' | University of Hawaii Deep-sea Animal Research Center' 

124 self.columns['Locality'] = dive_info['Locality'].replace(',', ' |') 

125 self.columns['Ocean'] = dive_info['Ocean'] 

126 self.columns['LargeMarineEcosystem'] = dive_info['LargeMarineEcosystem'] 

127 self.columns['Country'] = dive_info['Country'] 

128 self.columns['FishCouncilRegion'] = dive_info['FishCouncilRegion'] 

129 self.columns['SurveyID'] = dive_info['SurveyID'] 

130 self.columns['Vessel'] = dive_info['Vessel'] 

131 self.columns['PI'] = dive_info['PI'] 

132 self.columns['PIAffiliation'] = dive_info['PIAffiliation'] 

133 self.columns['Purpose'] = dive_info['Purpose'] 

134 self.columns['Station'] = dive_info['Station'] 

135 self.columns['EventID'] = dive_info['EventID'] 

136 self.columns['SamplingEquipment'] = dive_info['SamplingEquipment'] 

137 self.columns['VehicleName'] = dive_info['VehicleName'] 

138 self.columns['LocationAccuracy'] = \ 

139 add_meters(dive_info['LocationAccuracy']) if dive_info['LocationAccuracy'] != 'NA' else '' 

140 self.columns['NavType'] = \ 

141 'USBL' if dive_info['Vessel'] == 'Okeanos Explorer' or dive_info['Vessel'] == 'Nautilus' else 'NA' 

142 self.columns['WebSite'] = dive_info['WebSite'] 

143 self.columns['DataProvider'] = dive_info['DataProvider'] 

144 self.columns['DataContact'] = dive_info['DataContact'] 

145 

146 def set_concept_info(self, concepts: dict, warning_messages: list): 

147 """ 

148 Sets annotation's concept info from saved concept dict. 

149 

150 :param dict concepts: Dictionary of all locally saved concepts. 

151 :param list warning_messages: The list of warning messages to display at the end of the script. 

152 """ 

153 concept_name = self.annotation['concept'] 

154 scientific_name = concepts[concept_name]['scientific_name'] 

155 aphia_id = concepts[concept_name]['aphia_id'] 

156 taxon_ranks = concepts[concept_name]['taxon_ranks'] 

157 

158 self.columns['ScientificName'] = scientific_name 

159 self.columns['VernacularName'] = concepts[concept_name]['vernacular_name'] 

160 self.columns['TaxonRank'] = concepts[concept_name]['taxon_rank'] 

161 self.columns['AphiaID'] = aphia_id 

162 

163 if scientific_name == NULL_VAL_STRING: 

164 warning_messages.append([ 

165 self.columns['SampleID'], 

166 self.annotation['concept'], 

167 self.annotation['observation_uuid'], 

168 f'{Color.RED}Concept name {concept_name} is {NULL_VAL_STRING} (no WoRMS match found){Color.END}' 

169 ]) 

170 

171 if self.columns['AphiaID'] != NULL_VAL_INT: 

172 self.columns['LifeScienceIdentifier'] = f'urn:lsid:marinespecies.org:taxname:{aphia_id}' 

173 

174 # Fill out the taxonomy from the taxon ranks 

175 if taxon_ranks != {}: 

176 for key in ['Kingdom', 'Phylum', 'Class', 'Subclass', 'Order', 'Suborder', 'Family', 

177 'Subfamily', 'Genus', 'Subgenus', 'Species', 'Subspecies']: 

178 if key in taxon_ranks: 

179 self.columns[key] = taxon_ranks[key] 

180 

181 self.columns['ScientificNameAuthorship'] = concepts[concept_name]['authorship'] 

182 self.columns['CombinedNameID'] = scientific_name 

183 

184 if concepts[concept_name]['descriptors']: 

185 self.columns['Morphospecies'] = ' '.join(concepts[concept_name]['descriptors']) 

186 if self.columns['CombinedNameID'] != NULL_VAL_STRING: 

187 self.columns['CombinedNameID'] += f' {self.columns["Morphospecies"]}' 

188 else: 

189 self.columns['CombinedNameID'] = self.columns['Morphospecies'] 

190 

191 self.columns['Synonyms'] = ' | '.join(concepts[concept_name]['synonyms']) \ 

192 if concepts[concept_name]['synonyms'] else NULL_VAL_STRING 

193 

194 if '/' in concept_name: 

195 self.columns['IdentificationComments'] = concept_name.replace('/', ' or ') 

196 

197 def set_media_type(self, media_type: str): 

198 """ 

199 Populates the 'RecordType' column with the passed media type if it is an annotation of an organism. 

200 

201 :param str media_type: The type of media for the annotation record ('still image' or 'video observation'). 

202 """ 

203 self.columns['RecordType'] = media_type 

204 if self.columns['ScientificName'] != NULL_VAL_STRING: 

205 self.columns['IdentificationQualifier'] = \ 

206 'ID by expert from video' if media_type == 'video observation' else 'ID by expert from image' 

207 

208 def set_id_comments(self): 

209 """ 

210 Populates 'IdentificationQualifier' column with ID comments from annotation object. 

211 """ 

212 id_comments = get_association(self.annotation, 'identity-certainty') 

213 if id_comments: 

214 id_comments = id_comments['link_value'] 

215 id_comments = id_comments.split('; ') 

216 if 'maybe' in id_comments: 

217 self.columns['IdentificationQualifier'] = self.columns['IdentificationQualifier'] + ' | ID Uncertain' 

218 id_comments.remove('maybe') 

219 id_comments = ' | '.join(id_comments) 

220 self.columns['IdentificationComments'] = id_comments if id_comments != '' else NULL_VAL_STRING 

221 

222 def set_indv_count_and_cat_abundance(self): 

223 """ 

224 Populates the 'IndividualCount' and 'CategoricalAbundance' columns from annotation object. 

225 """ 

226 pop_quantity = get_association(self.annotation, 'population-quantity') 

227 if pop_quantity: 

228 self.columns['IndividualCount'] = pop_quantity['link_value'] 

229 elif self.columns['ScientificName'] != NULL_VAL_STRING: 

230 self.columns['IndividualCount'] = '1' 

231 else: 

232 self.columns['IndividualCount'] = NULL_VAL_INT 

233 cat_abundance = get_association(self.annotation, 'categorical-abundance') 

234 if cat_abundance: # if there are a lot of cats 

235 self.columns['CategoricalAbundance'] = cat_abundance['link_value'] 

236 self.columns['IndividualCount'] = NULL_VAL_INT 

237 

238 def set_size(self, warning_messages: list): 

239 """ 

240 Populates columns related to size ('VerbatimSize', 'MinimumSize', and 'MaximumSize') with the size from the 

241 annotation object. Saves a warning message if the size in the annotation object does not match one of the 

242 expected size categories. 

243 

244 :param list warning_messages: The list of warning messages to display at the end of the script. 

245 """ 

246 min_size = NULL_VAL_INT 

247 max_size = NULL_VAL_INT 

248 size_str = NULL_VAL_STRING 

249 size_category = get_association(self.annotation, 'size') 

250 old_size_category = get_association(self.annotation, 'length-centimeters') # old VARS data 

251 if size_category: 

252 if size_category['to_concept'] != 'nil': 

253 size_str = size_category['to_concept'] 

254 else: 

255 # another old VARS used 'link_value' instead of 'to_concept' :) 

256 size_str = size_category['link_value'] 

257 

258 if size_str == 'greater than 100 cm': 

259 min_size = '101' 

260 elif '-' in size_str and 'cm' in size_str: 

261 # turn a 'size category' into a maximum and minimum size 

262 sizes = size_str.replace(' ', '-').split('-') 

263 min_size = sizes[0] 

264 max_size = sizes[1] 

265 else: 

266 warning_messages.append([ 

267 self.columns['SampleID'], 

268 self.annotation['concept'], 

269 self.annotation['observation_uuid'], 

270 f'Unable to parse size string: {Color.BOLD}"{size_str}"{Color.END}' 

271 ]) 

272 elif old_size_category: 

273 size_str = old_size_category['link_value'] 

274 if '-' in size_str and len(size_str.split('-')) == 2: 

275 sizes = size_str.split('-') 

276 min_size = sizes[0] 

277 max_size = sizes[1] 

278 else: 

279 warning_messages.append([ 

280 self.columns['SampleID'], 

281 self.annotation['concept'], 

282 self.annotation['observation_uuid'], 

283 f'Unable to parse size string: {Color.BOLD}"{size_str}"{Color.END}' 

284 ]) 

285 self.columns['VerbatimSize'] = size_str 

286 self.columns['MinimumSize'] = min_size 

287 self.columns['MaximumSize'] = max_size 

288 

289 def set_condition_comment(self, warning_messages: list): 

290 """ 

291 Populates the 'Condition' column with information from the annotation object. Assumes all organisms are 'Live' 

292 unless otherwise noted. Saves a warning message if a dead animal is reported. 

293 

294 :param list warning_messages: The list of warning messages to display at the end of the script. 

295 """ 

296 condition_comment = get_association(self.annotation, 'condition-comment') 

297 if condition_comment: 

298 if condition_comment['link_value'] in ['dead', 'Dead']: 

299 # flag warning 

300 warning_messages.append([ 

301 self.columns['SampleID'], 

302 self.annotation['concept'], 

303 self.annotation['observation_uuid'], 

304 'Dead animal reported', 

305 ]) 

306 self.columns['Condition'] = 'Dead' 

307 else: 

308 self.columns['Condition'] = 'Damaged' 

309 else: 

310 self.columns['Condition'] = 'Live' if self.columns['ScientificName'] != NULL_VAL_STRING else NULL_VAL_STRING 

311 

312 def set_comments_and_sample(self): 

313 """ 

314 Populates 'OccurrenceComments' column with information from the annotation object. If there is a sample, appends 

315 the sample ID to the 'TrackingID' column and appends a note to 'OccurrenceComments'. 

316 """ 

317 # build occurrence remark string 

318 occurrence_remark = get_associations_list(self.annotation, 'occurrence-remark') 

319 remark_string = NULL_VAL_STRING 

320 if occurrence_remark: 

321 remark_list = [] 

322 for remark in occurrence_remark: 

323 remark_list.append(remark['link_value']) 

324 remark_string = ' | '.join(remark_list) 

325 if self.columns['VerbatimSize'] != NULL_VAL_STRING: 

326 if remark_string != NULL_VAL_STRING: 

327 remark_string += ' | size is estimated greatest length of individual in cm.' \ 

328 ' Size estimations placed into size category bins' 

329 else: 

330 remark_string = 'size is estimated greatest length of individual in cm.' \ 

331 ' Size estimations placed into size category bins' 

332 

333 # old VARS data 

334 observation_notes = get_association(self.annotation, 'observation notes') 

335 if observation_notes: 

336 if remark_string != NULL_VAL_STRING: 

337 remark_string += f' | notes: {observation_notes["link_value"]}' 

338 else: 

339 remark_string = f'notes: {observation_notes["link_value"]}' 

340 

341 # old VARS data 

342 habitat_comment = get_association(self.annotation, 'habitat-comment') 

343 if habitat_comment: 

344 if remark_string != NULL_VAL_STRING: 

345 remark_string += f' | comment: {habitat_comment["link_value"]}' 

346 else: 

347 remark_string = f'comment: {habitat_comment["link_value"]}' 

348 

349 sampled_by = get_association(self.annotation, 'sampled-by') 

350 if sampled_by and 'to_concept' in sampled_by.keys(): 

351 if remark_string != NULL_VAL_STRING: 

352 remark_string += f' | sampled by {sampled_by["to_concept"]}' 

353 else: 

354 remark_string = f'sampled by {sampled_by["to_concept"]}' 

355 sample_ref = get_association(self.annotation, 'sample-reference') 

356 if sample_ref: 

357 self.columns['TrackingID'] += f' | {sample_ref["link_value"]}' 

358 

359 self.columns['OccurrenceComments'] = remark_string 

360 

361 def set_cmecs_geo(self, cmecs_geo: str): 

362 """ 

363 Sets the 'CMECSGeoForm' column to the value passed in the function call. 

364 

365 :param str cmecs_geo: The current habitat. 

366 """ 

367 self.columns['CMECSGeoForm'] = cmecs_geo 

368 

369 def set_habitat(self, warning_messages): 

370 """ 

371 Populates the 'Habitat' with information from the annotation object. Adds a warning message if one of the 

372 habitats is missing or cannot be parsed. 

373 

374 :param list warning_messages: The list of warning messages to display at the end of the script. 

375 """ 

376 secondary = [] 

377 s1 = get_association(self.annotation, 's1') 

378 if s1: 

379 primary = translate_substrate_code(s1['to_concept']) 

380 if not primary: 

381 primary = translate_substrate_code(s1['link_value']) # this is how the data is stored in old VARS 

382 if not primary: 

383 # flag warning 

384 warning_messages.append([ 

385 self.columns['SampleID'], 

386 self.annotation['concept'], 

387 self.annotation['observation_uuid'], 

388 f'{Color.RED}Missing s1 or could not parse substrate code:{Color.END} ' 

389 f'{Color.BOLD}to_concept: {s1["to_concept"]}, link_value: {s1["link_value"]}{Color.END}' 

390 ]) 

391 else: 

392 self.columns['Habitat'] = f'primarily: {primary}' 

393 elif self.columns['ScientificName'] != NULL_VAL_STRING: 

394 # flag warning 

395 warning_messages.append([ 

396 self.columns['SampleID'], 

397 self.annotation['concept'], 

398 self.annotation['observation_uuid'], 

399 f'{Color.RED}Missing s1{Color.END}' 

400 ]) 

401 

402 s2_records = get_associations_list(self.annotation, 's2') 

403 if len(s2_records) != 0: 

404 s2s_list = [] 

405 failures = [] 

406 for s2 in s2_records: # remove duplicates 

407 if s2['to_concept'] == 'nil' or s2['to_concept'] == 'self': 

408 # this is old VARS data, formatted as one record separated by semicolons 

409 s2s_list = s2['link_value'].replace(',', ';').replace('; ', ';').replace(';;', ';')\ 

410 .replace(' ', ';').replace(':', ';').replace("'", ';').split(';') 

411 elif s2['to_concept'] not in s2s_list: 

412 s2s_list.append(s2['to_concept']) 

413 s2s_list.sort(key=grain_size) 

414 for s2 in s2s_list: 

415 s2_temp = translate_substrate_code(s2) 

416 if s2_temp: 

417 secondary.append(s2_temp) 

418 else: 

419 failures.append(s2) 

420 if len(secondary) != len(s2s_list): 

421 warning_messages.append([ 

422 self.columns['SampleID'], 

423 self.annotation['concept'], 

424 self.annotation['observation_uuid'], 

425 f'Could not parse s2 substrate codes {Color.BOLD}{failures}{Color.END}' 

426 ]) 

427 self.columns['Habitat'] = self.columns['Habitat'] + f' / secondary: {"; ".join(secondary)}' 

428 habitat_comment = get_association(self.annotation, 'habitat-comment') 

429 if habitat_comment: 

430 self.columns['Habitat'] = self.columns['Habitat'] + f' / comments: {habitat_comment["link_value"]}' 

431 

432 def set_upon(self): 

433 """ 

434 Sets the 'Substrate' column if there is an 'upon' record in the annotation object. 

435 """ 

436 upon = get_association(self.annotation, 'upon') 

437 self.columns['UponIsCreature'] = False 

438 if upon: 

439 subs = translate_substrate_code(upon['to_concept']) 

440 if subs: 

441 self.columns['Substrate'] = subs 

442 else: 

443 # the item in 'upon' is not in the substrate list, so it must be upon another creature 

444 self.columns['UponIsCreature'] = True 

445 

446 if upon['to_concept'] == 'orgsp': 

447 self.columns['Substrate'] = 'Porifera' 

448 notes = get_association(self.annotation, 'observation notes') 

449 if notes and 'dead' in notes['link_value']: 

450 self.columns['Substrate'] += ' (dead)' 

451 if upon['to_concept'] == 'orgcr': 

452 self.columns['Substrate'] = 'Crustacea' 

453 comment = get_association(self.annotation, 'comment') 

454 if comment: 

455 comment = comment['link_value'].split(';')[0].split(' ') 

456 if comment[0] == 'on': 

457 self.columns['Substrate'] = comment[1] 

458 else: 

459 self.columns['Substrate'] = upon['to_concept'] 

460 

461 def set_id_ref(self, warning_messages: list): 

462 """ 

463 Sets the 'IdentityReference' column with the value pulled from the annotation object. ID reference is populated 

464 when there are multiple annotations with the exact same animal. 

465 """ 

466 identity_reference = get_association(self.annotation, 'identity-reference') 

467 if identity_reference: 

468 if identity_reference['link_value'] == '': 

469 self.columns['IdentityReference'] = -1 

470 warning_messages.append([ 

471 self.columns['SampleID'], 

472 self.annotation['concept'], 

473 self.annotation['observation_uuid'], 

474 f'{Color.YELLOW}An identity-reference exists for this record, but it is empty{Color.END}' 

475 ]) 

476 else: 

477 self.columns['IdentityReference'] = int(identity_reference['link_value']) 

478 else: 

479 self.columns['IdentityReference'] = -1 

480 

481 def set_depth(self, warning_messages: list): 

482 """ 

483 Sets depth based on data from annotation object. Adds a warning message if depth is missing. 

484 

485 :param list warning_messages: The list of warning messages to display at the end of the script. 

486 """ 

487 if 'depth_meters' in self.annotation['ancillary_data'] \ 

488 and self.annotation['ancillary_data']['depth_meters'] != 0: 

489 self.columns['DepthInMeters'] = round(self.annotation['ancillary_data']['depth_meters'], 3) 

490 else: 

491 self.columns['DepthInMeters'] = NULL_VAL_INT 

492 warning_messages.append([ 

493 self.columns['SampleID'], 

494 self.annotation['concept'], 

495 self.annotation['observation_uuid'], 

496 f'{Color.YELLOW}No depth data found for this record{Color.END}' 

497 ]) 

498 self.columns['MinimumDepthInMeters'] = self.columns['DepthInMeters'] 

499 self.columns['MaximumDepthInMeters'] = self.columns['DepthInMeters'] 

500 

501 def set_temperature(self, warning_messages: list): 

502 """ 

503 Sets temperature based on data from annotation object. Adds a warning message if temperature is missing. 

504 

505 :param list warning_messages: The list of warning messages to display at the end of the script. 

506 """ 

507 if 'temperature_celsius' in self.annotation['ancillary_data']: 

508 self.columns['Temperature'] = round(self.annotation['ancillary_data']['temperature_celsius'], 4) 

509 else: 

510 self.columns['Temperature'] = NULL_VAL_INT 

511 # flag warning 

512 warning_messages.append([ 

513 self.columns['SampleID'], 

514 self.annotation['concept'], 

515 self.annotation['observation_uuid'], 

516 'No temperature measurement included in this record' 

517 ]) 

518 

519 def set_salinity(self, warning_messages: list): 

520 """ 

521 Sets salinity based on data from annotation object. Adds a warning message if salinity is missing. 

522 

523 :param list warning_messages: The list of warning messages to display at the end of the script. 

524 """ 

525 if 'salinity' in self.annotation['ancillary_data']: 

526 self.columns['Salinity'] = round(self.annotation['ancillary_data']['salinity'], 4) 

527 else: 

528 self.columns['Salinity'] = NULL_VAL_INT 

529 # flag warning 

530 warning_messages.append([ 

531 self.columns['SampleID'], 

532 self.annotation['concept'], 

533 self.annotation['observation_uuid'], 

534 'No salinity measurement included in this record' 

535 ]) 

536 

537 def set_oxygen(self, warning_messages: list): 

538 """ 

539 Populates the 'Oxygen' column with data from the annotation object. Adds a warning message if oxygen data is 

540 missing. 

541 

542 :param list warning_messages: The list of warning messages to display at the end of the script. 

543 """ 

544 if 'oxygen_ml_l' in self.annotation['ancillary_data']: 

545 self.columns['Oxygen'] = round(self.annotation['ancillary_data']['oxygen_ml_l'], 4) 

546 else: 

547 self.columns['Oxygen'] = NULL_VAL_INT 

548 # flag warning 

549 warning_messages.append([ 

550 self.columns['SampleID'], 

551 self.annotation['concept'], 

552 self.annotation['observation_uuid'], 

553 'No oxygen measurement included in this record' 

554 ]) 

555 

556 def set_image_paths(self, download_highlight_images: bool, output_file_path: str, warning_messages: list): 

557 """ 

558 Populates the 'ImageFilePath' and 'HighlightImageFilePath' columns with information from the annotation object. 

559 

560 :param download_highlight_images: whether or not to download the highlight images and save to local machine. 

561 :param output_file_path: where to save the images 

562 :param list warning_messages: The list of warning messages to display at the end of the script. 

563 """ 

564 image_paths = [] 

565 for image in self.annotation['image_references']: 

566 image_paths.append(image['url'].replace( 

567 'http://hurlstor.soest.hawaii.edu/imagearchive', 

568 'https://hurlimage.soest.hawaii.edu') 

569 ) 

570 if len(image_paths) == 1: 

571 self.columns['ImageFilePath'] = image_paths[0] 

572 elif len(image_paths) > 1: 

573 if '.png' in image_paths[0]: 

574 self.columns['ImageFilePath'] = image_paths[0] 

575 else: 

576 self.columns['ImageFilePath'] = image_paths[1] 

577 

578 # for old VARS :) 

579 photo_references = get_association(self.annotation, 'photo-reference') 

580 if photo_references: 

581 photo_references = photo_references['link_value'].split(';') 

582 photo_references[0] = photo_references[0].replace('http://max5kn1.soest.hawaii.edu/imagearchive/', '') 

583 path = photo_references[0].split('/') 

584 path.pop() 

585 path = f'https://hurlimage.soest.hawaii.edu/{"/".join(path)}' 

586 photo_references[0] = f'https://hurlimage.soest.hawaii.edu/{photo_references[0]}' 

587 for i in range(1, len(photo_references)): 

588 photo_references[i] = f'{path}/{photo_references[i]}' 

589 for image_path in photo_references: 

590 if self.columns['ImageFilePath'] != NULL_VAL_STRING: 

591 self.columns['ImageFilePath'] += f' | {image_path}' 

592 else: 

593 self.columns['ImageFilePath'] = image_path 

594 

595 highlight_image = get_association(self.annotation, 'guide-photo') 

596 if highlight_image and (highlight_image['to_concept'] == '1 best' or highlight_image['to_concept'] == '2 good'): 

597 self.columns['HighlightImageFilePath'] = self.columns['ImageFilePath'] 

598 if self.columns['ImageFilePath'] == NULL_VAL_STRING: 

599 warning_messages.append([ 

600 self.columns['SampleID'], 

601 self.annotation['concept'], 

602 self.annotation['observation_uuid'], 

603 'guide-photo for this annotation has a to_concept of "1 best" or "2 good", but the annotation has no image references', 

604 ]) 

605 elif download_highlight_images: 

606 try: 

607 res = requests.get(self.columns['ImageFilePath']) 

608 if res.status_code == 200: 

609 os.chdir(output_file_path) 

610 with open(self.columns['ImageFilePath'].split('/')[-1], 'wb') as file: 

611 file.write(res.content) 

612 else: 

613 warning_messages.append([ 

614 self.columns['SampleID'], 

615 self.annotation['concept'], 

616 self.annotation['observation_uuid'], 

617 'Error downloading image', 

618 ]) 

619 except requests.exceptions.ConnectionError: 

620 warning_messages.append([ 

621 self.columns['SampleID'], 

622 self.annotation['concept'], 

623 self.annotation['observation_uuid'], 

624 'Error downloading image', 

625 ]) 

626 

627 population_density = get_association(self.annotation, 'population-density') 

628 if population_density and population_density['link_value'] == 'dense': 

629 self.columns['HighlightImageFilePath'] = self.columns['ImageFilePath'] 

630 

631 def set_bounding_box_uuid(self): 

632 """ 

633 Sets the 'BoundingBoxID' column with the value pulled from the annotation object. 

634 """ 

635 bounding_box = get_association(self.annotation, 'bounding box') 

636 if bounding_box: 

637 self.columns['BoundingBoxID'] = bounding_box['uuid']