OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2016 The LUCI Authors. All rights reserved. | 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
5 | 5 |
6 """A low-level blob storage/retrieval interface to the Isolate server""" | 6 """A low-level blob storage/retrieval interface to the Isolate server""" |
7 | 7 |
8 import base64 | 8 import base64 |
9 import binascii | 9 import binascii |
10 import collections | 10 import collections |
11 import logging | 11 import logging |
| 12 import os |
12 import re | 13 import re |
13 import sys | 14 import sys |
14 import threading | 15 import threading |
15 import time | 16 import time |
16 import types | 17 import types |
| 18 import uuid |
17 | 19 |
18 from utils import file_path | 20 from utils import file_path |
19 from utils import net | 21 from utils import net |
20 | 22 |
21 import isolated_format | 23 import isolated_format |
22 | 24 |
23 # gRPC may not be installed on the worker machine. This is fine, as long as | 25 # gRPC may not be installed on the worker machine. This is fine, as long as |
24 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). | 26 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). |
| 27 # Full external requirements are: grpcio, certifi. |
25 try: | 28 try: |
26 import grpc | 29 import grpc |
27 from proto import isolate_bot_pb2 | 30 from google import auth as google_auth |
28 except ImportError: | 31 from google.auth.transport import grpc as google_auth_transport_grpc |
| 32 from google.auth.transport import requests as google_auth_transport_requests |
| 33 from proto import bytestream_pb2 |
| 34 except ImportError as err: |
29 grpc = None | 35 grpc = None |
30 isolate_bot_pb2 = None | 36 bytestream_pb2 = None |
31 | 37 |
| 38 # If gRPC is installed, at least give a warning if certifi is not. This is not |
| 39 # actually used anywhere in this module, but if certifi is missing, |
| 40 # google.auth.transport will fail with |
| 41 # https://stackoverflow.com/questions/24973326 |
| 42 if grpc is not None: |
| 43 try: |
| 44 import certifi |
| 45 except ImportError as err: |
| 46 logging.warning('could not import certifi; gRPC HTTPS connections may fail') |
32 | 47 |
33 # Chunk size to use when reading from network stream. | 48 # Chunk size to use when reading from network stream. |
34 NET_IO_FILE_CHUNK = 16 * 1024 | 49 NET_IO_FILE_CHUNK = 16 * 1024 |
35 | 50 |
36 | 51 |
37 # Read timeout in seconds for downloads from isolate storage. If there's no | 52 # Read timeout in seconds for downloads from isolate storage. If there's no |
38 # response from the server within this timeout whole download will be aborted. | 53 # response from the server within this timeout whole download will be aborted. |
39 DOWNLOAD_READ_TIMEOUT = 60 | 54 DOWNLOAD_READ_TIMEOUT = 60 |
40 | 55 |
41 | 56 |
(...skipping 457 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
499 class IsolateServerGrpc(StorageApi): | 514 class IsolateServerGrpc(StorageApi): |
500 """StorageApi implementation that downloads and uploads to a gRPC service. | 515 """StorageApi implementation that downloads and uploads to a gRPC service. |
501 | 516 |
502 Limitations: only works for the default-gzip namespace, and with zero offsets | 517 Limitations: only works for the default-gzip namespace, and with zero offsets |
503 while fetching. | 518 while fetching. |
504 """ | 519 """ |
505 | 520 |
506 def __init__(self, server, namespace): | 521 def __init__(self, server, namespace): |
507 super(IsolateServerGrpc, self).__init__() | 522 super(IsolateServerGrpc, self).__init__() |
508 logging.info('Using gRPC for Isolate') | 523 logging.info('Using gRPC for Isolate') |
| 524 self._server = server |
| 525 self._lock = threading.Lock() |
| 526 self._memory_use = 0 |
| 527 self._num_pushes = 0 |
| 528 self._already_exists = 0 |
509 | 529 |
510 # Make sure grpc was successfully imported | 530 # Make sure grpc was successfully imported |
511 assert grpc | 531 assert grpc |
512 assert isolate_bot_pb2 | 532 assert bytestream_pb2 |
| 533 # Proxies only support the default-gzip namespace for now. |
| 534 # TODO(aludwin): support other namespaces if necessary |
| 535 assert namespace == 'default-gzip' |
513 | 536 |
514 # Proxies only support the default-gzip namespace for now. | 537 proxy = os.environ.get('ISOLATED_GRPC_PROXY', '') |
515 # TODO(aludwin): support other namespaces | 538 roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS') |
516 assert namespace == 'default-gzip' | 539 overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE') |
517 self._server = server | 540 |
518 self._channel = grpc.insecure_channel(server) | 541 # The "proxy" envvar must be of the form: |
519 self._stub = isolate_bot_pb2.FileServiceStub(self._channel) | 542 # http[s]://<server>[:port][/prefix] |
520 self._lock = threading.Lock() | 543 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) |
521 self._memory_use = 0 | 544 if not m: |
522 logging.info('...gRPC successfully initialized') | 545 raise ValueError(('gRPC proxy must have the form: ' |
| 546 'http[s]://<server>[:port][/prefix] ' |
| 547 '(given: %s)') % proxy) |
| 548 transport = m.group(1) |
| 549 host = m.group(2) |
| 550 prefix = m.group(3) |
| 551 if not prefix.endswith('/'): |
| 552 prefix = prefix + '/' |
| 553 logging.info('gRPC proxy: transport %s, host %s, prefix %s', |
| 554 transport, host, prefix) |
| 555 self._prefix = prefix |
| 556 |
| 557 if transport == 'http': |
| 558 self._channel = grpc.insecure_channel(host) |
| 559 elif transport == 'https': |
| 560 # Using cloud container builder scopes for testing: |
| 561 scopes = ('https://www.googleapis.com/auth/cloud-build-service',) |
| 562 credentials, _ = google_auth.default(scopes=scopes) |
| 563 request = google_auth_transport_requests.Request() |
| 564 options = () |
| 565 root_certs = None |
| 566 if roots is not None: |
| 567 logging.info('Using root CA %s', roots) |
| 568 with open(roots) as f: |
| 569 root_certs = f.read() |
| 570 if overd is not None: |
| 571 logging.info('Using TLS server override %s', overd) |
| 572 options=(('grpc.ssl_target_name_override', overd),) |
| 573 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) |
| 574 self._channel = google_auth_transport_grpc.secure_authorized_channel( |
| 575 credentials, request, host, ssl_creds, options=options) |
| 576 else: |
| 577 raise ValueError('unknown transport %s (should be http[s])' % transport) |
| 578 self._stub = bytestream_pb2.ByteStreamStub(self._channel) |
523 | 579 |
524 @property | 580 @property |
525 def location(self): | 581 def location(self): |
526 return self._server | 582 return self._server |
527 | 583 |
528 @property | 584 @property |
529 def namespace(self): | 585 def namespace(self): |
530 # This is used to determine if the data is compressed, but gRPC proxies | 586 # This is used to determine if the data is compressed, but gRPC proxies |
531 # don't have concepts of 'namespaces' and natively compress all messages | 587 # don't have concepts of 'namespaces' and natively compress all messages |
532 # before transmission. So return an unlikely-to-be-used name so that | 588 # before transmission. So return an unlikely-to-be-used name so that |
533 # isolateserver doesn't try to compress anything. | 589 # isolateserver doesn't try to compress anything. |
534 return 'grpc-proxy' | 590 return 'grpc-proxy' |
535 | 591 |
536 def fetch(self, digest, offset=0): | 592 def fetch(self, digest, offset=0): |
537 # The gRPC APIs only work with an offset of 0 | 593 # The gRPC APIs only work with an offset of 0 |
538 assert offset == 0 | 594 assert offset == 0 |
539 request = isolate_bot_pb2.FetchBlobsRequest() | 595 request = bytestream_pb2.ReadRequest() |
540 req_digest = request.digest.add() | 596 #TODO(aludwin): send the expected size of the item |
541 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte | 597 request.resource_name = '%sblobs/%s/0' % ( |
542 # array (like [0x01, 0x2a, 0xbc]). | 598 self._prefix, digest) |
543 req_digest.digest = binascii.unhexlify(digest) | |
544 expected_offset = 0 | |
545 try: | 599 try: |
546 for response in self._stub.FetchBlobs(request, | 600 for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT): |
547 timeout=DOWNLOAD_READ_TIMEOUT): | 601 yield response.data |
548 if not response.status.succeeded: | |
549 raise IOError( | |
550 'Error while fetching %s: %s' % (digest, response.status)) | |
551 if not expected_offset == response.data.offset: | |
552 raise IOError( | |
553 'Error while fetching %s: expected offset %d, got %d' % ( | |
554 digest, expected_offset, response.data.offset)) | |
555 expected_offset += len(response.data.data) | |
556 yield response.data.data | |
557 except grpc.RpcError as g: | 602 except grpc.RpcError as g: |
558 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) | 603 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
559 raise IOError(g) | 604 raise IOError(g) |
560 | 605 |
561 def push(self, item, push_state, content=None): | 606 def push(self, item, push_state, content=None): |
562 assert isinstance(item, Item) | 607 assert isinstance(item, Item) |
563 assert item.digest is not None | 608 assert item.digest is not None |
564 assert item.size is not None | 609 assert item.size is not None |
565 assert isinstance(push_state, _IsolateServerGrpcPushState) | 610 assert isinstance(push_state, _IsolateServerGrpcPushState) |
566 | 611 |
567 # Default to item.content(). | 612 # Default to item.content(). |
568 content = item.content() if content is None else content | 613 content = item.content() if content is None else content |
569 guard_memory_use(self, content, item.size) | 614 guard_memory_use(self, content, item.size) |
| 615 self._num_pushes += 1 |
570 | 616 |
571 try: | 617 try: |
572 def chunker(): | 618 def chunker(): |
573 # Returns one bit of content at a time | 619 # Returns one bit of content at a time |
574 if (isinstance(content, str) | 620 if (isinstance(content, str) |
575 or not isinstance(content, collections.Iterable)): | 621 or not isinstance(content, collections.Iterable)): |
576 yield content | 622 yield content |
577 else: | 623 else: |
578 for chunk in content: | 624 for chunk in content: |
579 yield chunk | 625 yield chunk |
580 def slicer(): | 626 def slicer(): |
581 # Ensures every bit of content is under the gRPC max size; yields | 627 # Ensures every bit of content is under the gRPC max size; yields |
582 # proto messages to send via gRPC. | 628 # proto messages to send via gRPC. |
583 request = isolate_bot_pb2.PushBlobsRequest() | 629 request = bytestream_pb2.WriteRequest() |
584 request.data.digest.digest = binascii.unhexlify(item.digest) | 630 u = uuid.uuid4() |
585 request.data.digest.size_bytes = item.size | 631 request.resource_name = '%suploads/%s/blobs/%s/%d' % ( |
586 request.data.offset = 0 | 632 self._prefix, u, item.digest, item.size) |
| 633 request.write_offset = 0 |
587 for chunk in chunker(): | 634 for chunk in chunker(): |
588 # Make sure we send at least one chunk for zero-length blobs | 635 # Make sure we send at least one chunk for zero-length blobs |
589 has_sent_anything = False | 636 has_sent_anything = False |
590 while chunk or not has_sent_anything: | 637 while chunk or not has_sent_anything: |
| 638 has_sent_anything = True |
591 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) | 639 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
592 request.data.data = chunk[:slice_len] | 640 request.data = chunk[:slice_len] |
| 641 if request.write_offset + slice_len == item.size: |
| 642 request.finish_write = True |
593 yield request | 643 yield request |
594 has_sent_anything = True | 644 request.write_offset += slice_len |
595 request.data.offset += slice_len | |
596 # The proxy only expects the first chunk to have the digest | |
597 request.data.ClearField("digest") | |
598 chunk = chunk[slice_len:] | 645 chunk = chunk[slice_len:] |
599 | 646 |
600 # TODO(aludwin): batch up several requests to reuse TCP connections | |
601 try: | 647 try: |
602 response = self._stub.PushBlobs(slicer()) | 648 response = self._stub.Write(slicer()) |
603 except grpc.RpcError as g: | 649 except grpc.Call as c: |
604 logging.error('gRPC error during push: re-throwing as IOError (%s)' % g) | 650 # You might think that errors from gRPC would be rpc.RpcError. You'd |
605 raise IOError(g) | 651 # be... right... but it's *also* an instance of grpc.Call, and that's |
| 652 # where the status code actually lives. |
| 653 if c.code() == grpc.StatusCode.ALREADY_EXISTS: |
| 654 # This is legit - we didn't check before we pushed so no problem if |
| 655 # it's already there. |
| 656 self._already_exists += 1 |
| 657 if self._already_exists % 100 == 0: |
| 658 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( |
| 659 self._already_exists, self._num_pushes, |
| 660 100.0 * self._already_exists / self._num_pushes)) |
| 661 else: |
| 662 logging.error('gRPC error during push: throwing as IOError (%s)' % c) |
| 663 raise IOError(c) |
| 664 except Exception as e: |
| 665 logging.error('error during push: throwing as IOError (%s)' % e) |
| 666 raise IOError(e) |
606 | 667 |
607 if not response.status.succeeded: | 668 if response.committed_size != item.size: |
608 raise IOError( | 669 raise IOError('%s/%d: incorrect size written (%d)' % ( |
609 'Error while uploading %s: %s' % ( | 670 item.digest, item.size, response.committed_size)) |
610 item.digest, response.status.error_detail)) | |
611 | 671 |
612 finally: | 672 finally: |
613 with self._lock: | 673 with self._lock: |
614 self._memory_use -= item.size | 674 self._memory_use -= item.size |
615 | 675 |
616 def contains(self, items): | 676 def contains(self, items): |
617 """Returns the set of all missing items.""" | 677 """Returns the set of all missing items.""" |
| 678 # TODO(aludwin): this isn't supported directly in Bytestream, so for now |
| 679 # assume that nothing is present in the cache. |
618 # Ensure all items were initialized with 'prepare' call. Storage does that. | 680 # Ensure all items were initialized with 'prepare' call. Storage does that. |
619 assert all(i.digest is not None and i.size is not None for i in items) | 681 assert all(i.digest is not None and i.size is not None for i in items) |
620 request = isolate_bot_pb2.ContainsRequest() | 682 # Assume all Items are missing, and attach _PushState to them. The gRPC |
621 items_by_digest = {} | 683 # implementation doesn't actually have a push state, we just attach empty |
| 684 # objects to satisfy the StorageApi interface. |
| 685 missing_items = {} |
622 for item in items: | 686 for item in items: |
623 cd = request.digest.add() | |
624 cd.digest = binascii.unhexlify(item.digest) | |
625 items_by_digest[cd.digest] = item | |
626 try: | |
627 response = self._stub.Contains(request) | |
628 except grpc.RpcError as g: | |
629 logging.error('gRPC error during contains: re-throwing as IOError (%s)' | |
630 % g) | |
631 raise IOError(g) | |
632 | |
633 # If everything's present, return the empty set. | |
634 if response.status.succeeded: | |
635 return {} | |
636 | |
637 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: | |
638 raise IOError('Unknown response during lookup: %s' % response.status) | |
639 | |
640 # Pick Items that are missing, attach _PushState to them. The gRPC | |
641 # implementation doesn't actually have a push state, we just attach | |
642 # empty objects to satisfy the StorageApi interface. | |
643 missing_items = {} | |
644 for missing in response.status.missing_digest: | |
645 item = items_by_digest[missing.digest] | |
646 missing_items[item] = _IsolateServerGrpcPushState() | 687 missing_items[item] = _IsolateServerGrpcPushState() |
647 | |
648 logging.info('Queried %d files, %d cache hit', | |
649 len(items), len(items) - len(missing_items)) | |
650 return missing_items | 688 return missing_items |
651 | 689 |
652 | 690 |
653 def set_storage_api_class(cls): | 691 def set_storage_api_class(cls): |
654 """Replaces StorageApi implementation used by default.""" | 692 """Replaces StorageApi implementation used by default.""" |
655 global _storage_api_cls | 693 global _storage_api_cls |
656 assert _storage_api_cls is None | 694 assert _storage_api_cls is None |
657 assert issubclass(cls, StorageApi) | 695 assert issubclass(cls, StorageApi) |
658 _storage_api_cls = cls | 696 _storage_api_cls = cls |
659 | 697 |
660 | 698 |
661 def get_storage_api(url, namespace): | 699 def get_storage_api(url, namespace): |
662 """Returns an object that implements low-level StorageApi interface. | 700 """Returns an object that implements low-level StorageApi interface. |
663 | 701 |
664 It is used by Storage to work with single isolate |namespace|. It should | 702 It is used by Storage to work with single isolate |namespace|. It should |
665 rarely be used directly by clients, see 'get_storage' for | 703 rarely be used directly by clients, see 'get_storage' for |
666 a better alternative. | 704 a better alternative. |
667 | 705 |
668 Arguments: | 706 Arguments: |
669 url: URL of isolate service to use shared cloud based storage. | 707 url: URL of isolate service to use shared cloud based storage. |
670 namespace: isolate namespace to operate in, also defines hashing and | 708 namespace: isolate namespace to operate in, also defines hashing and |
671 compression scheme used, i.e. namespace names that end with '-gzip' | 709 compression scheme used, i.e. namespace names that end with '-gzip' |
672 store compressed data. | 710 store compressed data. |
673 | 711 |
674 Returns: | 712 Returns: |
675 Instance of StorageApi subclass. | 713 Instance of StorageApi subclass. |
676 """ | 714 """ |
677 cls = _storage_api_cls or IsolateServer | 715 cls = _storage_api_cls or IsolateServer |
678 return cls(url, namespace) | 716 return cls(url, namespace) |
OLD | NEW |