Coverage for util / functions.py: 91%
161 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 01:11 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 01:11 +0000
1"""
2This file contains functions that are used throughout the formatting process and WoRMS check.
3"""
5from datetime import datetime, timedelta
6from typing import Dict, Optional
8from util.constants import *
9from util.terminal_output import Color
12def get_association(annotation: Dict, link_name: str) -> dict:
13 """
14 Obtains an association value from the annotation data structure.
16 :param Dict annotation: The complete annotation dictionary.
17 :param str link_name: The specific key we want to get the value for.
18 :return dict: The matching value dict.
19 """
20 for association in annotation['associations']:
21 if association['link_name'] == link_name:
22 return association
23 return {}
26def get_associations_list(annotation: Dict, link_name: str) -> list:
27 """
28 Obtains a list of association values from the annotation data structure (for when there is more than one
29 association).
31 :param Dict annotation: The complete annotation dictionary.
32 :param str link_name: The specific key we want to get the value for.
33 :return list: A list of the matching value dicts.
34 """
35 association_matches = []
36 for association in annotation['associations']:
37 if association['link_name'] == link_name:
38 association_matches.append(association)
39 return association_matches
42def grain_size(sub: list) -> int:
43 """
44 Gets the relative grain size of a substrate concept.
46 :param list sub: The substrate.
47 :return int: The position of the substrate in ROOTS.
48 """
49 for i in range(len(ROOTS)):
50 if ROOTS[i] in sub:
51 return i
52 return len(ROOTS)
55def get_date_and_time(record: Dict) -> datetime:
56 """
57 Returns a datetime timestamp from a completed annotation record.
59 :param Dict record: The annotation record after it has been converted from an AnnotationRow to a list.
60 :return datetime: A datetime object of the observation date/time.
61 """
62 return datetime.strptime(record[OBSERVATION_DATE] + record[OBSERVATION_TIME], '%Y-%m-%d%H:%M:%S')
65def parse_datetime(timestamp: str) -> datetime:
66 """
67 Returns a datetime object given a timestamp string.
69 :param str timestamp: The timestamp to parse.
70 :return datetime: The timestamp parsed as a datetime object.
71 """
72 if '.' in timestamp:
73 return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')
74 return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
77def extract_time(json_object: Dict) -> Optional[datetime]:
78 """
79 Used to sort json objects by timestamp, given the json object.
81 :param Dict json_object: A json object with the time we want to sort by.
82 :return datetime: A datetime object of the timestamp from the json object.
83 """
84 if not json_object:
85 return None
86 if '.' in json_object['recorded_timestamp']:
87 timestamp = datetime.strptime(json_object['recorded_timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ')
88 if timestamp.microsecond >= 500000:
89 return timestamp.replace(microsecond=0) + timedelta(seconds=1)
90 return timestamp.replace(microsecond=0)
91 return datetime.strptime(json_object['recorded_timestamp'], '%Y-%m-%dT%H:%M:%SZ')
94def extract_uuid(json_object: Dict) -> str:
95 """
96 Used for sorting annotations by UUID (for testing).
98 :param Dict json_object: A json object with the UUID we want to sort by.
99 :return str: The UUID.
100 """
101 return json_object['observation_uuid']
104def add_meters(accuracy: str) -> str:
105 """
106 Takes input and appends an 'm' to the end, if one is not there already.
108 :param str accuracy: The accuracy string, e.g. '50m' or '50'.
109 :return str: The string with an 'm' on the end.
110 """
111 if accuracy[-1:] != 'm':
112 accuracy = accuracy + 'm'
113 return accuracy
116def convert_username_to_name(vars_username: str) -> str:
117 """
118 Converts format of VARS username: [FirstnameLastname] -> [Lastname, FirstName]
119 Assumes VARS usernames are formatted 'FirstnameLastname'
120 Some exceptions added for old VARS usernames
122 :param str vars_username: VARS username, e.g. 'SarahBingo'.
123 :return str: The converted name string, e.g. 'Bingo, Sarah'.
124 """
125 if vars_username == 'christopherkelley':
126 return 'Kelly, Christopher'
127 if vars_username == 'janeculp':
128 return 'Culp, Jane'
129 if vars_username == 'hcarlson':
130 return 'Carlson, Harold'
131 for i in range(1, len(vars_username)):
132 if vars_username[i].isupper():
133 return vars_username[i:] + ', ' + vars_username[0:i]
134 return vars_username
137def translate_substrate_code(code: str) -> str:
138 """
139 Translates substrate codes into human language.
141 :param str code: The VARS code of the substrate, e.g. 'peb'.
142 :return str: The translated code, e.g. 'pebble'.
143 """
144 code = code.strip()
145 if code in SAMES:
146 return code
147 if code == 'hp': # condition for old VARS, where hp was not only a suffix
148 return 'hydrothermal precipitate'
149 substrate_word_list = []
150 r = ''
151 man_or_forms = []
152 for root in ROOTS:
153 if root in code:
154 substrate_word_list.append(SUB_CONCEPTS[root])
155 r = SUB_CONCEPTS[root]
156 code = code.replace(root, '')
157 if code == '':
158 if r == 'man-made':
159 return 'man-made object'
160 else:
161 return r
162 break
163 for affix in ALL_AFFIXES:
164 if affix in code:
165 if affix == 'pi':
166 if r == 'bedrock' or r == 'block':
167 substrate_word_list.insert(0, SUB_CONCEPTS[affix][0])
168 else:
169 substrate_word_list.append(SUB_CONCEPTS[affix][1])
170 elif affix in SUFFIXES and r in substrate_word_list:
171 substrate_word_list.insert(substrate_word_list.index(r) + 1, SUB_CONCEPTS[affix])
172 elif affix in SUFFIXES_FORMS or affix in SUFFIXES_MAN:
173 substrate_word_list.append(SUB_CONCEPTS[affix])
174 man_or_forms.append(affix)
175 elif affix in SUFFIXES_DEAD:
176 substrate_word_list.append(SUB_CONCEPTS[affix])
177 elif affix in PREFIXES and r in substrate_word_list:
178 substrate_word_list.insert(substrate_word_list.index(r), SUB_CONCEPTS[affix])
179 code = code.replace(affix, '')
180 if code == '':
181 if len(man_or_forms) >= 2:
182 substrate_word_list.insert(-1, 'and')
183 subs = ' '.join(substrate_word_list)
184 if subs[:4] == 'dead':
185 subs = f'{subs[5:]} (dead)'
186 return subs
187 return ''
190def collapse_id_records(report_records: list) -> int:
191 """
192 Collapses records with the same identity-reference. Returns number of records collapsed.
194 :param list report_records: A list of annotation rows (i.e., a list of every annotation in a dive).
195 :return int: The number of records collapsed.
196 """
197 identity_references = {}
198 dupes_removed = 0
199 num_records = len(report_records)
200 i = 0
201 while i < num_records:
202 id_ref = report_records[i][IDENTITY_REF]
203 if id_ref != -1:
204 if id_ref not in identity_references:
205 # add a new key to identity_references with the current annotation as the value
206 identity_references[id_ref] = report_records[i]
207 else:
208 # collapse the values in the current annotation into the annotation in identity_references
209 for j in [ID_COMMENTS, HABITAT, SUBSTRATE, INDV_COUNT, VERBATIM_SIZE, OCCURRENCE_COMMENTS,
210 CMECS_GEO_FORM]:
211 if identity_references[id_ref][j] == NULL_VAL_STRING and report_records[i][j] != NULL_VAL_STRING:
212 identity_references[id_ref][j] = report_records[i][j]
213 for j in [MIN_SIZE, MAX_SIZE]:
214 if identity_references[id_ref][j] == NULL_VAL_INT and report_records[i][j] != NULL_VAL_INT:
215 identity_references[id_ref][j] = report_records[i][j]
216 for j in [IMAGE_PATH, HIGHLIGHT_IMAGE, BOUNDING_BOX_ID]:
217 if report_records[i][j] != NULL_VAL_STRING:
218 if identity_references[id_ref][j] != NULL_VAL_STRING and \
219 report_records[i][j] not in identity_references[id_ref][j]:
220 identity_references[id_ref][j] += f' | {report_records[i][j]}'
221 else:
222 identity_references[id_ref][j] = report_records[i][j]
223 if int(identity_references[id_ref][INDV_COUNT]) < int(report_records[i][INDV_COUNT]):
224 identity_references[id_ref][INDV_COUNT] = report_records[i][INDV_COUNT]
225 del report_records[i] # remove the duplicate record
226 i -= 1 # to account for the record that was just deleted
227 num_records -= 1 # ^
228 dupes_removed += 1
229 i += 1
231 return dupes_removed
234def find_associated_taxa(report_records: list, concepts: Dict, warning_messages: list):
235 """
236 Fills in the AssociatedTaxa fields: retrieves records from the output table that have another VARS concept listed
237 as a substrate.
239 :param list report_records: A list of annotation rows (i.e., a list of every annotation in a dive).
240 :param Dict concepts: Dictionary of all locally saved concepts.
241 :param list warning_messages: The list of warning messages to display at the end of the script.
242 """
243 for i in range(len(report_records)):
244 associate_record = report_records[i]
245 if associate_record[UPON_IS_CREATURE]:
246 # the associate's 'upon' is indeed a creature
247 host_concept_name = associate_record[SUBSTRATE] # VARS name for host
248 if host_concept_name in concepts:
249 # host concept is in local concepts file
250 observation_time = get_date_and_time(associate_record) # timestamp at which the associate was recorded
251 found = False
252 for j in range(i + 10, -1, -1):
253 """
254 Checks backward, looking for the most recent host w/ matching name. We start at i + 10 because
255 there can be multiple records with the exact same timestamp, and one of those records could be
256 the 'upon'
257 """
258 # to catch index out of range exception
259 while j >= len(report_records):
260 j -= 1
261 host_record = report_records[j]
262 host_time = get_date_and_time(host_record)
263 if i == j or host_time > observation_time:
264 # host record won't be recorded after associate record, so ignore this record
265 # i == j: record shouldn't be associated with itself, ignore
266 pass
267 elif host_record[SAMPLE_ID][:-9] != associate_record[SAMPLE_ID][:-9]:
268 # dive names don't match, stop the search
269 break
270 else:
271 if host_record[VARS_CONCEPT_NAME] == host_concept_name:
272 # the host record's name is equal to the host concept name (associate's 'upon' name)
273 if host_record[ASSOCIATED_TAXA] == NULL_VAL_STRING:
274 # if the host's 'associated taxa' field is blank, add the associate's concept name
275 host_record[ASSOCIATED_TAXA] = associate_record[COMBINED_NAME_ID]
276 elif associate_record[COMBINED_NAME_ID] not in host_record[ASSOCIATED_TAXA]:
277 # otherwise, append the concept name if it's not already there
278 host_record[ASSOCIATED_TAXA] += f' | {associate_record[COMBINED_NAME_ID]}'
279 if host_record[OCCURRENCE_COMMENTS] == NULL_VAL_STRING:
280 # add touch to occurrence comments
281 host_record[OCCURRENCE_COMMENTS] = 'associate touching host'
282 elif 'associate touching host' not in host_record[OCCURRENCE_COMMENTS]:
283 host_record[OCCURRENCE_COMMENTS] += ' | associate touching host'
284 time_diff = observation_time - host_time
285 if time_diff.seconds > 300:
286 # flag warning
287 warning_messages.append([
288 associate_record[SAMPLE_ID],
289 associate_record[VARS_CONCEPT_NAME],
290 associate_record[TRACKING_ID],
291 f'{Color.RED}Time between record and upon record greater than 5 minutes {Color.END}'
292 f'({time_diff.seconds} seconds)'
293 ])
294 elif time_diff.seconds > 60:
295 # flag for review
296 warning_messages.append([
297 associate_record[SAMPLE_ID],
298 associate_record[VARS_CONCEPT_NAME],
299 associate_record[TRACKING_ID],
300 f'{Color.YELLOW}Time between record and upon record greater than 1 minute {Color.END}'
301 f'({time_diff.seconds} seconds)'
302 ])
303 found = True
304 break
305 if not found:
306 # flag error
307 warning_messages.append([
308 associate_record[SAMPLE_ID],
309 associate_record[VARS_CONCEPT_NAME],
310 associate_record[TRACKING_ID],
311 f'{Color.RED}Upon not found in previous records{Color.END}'
312 ])
313 else:
314 # flag error
315 warning_messages.append([
316 associate_record[SAMPLE_ID],
317 associate_record[VARS_CONCEPT_NAME],
318 associate_record[TRACKING_ID],
319 f'{Color.RED}"{associate_record[SUBSTRATE]}" is host for this record, but that concept name '
320 f'was not found in concepts.{Color.END}'
321 ])