Coverage for annotation/annotation_row.py: 89%
340 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-12 17:57 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-12 17:57 +0000
1import os
2import requests
4from datetime import datetime, timezone
5from typing import Dict
7from annotation.timestamp_processor import TimestampProcessor
8from util.constants import NULL_VAL_STRING, HEADERS, NULL_VAL_INT
9from util.functions import get_association, convert_username_to_name, get_associations_list, add_meters, \
10 translate_substrate_code, grain_size
11from util.terminal_output import Color
14class AnnotationRow:
15 """
16 Stores information for a specific annotation. See util.constants.HEADERS for a list of all the columns.
17 """
19 def __init__(self, annotation: Dict, reporter: str, reporter_email: str):
20 """
21 :param dict annotation: A VARS annotation object retrieved from the HURL server.
22 """
23 self.columns = dict(zip(HEADERS, [NULL_VAL_STRING] * len(HEADERS))) # inits dict of header keys with NA vals
24 self.annotation = annotation
25 self.reporter = reporter
26 self.reporter_email = reporter_email
27 self.recorded_time = TimestampProcessor(self.annotation['recorded_timestamp'])
28 self.observation_time = TimestampProcessor(self.annotation['observation_timestamp'])
30 def set_simple_static_data(self):
31 """
32 Sets columns values to simple annotation data directly from the annotation JSON object.
33 """
34 self.columns['VARSConceptName'] = self.annotation['concept']
35 self.columns['TrackingID'] = self.annotation['observation_uuid']
36 self.columns['AphiaID'] = NULL_VAL_INT
37 self.columns['IdentifiedBy'] = convert_username_to_name(self.annotation['observer'])
38 self.columns['IdentificationDate'] = self.observation_time.timestamp.strftime('%Y-%m-%d')
39 self.columns['IdentificationVerificationStatus'] = 1
40 self.columns['DepthMethod'] = 'reported'
41 self.columns['ObservationDate'] = self.recorded_time.timestamp.strftime('%Y-%m-%d')
42 self.columns['ObservationTime'] = self.recorded_time.timestamp.strftime('%H:%M:%S')
43 self.columns['OtherData'] = 'CTD'
44 self.columns['Modified'] = datetime.now(timezone.utc).strftime('%Y-%m-%d')
45 self.columns['Reporter'] = self.reporter
46 self.columns['ReporterEmail'] = self.reporter_email
48 self.columns['EntryDate'] = '' # this column is left blank, to be filled by DSCRTP admin
50 # these seven values are hardcoded for now, keeping columns in case of future update
51 self.columns['SampleAreaInSquareMeters'] = NULL_VAL_INT
52 self.columns['Density'] = NULL_VAL_INT
53 self.columns['Cover'] = NULL_VAL_INT
54 self.columns['WeightInKg'] = NULL_VAL_INT
55 self.columns['SampleAreaInSquareMeters'] = NULL_VAL_INT
56 self.columns['Density'] = NULL_VAL_INT
57 self.columns['WeightInKg'] = NULL_VAL_INT
59 def set_ancillary_data(self, warning_messages: list):
60 """
61 Sets ancillary data from annotation object. Adds a warning message if ancillary data or lat/long data is
62 missing.
64 :param list warning_messages: The list of warning messages to display at the end of the script.
65 """
66 if 'ancillary_data' not in self.annotation:
67 warning_messages.append([
68 self.columns['SampleID'],
69 self.annotation['concept'],
70 self.annotation['observation_uuid'],
71 f'{Color.RED}No ancillary data found for this record{Color.END}'
72 ])
73 self.columns['Latitude'] = NULL_VAL_INT
74 self.columns['Longitude'] = NULL_VAL_INT
75 self.columns['VerbatimLatitude'] = NULL_VAL_INT
76 self.columns['VerbatimLongitude'] = NULL_VAL_INT
77 self.columns['DepthInMeters'] = NULL_VAL_INT
78 self.columns['MinimumDepthInMeters'] = NULL_VAL_INT
79 self.columns['MaximumDepthInMeters'] = NULL_VAL_INT
80 self.columns['Temperature'] = NULL_VAL_INT
81 self.columns['Salinity'] = NULL_VAL_INT
82 self.columns['Oxygen'] = NULL_VAL_INT
83 return
84 if 'latitude' in self.annotation['ancillary_data'] and 'longitude' in self.annotation['ancillary_data']:
85 self.columns['Latitude'] = round(self.annotation['ancillary_data']['latitude'], 8)
86 self.columns['Longitude'] = round(self.annotation['ancillary_data']['longitude'], 8)
87 self.columns['VerbatimLatitude'] = self.annotation['ancillary_data']['latitude']
88 self.columns['VerbatimLongitude'] = self.annotation['ancillary_data']['longitude']
89 else:
90 self.columns['Latitude'] = NULL_VAL_INT
91 self.columns['Longitude'] = NULL_VAL_INT
92 self.columns['VerbatimLatitude'] = NULL_VAL_INT
93 self.columns['VerbatimLongitude'] = NULL_VAL_INT
94 # flag warning
95 warning_messages.append([
96 self.columns['SampleID'],
97 self.annotation['concept'],
98 self.annotation['observation_uuid'],
99 f'{Color.RED}No location data found for this record{Color.END}'
100 ])
101 self.set_depth(warning_messages=warning_messages)
102 self.set_temperature(warning_messages=warning_messages)
103 self.set_salinity(warning_messages=warning_messages)
104 self.set_oxygen(warning_messages=warning_messages)
106 def set_sample_id(self, dive_name: str):
107 """
108 Sets the SampleID column with the properly formatted SampleID: [DIVE_NAME]_[TIMESTAMP]
110 :param str dive_name: The name of the dive, e.g. 'Deep Discoverer 14040201'.
111 """
112 self.columns['SampleID'] = dive_name.replace(' ',
113 '_') + '_' + self.recorded_time.get_formatted_timestamp()
115 def set_dive_info(self, dive_info: dict):
116 """
117 Sets dive-related annotation data from passed dive_info dict.
119 :param dict dive_info: A dictionary of information about the dive (imported from Dives.csv).
120 """
121 self.columns['Citation'] = dive_info['Citation'] if dive_info['Citation'] != NULL_VAL_STRING else ''
122 self.columns['Repository'] = dive_info['DataProvider'].split(';')[0] + \
123 ' | University of Hawaii Deep-sea Animal Research Center'
124 self.columns['Locality'] = dive_info['Locality'].replace(',', ' |')
125 self.columns['Ocean'] = dive_info['Ocean']
126 self.columns['LargeMarineEcosystem'] = dive_info['LargeMarineEcosystem']
127 self.columns['Country'] = dive_info['Country']
128 self.columns['FishCouncilRegion'] = dive_info['FishCouncilRegion']
129 self.columns['SurveyID'] = dive_info['SurveyID']
130 self.columns['Vessel'] = dive_info['Vessel']
131 self.columns['PI'] = dive_info['PI']
132 self.columns['PIAffiliation'] = dive_info['PIAffiliation']
133 self.columns['Purpose'] = dive_info['Purpose']
134 self.columns['Station'] = dive_info['Station']
135 self.columns['EventID'] = dive_info['EventID']
136 self.columns['SamplingEquipment'] = dive_info['SamplingEquipment']
137 self.columns['VehicleName'] = dive_info['VehicleName']
138 self.columns['LocationAccuracy'] = \
139 add_meters(dive_info['LocationAccuracy']) if dive_info['LocationAccuracy'] != 'NA' else ''
140 self.columns['NavType'] = \
141 'USBL' if dive_info['Vessel'] == 'Okeanos Explorer' or dive_info['Vessel'] == 'Nautilus' else 'NA'
142 self.columns['WebSite'] = dive_info['WebSite']
143 self.columns['DataProvider'] = dive_info['DataProvider']
144 self.columns['DataContact'] = dive_info['DataContact']
146 def set_concept_info(self, concepts: dict, warning_messages: list):
147 """
148 Sets annotation's concept info from saved concept dict.
150 :param dict concepts: Dictionary of all locally saved concepts.
151 :param list warning_messages: The list of warning messages to display at the end of the script.
152 """
153 concept_name = self.annotation['concept']
154 scientific_name = concepts[concept_name]['scientific_name']
155 aphia_id = concepts[concept_name]['aphia_id']
156 taxon_ranks = concepts[concept_name]['taxon_ranks']
158 self.columns['ScientificName'] = scientific_name
159 self.columns['VernacularName'] = concepts[concept_name]['vernacular_name']
160 self.columns['TaxonRank'] = concepts[concept_name]['taxon_rank']
161 self.columns['AphiaID'] = aphia_id
163 if scientific_name == NULL_VAL_STRING:
164 warning_messages.append([
165 self.columns['SampleID'],
166 self.annotation['concept'],
167 self.annotation['observation_uuid'],
168 f'{Color.RED}Concept name {concept_name} is {NULL_VAL_STRING} (no WoRMS match found){Color.END}'
169 ])
171 if self.columns['AphiaID'] != NULL_VAL_INT:
172 self.columns['LifeScienceIdentifier'] = f'urn:lsid:marinespecies.org:taxname:{aphia_id}'
174 # Fill out the taxonomy from the taxon ranks
175 if taxon_ranks != {}:
176 for key in ['Kingdom', 'Phylum', 'Class', 'Subclass', 'Order', 'Suborder', 'Family',
177 'Subfamily', 'Genus', 'Subgenus', 'Species', 'Subspecies']:
178 if key in taxon_ranks:
179 self.columns[key] = taxon_ranks[key]
181 self.columns['ScientificNameAuthorship'] = concepts[concept_name]['authorship']
182 self.columns['CombinedNameID'] = scientific_name
184 if concepts[concept_name]['descriptors']:
185 self.columns['Morphospecies'] = ' '.join(concepts[concept_name]['descriptors'])
186 if self.columns['CombinedNameID'] != NULL_VAL_STRING:
187 self.columns['CombinedNameID'] += f' {self.columns["Morphospecies"]}'
188 else:
189 self.columns['CombinedNameID'] = self.columns['Morphospecies']
191 self.columns['Synonyms'] = ' | '.join(concepts[concept_name]['synonyms']) \
192 if concepts[concept_name]['synonyms'] else NULL_VAL_STRING
194 if '/' in concept_name:
195 self.columns['IdentificationComments'] = concept_name.replace('/', ' or ')
197 def set_media_type(self, media_type: str):
198 """
199 Populates the 'RecordType' column with the passed media type if it is an annotation of an organism.
201 :param str media_type: The type of media for the annotation record ('still image' or 'video observation').
202 """
203 self.columns['RecordType'] = media_type
204 if self.columns['ScientificName'] != NULL_VAL_STRING:
205 self.columns['IdentificationQualifier'] = \
206 'ID by expert from video' if media_type == 'video observation' else 'ID by expert from image'
208 def set_id_comments(self):
209 """
210 Populates 'IdentificationQualifier' column with ID comments from annotation object.
211 """
212 id_comments = get_association(self.annotation, 'identity-certainty')
213 if id_comments:
214 id_comments = id_comments['link_value']
215 id_comments = id_comments.split('; ')
216 if 'maybe' in id_comments:
217 self.columns['IdentificationQualifier'] = self.columns['IdentificationQualifier'] + ' | ID Uncertain'
218 id_comments.remove('maybe')
219 id_comments = ' | '.join(id_comments)
220 self.columns['IdentificationComments'] = id_comments if id_comments != '' else NULL_VAL_STRING
222 def set_indv_count_and_cat_abundance(self):
223 """
224 Populates the 'IndividualCount' and 'CategoricalAbundance' columns from annotation object.
225 """
226 pop_quantity = get_association(self.annotation, 'population-quantity')
227 if pop_quantity:
228 self.columns['IndividualCount'] = pop_quantity['link_value']
229 elif self.columns['ScientificName'] != NULL_VAL_STRING:
230 self.columns['IndividualCount'] = '1'
231 else:
232 self.columns['IndividualCount'] = NULL_VAL_INT
233 cat_abundance = get_association(self.annotation, 'categorical-abundance')
234 if cat_abundance: # if there are a lot of cats
235 self.columns['CategoricalAbundance'] = cat_abundance['link_value']
236 self.columns['IndividualCount'] = NULL_VAL_INT
238 def set_size(self, warning_messages: list):
239 """
240 Populates columns related to size ('VerbatimSize', 'MinimumSize', and 'MaximumSize') with the size from the
241 annotation object. Saves a warning message if the size in the annotation object does not match one of the
242 expected size categories.
244 :param list warning_messages: The list of warning messages to display at the end of the script.
245 """
246 min_size = NULL_VAL_INT
247 max_size = NULL_VAL_INT
248 size_str = NULL_VAL_STRING
249 size_category = get_association(self.annotation, 'size')
250 old_size_category = get_association(self.annotation, 'length-centimeters') # old VARS data
251 if size_category:
252 if size_category['to_concept'] != 'nil':
253 size_str = size_category['to_concept']
254 else:
255 # another old VARS used 'link_value' instead of 'to_concept' :)
256 size_str = size_category['link_value']
258 if size_str == 'greater than 100 cm':
259 min_size = '101'
260 elif '-' in size_str and 'cm' in size_str:
261 # turn a 'size category' into a maximum and minimum size
262 sizes = size_str.replace(' ', '-').split('-')
263 min_size = sizes[0]
264 max_size = sizes[1]
265 else:
266 warning_messages.append([
267 self.columns['SampleID'],
268 self.annotation['concept'],
269 self.annotation['observation_uuid'],
270 f'Unable to parse size string: {Color.BOLD}"{size_str}"{Color.END}'
271 ])
272 elif old_size_category:
273 size_str = old_size_category['link_value']
274 if '-' in size_str and len(size_str.split('-')) == 2:
275 sizes = size_str.split('-')
276 min_size = sizes[0]
277 max_size = sizes[1]
278 else:
279 warning_messages.append([
280 self.columns['SampleID'],
281 self.annotation['concept'],
282 self.annotation['observation_uuid'],
283 f'Unable to parse size string: {Color.BOLD}"{size_str}"{Color.END}'
284 ])
285 self.columns['VerbatimSize'] = size_str
286 self.columns['MinimumSize'] = min_size
287 self.columns['MaximumSize'] = max_size
289 def set_condition_comment(self, warning_messages: list):
290 """
291 Populates the 'Condition' column with information from the annotation object. Assumes all organisms are 'Live'
292 unless otherwise noted. Saves a warning message if a dead animal is reported.
294 :param list warning_messages: The list of warning messages to display at the end of the script.
295 """
296 condition_comment = get_association(self.annotation, 'condition-comment')
297 if condition_comment:
298 if condition_comment['link_value'] in ['dead', 'Dead']:
299 # flag warning
300 warning_messages.append([
301 self.columns['SampleID'],
302 self.annotation['concept'],
303 self.annotation['observation_uuid'],
304 'Dead animal reported',
305 ])
306 self.columns['Condition'] = 'Dead'
307 else:
308 self.columns['Condition'] = 'Damaged'
309 else:
310 self.columns['Condition'] = 'Live' if self.columns['ScientificName'] != NULL_VAL_STRING else NULL_VAL_STRING
312 def set_comments_and_sample(self):
313 """
314 Populates 'OccurrenceComments' column with information from the annotation object. If there is a sample, appends
315 the sample ID to the 'TrackingID' column and appends a note to 'OccurrenceComments'.
316 """
317 # build occurrence remark string
318 occurrence_remark = get_associations_list(self.annotation, 'occurrence-remark')
319 remark_string = NULL_VAL_STRING
320 if occurrence_remark:
321 remark_list = []
322 for remark in occurrence_remark:
323 remark_list.append(remark['link_value'])
324 remark_string = ' | '.join(remark_list)
325 if self.columns['VerbatimSize'] != NULL_VAL_STRING:
326 if remark_string != NULL_VAL_STRING:
327 remark_string += ' | size is estimated greatest length of individual in cm.' \
328 ' Size estimations placed into size category bins'
329 else:
330 remark_string = 'size is estimated greatest length of individual in cm.' \
331 ' Size estimations placed into size category bins'
333 # old VARS data
334 observation_notes = get_association(self.annotation, 'observation notes')
335 if observation_notes:
336 if remark_string != NULL_VAL_STRING:
337 remark_string += f' | notes: {observation_notes["link_value"]}'
338 else:
339 remark_string = f'notes: {observation_notes["link_value"]}'
341 # old VARS data
342 habitat_comment = get_association(self.annotation, 'habitat-comment')
343 if habitat_comment:
344 if remark_string != NULL_VAL_STRING:
345 remark_string += f' | comment: {habitat_comment["link_value"]}'
346 else:
347 remark_string = f'comment: {habitat_comment["link_value"]}'
349 sampled_by = get_association(self.annotation, 'sampled-by')
350 if sampled_by and 'to_concept' in sampled_by.keys():
351 if remark_string != NULL_VAL_STRING:
352 remark_string += f' | sampled by {sampled_by["to_concept"]}'
353 else:
354 remark_string = f'sampled by {sampled_by["to_concept"]}'
355 sample_ref = get_association(self.annotation, 'sample-reference')
356 if sample_ref:
357 self.columns['TrackingID'] += f' | {sample_ref["link_value"]}'
359 self.columns['OccurrenceComments'] = remark_string
361 def set_cmecs_geo(self, cmecs_geo: str):
362 """
363 Sets the 'CMECSGeoForm' column to the value passed in the function call.
365 :param str cmecs_geo: The current habitat.
366 """
367 self.columns['CMECSGeoForm'] = cmecs_geo
369 def set_habitat(self, warning_messages):
370 """
371 Populates the 'Habitat' with information from the annotation object. Adds a warning message if one of the
372 habitats is missing or cannot be parsed.
374 :param list warning_messages: The list of warning messages to display at the end of the script.
375 """
376 secondary = []
377 s1 = get_association(self.annotation, 's1')
378 if s1:
379 primary = translate_substrate_code(s1['to_concept'])
380 if not primary:
381 primary = translate_substrate_code(s1['link_value']) # this is how the data is stored in old VARS
382 if not primary:
383 # flag warning
384 warning_messages.append([
385 self.columns['SampleID'],
386 self.annotation['concept'],
387 self.annotation['observation_uuid'],
388 f'{Color.RED}Missing s1 or could not parse substrate code:{Color.END} '
389 f'{Color.BOLD}to_concept: {s1["to_concept"]}, link_value: {s1["link_value"]}{Color.END}'
390 ])
391 else:
392 self.columns['Habitat'] = f'primarily: {primary}'
393 elif self.columns['ScientificName'] != NULL_VAL_STRING:
394 # flag warning
395 warning_messages.append([
396 self.columns['SampleID'],
397 self.annotation['concept'],
398 self.annotation['observation_uuid'],
399 f'{Color.RED}Missing s1{Color.END}'
400 ])
402 s2_records = get_associations_list(self.annotation, 's2')
403 if len(s2_records) != 0:
404 s2s_list = []
405 failures = []
406 for s2 in s2_records: # remove duplicates
407 if s2['to_concept'] == 'nil' or s2['to_concept'] == 'self':
408 # this is old VARS data, formatted as one record separated by semicolons
409 s2s_list = s2['link_value'].replace(',', ';').replace('; ', ';').replace(';;', ';')\
410 .replace(' ', ';').replace(':', ';').replace("'", ';').split(';')
411 elif s2['to_concept'] not in s2s_list:
412 s2s_list.append(s2['to_concept'])
413 s2s_list.sort(key=grain_size)
414 for s2 in s2s_list:
415 s2_temp = translate_substrate_code(s2)
416 if s2_temp:
417 secondary.append(s2_temp)
418 else:
419 failures.append(s2)
420 if len(secondary) != len(s2s_list):
421 warning_messages.append([
422 self.columns['SampleID'],
423 self.annotation['concept'],
424 self.annotation['observation_uuid'],
425 f'Could not parse s2 substrate codes {Color.BOLD}{failures}{Color.END}'
426 ])
427 self.columns['Habitat'] = self.columns['Habitat'] + f' / secondary: {"; ".join(secondary)}'
428 habitat_comment = get_association(self.annotation, 'habitat-comment')
429 if habitat_comment:
430 self.columns['Habitat'] = self.columns['Habitat'] + f' / comments: {habitat_comment["link_value"]}'
432 def set_upon(self):
433 """
434 Sets the 'Substrate' column if there is an 'upon' record in the annotation object.
435 """
436 upon = get_association(self.annotation, 'upon')
437 self.columns['UponIsCreature'] = False
438 if upon:
439 subs = translate_substrate_code(upon['to_concept'])
440 if subs:
441 self.columns['Substrate'] = subs
442 else:
443 # the item in 'upon' is not in the substrate list, so it must be upon another creature
444 self.columns['UponIsCreature'] = True
446 if upon['to_concept'] == 'orgsp':
447 self.columns['Substrate'] = 'Porifera'
448 notes = get_association(self.annotation, 'observation notes')
449 if notes and 'dead' in notes['link_value']:
450 self.columns['Substrate'] += ' (dead)'
451 if upon['to_concept'] == 'orgcr':
452 self.columns['Substrate'] = 'Crustacea'
453 comment = get_association(self.annotation, 'comment')
454 if comment:
455 comment = comment['link_value'].split(';')[0].split(' ')
456 if comment[0] == 'on':
457 self.columns['Substrate'] = comment[1]
458 else:
459 self.columns['Substrate'] = upon['to_concept']
461 def set_id_ref(self, warning_messages: list):
462 """
463 Sets the 'IdentityReference' column with the value pulled from the annotation object. ID reference is populated
464 when there are multiple annotations with the exact same animal.
465 """
466 identity_reference = get_association(self.annotation, 'identity-reference')
467 if identity_reference:
468 if identity_reference['link_value'] == '':
469 self.columns['IdentityReference'] = -1
470 warning_messages.append([
471 self.columns['SampleID'],
472 self.annotation['concept'],
473 self.annotation['observation_uuid'],
474 f'{Color.YELLOW}An identity-reference exists for this record, but it is empty{Color.END}'
475 ])
476 else:
477 self.columns['IdentityReference'] = int(identity_reference['link_value'])
478 else:
479 self.columns['IdentityReference'] = -1
481 def set_depth(self, warning_messages: list):
482 """
483 Sets depth based on data from annotation object. Adds a warning message if depth is missing.
485 :param list warning_messages: The list of warning messages to display at the end of the script.
486 """
487 if 'depth_meters' in self.annotation['ancillary_data'] \
488 and self.annotation['ancillary_data']['depth_meters'] != 0:
489 self.columns['DepthInMeters'] = round(self.annotation['ancillary_data']['depth_meters'], 3)
490 else:
491 self.columns['DepthInMeters'] = NULL_VAL_INT
492 warning_messages.append([
493 self.columns['SampleID'],
494 self.annotation['concept'],
495 self.annotation['observation_uuid'],
496 f'{Color.YELLOW}No depth data found for this record{Color.END}'
497 ])
498 self.columns['MinimumDepthInMeters'] = self.columns['DepthInMeters']
499 self.columns['MaximumDepthInMeters'] = self.columns['DepthInMeters']
501 def set_temperature(self, warning_messages: list):
502 """
503 Sets temperature based on data from annotation object. Adds a warning message if temperature is missing.
505 :param list warning_messages: The list of warning messages to display at the end of the script.
506 """
507 if 'temperature_celsius' in self.annotation['ancillary_data']:
508 self.columns['Temperature'] = round(self.annotation['ancillary_data']['temperature_celsius'], 4)
509 else:
510 self.columns['Temperature'] = NULL_VAL_INT
511 # flag warning
512 warning_messages.append([
513 self.columns['SampleID'],
514 self.annotation['concept'],
515 self.annotation['observation_uuid'],
516 'No temperature measurement included in this record'
517 ])
519 def set_salinity(self, warning_messages: list):
520 """
521 Sets salinity based on data from annotation object. Adds a warning message if salinity is missing.
523 :param list warning_messages: The list of warning messages to display at the end of the script.
524 """
525 if 'salinity' in self.annotation['ancillary_data']:
526 self.columns['Salinity'] = round(self.annotation['ancillary_data']['salinity'], 4)
527 else:
528 self.columns['Salinity'] = NULL_VAL_INT
529 # flag warning
530 warning_messages.append([
531 self.columns['SampleID'],
532 self.annotation['concept'],
533 self.annotation['observation_uuid'],
534 'No salinity measurement included in this record'
535 ])
537 def set_oxygen(self, warning_messages: list):
538 """
539 Populates the 'Oxygen' column with data from the annotation object. Adds a warning message if oxygen data is
540 missing.
542 :param list warning_messages: The list of warning messages to display at the end of the script.
543 """
544 if 'oxygen_ml_l' in self.annotation['ancillary_data']:
545 self.columns['Oxygen'] = round(self.annotation['ancillary_data']['oxygen_ml_l'], 4)
546 else:
547 self.columns['Oxygen'] = NULL_VAL_INT
548 # flag warning
549 warning_messages.append([
550 self.columns['SampleID'],
551 self.annotation['concept'],
552 self.annotation['observation_uuid'],
553 'No oxygen measurement included in this record'
554 ])
556 def set_image_paths(self, download_highlight_images: bool, output_file_path: str, warning_messages: list):
557 """
558 Populates the 'ImageFilePath' and 'HighlightImageFilePath' columns with information from the annotation object.
560 :param download_highlight_images: whether or not to download the highlight images and save to local machine.
561 :param output_file_path: where to save the images
562 :param list warning_messages: The list of warning messages to display at the end of the script.
563 """
564 image_paths = []
565 for image in self.annotation['image_references']:
566 image_paths.append(image['url'].replace(
567 'http://hurlstor.soest.hawaii.edu/imagearchive',
568 'https://hurlimage.soest.hawaii.edu')
569 )
570 if len(image_paths) == 1:
571 self.columns['ImageFilePath'] = image_paths[0]
572 elif len(image_paths) > 1:
573 if '.png' in image_paths[0]:
574 self.columns['ImageFilePath'] = image_paths[0]
575 else:
576 self.columns['ImageFilePath'] = image_paths[1]
578 # for old VARS :)
579 photo_references = get_association(self.annotation, 'photo-reference')
580 if photo_references:
581 photo_references = photo_references['link_value'].split(';')
582 photo_references[0] = photo_references[0].replace('http://max5kn1.soest.hawaii.edu/imagearchive/', '')
583 path = photo_references[0].split('/')
584 path.pop()
585 path = f'https://hurlimage.soest.hawaii.edu/{"/".join(path)}'
586 photo_references[0] = f'https://hurlimage.soest.hawaii.edu/{photo_references[0]}'
587 for i in range(1, len(photo_references)):
588 photo_references[i] = f'{path}/{photo_references[i]}'
589 for image_path in photo_references:
590 if self.columns['ImageFilePath'] != NULL_VAL_STRING:
591 self.columns['ImageFilePath'] += f' | {image_path}'
592 else:
593 self.columns['ImageFilePath'] = image_path
595 highlight_image = get_association(self.annotation, 'guide-photo')
596 if highlight_image and (highlight_image['to_concept'] == '1 best' or highlight_image['to_concept'] == '2 good'):
597 self.columns['HighlightImageFilePath'] = self.columns['ImageFilePath']
598 if self.columns['ImageFilePath'] == NULL_VAL_STRING:
599 warning_messages.append([
600 self.columns['SampleID'],
601 self.annotation['concept'],
602 self.annotation['observation_uuid'],
603 'guide-photo for this annotation has a to_concept of "1 best" or "2 good", but the annotation has no image references',
604 ])
605 elif download_highlight_images:
606 try:
607 res = requests.get(self.columns['ImageFilePath'])
608 if res.status_code == 200:
609 os.chdir(output_file_path)
610 with open(self.columns['ImageFilePath'].split('/')[-1], 'wb') as file:
611 file.write(res.content)
612 else:
613 warning_messages.append([
614 self.columns['SampleID'],
615 self.annotation['concept'],
616 self.annotation['observation_uuid'],
617 'Error downloading image',
618 ])
619 except requests.exceptions.ConnectionError:
620 warning_messages.append([
621 self.columns['SampleID'],
622 self.annotation['concept'],
623 self.annotation['observation_uuid'],
624 'Error downloading image',
625 ])
627 population_density = get_association(self.annotation, 'population-density')
628 if population_density and population_density['link_value'] == 'dense':
629 self.columns['HighlightImageFilePath'] = self.columns['ImageFilePath']
631 def set_bounding_box_uuid(self):
632 """
633 Sets the 'BoundingBoxID' column with the value pulled from the annotation object.
634 """
635 bounding_box = get_association(self.annotation, 'bounding box')
636 if bounding_box:
637 self.columns['BoundingBoxID'] = bounding_box['uuid']