Coverage for application/vars/routes.py: 24%
74 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-23 02:22 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-23 02:22 +0000
1"""
2General endpoints for VARS annotations/associations that are used throughout the application.
4/vars/annotation/<observation_uuid> [GET]
5/vars/annotation/concept [PATCH]
6/vars/association [POST]
7/vars/association/<uuid> [PATCH]
8/vars/association/<uuid> [DELETE]
9/vars/video-frame [GET]
10"""
12from io import BytesIO
13from pathlib import Path
15import requests
16from flask import current_app, request, send_file
18from . import vars_bp
19from application.vars.annosaurus import Annosaurus
22# get a single VARS annotation
23@vars_bp.get('/annotation/<observation_uuid>')
24def get_current_associations(observation_uuid):
25 res = requests.get(url=f'{current_app.config.get("VARS_ANNOTATION_URL")}/{observation_uuid}')
26 if res.status_code != 200:
27 return {}, res.status_code
28 return res.json(), 200
31# updates annotation with a new concept name
32@vars_bp.patch('/annotation/concept')
33def update_annotation():
34 annosaurus = Annosaurus(current_app.config.get('ANNOSAURUS_URL'))
35 updated_response = annosaurus.update_concept_name(
36 observation_uuid=request.values.get('observation_uuid'),
37 concept=request.values.get('concept'),
38 client_secret=current_app.config.get('ANNOSAURUS_CLIENT_SECRET'),
39 )
40 return updated_response['json'], updated_response['status']
43# creates a new association for a VARS annotation
44@vars_bp.post('/association')
45def create_association():
46 annosaurus = Annosaurus(current_app.config.get('ANNOSAURUS_URL'))
47 new_association = {
48 'link_name': request.values.get('link_name'),
49 'link_value': request.values.get('link_value'),
50 'to_concept': request.values.get('to_concept'),
51 }
52 created_response = annosaurus.create_association(
53 observation_uuid=request.values.get('observation_uuid'),
54 association=new_association,
55 client_secret=current_app.config.get('ANNOSAURUS_CLIENT_SECRET'),
56 )
57 if created_response['status'] == 200:
58 created_response['status'] = 201
59 return created_response['json'], created_response['status']
62# updates a VARS association
63@vars_bp.patch('/association/<uuid>')
64def update_association(uuid):
65 annosaurus = Annosaurus(current_app.config.get('ANNOSAURUS_URL'))
66 updated_association = {
67 'link_name': request.values.get('link_name'),
68 'link_value': request.values.get('link_value'),
69 'to_concept': request.values.get('to_concept'),
70 }
71 updated_response = annosaurus.update_association(
72 association_uuid=uuid,
73 association=updated_association,
74 client_secret=current_app.config.get('ANNOSAURUS_CLIENT_SECRET'),
75 )
76 return updated_response['json'], updated_response['status']
79# deletes a VARS association
80@vars_bp.delete('/association/<uuid>')
81def delete_association(uuid):
82 annosaurus = Annosaurus(current_app.config.get('ANNOSAURUS_URL'))
83 deleted = annosaurus.delete_association(
84 association_uuid=uuid,
85 client_secret=current_app.config.get('ANNOSAURUS_CLIENT_SECRET'),
86 )
87 return deleted['json'], deleted['status']
90# gets a frame from a VARS video for annotations without an image. caches images locally
91@vars_bp.get('/video-frame')
92def video_frame():
93 try:
94 import cv2
95 from PIL import Image
96 except ModuleNotFoundError:
97 print('To display video frames, install cv2 and PIL by running the command "pip install -r requirements.txt"')
98 return {}, 500
99 video_url = request.args.get('url')
100 timestamp = int(request.args.get('time', 0))
101 if not video_url:
102 print('Missing video URL')
103 return {}, 400
104 cache_dir = Path('cache', 'vars_frames')
105 cache_dir.mkdir(exist_ok=True)
106 cache_path = cache_dir / f'{video_url.split("/")[-1].split(".")[0]}__{timestamp}.jpeg'
107 if cache_path.exists():
108 with open(cache_path, 'rb') as f:
109 return send_file(
110 BytesIO(f.read()),
111 mimetype='image/jpeg',
112 as_attachment=False
113 )
114 cap = cv2.VideoCapture(video_url)
115 if not cap.isOpened():
116 print('Could not open video file')
117 return {}, 500
118 frame_number = timestamp * cap.get(cv2.CAP_PROP_FPS) # calc frame number
119 cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) # set to frame
120 ret, frame = cap.read() # get frame
121 cap.release() # release video
122 if not ret:
123 print(f'Could not read frame at timestamp {timestamp}')
124 return {}, 500
125 frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Convert BGR to RGB
126 img = Image.fromarray(frame)
127 img_byte_arr = BytesIO()
128 img.save(img_byte_arr, format='JPEG')
129 frame_data = img_byte_arr.getvalue()
130 with open(cache_path, 'wb') as f:
131 f.write(frame_data)
132 return send_file(
133 BytesIO(frame_data),
134 mimetype='image/jpeg',
135 as_attachment=False
136 )