Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(195)

Side by Side Diff: client/isolate_storage.py

Issue 2953253003: Replace custom blob gRPC API with ByteStream (Closed)
Patch Set: Import ndb directly to test code Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/swarming/server/bot_code_test.py ('k') | client/isolateserver.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2016 The LUCI Authors. All rights reserved. 2 # Copyright 2016 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file. 4 # that can be found in the LICENSE file.
5 5
6 """A low-level blob storage/retrieval interface to the Isolate server""" 6 """A low-level blob storage/retrieval interface to the Isolate server"""
7 7
8 import base64 8 import base64
9 import binascii 9 import binascii
10 import collections 10 import collections
11 import logging 11 import logging
12 import os
12 import re 13 import re
13 import sys 14 import sys
14 import threading 15 import threading
15 import time 16 import time
16 import types 17 import types
18 import uuid
17 19
18 from utils import file_path 20 from utils import file_path
19 from utils import net 21 from utils import net
20 22
21 import isolated_format 23 import isolated_format
22 24
23 # gRPC may not be installed on the worker machine. This is fine, as long as 25 # gRPC may not be installed on the worker machine. This is fine, as long as
24 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). 26 # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
27 # Full external requirements are: grpcio, certifi.
25 try: 28 try:
26 import grpc 29 import grpc
27 from proto import isolate_bot_pb2 30 from google import auth as google_auth
28 except ImportError: 31 from google.auth.transport import grpc as google_auth_transport_grpc
32 from google.auth.transport import requests as google_auth_transport_requests
33 from proto import bytestream_pb2
34 except ImportError as err:
29 grpc = None 35 grpc = None
30 isolate_bot_pb2 = None 36 bytestream_pb2 = None
31 37
38 # If gRPC is installed, at least give a warning if certifi is not. This is not
39 # actually used anywhere in this module, but if certifi is missing,
40 # google.auth.transport will fail with
41 # https://stackoverflow.com/questions/24973326
42 if grpc is not None:
43 try:
44 import certifi
45 except ImportError as err:
46 logging.warning('could not import certifi; gRPC HTTPS connections may fail')
32 47
33 # Chunk size to use when reading from network stream. 48 # Chunk size to use when reading from network stream.
34 NET_IO_FILE_CHUNK = 16 * 1024 49 NET_IO_FILE_CHUNK = 16 * 1024
35 50
36 51
37 # Read timeout in seconds for downloads from isolate storage. If there's no 52 # Read timeout in seconds for downloads from isolate storage. If there's no
38 # response from the server within this timeout whole download will be aborted. 53 # response from the server within this timeout whole download will be aborted.
39 DOWNLOAD_READ_TIMEOUT = 60 54 DOWNLOAD_READ_TIMEOUT = 60
40 55
41 56
(...skipping 457 matching lines...) Expand 10 before | Expand all | Expand 10 after
499 class IsolateServerGrpc(StorageApi): 514 class IsolateServerGrpc(StorageApi):
500 """StorageApi implementation that downloads and uploads to a gRPC service. 515 """StorageApi implementation that downloads and uploads to a gRPC service.
501 516
502 Limitations: only works for the default-gzip namespace, and with zero offsets 517 Limitations: only works for the default-gzip namespace, and with zero offsets
503 while fetching. 518 while fetching.
504 """ 519 """
505 520
506 def __init__(self, server, namespace): 521 def __init__(self, server, namespace):
507 super(IsolateServerGrpc, self).__init__() 522 super(IsolateServerGrpc, self).__init__()
508 logging.info('Using gRPC for Isolate') 523 logging.info('Using gRPC for Isolate')
524 self._server = server
525 self._lock = threading.Lock()
526 self._memory_use = 0
527 self._num_pushes = 0
528 self._already_exists = 0
509 529
510 # Make sure grpc was successfully imported 530 # Make sure grpc was successfully imported
511 assert grpc 531 assert grpc
512 assert isolate_bot_pb2 532 assert bytestream_pb2
533 # Proxies only support the default-gzip namespace for now.
534 # TODO(aludwin): support other namespaces if necessary
535 assert namespace == 'default-gzip'
513 536
514 # Proxies only support the default-gzip namespace for now. 537 proxy = os.environ.get('ISOLATED_GRPC_PROXY', '')
515 # TODO(aludwin): support other namespaces 538 roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS')
516 assert namespace == 'default-gzip' 539 overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE')
517 self._server = server 540
518 self._channel = grpc.insecure_channel(server) 541 # The "proxy" envvar must be of the form:
519 self._stub = isolate_bot_pb2.FileServiceStub(self._channel) 542 # http[s]://<server>[:port][/prefix]
520 self._lock = threading.Lock() 543 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
521 self._memory_use = 0 544 if not m:
522 logging.info('...gRPC successfully initialized') 545 raise ValueError(('gRPC proxy must have the form: '
546 'http[s]://<server>[:port][/prefix] '
547 '(given: %s)') % proxy)
548 transport = m.group(1)
549 host = m.group(2)
550 prefix = m.group(3)
551 if not prefix.endswith('/'):
552 prefix = prefix + '/'
553 logging.info('gRPC proxy: transport %s, host %s, prefix %s',
554 transport, host, prefix)
555 self._prefix = prefix
556
557 if transport == 'http':
558 self._channel = grpc.insecure_channel(host)
559 elif transport == 'https':
560 # Using cloud container builder scopes for testing:
561 scopes = ('https://www.googleapis.com/auth/cloud-build-service',)
562 credentials, _ = google_auth.default(scopes=scopes)
563 request = google_auth_transport_requests.Request()
564 options = ()
565 root_certs = None
566 if roots is not None:
567 logging.info('Using root CA %s', roots)
568 with open(roots) as f:
569 root_certs = f.read()
570 if overd is not None:
571 logging.info('Using TLS server override %s', overd)
572 options=(('grpc.ssl_target_name_override', overd),)
573 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
574 self._channel = google_auth_transport_grpc.secure_authorized_channel(
575 credentials, request, host, ssl_creds, options=options)
576 else:
577 raise ValueError('unknown transport %s (should be http[s])' % transport)
578 self._stub = bytestream_pb2.ByteStreamStub(self._channel)
523 579
524 @property 580 @property
525 def location(self): 581 def location(self):
526 return self._server 582 return self._server
527 583
528 @property 584 @property
529 def namespace(self): 585 def namespace(self):
530 # This is used to determine if the data is compressed, but gRPC proxies 586 # This is used to determine if the data is compressed, but gRPC proxies
531 # don't have concepts of 'namespaces' and natively compress all messages 587 # don't have concepts of 'namespaces' and natively compress all messages
532 # before transmission. So return an unlikely-to-be-used name so that 588 # before transmission. So return an unlikely-to-be-used name so that
533 # isolateserver doesn't try to compress anything. 589 # isolateserver doesn't try to compress anything.
534 return 'grpc-proxy' 590 return 'grpc-proxy'
535 591
536 def fetch(self, digest, offset=0): 592 def fetch(self, digest, offset=0):
537 # The gRPC APIs only work with an offset of 0 593 # The gRPC APIs only work with an offset of 0
538 assert offset == 0 594 assert offset == 0
539 request = isolate_bot_pb2.FetchBlobsRequest() 595 request = bytestream_pb2.ReadRequest()
540 req_digest = request.digest.add() 596 #TODO(aludwin): send the expected size of the item
541 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte 597 request.resource_name = '%sblobs/%s/0' % (
542 # array (like [0x01, 0x2a, 0xbc]). 598 self._prefix, digest)
543 req_digest.digest = binascii.unhexlify(digest)
544 expected_offset = 0
545 try: 599 try:
546 for response in self._stub.FetchBlobs(request, 600 for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT):
547 timeout=DOWNLOAD_READ_TIMEOUT): 601 yield response.data
548 if not response.status.succeeded:
549 raise IOError(
550 'Error while fetching %s: %s' % (digest, response.status))
551 if not expected_offset == response.data.offset:
552 raise IOError(
553 'Error while fetching %s: expected offset %d, got %d' % (
554 digest, expected_offset, response.data.offset))
555 expected_offset += len(response.data.data)
556 yield response.data.data
557 except grpc.RpcError as g: 602 except grpc.RpcError as g:
558 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) 603 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
559 raise IOError(g) 604 raise IOError(g)
560 605
561 def push(self, item, push_state, content=None): 606 def push(self, item, push_state, content=None):
562 assert isinstance(item, Item) 607 assert isinstance(item, Item)
563 assert item.digest is not None 608 assert item.digest is not None
564 assert item.size is not None 609 assert item.size is not None
565 assert isinstance(push_state, _IsolateServerGrpcPushState) 610 assert isinstance(push_state, _IsolateServerGrpcPushState)
566 611
567 # Default to item.content(). 612 # Default to item.content().
568 content = item.content() if content is None else content 613 content = item.content() if content is None else content
569 guard_memory_use(self, content, item.size) 614 guard_memory_use(self, content, item.size)
615 self._num_pushes += 1
570 616
571 try: 617 try:
572 def chunker(): 618 def chunker():
573 # Returns one bit of content at a time 619 # Returns one bit of content at a time
574 if (isinstance(content, str) 620 if (isinstance(content, str)
575 or not isinstance(content, collections.Iterable)): 621 or not isinstance(content, collections.Iterable)):
576 yield content 622 yield content
577 else: 623 else:
578 for chunk in content: 624 for chunk in content:
579 yield chunk 625 yield chunk
580 def slicer(): 626 def slicer():
581 # Ensures every bit of content is under the gRPC max size; yields 627 # Ensures every bit of content is under the gRPC max size; yields
582 # proto messages to send via gRPC. 628 # proto messages to send via gRPC.
583 request = isolate_bot_pb2.PushBlobsRequest() 629 request = bytestream_pb2.WriteRequest()
584 request.data.digest.digest = binascii.unhexlify(item.digest) 630 u = uuid.uuid4()
585 request.data.digest.size_bytes = item.size 631 request.resource_name = '%suploads/%s/blobs/%s/%d' % (
586 request.data.offset = 0 632 self._prefix, u, item.digest, item.size)
633 request.write_offset = 0
587 for chunk in chunker(): 634 for chunk in chunker():
588 # Make sure we send at least one chunk for zero-length blobs 635 # Make sure we send at least one chunk for zero-length blobs
589 has_sent_anything = False 636 has_sent_anything = False
590 while chunk or not has_sent_anything: 637 while chunk or not has_sent_anything:
638 has_sent_anything = True
591 slice_len = min(len(chunk), NET_IO_FILE_CHUNK) 639 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
592 request.data.data = chunk[:slice_len] 640 request.data = chunk[:slice_len]
641 if request.write_offset + slice_len == item.size:
642 request.finish_write = True
593 yield request 643 yield request
594 has_sent_anything = True 644 request.write_offset += slice_len
595 request.data.offset += slice_len
596 # The proxy only expects the first chunk to have the digest
597 request.data.ClearField("digest")
598 chunk = chunk[slice_len:] 645 chunk = chunk[slice_len:]
599 646
600 # TODO(aludwin): batch up several requests to reuse TCP connections
601 try: 647 try:
602 response = self._stub.PushBlobs(slicer()) 648 response = self._stub.Write(slicer())
603 except grpc.RpcError as g: 649 except grpc.Call as c:
604 logging.error('gRPC error during push: re-throwing as IOError (%s)' % g) 650 # You might think that errors from gRPC would be rpc.RpcError. You'd
605 raise IOError(g) 651 # be... right... but it's *also* an instance of grpc.Call, and that's
652 # where the status code actually lives.
653 if c.code() == grpc.StatusCode.ALREADY_EXISTS:
654 # This is legit - we didn't check before we pushed so no problem if
655 # it's already there.
656 self._already_exists += 1
657 if self._already_exists % 100 == 0:
658 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
659 self._already_exists, self._num_pushes,
660 100.0 * self._already_exists / self._num_pushes))
661 else:
662 logging.error('gRPC error during push: throwing as IOError (%s)' % c)
663 raise IOError(c)
664 except Exception as e:
665 logging.error('error during push: throwing as IOError (%s)' % e)
666 raise IOError(e)
606 667
607 if not response.status.succeeded: 668 if response.committed_size != item.size:
608 raise IOError( 669 raise IOError('%s/%d: incorrect size written (%d)' % (
609 'Error while uploading %s: %s' % ( 670 item.digest, item.size, response.committed_size))
610 item.digest, response.status.error_detail))
611 671
612 finally: 672 finally:
613 with self._lock: 673 with self._lock:
614 self._memory_use -= item.size 674 self._memory_use -= item.size
615 675
616 def contains(self, items): 676 def contains(self, items):
617 """Returns the set of all missing items.""" 677 """Returns the set of all missing items."""
678 # TODO(aludwin): this isn't supported directly in Bytestream, so for now
679 # assume that nothing is present in the cache.
618 # Ensure all items were initialized with 'prepare' call. Storage does that. 680 # Ensure all items were initialized with 'prepare' call. Storage does that.
619 assert all(i.digest is not None and i.size is not None for i in items) 681 assert all(i.digest is not None and i.size is not None for i in items)
620 request = isolate_bot_pb2.ContainsRequest() 682 # Assume all Items are missing, and attach _PushState to them. The gRPC
621 items_by_digest = {} 683 # implementation doesn't actually have a push state, we just attach empty
684 # objects to satisfy the StorageApi interface.
685 missing_items = {}
622 for item in items: 686 for item in items:
623 cd = request.digest.add()
624 cd.digest = binascii.unhexlify(item.digest)
625 items_by_digest[cd.digest] = item
626 try:
627 response = self._stub.Contains(request)
628 except grpc.RpcError as g:
629 logging.error('gRPC error during contains: re-throwing as IOError (%s)'
630 % g)
631 raise IOError(g)
632
633 # If everything's present, return the empty set.
634 if response.status.succeeded:
635 return {}
636
637 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST:
638 raise IOError('Unknown response during lookup: %s' % response.status)
639
640 # Pick Items that are missing, attach _PushState to them. The gRPC
641 # implementation doesn't actually have a push state, we just attach
642 # empty objects to satisfy the StorageApi interface.
643 missing_items = {}
644 for missing in response.status.missing_digest:
645 item = items_by_digest[missing.digest]
646 missing_items[item] = _IsolateServerGrpcPushState() 687 missing_items[item] = _IsolateServerGrpcPushState()
647
648 logging.info('Queried %d files, %d cache hit',
649 len(items), len(items) - len(missing_items))
650 return missing_items 688 return missing_items
651 689
652 690
653 def set_storage_api_class(cls): 691 def set_storage_api_class(cls):
654 """Replaces StorageApi implementation used by default.""" 692 """Replaces StorageApi implementation used by default."""
655 global _storage_api_cls 693 global _storage_api_cls
656 assert _storage_api_cls is None 694 assert _storage_api_cls is None
657 assert issubclass(cls, StorageApi) 695 assert issubclass(cls, StorageApi)
658 _storage_api_cls = cls 696 _storage_api_cls = cls
659 697
660 698
661 def get_storage_api(url, namespace): 699 def get_storage_api(url, namespace):
662 """Returns an object that implements low-level StorageApi interface. 700 """Returns an object that implements low-level StorageApi interface.
663 701
664 It is used by Storage to work with single isolate |namespace|. It should 702 It is used by Storage to work with single isolate |namespace|. It should
665 rarely be used directly by clients, see 'get_storage' for 703 rarely be used directly by clients, see 'get_storage' for
666 a better alternative. 704 a better alternative.
667 705
668 Arguments: 706 Arguments:
669 url: URL of isolate service to use shared cloud based storage. 707 url: URL of isolate service to use shared cloud based storage.
670 namespace: isolate namespace to operate in, also defines hashing and 708 namespace: isolate namespace to operate in, also defines hashing and
671 compression scheme used, i.e. namespace names that end with '-gzip' 709 compression scheme used, i.e. namespace names that end with '-gzip'
672 store compressed data. 710 store compressed data.
673 711
674 Returns: 712 Returns:
675 Instance of StorageApi subclass. 713 Instance of StorageApi subclass.
676 """ 714 """
677 cls = _storage_api_cls or IsolateServer 715 cls = _storage_api_cls or IsolateServer
678 return cls(url, namespace) 716 return cls(url, namespace)
OLDNEW
« no previous file with comments | « appengine/swarming/server/bot_code_test.py ('k') | client/isolateserver.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698