Coverage for application/vars/annosaurus.py: 97%

95 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-23 02:22 +0000

1import requests 

2import json 

3 

4from typing import Dict 

5 

6 

7class AuthenticationError(Exception): 

8 """ 

9 Exception raised for errors during authentication. 

10 """ 

11 

12 def __init__(self, message): 

13 """ 

14 :param str message: explanation of the error 

15 """ 

16 self.message = message 

17 

18 

19class JWTAuthentication(object): 

20 

21 def __init__(self, base_url: str): 

22 if base_url.endswith('/'): 

23 base_url = base_url[0:-1] 

24 self.base_url = base_url 

25 

26 def authorize(self, client_secret: str = None, jwt: str = None) -> str: 

27 """ 

28 Fetch a JWT authentication token if needed 

29 """ 

30 if jwt: 

31 pass 

32 elif client_secret: 

33 jwt = self.authenticate(client_secret) 

34 else: 

35 raise AuthenticationError('No jwt or client_secret were provided') 

36 

37 if not jwt: 

38 raise AuthenticationError('Failed to authenticate with your client_secret') 

39 return jwt 

40 

41 def authenticate(self, client_secret: str) -> str: 

42 """ 

43 Call the authentication endpoint to retrieve a JWT token as a string 

44 """ 

45 

46 url = f'{self.base_url}/auth' 

47 res = requests.post( 

48 url=url, 

49 headers={'Authorization': f'APIKEY {client_secret}'}, 

50 ) 

51 try: 

52 auth_response = res.json() 

53 return auth_response['access_token'] 

54 except json.decoder.JSONDecodeError: 

55 print(f'-- BAD Authentication: {url} returned: \n{res.text}') 

56 return '' 

57 

58 def _auth_header(self, jwt: str) -> Dict: 

59 """ 

60 Format 

61 """ 

62 return {'Authorization': f'Bearer {jwt}'} 

63 

64 

65class Annosaurus(JWTAuthentication): 

66 """ 

67 Encapsulate REST calls to the annotation service 

68 """ 

69 

70 def __init__(self, base_url: str): 

71 JWTAuthentication.__init__(self, base_url) 

72 

73 def create_association(self, 

74 observation_uuid: str, 

75 association: Dict, 

76 client_secret: str = None, 

77 jwt: str = None) -> dict: 

78 

79 if 'link_name' not in association: 

80 raise ValueError('association dict missing key "link_name"') 

81 jwt = self.authorize(client_secret, jwt) 

82 association['observation_uuid'] = observation_uuid 

83 if "link_value" not in association or association["link_value"] is None: 

84 association["link_value"] = "nil" 

85 if "to_concept" not in association or association["to_concept"] is None: 

86 association["to_concept"] = "self" 

87 res = requests.post( 

88 url=f'{self.base_url}/associations', 

89 data=association, 

90 headers=self._auth_header(jwt), 

91 ) 

92 # print(association) 

93 # print(res.text) 

94 return {'status': res.status_code, 'json': res.json()} 

95 

96 def update_association(self, 

97 association_uuid: str, 

98 association: Dict, 

99 client_secret: str = None, 

100 jwt: str = None) -> dict: 

101 

102 jwt = self.authorize(client_secret, jwt) 

103 res = requests.put( 

104 url=f'{self.base_url}/associations/{association_uuid}', 

105 data=association, 

106 headers=self._auth_header(jwt), 

107 ) 

108 return {'status': res.status_code, 'json': res.json()} 

109 

110 def delete_association(self, 

111 association_uuid: str, 

112 client_secret: str = None, 

113 jwt: str = None) -> dict: 

114 

115 jwt = self.authorize(client_secret, jwt) 

116 res = requests.delete( 

117 url=f'{self.base_url}/associations/{association_uuid}', 

118 headers=self._auth_header(jwt), 

119 ) 

120 return {'status': res.status_code, 'json': {}} 

121 

122 def update_concept_name(self, 

123 observation_uuid: str, 

124 concept: str, 

125 client_secret: str = None, 

126 jwt: str = None) -> dict: 

127 

128 jwt = self.authorize(client_secret, jwt) 

129 res = requests.put( 

130 url=f'{self.base_url}/annotations/{observation_uuid}', 

131 data={'concept': concept}, 

132 headers=self._auth_header(jwt), 

133 ) 

134 return {'status': res.status_code, 'json': res.json()} 

135 

136 def update_annotation_comment(self, 

137 observation_uuid: str, 

138 reviewers: list, 

139 client_secret: str = None, 

140 jwt: str = None) -> dict: 

141 jwt = self.authorize(client_secret, jwt) 

142 res = requests.get(url=f'{self.base_url}/observations/{observation_uuid}') 

143 if res.status_code != 200: 

144 print(f'Unable to find annotation with observation uuid of {observation_uuid}') 

145 return {'status': res.status_code, 'json': res.json()} 

146 

147 comment_association = next((item for item in res.json()['associations'] if item['link_name'] == 'comment'), None) 

148 if comment_association: 

149 # there's already a comment 

150 old_comment = comment_association['link_value'].split('; ') 

151 old_comment = [cmt for cmt in old_comment if 'send to' not in cmt.lower()] # get rid of 'send to expert' notes 

152 old_comment = [cmt for cmt in old_comment if 'added for review' not in cmt.lower()] # get rid of old 'added for review' notes 

153 old_comment = '; '.join(old_comment) 

154 if old_comment: 

155 if reviewers: # add reviewers to the current comment 

156 new_comment = f'{old_comment}; Added for review: {", ".join(reviewers)}' 

157 else: # remove reviewers from the comment 

158 new_comment = old_comment 

159 elif reviewers: # create a new comment with reviewers 

160 new_comment = f'Added for review: {", ".join(reviewers)}' 

161 else: # remove the comment 

162 new_comment = '' 

163 

164 new_association = {'link_value': new_comment} 

165 if new_comment == '': 

166 # delete the comment 

167 deleted = self.delete_association( 

168 association_uuid=comment_association['uuid'], 

169 jwt=jwt, 

170 ) 

171 if deleted['status'] != 204: 

172 print('Error deleting comment') 

173 else: 

174 print('Deleted comment') 

175 return deleted 

176 else: 

177 updated = self.update_association( 

178 association_uuid=comment_association['uuid'], 

179 association=new_association, 

180 jwt=jwt, 

181 ) 

182 if updated['status'] != 200: 

183 print('Error updating comment') 

184 else: 

185 print('Updated comment') 

186 return updated 

187 else: 

188 # make a new comment 

189 new_comment = f'Added for review: {", ".join(reviewers)}' 

190 comment_association = { 

191 'link_name': 'comment', 

192 'link_value': new_comment 

193 } 

194 created = self.create_association( 

195 observation_uuid=observation_uuid, 

196 association=comment_association, 

197 jwt=jwt 

198 ) 

199 if created['status'] != 200: 

200 print('Error creating comment') 

201 else: 

202 print('Created comment') 

203 return created