Coverage for application / vars / annosaurus.py: 97%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 05:22 +0000

1import requests 

2import json 

3 

4 

5class AuthenticationError(Exception): 

6 """ 

7 Exception raised for errors during authentication. 

8 """ 

9 

10 def __init__(self, message): 

11 """ 

12 :param str message: explanation of the error 

13 """ 

14 self.message = message 

15 

16 

17class JWTAuthentication(object): 

18 

19 def __init__(self, base_url: str): 

20 if base_url.endswith('/'): 

21 base_url = base_url[0:-1] 

22 self.base_url = base_url 

23 

24 def authorize(self, client_secret: str = None, jwt: str = None) -> str: 

25 """ 

26 Fetch a JWT authentication token if needed 

27 """ 

28 if jwt: 

29 pass 

30 elif client_secret: 

31 jwt = self.authenticate(client_secret) 

32 else: 

33 raise AuthenticationError('No jwt or client_secret were provided') 

34 

35 if not jwt: 

36 raise AuthenticationError('Failed to authenticate with your client_secret') 

37 return jwt 

38 

39 def authenticate(self, client_secret: str) -> str: 

40 """ 

41 Call the authentication endpoint to retrieve a JWT token as a string 

42 """ 

43 

44 url = f'{self.base_url}/auth' 

45 res = requests.post( 

46 url=url, 

47 headers={'Authorization': f'APIKEY {client_secret}'}, 

48 ) 

49 try: 

50 auth_response = res.json() 

51 return auth_response['access_token'] 

52 except json.decoder.JSONDecodeError: 

53 print(f'-- BAD Authentication: {url} returned: \n{res.text}') 

54 return '' 

55 

56 def _auth_header(self, jwt: str) -> dict: 

57 """ 

58 Format 

59 """ 

60 return {'Authorization': f'Bearer {jwt}'} 

61 

62 

63class Annosaurus(JWTAuthentication): 

64 """ 

65 Encapsulate REST calls to the annotation service 

66 """ 

67 

68 def __init__(self, base_url: str): 

69 JWTAuthentication.__init__(self, base_url) 

70 

71 def create_association(self, 

72 observation_uuid: str, 

73 association: dict, 

74 client_secret: str = None, 

75 jwt: str = None) -> dict: 

76 

77 if 'link_name' not in association: 

78 raise ValueError('association dict missing key "link_name"') 

79 jwt = self.authorize(client_secret, jwt) 

80 association['observation_uuid'] = observation_uuid 

81 if "link_value" not in association or association["link_value"] is None: 

82 association["link_value"] = "nil" 

83 if "to_concept" not in association or association["to_concept"] is None: 

84 association["to_concept"] = "self" 

85 res = requests.post( 

86 url=f'{self.base_url}/associations', 

87 data=association, 

88 headers=self._auth_header(jwt), 

89 ) 

90 # print(association) 

91 # print(res.text) 

92 return {'status': res.status_code, 'json': res.json()} 

93 

94 def update_association(self, 

95 association_uuid: str, 

96 association: dict, 

97 client_secret: str = None, 

98 jwt: str = None) -> dict: 

99 

100 jwt = self.authorize(client_secret, jwt) 

101 res = requests.put( 

102 url=f'{self.base_url}/associations/{association_uuid}', 

103 data=association, 

104 headers=self._auth_header(jwt), 

105 ) 

106 return {'status': res.status_code, 'json': res.json()} 

107 

108 def delete_association(self, 

109 association_uuid: str, 

110 client_secret: str = None, 

111 jwt: str = None) -> dict: 

112 

113 jwt = self.authorize(client_secret, jwt) 

114 res = requests.delete( 

115 url=f'{self.base_url}/associations/{association_uuid}', 

116 headers=self._auth_header(jwt), 

117 ) 

118 return {'status': res.status_code, 'json': {}} 

119 

120 def update_concept_name(self, 

121 observation_uuid: str, 

122 concept: str, 

123 client_secret: str = None, 

124 jwt: str = None) -> dict: 

125 

126 jwt = self.authorize(client_secret, jwt) 

127 res = requests.put( 

128 url=f'{self.base_url}/annotations/{observation_uuid}', 

129 data={'concept': concept}, 

130 headers=self._auth_header(jwt), 

131 ) 

132 return {'status': res.status_code, 'json': res.json()} 

133 

134 def update_annotation_comment(self, 

135 observation_uuid: str, 

136 reviewers: list, 

137 client_secret: str = None, 

138 jwt: str = None) -> dict: 

139 jwt = self.authorize(client_secret, jwt) 

140 res = requests.get(url=f'{self.base_url}/observations/{observation_uuid}') 

141 if res.status_code != 200: 

142 print(f'Unable to find annotation with observation uuid of {observation_uuid}') 

143 return {'status': res.status_code, 'json': res.json()} 

144 

145 comment_association = next((item for item in res.json()['associations'] if item['link_name'] == 'comment'), None) 

146 if comment_association: 

147 # there's already a comment 

148 old_comment = comment_association['link_value'].split('; ') 

149 old_comment = [cmt for cmt in old_comment if 'send to' not in cmt.lower()] # get rid of 'send to expert' notes 

150 old_comment = [cmt for cmt in old_comment if 'added for review' not in cmt.lower()] # get rid of old 'added for review' notes 

151 old_comment = '; '.join(old_comment) 

152 if old_comment: 

153 if reviewers: # add reviewers to the current comment 

154 new_comment = f'{old_comment}; Added for review: {", ".join(reviewers)}' 

155 else: # remove reviewers from the comment 

156 new_comment = old_comment 

157 elif reviewers: # create a new comment with reviewers 

158 new_comment = f'Added for review: {", ".join(reviewers)}' 

159 else: # remove the comment 

160 new_comment = '' 

161 

162 new_association = {'link_value': new_comment} 

163 if new_comment == '': 

164 # delete the comment 

165 deleted = self.delete_association( 

166 association_uuid=comment_association['uuid'], 

167 jwt=jwt, 

168 ) 

169 if deleted['status'] != 204: 

170 print('Error deleting comment') 

171 else: 

172 print('Deleted comment') 

173 return deleted 

174 else: 

175 updated = self.update_association( 

176 association_uuid=comment_association['uuid'], 

177 association=new_association, 

178 jwt=jwt, 

179 ) 

180 if updated['status'] != 200: 

181 print('Error updating comment') 

182 else: 

183 print('Updated comment') 

184 return updated 

185 else: 

186 # make a new comment 

187 new_comment = f'Added for review: {", ".join(reviewers)}' 

188 comment_association = { 

189 'link_name': 'comment', 

190 'link_value': new_comment 

191 } 

192 created = self.create_association( 

193 observation_uuid=observation_uuid, 

194 association=comment_association, 

195 jwt=jwt 

196 ) 

197 if created['status'] != 200: 

198 print('Error creating comment') 

199 else: 

200 print('Created comment') 

201 return created