Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(105)

Side by Side Diff: client/third_party/google/auth/crypt.py

Issue 2953253003: Replace custom blob gRPC API with ByteStream (Closed)
Patch Set: Import ndb directly to test code Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 Google Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Cryptography helpers for verifying and signing messages.
16
17 Uses the ``rsa``, ``pyasn1`` and ``pyasn1_modules`` packages
18 to parse PEM files storing PKCS#1 or PKCS#8 keys as well as
19 certificates. There is no support for p12 files.
20
21 The simplest way to verify signatures is using :func:`verify_signature`::
22
23 cert = open('certs.pem').read()
24 valid = crypt.verify_signature(message, signature, cert)
25
26 If you're going to verify many messages with the same certificate, you can use
27 :class:`RSAVerifier`::
28
29 cert = open('certs.pem').read()
30 verifier = crypt.RSAVerifier.from_string(cert)
31 valid = verifier.verify(message, signature)
32
33
34 To sign messages use :class:`RSASigner` with a private key::
35
36 private_key = open('private_key.pem').read()
37 signer = crypt.RSASigner(private_key)
38 signature = signer.sign(message)
39
40 """
41 import abc
42 import io
43 import json
44
45 from pyasn1.codec.der import decoder
46 from pyasn1_modules import pem
47 from pyasn1_modules.rfc2459 import Certificate
48 from pyasn1_modules.rfc5208 import PrivateKeyInfo
49 import rsa
50 import six
51
52 from google.auth import _helpers
53
54 _POW2 = (128, 64, 32, 16, 8, 4, 2, 1)
55 _CERTIFICATE_MARKER = b'-----BEGIN CERTIFICATE-----'
56 _PKCS1_MARKER = ('-----BEGIN RSA PRIVATE KEY-----',
57 '-----END RSA PRIVATE KEY-----')
58 _PKCS8_MARKER = ('-----BEGIN PRIVATE KEY-----',
59 '-----END PRIVATE KEY-----')
60 _PKCS8_SPEC = PrivateKeyInfo()
61 _JSON_FILE_PRIVATE_KEY = 'private_key'
62 _JSON_FILE_PRIVATE_KEY_ID = 'private_key_id'
63
64
65 def _bit_list_to_bytes(bit_list):
66 """Converts an iterable of 1s and 0s to bytes.
67
68 Combines the list 8 at a time, treating each group of 8 bits
69 as a single byte.
70
71 Args:
72 bit_list (Sequence): Sequence of 1s and 0s.
73
74 Returns:
75 bytes: The decoded bytes.
76 """
77 num_bits = len(bit_list)
78 byte_vals = bytearray()
79 for start in six.moves.xrange(0, num_bits, 8):
80 curr_bits = bit_list[start:start + 8]
81 char_val = sum(
82 val * digit for val, digit in six.moves.zip(_POW2, curr_bits))
83 byte_vals.append(char_val)
84 return bytes(byte_vals)
85
86
87 @six.add_metaclass(abc.ABCMeta)
88 class Verifier(object):
89 """Abstract base class for crytographic signature verifiers."""
90
91 @abc.abstractmethod
92 def verify(self, message, signature):
93 """Verifies a message against a cryptographic signature.
94
95 Args:
96 message (Union[str, bytes]): The message to verify.
97 signature (Union[str, bytes]): The cryptography signature to check.
98
99 Returns:
100 bool: True if message was signed by the private key associated
101 with the public key that this object was constructed with.
102 """
103 # pylint: disable=missing-raises-doc,redundant-returns-doc
104 # (pylint doesn't recognize that this is abstract)
105 raise NotImplementedError('Verify must be implemented')
106
107
108 class RSAVerifier(Verifier):
109 """Verifies RSA cryptographic signatures using public keys.
110
111 Args:
112 public_key (rsa.key.PublicKey): The public key used to verify
113 signatures.
114 """
115
116 def __init__(self, public_key):
117 self._pubkey = public_key
118
119 @_helpers.copy_docstring(Verifier)
120 def verify(self, message, signature):
121 message = _helpers.to_bytes(message)
122 try:
123 return rsa.pkcs1.verify(message, signature, self._pubkey)
124 except (ValueError, rsa.pkcs1.VerificationError):
125 return False
126
127 @classmethod
128 def from_string(cls, public_key):
129 """Construct an Verifier instance from a public key or public
130 certificate string.
131
132 Args:
133 public_key (Union[str, bytes]): The public key in PEM format or the
134 x509 public key certificate.
135
136 Returns:
137 Verifier: The constructed verifier.
138
139 Raises:
140 ValueError: If the public_key can't be parsed.
141 """
142 public_key = _helpers.to_bytes(public_key)
143 is_x509_cert = _CERTIFICATE_MARKER in public_key
144
145 # If this is a certificate, extract the public key info.
146 if is_x509_cert:
147 der = rsa.pem.load_pem(public_key, 'CERTIFICATE')
148 asn1_cert, remaining = decoder.decode(der, asn1Spec=Certificate())
149 if remaining != b'':
150 raise ValueError('Unused bytes', remaining)
151
152 cert_info = asn1_cert['tbsCertificate']['subjectPublicKeyInfo']
153 key_bytes = _bit_list_to_bytes(cert_info['subjectPublicKey'])
154 pubkey = rsa.PublicKey.load_pkcs1(key_bytes, 'DER')
155 else:
156 pubkey = rsa.PublicKey.load_pkcs1(public_key, 'PEM')
157 return cls(pubkey)
158
159
160 def verify_signature(message, signature, certs):
161 """Verify an RSA cryptographic signature.
162
163 Checks that the provided ``signature`` was generated from ``bytes`` using
164 the private key associated with the ``cert``.
165
166 Args:
167 message (Union[str, bytes]): The plaintext message.
168 signature (Union[str, bytes]): The cryptographic signature to check.
169 certs (Union[Sequence, str, bytes]): The certificate or certificates
170 to use to check the signature.
171
172 Returns:
173 bool: True if the signature is valid, otherwise False.
174 """
175 if isinstance(certs, (six.text_type, six.binary_type)):
176 certs = [certs]
177
178 for cert in certs:
179 verifier = RSAVerifier.from_string(cert)
180 if verifier.verify(message, signature):
181 return True
182 return False
183
184
185 @six.add_metaclass(abc.ABCMeta)
186 class Signer(object):
187 """Abstract base class for cryptographic signers."""
188
189 @abc.abstractproperty
190 def key_id(self):
191 """Optional[str]: The key ID used to identify this private key."""
192 raise NotImplementedError('Key id must be implemented')
193
194 @abc.abstractmethod
195 def sign(self, message):
196 """Signs a message.
197
198 Args:
199 message (Union[str, bytes]): The message to be signed.
200
201 Returns:
202 bytes: The signature of the message.
203 """
204 # pylint: disable=missing-raises-doc,redundant-returns-doc
205 # (pylint doesn't recognize that this is abstract)
206 raise NotImplementedError('Sign must be implemented')
207
208
209 class RSASigner(Signer):
210 """Signs messages with an RSA private key.
211
212 Args:
213 private_key (rsa.key.PrivateKey): The private key to sign with.
214 key_id (str): Optional key ID used to identify this private key. This
215 can be useful to associate the private key with its associated
216 public key or certificate.
217 """
218
219 def __init__(self, private_key, key_id=None):
220 self._key = private_key
221 self._key_id = key_id
222
223 @property
224 @_helpers.copy_docstring(Signer)
225 def key_id(self):
226 return self._key_id
227
228 @_helpers.copy_docstring(Signer)
229 def sign(self, message):
230 message = _helpers.to_bytes(message)
231 return rsa.pkcs1.sign(message, self._key, 'SHA-256')
232
233 @classmethod
234 def from_string(cls, key, key_id=None):
235 """Construct an Signer instance from a private key in PEM format.
236
237 Args:
238 key (str): Private key in PEM format.
239 key_id (str): An optional key id used to identify the private key.
240
241 Returns:
242 google.auth.crypt.Signer: The constructed signer.
243
244 Raises:
245 ValueError: If the key cannot be parsed as PKCS#1 or PKCS#8 in
246 PEM format.
247 """
248 key = _helpers.from_bytes(key) # PEM expects str in Python 3
249 marker_id, key_bytes = pem.readPemBlocksFromFile(
250 six.StringIO(key), _PKCS1_MARKER, _PKCS8_MARKER)
251
252 # Key is in pkcs1 format.
253 if marker_id == 0:
254 private_key = rsa.key.PrivateKey.load_pkcs1(
255 key_bytes, format='DER')
256 # Key is in pkcs8.
257 elif marker_id == 1:
258 key_info, remaining = decoder.decode(
259 key_bytes, asn1Spec=_PKCS8_SPEC)
260 if remaining != b'':
261 raise ValueError('Unused bytes', remaining)
262 private_key_info = key_info.getComponentByName('privateKey')
263 private_key = rsa.key.PrivateKey.load_pkcs1(
264 private_key_info.asOctets(), format='DER')
265 else:
266 raise ValueError('No key could be detected.')
267
268 return cls(private_key, key_id=key_id)
269
270 @classmethod
271 def from_service_account_info(cls, info):
272 """Creates a Signer instance instance from a dictionary containing
273 service account info in Google format.
274
275 Args:
276 info (Mapping[str, str]): The service account info in Google
277 format.
278
279 Returns:
280 google.auth.crypt.Signer: The constructed signer.
281
282 Raises:
283 ValueError: If the info is not in the expected format.
284 """
285 if _JSON_FILE_PRIVATE_KEY not in info:
286 raise ValueError(
287 'The private_key field was not found in the service account '
288 'info.')
289
290 return cls.from_string(
291 info[_JSON_FILE_PRIVATE_KEY],
292 info.get(_JSON_FILE_PRIVATE_KEY_ID))
293
294 @classmethod
295 def from_service_account_file(cls, filename):
296 """Creates a Signer instance from a service account .json file
297 in Google format.
298
299 Args:
300 filename (str): The path to the service account .json file.
301
302 Returns:
303 google.auth.crypt.Signer: The constructed signer.
304 """
305 with io.open(filename, 'r', encoding='utf-8') as json_file:
306 data = json.load(json_file)
307
308 return cls.from_service_account_info(data)
OLDNEW
« no previous file with comments | « client/third_party/google/auth/credentials.py ('k') | client/third_party/google/auth/environment_vars.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698