Coverage for application/vars/annosaurus.py: 97%
95 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-23 02:22 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-23 02:22 +0000
1import requests
2import json
4from typing import Dict
7class AuthenticationError(Exception):
8 """
9 Exception raised for errors during authentication.
10 """
12 def __init__(self, message):
13 """
14 :param str message: explanation of the error
15 """
16 self.message = message
19class JWTAuthentication(object):
21 def __init__(self, base_url: str):
22 if base_url.endswith('/'):
23 base_url = base_url[0:-1]
24 self.base_url = base_url
26 def authorize(self, client_secret: str = None, jwt: str = None) -> str:
27 """
28 Fetch a JWT authentication token if needed
29 """
30 if jwt:
31 pass
32 elif client_secret:
33 jwt = self.authenticate(client_secret)
34 else:
35 raise AuthenticationError('No jwt or client_secret were provided')
37 if not jwt:
38 raise AuthenticationError('Failed to authenticate with your client_secret')
39 return jwt
41 def authenticate(self, client_secret: str) -> str:
42 """
43 Call the authentication endpoint to retrieve a JWT token as a string
44 """
46 url = f'{self.base_url}/auth'
47 res = requests.post(
48 url=url,
49 headers={'Authorization': f'APIKEY {client_secret}'},
50 )
51 try:
52 auth_response = res.json()
53 return auth_response['access_token']
54 except json.decoder.JSONDecodeError:
55 print(f'-- BAD Authentication: {url} returned: \n{res.text}')
56 return ''
58 def _auth_header(self, jwt: str) -> Dict:
59 """
60 Format
61 """
62 return {'Authorization': f'Bearer {jwt}'}
65class Annosaurus(JWTAuthentication):
66 """
67 Encapsulate REST calls to the annotation service
68 """
70 def __init__(self, base_url: str):
71 JWTAuthentication.__init__(self, base_url)
73 def create_association(self,
74 observation_uuid: str,
75 association: Dict,
76 client_secret: str = None,
77 jwt: str = None) -> dict:
79 if 'link_name' not in association:
80 raise ValueError('association dict missing key "link_name"')
81 jwt = self.authorize(client_secret, jwt)
82 association['observation_uuid'] = observation_uuid
83 if "link_value" not in association or association["link_value"] is None:
84 association["link_value"] = "nil"
85 if "to_concept" not in association or association["to_concept"] is None:
86 association["to_concept"] = "self"
87 res = requests.post(
88 url=f'{self.base_url}/associations',
89 data=association,
90 headers=self._auth_header(jwt),
91 )
92 # print(association)
93 # print(res.text)
94 return {'status': res.status_code, 'json': res.json()}
96 def update_association(self,
97 association_uuid: str,
98 association: Dict,
99 client_secret: str = None,
100 jwt: str = None) -> dict:
102 jwt = self.authorize(client_secret, jwt)
103 res = requests.put(
104 url=f'{self.base_url}/associations/{association_uuid}',
105 data=association,
106 headers=self._auth_header(jwt),
107 )
108 return {'status': res.status_code, 'json': res.json()}
110 def delete_association(self,
111 association_uuid: str,
112 client_secret: str = None,
113 jwt: str = None) -> dict:
115 jwt = self.authorize(client_secret, jwt)
116 res = requests.delete(
117 url=f'{self.base_url}/associations/{association_uuid}',
118 headers=self._auth_header(jwt),
119 )
120 return {'status': res.status_code, 'json': {}}
122 def update_concept_name(self,
123 observation_uuid: str,
124 concept: str,
125 client_secret: str = None,
126 jwt: str = None) -> dict:
128 jwt = self.authorize(client_secret, jwt)
129 res = requests.put(
130 url=f'{self.base_url}/annotations/{observation_uuid}',
131 data={'concept': concept},
132 headers=self._auth_header(jwt),
133 )
134 return {'status': res.status_code, 'json': res.json()}
136 def update_annotation_comment(self,
137 observation_uuid: str,
138 reviewers: list,
139 client_secret: str = None,
140 jwt: str = None) -> dict:
141 jwt = self.authorize(client_secret, jwt)
142 res = requests.get(url=f'{self.base_url}/observations/{observation_uuid}')
143 if res.status_code != 200:
144 print(f'Unable to find annotation with observation uuid of {observation_uuid}')
145 return {'status': res.status_code, 'json': res.json()}
147 comment_association = next((item for item in res.json()['associations'] if item['link_name'] == 'comment'), None)
148 if comment_association:
149 # there's already a comment
150 old_comment = comment_association['link_value'].split('; ')
151 old_comment = [cmt for cmt in old_comment if 'send to' not in cmt.lower()] # get rid of 'send to expert' notes
152 old_comment = [cmt for cmt in old_comment if 'added for review' not in cmt.lower()] # get rid of old 'added for review' notes
153 old_comment = '; '.join(old_comment)
154 if old_comment:
155 if reviewers: # add reviewers to the current comment
156 new_comment = f'{old_comment}; Added for review: {", ".join(reviewers)}'
157 else: # remove reviewers from the comment
158 new_comment = old_comment
159 elif reviewers: # create a new comment with reviewers
160 new_comment = f'Added for review: {", ".join(reviewers)}'
161 else: # remove the comment
162 new_comment = ''
164 new_association = {'link_value': new_comment}
165 if new_comment == '':
166 # delete the comment
167 deleted = self.delete_association(
168 association_uuid=comment_association['uuid'],
169 jwt=jwt,
170 )
171 if deleted['status'] != 204:
172 print('Error deleting comment')
173 else:
174 print('Deleted comment')
175 return deleted
176 else:
177 updated = self.update_association(
178 association_uuid=comment_association['uuid'],
179 association=new_association,
180 jwt=jwt,
181 )
182 if updated['status'] != 200:
183 print('Error updating comment')
184 else:
185 print('Updated comment')
186 return updated
187 else:
188 # make a new comment
189 new_comment = f'Added for review: {", ".join(reviewers)}'
190 comment_association = {
191 'link_name': 'comment',
192 'link_value': new_comment
193 }
194 created = self.create_association(
195 observation_uuid=observation_uuid,
196 association=comment_association,
197 jwt=jwt
198 )
199 if created['status'] != 200:
200 print('Error creating comment')
201 else:
202 print('Created comment')
203 return created