Coverage for util/functions.py: 91%
159 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-12 17:57 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-12 17:57 +0000
1"""
2This file contains functions that are used throughout the formatting process and WoRMS check.
3"""
5from datetime import datetime, timedelta
6from typing import Dict, Optional
8from util.constants import *
9from util.terminal_output import Color
12def get_association(annotation: Dict, link_name: str) -> dict:
13 """
14 Obtains an association value from the annotation data structure.
16 :param Dict annotation: The complete annotation dictionary.
17 :param str link_name: The specific key we want to get the value for.
18 :return dict: The matching value dict.
19 """
20 for association in annotation['associations']:
21 if association['link_name'] == link_name:
22 return association
23 return {}
26def get_associations_list(annotation: Dict, link_name: str) -> list:
27 """
28 Obtains a list of association values from the annotation data structure (for when there is more than one
29 association).
31 :param Dict annotation: The complete annotation dictionary.
32 :param str link_name: The specific key we want to get the value for.
33 :return list: A list of the matching value dicts.
34 """
35 association_matches = []
36 for association in annotation['associations']:
37 if association['link_name'] == link_name:
38 association_matches.append(association)
39 return association_matches
42def grain_size(sub: list) -> int:
43 """
44 Gets the relative grain size of a substrate concept.
46 :param list sub: The substrate.
47 :return int: The position of the substrate in ROOTS.
48 """
49 for i in range(len(ROOTS)):
50 if ROOTS[i] in sub:
51 return i
52 return len(ROOTS)
55def get_date_and_time(record: Dict) -> datetime:
56 """
57 Returns a datetime timestamp from a completed annotation record.
59 :param Dict record: The annotation record after it has been converted from an AnnotationRow to a list.
60 :return datetime: A datetime object of the observation date/time.
61 """
62 return datetime.strptime(record[OBSERVATION_DATE] + record[OBSERVATION_TIME], '%Y-%m-%d%H:%M:%S')
65def parse_datetime(timestamp: str) -> datetime:
66 """
67 Returns a datetime object given a timestamp string.
69 :param str timestamp: The timestamp to parse.
70 :return datetime: The timestamp parsed as a datetime object.
71 """
72 if '.' in timestamp:
73 return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')
74 return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
77def extract_time(json_object: Dict) -> Optional[datetime]:
78 """
79 Used to sort json objects by timestamp, given the json object.
81 :param Dict json_object: A json object with the time we want to sort by.
82 :return datetime: A datetime object of the timestamp from the json object.
83 """
84 if not json_object:
85 return None
86 if '.' in json_object['recorded_timestamp']:
87 timestamp = datetime.strptime(json_object['recorded_timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ')
88 if timestamp.microsecond >= 500000:
89 return timestamp.replace(microsecond=0) + timedelta(seconds=1)
90 return timestamp.replace(microsecond=0)
91 return datetime.strptime(json_object['recorded_timestamp'], '%Y-%m-%dT%H:%M:%SZ')
94def extract_uuid(json_object: Dict) -> str:
95 """
96 Used for sorting annotations by UUID (for testing).
98 :param Dict json_object: A json object with the UUID we want to sort by.
99 :return str: The UUID.
100 """
101 return json_object['observation_uuid']
104def add_meters(accuracy: str) -> str:
105 """
106 Takes input and appends an 'm' to the end, if one is not there already.
108 :param str accuracy: The accuracy string, e.g. '50m' or '50'.
109 :return str: The string with an 'm' on the end.
110 """
111 if accuracy[-1:] != 'm':
112 accuracy = accuracy + 'm'
113 return accuracy
116def convert_username_to_name(vars_username: str) -> str:
117 """
118 Converts format of VARS username: [FirstnameLastname] -> [Lastname, FirstName]
119 Assumes VARS usernames are formatted 'FirstnameLastname'
120 Some exceptions added for old VARS usernames
122 :param str vars_username: VARS username, e.g. 'SarahBingo'.
123 :return str: The converted name string, e.g. 'Bingo, Sarah'.
124 """
125 if vars_username == 'christopherkelley':
126 return 'Kelly, Christopher'
127 if vars_username == 'janeculp':
128 return 'Culp, Jane'
129 for i in range(1, len(vars_username)):
130 if vars_username[i].isupper():
131 return vars_username[i:] + ', ' + vars_username[0:i]
132 return vars_username
135def translate_substrate_code(code: str) -> str:
136 """
137 Translates substrate codes into human language.
139 :param str code: The VARS code of the substrate, e.g. 'peb'.
140 :return str: The translated code, e.g. 'pebble'.
141 """
142 code = code.strip()
143 if code in SAMES:
144 return code
145 if code == 'hp': # condition for old VARS, where hp was not only a suffix
146 return 'hydrothermal precipitate'
147 substrate_word_list = []
148 r = ''
149 man_or_forms = []
150 for root in ROOTS:
151 if root in code:
152 substrate_word_list.append(SUB_CONCEPTS[root])
153 r = SUB_CONCEPTS[root]
154 code = code.replace(root, '')
155 if code == '':
156 if r == 'man-made':
157 return 'man-made object'
158 else:
159 return r
160 break
161 for affix in ALL_AFFIXES:
162 if affix in code:
163 if affix == 'pi':
164 if r == 'bedrock' or r == 'block':
165 substrate_word_list.insert(0, SUB_CONCEPTS[affix][0])
166 else:
167 substrate_word_list.append(SUB_CONCEPTS[affix][1])
168 elif affix in SUFFIXES and r in substrate_word_list:
169 substrate_word_list.insert(substrate_word_list.index(r) + 1, SUB_CONCEPTS[affix])
170 elif affix in SUFFIXES_FORMS or affix in SUFFIXES_MAN:
171 substrate_word_list.append(SUB_CONCEPTS[affix])
172 man_or_forms.append(affix)
173 elif affix in SUFFIXES_DEAD:
174 substrate_word_list.append(SUB_CONCEPTS[affix])
175 elif affix in PREFIXES and r in substrate_word_list:
176 substrate_word_list.insert(substrate_word_list.index(r), SUB_CONCEPTS[affix])
177 code = code.replace(affix, '')
178 if code == '':
179 if len(man_or_forms) >= 2:
180 substrate_word_list.insert(-1, 'and')
181 subs = ' '.join(substrate_word_list)
182 if subs[:4] == 'dead':
183 subs = f'{subs[5:]} (dead)'
184 return subs
185 return ''
188def collapse_id_records(report_records: list) -> int:
189 """
190 Collapses records with the same identity-reference. Returns number of records collapsed.
192 :param list report_records: A list of annotation rows (i.e., a list of every annotation in a dive).
193 :return int: The number of records collapsed.
194 """
195 identity_references = {}
196 dupes_removed = 0
197 num_records = len(report_records)
198 i = 0
199 while i < num_records:
200 id_ref = report_records[i][IDENTITY_REF]
201 if id_ref != -1:
202 if id_ref not in identity_references:
203 # add a new key to identity_references with the current annotation as the value
204 identity_references[id_ref] = report_records[i]
205 else:
206 # collapse the values in the current annotation into the annotation in identity_references
207 for j in [ID_COMMENTS, HABITAT, SUBSTRATE, INDV_COUNT, VERBATIM_SIZE, OCCURRENCE_COMMENTS,
208 CMECS_GEO_FORM]:
209 if identity_references[id_ref][j] == NULL_VAL_STRING and report_records[i][j] != NULL_VAL_STRING:
210 identity_references[id_ref][j] = report_records[i][j]
211 for j in [MIN_SIZE, MAX_SIZE]:
212 if identity_references[id_ref][j] == NULL_VAL_INT and report_records[i][j] != NULL_VAL_INT:
213 identity_references[id_ref][j] = report_records[i][j]
214 for j in [IMAGE_PATH, HIGHLIGHT_IMAGE, BOUNDING_BOX_ID]:
215 if report_records[i][j] != NULL_VAL_STRING:
216 if identity_references[id_ref][j] != NULL_VAL_STRING and \
217 report_records[i][j] not in identity_references[id_ref][j]:
218 identity_references[id_ref][j] += f' | {report_records[i][j]}'
219 else:
220 identity_references[id_ref][j] = report_records[i][j]
221 if int(identity_references[id_ref][INDV_COUNT]) < int(report_records[i][INDV_COUNT]):
222 identity_references[id_ref][INDV_COUNT] = report_records[i][INDV_COUNT]
223 del report_records[i] # remove the duplicate record
224 i -= 1 # to account for the record that was just deleted
225 num_records -= 1 # ^
226 dupes_removed += 1
227 i += 1
229 return dupes_removed
232def find_associated_taxa(report_records: list, concepts: Dict, warning_messages: list):
233 """
234 Fills in the AssociatedTaxa fields: retrieves records from the output table that have another VARS concept listed
235 as a substrate.
237 :param list report_records: A list of annotation rows (i.e., a list of every annotation in a dive).
238 :param Dict concepts: Dictionary of all locally saved concepts.
239 :param list warning_messages: The list of warning messages to display at the end of the script.
240 """
241 for i in range(len(report_records)):
242 associate_record = report_records[i]
243 if associate_record[UPON_IS_CREATURE]:
244 # the associate's 'upon' is indeed a creature
245 host_concept_name = associate_record[SUBSTRATE] # VARS name for host
246 if host_concept_name in concepts:
247 # host concept is in local concepts file
248 observation_time = get_date_and_time(associate_record) # timestamp at which the associate was recorded
249 found = False
250 for j in range(i + 10, -1, -1):
251 """
252 Checks backward, looking for the most recent host w/ matching name. We start at i + 10 because
253 there can be multiple records with the exact same timestamp, and one of those records could be
254 the 'upon'
255 """
256 # to catch index out of range exception
257 while j >= len(report_records):
258 j -= 1
259 host_record = report_records[j]
260 host_time = get_date_and_time(host_record)
261 if i == j or host_time > observation_time:
262 # host record won't be recorded after associate record, so ignore this record
263 # i == j: record shouldn't be associated with itself, ignore
264 pass
265 elif host_record[SAMPLE_ID][:-9] != associate_record[SAMPLE_ID][:-9]:
266 # dive names don't match, stop the search
267 break
268 else:
269 if host_record[VARS_CONCEPT_NAME] == host_concept_name:
270 # the host record's name is equal to the host concept name (associate's 'upon' name)
271 if host_record[ASSOCIATED_TAXA] == NULL_VAL_STRING:
272 # if the host's 'associated taxa' field is blank, add the associate's concept name
273 host_record[ASSOCIATED_TAXA] = associate_record[COMBINED_NAME_ID]
274 elif associate_record[COMBINED_NAME_ID] not in host_record[ASSOCIATED_TAXA]:
275 # otherwise, append the concept name if it's not already there
276 host_record[ASSOCIATED_TAXA] += f' | {associate_record[COMBINED_NAME_ID]}'
277 if host_record[OCCURRENCE_COMMENTS] == NULL_VAL_STRING:
278 # add touch to occurrence comments
279 host_record[OCCURRENCE_COMMENTS] = 'associate touching host'
280 elif 'associate touching host' not in host_record[OCCURRENCE_COMMENTS]:
281 host_record[OCCURRENCE_COMMENTS] += ' | associate touching host'
282 time_diff = observation_time - host_time
283 if time_diff.seconds > 300:
284 # flag warning
285 warning_messages.append([
286 associate_record[SAMPLE_ID],
287 associate_record[VARS_CONCEPT_NAME],
288 associate_record[TRACKING_ID],
289 f'{Color.RED}Time between record and upon record greater than 5 minutes {Color.END}'
290 f'({time_diff.seconds} seconds)'
291 ])
292 elif time_diff.seconds > 60:
293 # flag for review
294 warning_messages.append([
295 associate_record[SAMPLE_ID],
296 associate_record[VARS_CONCEPT_NAME],
297 associate_record[TRACKING_ID],
298 f'{Color.YELLOW}Time between record and upon record greater than 1 minute {Color.END}'
299 f'({time_diff.seconds} seconds)'
300 ])
301 found = True
302 break
303 if not found:
304 # flag error
305 warning_messages.append([
306 associate_record[SAMPLE_ID],
307 associate_record[VARS_CONCEPT_NAME],
308 associate_record[TRACKING_ID],
309 f'{Color.RED}Upon not found in previous records{Color.END}'
310 ])
311 else:
312 # flag error
313 warning_messages.append([
314 associate_record[SAMPLE_ID],
315 associate_record[VARS_CONCEPT_NAME],
316 associate_record[TRACKING_ID],
317 f'{Color.RED}"{associate_record[SUBSTRATE]}" is host for this record, but that concept name '
318 f'was not found in concepts.{Color.END}'
319 ])