Coverage for util / functions.py: 91%

161 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-12 01:11 +0000

1""" 

2This file contains functions that are used throughout the formatting process and WoRMS check. 

3""" 

4 

5from datetime import datetime, timedelta 

6from typing import Dict, Optional 

7 

8from util.constants import * 

9from util.terminal_output import Color 

10 

11 

12def get_association(annotation: Dict, link_name: str) -> dict: 

13 """ 

14 Obtains an association value from the annotation data structure. 

15 

16 :param Dict annotation: The complete annotation dictionary. 

17 :param str link_name: The specific key we want to get the value for. 

18 :return dict: The matching value dict. 

19 """ 

20 for association in annotation['associations']: 

21 if association['link_name'] == link_name: 

22 return association 

23 return {} 

24 

25 

26def get_associations_list(annotation: Dict, link_name: str) -> list: 

27 """ 

28 Obtains a list of association values from the annotation data structure (for when there is more than one 

29 association). 

30 

31 :param Dict annotation: The complete annotation dictionary. 

32 :param str link_name: The specific key we want to get the value for. 

33 :return list: A list of the matching value dicts. 

34 """ 

35 association_matches = [] 

36 for association in annotation['associations']: 

37 if association['link_name'] == link_name: 

38 association_matches.append(association) 

39 return association_matches 

40 

41 

42def grain_size(sub: list) -> int: 

43 """ 

44 Gets the relative grain size of a substrate concept. 

45 

46 :param list sub: The substrate. 

47 :return int: The position of the substrate in ROOTS. 

48 """ 

49 for i in range(len(ROOTS)): 

50 if ROOTS[i] in sub: 

51 return i 

52 return len(ROOTS) 

53 

54 

55def get_date_and_time(record: Dict) -> datetime: 

56 """ 

57 Returns a datetime timestamp from a completed annotation record. 

58 

59 :param Dict record: The annotation record after it has been converted from an AnnotationRow to a list. 

60 :return datetime: A datetime object of the observation date/time. 

61 """ 

62 return datetime.strptime(record[OBSERVATION_DATE] + record[OBSERVATION_TIME], '%Y-%m-%d%H:%M:%S') 

63 

64 

65def parse_datetime(timestamp: str) -> datetime: 

66 """ 

67 Returns a datetime object given a timestamp string. 

68 

69 :param str timestamp: The timestamp to parse. 

70 :return datetime: The timestamp parsed as a datetime object. 

71 """ 

72 if '.' in timestamp: 

73 return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') 

74 return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') 

75 

76 

77def extract_time(json_object: Dict) -> Optional[datetime]: 

78 """ 

79 Used to sort json objects by timestamp, given the json object. 

80 

81 :param Dict json_object: A json object with the time we want to sort by. 

82 :return datetime: A datetime object of the timestamp from the json object. 

83 """ 

84 if not json_object: 

85 return None 

86 if '.' in json_object['recorded_timestamp']: 

87 timestamp = datetime.strptime(json_object['recorded_timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ') 

88 if timestamp.microsecond >= 500000: 

89 return timestamp.replace(microsecond=0) + timedelta(seconds=1) 

90 return timestamp.replace(microsecond=0) 

91 return datetime.strptime(json_object['recorded_timestamp'], '%Y-%m-%dT%H:%M:%SZ') 

92 

93 

94def extract_uuid(json_object: Dict) -> str: 

95 """ 

96 Used for sorting annotations by UUID (for testing). 

97 

98 :param Dict json_object: A json object with the UUID we want to sort by. 

99 :return str: The UUID. 

100 """ 

101 return json_object['observation_uuid'] 

102 

103 

104def add_meters(accuracy: str) -> str: 

105 """ 

106 Takes input and appends an 'm' to the end, if one is not there already. 

107 

108 :param str accuracy: The accuracy string, e.g. '50m' or '50'. 

109 :return str: The string with an 'm' on the end. 

110 """ 

111 if accuracy[-1:] != 'm': 

112 accuracy = accuracy + 'm' 

113 return accuracy 

114 

115 

116def convert_username_to_name(vars_username: str) -> str: 

117 """ 

118 Converts format of VARS username: [FirstnameLastname] -> [Lastname, FirstName] 

119 Assumes VARS usernames are formatted 'FirstnameLastname' 

120 Some exceptions added for old VARS usernames 

121 

122 :param str vars_username: VARS username, e.g. 'SarahBingo'. 

123 :return str: The converted name string, e.g. 'Bingo, Sarah'. 

124 """ 

125 if vars_username == 'christopherkelley': 

126 return 'Kelly, Christopher' 

127 if vars_username == 'janeculp': 

128 return 'Culp, Jane' 

129 if vars_username == 'hcarlson': 

130 return 'Carlson, Harold' 

131 for i in range(1, len(vars_username)): 

132 if vars_username[i].isupper(): 

133 return vars_username[i:] + ', ' + vars_username[0:i] 

134 return vars_username 

135 

136 

137def translate_substrate_code(code: str) -> str: 

138 """ 

139 Translates substrate codes into human language. 

140 

141 :param str code: The VARS code of the substrate, e.g. 'peb'. 

142 :return str: The translated code, e.g. 'pebble'. 

143 """ 

144 code = code.strip() 

145 if code in SAMES: 

146 return code 

147 if code == 'hp': # condition for old VARS, where hp was not only a suffix 

148 return 'hydrothermal precipitate' 

149 substrate_word_list = [] 

150 r = '' 

151 man_or_forms = [] 

152 for root in ROOTS: 

153 if root in code: 

154 substrate_word_list.append(SUB_CONCEPTS[root]) 

155 r = SUB_CONCEPTS[root] 

156 code = code.replace(root, '') 

157 if code == '': 

158 if r == 'man-made': 

159 return 'man-made object' 

160 else: 

161 return r 

162 break 

163 for affix in ALL_AFFIXES: 

164 if affix in code: 

165 if affix == 'pi': 

166 if r == 'bedrock' or r == 'block': 

167 substrate_word_list.insert(0, SUB_CONCEPTS[affix][0]) 

168 else: 

169 substrate_word_list.append(SUB_CONCEPTS[affix][1]) 

170 elif affix in SUFFIXES and r in substrate_word_list: 

171 substrate_word_list.insert(substrate_word_list.index(r) + 1, SUB_CONCEPTS[affix]) 

172 elif affix in SUFFIXES_FORMS or affix in SUFFIXES_MAN: 

173 substrate_word_list.append(SUB_CONCEPTS[affix]) 

174 man_or_forms.append(affix) 

175 elif affix in SUFFIXES_DEAD: 

176 substrate_word_list.append(SUB_CONCEPTS[affix]) 

177 elif affix in PREFIXES and r in substrate_word_list: 

178 substrate_word_list.insert(substrate_word_list.index(r), SUB_CONCEPTS[affix]) 

179 code = code.replace(affix, '') 

180 if code == '': 

181 if len(man_or_forms) >= 2: 

182 substrate_word_list.insert(-1, 'and') 

183 subs = ' '.join(substrate_word_list) 

184 if subs[:4] == 'dead': 

185 subs = f'{subs[5:]} (dead)' 

186 return subs 

187 return '' 

188 

189 

190def collapse_id_records(report_records: list) -> int: 

191 """ 

192 Collapses records with the same identity-reference. Returns number of records collapsed. 

193 

194 :param list report_records: A list of annotation rows (i.e., a list of every annotation in a dive). 

195 :return int: The number of records collapsed. 

196 """ 

197 identity_references = {} 

198 dupes_removed = 0 

199 num_records = len(report_records) 

200 i = 0 

201 while i < num_records: 

202 id_ref = report_records[i][IDENTITY_REF] 

203 if id_ref != -1: 

204 if id_ref not in identity_references: 

205 # add a new key to identity_references with the current annotation as the value 

206 identity_references[id_ref] = report_records[i] 

207 else: 

208 # collapse the values in the current annotation into the annotation in identity_references 

209 for j in [ID_COMMENTS, HABITAT, SUBSTRATE, INDV_COUNT, VERBATIM_SIZE, OCCURRENCE_COMMENTS, 

210 CMECS_GEO_FORM]: 

211 if identity_references[id_ref][j] == NULL_VAL_STRING and report_records[i][j] != NULL_VAL_STRING: 

212 identity_references[id_ref][j] = report_records[i][j] 

213 for j in [MIN_SIZE, MAX_SIZE]: 

214 if identity_references[id_ref][j] == NULL_VAL_INT and report_records[i][j] != NULL_VAL_INT: 

215 identity_references[id_ref][j] = report_records[i][j] 

216 for j in [IMAGE_PATH, HIGHLIGHT_IMAGE, BOUNDING_BOX_ID]: 

217 if report_records[i][j] != NULL_VAL_STRING: 

218 if identity_references[id_ref][j] != NULL_VAL_STRING and \ 

219 report_records[i][j] not in identity_references[id_ref][j]: 

220 identity_references[id_ref][j] += f' | {report_records[i][j]}' 

221 else: 

222 identity_references[id_ref][j] = report_records[i][j] 

223 if int(identity_references[id_ref][INDV_COUNT]) < int(report_records[i][INDV_COUNT]): 

224 identity_references[id_ref][INDV_COUNT] = report_records[i][INDV_COUNT] 

225 del report_records[i] # remove the duplicate record 

226 i -= 1 # to account for the record that was just deleted 

227 num_records -= 1 # ^ 

228 dupes_removed += 1 

229 i += 1 

230 

231 return dupes_removed 

232 

233 

234def find_associated_taxa(report_records: list, concepts: Dict, warning_messages: list): 

235 """ 

236 Fills in the AssociatedTaxa fields: retrieves records from the output table that have another VARS concept listed 

237 as a substrate. 

238 

239 :param list report_records: A list of annotation rows (i.e., a list of every annotation in a dive). 

240 :param Dict concepts: Dictionary of all locally saved concepts. 

241 :param list warning_messages: The list of warning messages to display at the end of the script. 

242 """ 

243 for i in range(len(report_records)): 

244 associate_record = report_records[i] 

245 if associate_record[UPON_IS_CREATURE]: 

246 # the associate's 'upon' is indeed a creature 

247 host_concept_name = associate_record[SUBSTRATE] # VARS name for host 

248 if host_concept_name in concepts: 

249 # host concept is in local concepts file 

250 observation_time = get_date_and_time(associate_record) # timestamp at which the associate was recorded 

251 found = False 

252 for j in range(i + 10, -1, -1): 

253 """ 

254 Checks backward, looking for the most recent host w/ matching name. We start at i + 10 because 

255 there can be multiple records with the exact same timestamp, and one of those records could be 

256 the 'upon' 

257 """ 

258 # to catch index out of range exception 

259 while j >= len(report_records): 

260 j -= 1 

261 host_record = report_records[j] 

262 host_time = get_date_and_time(host_record) 

263 if i == j or host_time > observation_time: 

264 # host record won't be recorded after associate record, so ignore this record 

265 # i == j: record shouldn't be associated with itself, ignore 

266 pass 

267 elif host_record[SAMPLE_ID][:-9] != associate_record[SAMPLE_ID][:-9]: 

268 # dive names don't match, stop the search 

269 break 

270 else: 

271 if host_record[VARS_CONCEPT_NAME] == host_concept_name: 

272 # the host record's name is equal to the host concept name (associate's 'upon' name) 

273 if host_record[ASSOCIATED_TAXA] == NULL_VAL_STRING: 

274 # if the host's 'associated taxa' field is blank, add the associate's concept name 

275 host_record[ASSOCIATED_TAXA] = associate_record[COMBINED_NAME_ID] 

276 elif associate_record[COMBINED_NAME_ID] not in host_record[ASSOCIATED_TAXA]: 

277 # otherwise, append the concept name if it's not already there 

278 host_record[ASSOCIATED_TAXA] += f' | {associate_record[COMBINED_NAME_ID]}' 

279 if host_record[OCCURRENCE_COMMENTS] == NULL_VAL_STRING: 

280 # add touch to occurrence comments 

281 host_record[OCCURRENCE_COMMENTS] = 'associate touching host' 

282 elif 'associate touching host' not in host_record[OCCURRENCE_COMMENTS]: 

283 host_record[OCCURRENCE_COMMENTS] += ' | associate touching host' 

284 time_diff = observation_time - host_time 

285 if time_diff.seconds > 300: 

286 # flag warning 

287 warning_messages.append([ 

288 associate_record[SAMPLE_ID], 

289 associate_record[VARS_CONCEPT_NAME], 

290 associate_record[TRACKING_ID], 

291 f'{Color.RED}Time between record and upon record greater than 5 minutes {Color.END}' 

292 f'({time_diff.seconds} seconds)' 

293 ]) 

294 elif time_diff.seconds > 60: 

295 # flag for review 

296 warning_messages.append([ 

297 associate_record[SAMPLE_ID], 

298 associate_record[VARS_CONCEPT_NAME], 

299 associate_record[TRACKING_ID], 

300 f'{Color.YELLOW}Time between record and upon record greater than 1 minute {Color.END}' 

301 f'({time_diff.seconds} seconds)' 

302 ]) 

303 found = True 

304 break 

305 if not found: 

306 # flag error 

307 warning_messages.append([ 

308 associate_record[SAMPLE_ID], 

309 associate_record[VARS_CONCEPT_NAME], 

310 associate_record[TRACKING_ID], 

311 f'{Color.RED}Upon not found in previous records{Color.END}' 

312 ]) 

313 else: 

314 # flag error 

315 warning_messages.append([ 

316 associate_record[SAMPLE_ID], 

317 associate_record[VARS_CONCEPT_NAME], 

318 associate_record[TRACKING_ID], 

319 f'{Color.RED}"{associate_record[SUBSTRATE]}" is host for this record, but that concept name ' 

320 f'was not found in concepts.{Color.END}' 

321 ])