Coverage for application/vars/routes.py: 24%

74 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-23 02:22 +0000

1""" 

2General endpoints for VARS annotations/associations that are used throughout the application. 

3 

4/vars/annotation/<observation_uuid> [GET] 

5/vars/annotation/concept [PATCH] 

6/vars/association [POST] 

7/vars/association/<uuid> [PATCH] 

8/vars/association/<uuid> [DELETE] 

9/vars/video-frame [GET] 

10""" 

11 

12from io import BytesIO 

13from pathlib import Path 

14 

15import requests 

16from flask import current_app, request, send_file 

17 

18from . import vars_bp 

19from application.vars.annosaurus import Annosaurus 

20 

21 

22# get a single VARS annotation 

23@vars_bp.get('/annotation/<observation_uuid>') 

24def get_current_associations(observation_uuid): 

25 res = requests.get(url=f'{current_app.config.get("VARS_ANNOTATION_URL")}/{observation_uuid}') 

26 if res.status_code != 200: 

27 return {}, res.status_code 

28 return res.json(), 200 

29 

30 

31# updates annotation with a new concept name 

32@vars_bp.patch('/annotation/concept') 

33def update_annotation(): 

34 annosaurus = Annosaurus(current_app.config.get('ANNOSAURUS_URL')) 

35 updated_response = annosaurus.update_concept_name( 

36 observation_uuid=request.values.get('observation_uuid'), 

37 concept=request.values.get('concept'), 

38 client_secret=current_app.config.get('ANNOSAURUS_CLIENT_SECRET'), 

39 ) 

40 return updated_response['json'], updated_response['status'] 

41 

42 

43# creates a new association for a VARS annotation 

44@vars_bp.post('/association') 

45def create_association(): 

46 annosaurus = Annosaurus(current_app.config.get('ANNOSAURUS_URL')) 

47 new_association = { 

48 'link_name': request.values.get('link_name'), 

49 'link_value': request.values.get('link_value'), 

50 'to_concept': request.values.get('to_concept'), 

51 } 

52 created_response = annosaurus.create_association( 

53 observation_uuid=request.values.get('observation_uuid'), 

54 association=new_association, 

55 client_secret=current_app.config.get('ANNOSAURUS_CLIENT_SECRET'), 

56 ) 

57 if created_response['status'] == 200: 

58 created_response['status'] = 201 

59 return created_response['json'], created_response['status'] 

60 

61 

62# updates a VARS association 

63@vars_bp.patch('/association/<uuid>') 

64def update_association(uuid): 

65 annosaurus = Annosaurus(current_app.config.get('ANNOSAURUS_URL')) 

66 updated_association = { 

67 'link_name': request.values.get('link_name'), 

68 'link_value': request.values.get('link_value'), 

69 'to_concept': request.values.get('to_concept'), 

70 } 

71 updated_response = annosaurus.update_association( 

72 association_uuid=uuid, 

73 association=updated_association, 

74 client_secret=current_app.config.get('ANNOSAURUS_CLIENT_SECRET'), 

75 ) 

76 return updated_response['json'], updated_response['status'] 

77 

78 

79# deletes a VARS association 

80@vars_bp.delete('/association/<uuid>') 

81def delete_association(uuid): 

82 annosaurus = Annosaurus(current_app.config.get('ANNOSAURUS_URL')) 

83 deleted = annosaurus.delete_association( 

84 association_uuid=uuid, 

85 client_secret=current_app.config.get('ANNOSAURUS_CLIENT_SECRET'), 

86 ) 

87 return deleted['json'], deleted['status'] 

88 

89 

90# gets a frame from a VARS video for annotations without an image. caches images locally 

91@vars_bp.get('/video-frame') 

92def video_frame(): 

93 try: 

94 import cv2 

95 from PIL import Image 

96 except ModuleNotFoundError: 

97 print('To display video frames, install cv2 and PIL by running the command "pip install -r requirements.txt"') 

98 return {}, 500 

99 video_url = request.args.get('url') 

100 timestamp = int(request.args.get('time', 0)) 

101 if not video_url: 

102 print('Missing video URL') 

103 return {}, 400 

104 cache_dir = Path('cache', 'vars_frames') 

105 cache_dir.mkdir(exist_ok=True) 

106 cache_path = cache_dir / f'{video_url.split("/")[-1].split(".")[0]}__{timestamp}.jpeg' 

107 if cache_path.exists(): 

108 with open(cache_path, 'rb') as f: 

109 return send_file( 

110 BytesIO(f.read()), 

111 mimetype='image/jpeg', 

112 as_attachment=False 

113 ) 

114 cap = cv2.VideoCapture(video_url) 

115 if not cap.isOpened(): 

116 print('Could not open video file') 

117 return {}, 500 

118 frame_number = timestamp * cap.get(cv2.CAP_PROP_FPS) # calc frame number 

119 cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) # set to frame 

120 ret, frame = cap.read() # get frame 

121 cap.release() # release video 

122 if not ret: 

123 print(f'Could not read frame at timestamp {timestamp}') 

124 return {}, 500 

125 frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Convert BGR to RGB 

126 img = Image.fromarray(frame) 

127 img_byte_arr = BytesIO() 

128 img.save(img_byte_arr, format='JPEG') 

129 frame_data = img_byte_arr.getvalue() 

130 with open(cache_path, 'wb') as f: 

131 f.write(frame_data) 

132 return send_file( 

133 BytesIO(frame_data), 

134 mimetype='image/jpeg', 

135 as_attachment=False 

136 )