Coverage for api/proscai/auth.py: 17%
120 statements
« prev ^ index » next coverage.py v6.4.4, created at 2024-01-12 11:16 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2024-01-12 11:16 -0600
1import requests
2import base64
3import json
4from time import sleep
5from apps.local_models.models import JWTToken
6from django.utils.timezone import now
7from datetime import datetime
8from .errors import send_email, log_error
12from Crypto.Cipher import AES
13from Crypto.Util.Padding import pad, unpad
14from django.conf import settings
18PROSCAI_CONN = getattr(settings, "PROSCAI_CONN", None)
19PROSCAI_SECRET = getattr(settings, "PROSCAI_SECRET", None)
20PROSCAI_TOKEN_BROKER = 'https://tokenbroker.proscai.com/v1/auth/caderapi'
22class BaseAPIClient():
24 def __init__(self, *args, **kwargs):
25 self.SECRET_KEY = PROSCAI_SECRET
26 self.CONN = PROSCAI_CONN
27 self.BASE_URL = 'https://pas.proscai.com'
28 self.HEADERS = {"Content-Type": "application/json"}
29 self.ENCODING = 'mac-roman'
30 self.BS = 16 # Block Size
31 self.KEY_STR = self.SECRET_KEY
32 self.KEY = base64.b64decode(self.KEY_STR)
33 self.IV = bytearray(self.BS)
34 self.TOKEN = JWTToken.objects.last() or None
38 if getattr(self.TOKEN, 'exp_date', None):
39 if self.TOKEN.exp_date < now():
40 self.post_auth()
41 else:
42 pass
43 else:
44 self.post_auth()
45 pass
47 def encrypt(self, raw: str) -> str:
48 bs = self.BS
49 key = self.KEY
50 iv = self.IV
51 data = raw
52 data = data.encode(self.ENCODING)
53 data = pad(data,self.BS)
54 cipher = AES.new(key,AES.MODE_CBC, iv=iv)
55 ciphertext = cipher.encrypt(data)
56 result = base64.b64encode(ciphertext)
57 result_decoded = result.decode(self.ENCODING)
58 return result_decoded
60 def decrypt(self, raw: str) -> str:
61 bs = self.BS
62 key = self.KEY
63 iv = self.IV
64 data = raw
65 data = base64.b64decode(data)
66 cipher = AES.new(key, AES.MODE_CBC, self.IV)
67 decrypted_text = unpad(cipher.decrypt(data), bs)
68 decrypted_text_str = decrypted_text.decode()
69 result = decrypted_text_str
70 return result
75 def post_auth(self, *args, **kwargs):
76 self.path = '/autorizacion?dataId={}'.format(self.CONN)
77 # super().post(*args, **kwargs)
78 url = f"{self.BASE_URL}/{self.path}"
80 timmer = 0.5 #minutes
81 counter = 1 # times
82 limit = 20
83 while counter <= limit:
84 r = requests.post(url)
85 if r.ok:
86 info = self.fetch_info(r)
87 return info
88 else:
89 log_error(r)
90 send_email(r)
91 sleep(timmer * 60)
92 timmer = min(timmer * 2, 10)
93 ++counter
96 def fetch_info(self, response):
97 a = response.content
98 b = base64.b64decode(a)
99 c = b.decode(self.ENCODING)
100 general_data = json.loads(c)
101 cia = general_data['fcia']
102 cia_data = json.loads(cia)
104 my_dict = {}
105 my_dict['ts'] = general_data['ts']
106 my_dict['rfc'] = general_data['rfc']
107 my_dict['dbid'] = general_data['dbid']
108 my_dict['ts'] = general_data['ts']
109 my_dict['fcia'] = {}
110 my_dict['fcia']['CIARAIZ'] = cia_data['CIARAIZ']
111 my_dict['fcia']['CIACOLORUNICO'] = cia_data['CIACOLORUNICO']
112 my_dict['fcia']['CIATALLA'] = cia_data['CIATALLA']
113 my_dict['fcia']['CIANAME'] = cia_data['CIANAME']
114 my_dict['fcia']['CIAALIAS'] = cia_data['CIAALIAS']
115 my_dict['fcia']['CIACOMPORTA'] = cia_data['CIACOMPORTA']
116 my_json = json.dumps(my_dict)
119 enc = self.encrypt(my_json)
120 enc = '"{}"'.format(enc)
121 headers = {
122 'Accept': 'application/json',
123 'Content-Type': 'application/json'
124 }
126 timmer = 0.5 #minutes
127 counter = 1 # times
128 limit = 20
129 while counter <= limit:
130 r = requests.post(PROSCAI_TOKEN_BROKER, data=enc, headers=headers)
131 if r.ok:
132 token = self.fetch_token_broker(r)
133 return token
134 else:
135 log_error(r)
136 send_email(r)
137 sleep(timmer * 60)
138 timmer = min(timmer * 2, 10)
139 ++counter
142 def fetch_token_broker(self, response):
143 token = self.decrypt(response.content)
144 token_obj = JWTToken.objects.create(
145 token=token
146 )
147 a = token_obj.token.split('.')[1]
148 b = a + "="*divmod(len(a),4)[1]
149 c = base64.b64decode(b)
150 general_data = json.loads(c)
151 token_obj.token_decoded = json.dumps(general_data)
152 exp = general_data['exp']
153 token_obj.exp = exp
154 token_obj.exp_date = datetime.fromtimestamp(exp)
155 token_obj.save()
156 return token