Coverage for application / vars / annosaurus.py: 97%
94 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 05:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 05:22 +0000
1import requests
2import json
5class AuthenticationError(Exception):
6 """
7 Exception raised for errors during authentication.
8 """
10 def __init__(self, message):
11 """
12 :param str message: explanation of the error
13 """
14 self.message = message
17class JWTAuthentication(object):
19 def __init__(self, base_url: str):
20 if base_url.endswith('/'):
21 base_url = base_url[0:-1]
22 self.base_url = base_url
24 def authorize(self, client_secret: str = None, jwt: str = None) -> str:
25 """
26 Fetch a JWT authentication token if needed
27 """
28 if jwt:
29 pass
30 elif client_secret:
31 jwt = self.authenticate(client_secret)
32 else:
33 raise AuthenticationError('No jwt or client_secret were provided')
35 if not jwt:
36 raise AuthenticationError('Failed to authenticate with your client_secret')
37 return jwt
39 def authenticate(self, client_secret: str) -> str:
40 """
41 Call the authentication endpoint to retrieve a JWT token as a string
42 """
44 url = f'{self.base_url}/auth'
45 res = requests.post(
46 url=url,
47 headers={'Authorization': f'APIKEY {client_secret}'},
48 )
49 try:
50 auth_response = res.json()
51 return auth_response['access_token']
52 except json.decoder.JSONDecodeError:
53 print(f'-- BAD Authentication: {url} returned: \n{res.text}')
54 return ''
56 def _auth_header(self, jwt: str) -> dict:
57 """
58 Format
59 """
60 return {'Authorization': f'Bearer {jwt}'}
63class Annosaurus(JWTAuthentication):
64 """
65 Encapsulate REST calls to the annotation service
66 """
68 def __init__(self, base_url: str):
69 JWTAuthentication.__init__(self, base_url)
71 def create_association(self,
72 observation_uuid: str,
73 association: dict,
74 client_secret: str = None,
75 jwt: str = None) -> dict:
77 if 'link_name' not in association:
78 raise ValueError('association dict missing key "link_name"')
79 jwt = self.authorize(client_secret, jwt)
80 association['observation_uuid'] = observation_uuid
81 if "link_value" not in association or association["link_value"] is None:
82 association["link_value"] = "nil"
83 if "to_concept" not in association or association["to_concept"] is None:
84 association["to_concept"] = "self"
85 res = requests.post(
86 url=f'{self.base_url}/associations',
87 data=association,
88 headers=self._auth_header(jwt),
89 )
90 # print(association)
91 # print(res.text)
92 return {'status': res.status_code, 'json': res.json()}
94 def update_association(self,
95 association_uuid: str,
96 association: dict,
97 client_secret: str = None,
98 jwt: str = None) -> dict:
100 jwt = self.authorize(client_secret, jwt)
101 res = requests.put(
102 url=f'{self.base_url}/associations/{association_uuid}',
103 data=association,
104 headers=self._auth_header(jwt),
105 )
106 return {'status': res.status_code, 'json': res.json()}
108 def delete_association(self,
109 association_uuid: str,
110 client_secret: str = None,
111 jwt: str = None) -> dict:
113 jwt = self.authorize(client_secret, jwt)
114 res = requests.delete(
115 url=f'{self.base_url}/associations/{association_uuid}',
116 headers=self._auth_header(jwt),
117 )
118 return {'status': res.status_code, 'json': {}}
120 def update_concept_name(self,
121 observation_uuid: str,
122 concept: str,
123 client_secret: str = None,
124 jwt: str = None) -> dict:
126 jwt = self.authorize(client_secret, jwt)
127 res = requests.put(
128 url=f'{self.base_url}/annotations/{observation_uuid}',
129 data={'concept': concept},
130 headers=self._auth_header(jwt),
131 )
132 return {'status': res.status_code, 'json': res.json()}
134 def update_annotation_comment(self,
135 observation_uuid: str,
136 reviewers: list,
137 client_secret: str = None,
138 jwt: str = None) -> dict:
139 jwt = self.authorize(client_secret, jwt)
140 res = requests.get(url=f'{self.base_url}/observations/{observation_uuid}')
141 if res.status_code != 200:
142 print(f'Unable to find annotation with observation uuid of {observation_uuid}')
143 return {'status': res.status_code, 'json': res.json()}
145 comment_association = next((item for item in res.json()['associations'] if item['link_name'] == 'comment'), None)
146 if comment_association:
147 # there's already a comment
148 old_comment = comment_association['link_value'].split('; ')
149 old_comment = [cmt for cmt in old_comment if 'send to' not in cmt.lower()] # get rid of 'send to expert' notes
150 old_comment = [cmt for cmt in old_comment if 'added for review' not in cmt.lower()] # get rid of old 'added for review' notes
151 old_comment = '; '.join(old_comment)
152 if old_comment:
153 if reviewers: # add reviewers to the current comment
154 new_comment = f'{old_comment}; Added for review: {", ".join(reviewers)}'
155 else: # remove reviewers from the comment
156 new_comment = old_comment
157 elif reviewers: # create a new comment with reviewers
158 new_comment = f'Added for review: {", ".join(reviewers)}'
159 else: # remove the comment
160 new_comment = ''
162 new_association = {'link_value': new_comment}
163 if new_comment == '':
164 # delete the comment
165 deleted = self.delete_association(
166 association_uuid=comment_association['uuid'],
167 jwt=jwt,
168 )
169 if deleted['status'] != 204:
170 print('Error deleting comment')
171 else:
172 print('Deleted comment')
173 return deleted
174 else:
175 updated = self.update_association(
176 association_uuid=comment_association['uuid'],
177 association=new_association,
178 jwt=jwt,
179 )
180 if updated['status'] != 200:
181 print('Error updating comment')
182 else:
183 print('Updated comment')
184 return updated
185 else:
186 # make a new comment
187 new_comment = f'Added for review: {", ".join(reviewers)}'
188 comment_association = {
189 'link_name': 'comment',
190 'link_value': new_comment
191 }
192 created = self.create_association(
193 observation_uuid=observation_uuid,
194 association=comment_association,
195 jwt=jwt
196 )
197 if created['status'] != 200:
198 print('Error creating comment')
199 else:
200 print('Created comment')
201 return created