Coverage for api/proscai/auth.py: 17%

120 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2024-01-12 11:16 -0600

1import requests 

2import base64 

3import json 

4from time import sleep 

5from apps.local_models.models import JWTToken 

6from django.utils.timezone import now 

7from datetime import datetime 

8from .errors import send_email, log_error 

9 

10 

11 

12from Crypto.Cipher import AES 

13from Crypto.Util.Padding import pad, unpad 

14from django.conf import settings 

15 

16 

17 

18PROSCAI_CONN = getattr(settings, "PROSCAI_CONN", None) 

19PROSCAI_SECRET = getattr(settings, "PROSCAI_SECRET", None) 

20PROSCAI_TOKEN_BROKER = 'https://tokenbroker.proscai.com/v1/auth/caderapi' 

21 

22class BaseAPIClient(): 

23 

24 def __init__(self, *args, **kwargs): 

25 self.SECRET_KEY = PROSCAI_SECRET 

26 self.CONN = PROSCAI_CONN 

27 self.BASE_URL = 'https://pas.proscai.com' 

28 self.HEADERS = {"Content-Type": "application/json"} 

29 self.ENCODING = 'mac-roman' 

30 self.BS = 16 # Block Size 

31 self.KEY_STR = self.SECRET_KEY 

32 self.KEY = base64.b64decode(self.KEY_STR) 

33 self.IV = bytearray(self.BS) 

34 self.TOKEN = JWTToken.objects.last() or None 

35 

36 

37 

38 if getattr(self.TOKEN, 'exp_date', None): 

39 if self.TOKEN.exp_date < now(): 

40 self.post_auth() 

41 else: 

42 pass 

43 else: 

44 self.post_auth() 

45 pass 

46 

47 def encrypt(self, raw: str) -> str: 

48 bs = self.BS 

49 key = self.KEY 

50 iv = self.IV 

51 data = raw 

52 data = data.encode(self.ENCODING) 

53 data = pad(data,self.BS) 

54 cipher = AES.new(key,AES.MODE_CBC, iv=iv) 

55 ciphertext = cipher.encrypt(data) 

56 result = base64.b64encode(ciphertext) 

57 result_decoded = result.decode(self.ENCODING) 

58 return result_decoded 

59 

60 def decrypt(self, raw: str) -> str: 

61 bs = self.BS 

62 key = self.KEY 

63 iv = self.IV 

64 data = raw 

65 data = base64.b64decode(data) 

66 cipher = AES.new(key, AES.MODE_CBC, self.IV) 

67 decrypted_text = unpad(cipher.decrypt(data), bs) 

68 decrypted_text_str = decrypted_text.decode() 

69 result = decrypted_text_str 

70 return result 

71 

72 

73 

74 

75 def post_auth(self, *args, **kwargs): 

76 self.path = '/autorizacion?dataId={}'.format(self.CONN) 

77 # super().post(*args, **kwargs) 

78 url = f"{self.BASE_URL}/{self.path}" 

79 

80 timmer = 0.5 #minutes 

81 counter = 1 # times 

82 limit = 20 

83 while counter <= limit: 

84 r = requests.post(url) 

85 if r.ok: 

86 info = self.fetch_info(r) 

87 return info 

88 else: 

89 log_error(r) 

90 send_email(r) 

91 sleep(timmer * 60) 

92 timmer = min(timmer * 2, 10) 

93 ++counter 

94 

95 

96 def fetch_info(self, response): 

97 a = response.content 

98 b = base64.b64decode(a) 

99 c = b.decode(self.ENCODING) 

100 general_data = json.loads(c) 

101 cia = general_data['fcia'] 

102 cia_data = json.loads(cia) 

103 

104 my_dict = {} 

105 my_dict['ts'] = general_data['ts'] 

106 my_dict['rfc'] = general_data['rfc'] 

107 my_dict['dbid'] = general_data['dbid'] 

108 my_dict['ts'] = general_data['ts'] 

109 my_dict['fcia'] = {} 

110 my_dict['fcia']['CIARAIZ'] = cia_data['CIARAIZ'] 

111 my_dict['fcia']['CIACOLORUNICO'] = cia_data['CIACOLORUNICO'] 

112 my_dict['fcia']['CIATALLA'] = cia_data['CIATALLA'] 

113 my_dict['fcia']['CIANAME'] = cia_data['CIANAME'] 

114 my_dict['fcia']['CIAALIAS'] = cia_data['CIAALIAS'] 

115 my_dict['fcia']['CIACOMPORTA'] = cia_data['CIACOMPORTA'] 

116 my_json = json.dumps(my_dict) 

117 

118 

119 enc = self.encrypt(my_json) 

120 enc = '"{}"'.format(enc) 

121 headers = { 

122 'Accept': 'application/json', 

123 'Content-Type': 'application/json' 

124 } 

125 

126 timmer = 0.5 #minutes 

127 counter = 1 # times 

128 limit = 20 

129 while counter <= limit: 

130 r = requests.post(PROSCAI_TOKEN_BROKER, data=enc, headers=headers) 

131 if r.ok: 

132 token = self.fetch_token_broker(r) 

133 return token 

134 else: 

135 log_error(r) 

136 send_email(r) 

137 sleep(timmer * 60) 

138 timmer = min(timmer * 2, 10) 

139 ++counter 

140 

141 

142 def fetch_token_broker(self, response): 

143 token = self.decrypt(response.content) 

144 token_obj = JWTToken.objects.create( 

145 token=token 

146 ) 

147 a = token_obj.token.split('.')[1] 

148 b = a + "="*divmod(len(a),4)[1] 

149 c = base64.b64decode(b) 

150 general_data = json.loads(c) 

151 token_obj.token_decoded = json.dumps(general_data) 

152 exp = general_data['exp'] 

153 token_obj.exp = exp 

154 token_obj.exp_date = datetime.fromtimestamp(exp) 

155 token_obj.save() 

156 return token 

157 

158 

159 

160 

161 

162 

163 

164 

165 

166 

167 

168 

169