Coverage for util/functions.py: 91%

159 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-12 17:57 +0000

1""" 

2This file contains functions that are used throughout the formatting process and WoRMS check. 

3""" 

4 

5from datetime import datetime, timedelta 

6from typing import Dict, Optional 

7 

8from util.constants import * 

9from util.terminal_output import Color 

10 

11 

12def get_association(annotation: Dict, link_name: str) -> dict: 

13 """ 

14 Obtains an association value from the annotation data structure. 

15 

16 :param Dict annotation: The complete annotation dictionary. 

17 :param str link_name: The specific key we want to get the value for. 

18 :return dict: The matching value dict. 

19 """ 

20 for association in annotation['associations']: 

21 if association['link_name'] == link_name: 

22 return association 

23 return {} 

24 

25 

26def get_associations_list(annotation: Dict, link_name: str) -> list: 

27 """ 

28 Obtains a list of association values from the annotation data structure (for when there is more than one 

29 association). 

30 

31 :param Dict annotation: The complete annotation dictionary. 

32 :param str link_name: The specific key we want to get the value for. 

33 :return list: A list of the matching value dicts. 

34 """ 

35 association_matches = [] 

36 for association in annotation['associations']: 

37 if association['link_name'] == link_name: 

38 association_matches.append(association) 

39 return association_matches 

40 

41 

42def grain_size(sub: list) -> int: 

43 """ 

44 Gets the relative grain size of a substrate concept. 

45 

46 :param list sub: The substrate. 

47 :return int: The position of the substrate in ROOTS. 

48 """ 

49 for i in range(len(ROOTS)): 

50 if ROOTS[i] in sub: 

51 return i 

52 return len(ROOTS) 

53 

54 

55def get_date_and_time(record: Dict) -> datetime: 

56 """ 

57 Returns a datetime timestamp from a completed annotation record. 

58 

59 :param Dict record: The annotation record after it has been converted from an AnnotationRow to a list. 

60 :return datetime: A datetime object of the observation date/time. 

61 """ 

62 return datetime.strptime(record[OBSERVATION_DATE] + record[OBSERVATION_TIME], '%Y-%m-%d%H:%M:%S') 

63 

64 

65def parse_datetime(timestamp: str) -> datetime: 

66 """ 

67 Returns a datetime object given a timestamp string. 

68 

69 :param str timestamp: The timestamp to parse. 

70 :return datetime: The timestamp parsed as a datetime object. 

71 """ 

72 if '.' in timestamp: 

73 return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') 

74 return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') 

75 

76 

77def extract_time(json_object: Dict) -> Optional[datetime]: 

78 """ 

79 Used to sort json objects by timestamp, given the json object. 

80 

81 :param Dict json_object: A json object with the time we want to sort by. 

82 :return datetime: A datetime object of the timestamp from the json object. 

83 """ 

84 if not json_object: 

85 return None 

86 if '.' in json_object['recorded_timestamp']: 

87 timestamp = datetime.strptime(json_object['recorded_timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ') 

88 if timestamp.microsecond >= 500000: 

89 return timestamp.replace(microsecond=0) + timedelta(seconds=1) 

90 return timestamp.replace(microsecond=0) 

91 return datetime.strptime(json_object['recorded_timestamp'], '%Y-%m-%dT%H:%M:%SZ') 

92 

93 

94def extract_uuid(json_object: Dict) -> str: 

95 """ 

96 Used for sorting annotations by UUID (for testing). 

97 

98 :param Dict json_object: A json object with the UUID we want to sort by. 

99 :return str: The UUID. 

100 """ 

101 return json_object['observation_uuid'] 

102 

103 

104def add_meters(accuracy: str) -> str: 

105 """ 

106 Takes input and appends an 'm' to the end, if one is not there already. 

107 

108 :param str accuracy: The accuracy string, e.g. '50m' or '50'. 

109 :return str: The string with an 'm' on the end. 

110 """ 

111 if accuracy[-1:] != 'm': 

112 accuracy = accuracy + 'm' 

113 return accuracy 

114 

115 

116def convert_username_to_name(vars_username: str) -> str: 

117 """ 

118 Converts format of VARS username: [FirstnameLastname] -> [Lastname, FirstName] 

119 Assumes VARS usernames are formatted 'FirstnameLastname' 

120 Some exceptions added for old VARS usernames 

121 

122 :param str vars_username: VARS username, e.g. 'SarahBingo'. 

123 :return str: The converted name string, e.g. 'Bingo, Sarah'. 

124 """ 

125 if vars_username == 'christopherkelley': 

126 return 'Kelly, Christopher' 

127 if vars_username == 'janeculp': 

128 return 'Culp, Jane' 

129 for i in range(1, len(vars_username)): 

130 if vars_username[i].isupper(): 

131 return vars_username[i:] + ', ' + vars_username[0:i] 

132 return vars_username 

133 

134 

135def translate_substrate_code(code: str) -> str: 

136 """ 

137 Translates substrate codes into human language. 

138 

139 :param str code: The VARS code of the substrate, e.g. 'peb'. 

140 :return str: The translated code, e.g. 'pebble'. 

141 """ 

142 code = code.strip() 

143 if code in SAMES: 

144 return code 

145 if code == 'hp': # condition for old VARS, where hp was not only a suffix 

146 return 'hydrothermal precipitate' 

147 substrate_word_list = [] 

148 r = '' 

149 man_or_forms = [] 

150 for root in ROOTS: 

151 if root in code: 

152 substrate_word_list.append(SUB_CONCEPTS[root]) 

153 r = SUB_CONCEPTS[root] 

154 code = code.replace(root, '') 

155 if code == '': 

156 if r == 'man-made': 

157 return 'man-made object' 

158 else: 

159 return r 

160 break 

161 for affix in ALL_AFFIXES: 

162 if affix in code: 

163 if affix == 'pi': 

164 if r == 'bedrock' or r == 'block': 

165 substrate_word_list.insert(0, SUB_CONCEPTS[affix][0]) 

166 else: 

167 substrate_word_list.append(SUB_CONCEPTS[affix][1]) 

168 elif affix in SUFFIXES and r in substrate_word_list: 

169 substrate_word_list.insert(substrate_word_list.index(r) + 1, SUB_CONCEPTS[affix]) 

170 elif affix in SUFFIXES_FORMS or affix in SUFFIXES_MAN: 

171 substrate_word_list.append(SUB_CONCEPTS[affix]) 

172 man_or_forms.append(affix) 

173 elif affix in SUFFIXES_DEAD: 

174 substrate_word_list.append(SUB_CONCEPTS[affix]) 

175 elif affix in PREFIXES and r in substrate_word_list: 

176 substrate_word_list.insert(substrate_word_list.index(r), SUB_CONCEPTS[affix]) 

177 code = code.replace(affix, '') 

178 if code == '': 

179 if len(man_or_forms) >= 2: 

180 substrate_word_list.insert(-1, 'and') 

181 subs = ' '.join(substrate_word_list) 

182 if subs[:4] == 'dead': 

183 subs = f'{subs[5:]} (dead)' 

184 return subs 

185 return '' 

186 

187 

188def collapse_id_records(report_records: list) -> int: 

189 """ 

190 Collapses records with the same identity-reference. Returns number of records collapsed. 

191 

192 :param list report_records: A list of annotation rows (i.e., a list of every annotation in a dive). 

193 :return int: The number of records collapsed. 

194 """ 

195 identity_references = {} 

196 dupes_removed = 0 

197 num_records = len(report_records) 

198 i = 0 

199 while i < num_records: 

200 id_ref = report_records[i][IDENTITY_REF] 

201 if id_ref != -1: 

202 if id_ref not in identity_references: 

203 # add a new key to identity_references with the current annotation as the value 

204 identity_references[id_ref] = report_records[i] 

205 else: 

206 # collapse the values in the current annotation into the annotation in identity_references 

207 for j in [ID_COMMENTS, HABITAT, SUBSTRATE, INDV_COUNT, VERBATIM_SIZE, OCCURRENCE_COMMENTS, 

208 CMECS_GEO_FORM]: 

209 if identity_references[id_ref][j] == NULL_VAL_STRING and report_records[i][j] != NULL_VAL_STRING: 

210 identity_references[id_ref][j] = report_records[i][j] 

211 for j in [MIN_SIZE, MAX_SIZE]: 

212 if identity_references[id_ref][j] == NULL_VAL_INT and report_records[i][j] != NULL_VAL_INT: 

213 identity_references[id_ref][j] = report_records[i][j] 

214 for j in [IMAGE_PATH, HIGHLIGHT_IMAGE, BOUNDING_BOX_ID]: 

215 if report_records[i][j] != NULL_VAL_STRING: 

216 if identity_references[id_ref][j] != NULL_VAL_STRING and \ 

217 report_records[i][j] not in identity_references[id_ref][j]: 

218 identity_references[id_ref][j] += f' | {report_records[i][j]}' 

219 else: 

220 identity_references[id_ref][j] = report_records[i][j] 

221 if int(identity_references[id_ref][INDV_COUNT]) < int(report_records[i][INDV_COUNT]): 

222 identity_references[id_ref][INDV_COUNT] = report_records[i][INDV_COUNT] 

223 del report_records[i] # remove the duplicate record 

224 i -= 1 # to account for the record that was just deleted 

225 num_records -= 1 # ^ 

226 dupes_removed += 1 

227 i += 1 

228 

229 return dupes_removed 

230 

231 

232def find_associated_taxa(report_records: list, concepts: Dict, warning_messages: list): 

233 """ 

234 Fills in the AssociatedTaxa fields: retrieves records from the output table that have another VARS concept listed 

235 as a substrate. 

236 

237 :param list report_records: A list of annotation rows (i.e., a list of every annotation in a dive). 

238 :param Dict concepts: Dictionary of all locally saved concepts. 

239 :param list warning_messages: The list of warning messages to display at the end of the script. 

240 """ 

241 for i in range(len(report_records)): 

242 associate_record = report_records[i] 

243 if associate_record[UPON_IS_CREATURE]: 

244 # the associate's 'upon' is indeed a creature 

245 host_concept_name = associate_record[SUBSTRATE] # VARS name for host 

246 if host_concept_name in concepts: 

247 # host concept is in local concepts file 

248 observation_time = get_date_and_time(associate_record) # timestamp at which the associate was recorded 

249 found = False 

250 for j in range(i + 10, -1, -1): 

251 """ 

252 Checks backward, looking for the most recent host w/ matching name. We start at i + 10 because 

253 there can be multiple records with the exact same timestamp, and one of those records could be 

254 the 'upon' 

255 """ 

256 # to catch index out of range exception 

257 while j >= len(report_records): 

258 j -= 1 

259 host_record = report_records[j] 

260 host_time = get_date_and_time(host_record) 

261 if i == j or host_time > observation_time: 

262 # host record won't be recorded after associate record, so ignore this record 

263 # i == j: record shouldn't be associated with itself, ignore 

264 pass 

265 elif host_record[SAMPLE_ID][:-9] != associate_record[SAMPLE_ID][:-9]: 

266 # dive names don't match, stop the search 

267 break 

268 else: 

269 if host_record[VARS_CONCEPT_NAME] == host_concept_name: 

270 # the host record's name is equal to the host concept name (associate's 'upon' name) 

271 if host_record[ASSOCIATED_TAXA] == NULL_VAL_STRING: 

272 # if the host's 'associated taxa' field is blank, add the associate's concept name 

273 host_record[ASSOCIATED_TAXA] = associate_record[COMBINED_NAME_ID] 

274 elif associate_record[COMBINED_NAME_ID] not in host_record[ASSOCIATED_TAXA]: 

275 # otherwise, append the concept name if it's not already there 

276 host_record[ASSOCIATED_TAXA] += f' | {associate_record[COMBINED_NAME_ID]}' 

277 if host_record[OCCURRENCE_COMMENTS] == NULL_VAL_STRING: 

278 # add touch to occurrence comments 

279 host_record[OCCURRENCE_COMMENTS] = 'associate touching host' 

280 elif 'associate touching host' not in host_record[OCCURRENCE_COMMENTS]: 

281 host_record[OCCURRENCE_COMMENTS] += ' | associate touching host' 

282 time_diff = observation_time - host_time 

283 if time_diff.seconds > 300: 

284 # flag warning 

285 warning_messages.append([ 

286 associate_record[SAMPLE_ID], 

287 associate_record[VARS_CONCEPT_NAME], 

288 associate_record[TRACKING_ID], 

289 f'{Color.RED}Time between record and upon record greater than 5 minutes {Color.END}' 

290 f'({time_diff.seconds} seconds)' 

291 ]) 

292 elif time_diff.seconds > 60: 

293 # flag for review 

294 warning_messages.append([ 

295 associate_record[SAMPLE_ID], 

296 associate_record[VARS_CONCEPT_NAME], 

297 associate_record[TRACKING_ID], 

298 f'{Color.YELLOW}Time between record and upon record greater than 1 minute {Color.END}' 

299 f'({time_diff.seconds} seconds)' 

300 ]) 

301 found = True 

302 break 

303 if not found: 

304 # flag error 

305 warning_messages.append([ 

306 associate_record[SAMPLE_ID], 

307 associate_record[VARS_CONCEPT_NAME], 

308 associate_record[TRACKING_ID], 

309 f'{Color.RED}Upon not found in previous records{Color.END}' 

310 ]) 

311 else: 

312 # flag error 

313 warning_messages.append([ 

314 associate_record[SAMPLE_ID], 

315 associate_record[VARS_CONCEPT_NAME], 

316 associate_record[TRACKING_ID], 

317 f'{Color.RED}"{associate_record[SUBSTRATE]}" is host for this record, but that concept name ' 

318 f'was not found in concepts.{Color.END}' 

319 ])