Index: client/isolate_storage.py |
diff --git a/client/isolate_storage.py b/client/isolate_storage.py |
index 63234b0a1295d85e9e700aa057373ff5516f05be..7dc92b03db0b318288265c6b18322ed0626aa4fb 100644 |
--- a/client/isolate_storage.py |
+++ b/client/isolate_storage.py |
@@ -9,11 +9,13 @@ import base64 |
import binascii |
import collections |
import logging |
+import os |
import re |
import sys |
import threading |
import time |
import types |
+import uuid |
from utils import file_path |
from utils import net |
@@ -22,13 +24,26 @@ import isolated_format |
# gRPC may not be installed on the worker machine. This is fine, as long as |
# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). |
+# Full external requirements are: grpcio, certifi. |
try: |
import grpc |
- from proto import isolate_bot_pb2 |
-except ImportError: |
+ from google import auth as google_auth |
+ from google.auth.transport import grpc as google_auth_transport_grpc |
+ from google.auth.transport import requests as google_auth_transport_requests |
+ from proto import bytestream_pb2 |
+except ImportError as err: |
grpc = None |
- isolate_bot_pb2 = None |
- |
+ bytestream_pb2 = None |
+ |
+# If gRPC is installed, at least give a warning if certifi is not. This is not |
+# actually used anywhere in this module, but if certifi is missing, |
+# google.auth.transport will fail with |
+# https://stackoverflow.com/questions/24973326 |
+if grpc is not None: |
+ try: |
+ import certifi |
+ except ImportError as err: |
+ logging.warning('could not import certifi; gRPC HTTPS connections may fail') |
# Chunk size to use when reading from network stream. |
NET_IO_FILE_CHUNK = 16 * 1024 |
@@ -506,20 +521,61 @@ class IsolateServerGrpc(StorageApi): |
def __init__(self, server, namespace): |
super(IsolateServerGrpc, self).__init__() |
logging.info('Using gRPC for Isolate') |
+ self._server = server |
+ self._lock = threading.Lock() |
+ self._memory_use = 0 |
+ self._num_pushes = 0 |
+ self._already_exists = 0 |
# Make sure grpc was successfully imported |
assert grpc |
- assert isolate_bot_pb2 |
- |
+ assert bytestream_pb2 |
# Proxies only support the default-gzip namespace for now. |
- # TODO(aludwin): support other namespaces |
+ # TODO(aludwin): support other namespaces if necessary |
assert namespace == 'default-gzip' |
- self._server = server |
- self._channel = grpc.insecure_channel(server) |
- self._stub = isolate_bot_pb2.FileServiceStub(self._channel) |
- self._lock = threading.Lock() |
- self._memory_use = 0 |
- logging.info('...gRPC successfully initialized') |
+ |
+ proxy = os.environ.get('ISOLATED_GRPC_PROXY', '') |
+ roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS') |
+ overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE') |
+ |
+ # The "proxy" envvar must be of the form: |
+ # http[s]://<server>[:port][/prefix] |
+ m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) |
+ if not m: |
+ raise ValueError(('gRPC proxy must have the form: ' |
+ 'http[s]://<server>[:port][/prefix] ' |
+ '(given: %s)') % proxy) |
+ transport = m.group(1) |
+ host = m.group(2) |
+ prefix = m.group(3) |
+ if not prefix.endswith('/'): |
+ prefix = prefix + '/' |
+ logging.info('gRPC proxy: transport %s, host %s, prefix %s', |
+ transport, host, prefix) |
+ self._prefix = prefix |
+ |
+ if transport == 'http': |
+ self._channel = grpc.insecure_channel(host) |
+ elif transport == 'https': |
+ # Using cloud container builder scopes for testing: |
+ scopes = ('https://www.googleapis.com/auth/cloud-build-service',) |
+ credentials, _ = google_auth.default(scopes=scopes) |
+ request = google_auth_transport_requests.Request() |
+ options = () |
+ root_certs = None |
+ if roots is not None: |
+ logging.info('Using root CA %s', roots) |
+ with open(roots) as f: |
+ root_certs = f.read() |
+ if overd is not None: |
+ logging.info('Using TLS server override %s', overd) |
+ options=(('grpc.ssl_target_name_override', overd),) |
+ ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) |
+ self._channel = google_auth_transport_grpc.secure_authorized_channel( |
+ credentials, request, host, ssl_creds, options=options) |
+ else: |
+ raise ValueError('unknown transport %s (should be http[s])' % transport) |
+ self._stub = bytestream_pb2.ByteStreamStub(self._channel) |
@property |
def location(self): |
@@ -536,24 +592,13 @@ class IsolateServerGrpc(StorageApi): |
def fetch(self, digest, offset=0): |
# The gRPC APIs only work with an offset of 0 |
assert offset == 0 |
- request = isolate_bot_pb2.FetchBlobsRequest() |
- req_digest = request.digest.add() |
- # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte |
- # array (like [0x01, 0x2a, 0xbc]). |
- req_digest.digest = binascii.unhexlify(digest) |
- expected_offset = 0 |
+ request = bytestream_pb2.ReadRequest() |
+ #TODO(aludwin): send the expected size of the item |
+ request.resource_name = '%sblobs/%s/0' % ( |
+ self._prefix, digest) |
try: |
- for response in self._stub.FetchBlobs(request, |
- timeout=DOWNLOAD_READ_TIMEOUT): |
- if not response.status.succeeded: |
- raise IOError( |
- 'Error while fetching %s: %s' % (digest, response.status)) |
- if not expected_offset == response.data.offset: |
- raise IOError( |
- 'Error while fetching %s: expected offset %d, got %d' % ( |
- digest, expected_offset, response.data.offset)) |
- expected_offset += len(response.data.data) |
- yield response.data.data |
+ for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT): |
+ yield response.data |
except grpc.RpcError as g: |
logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
raise IOError(g) |
@@ -567,6 +612,7 @@ class IsolateServerGrpc(StorageApi): |
# Default to item.content(). |
content = item.content() if content is None else content |
guard_memory_use(self, content, item.size) |
+ self._num_pushes += 1 |
try: |
def chunker(): |
@@ -580,34 +626,48 @@ class IsolateServerGrpc(StorageApi): |
def slicer(): |
# Ensures every bit of content is under the gRPC max size; yields |
# proto messages to send via gRPC. |
- request = isolate_bot_pb2.PushBlobsRequest() |
- request.data.digest.digest = binascii.unhexlify(item.digest) |
- request.data.digest.size_bytes = item.size |
- request.data.offset = 0 |
+ request = bytestream_pb2.WriteRequest() |
+ u = uuid.uuid4() |
+ request.resource_name = '%suploads/%s/blobs/%s/%d' % ( |
+ self._prefix, u, item.digest, item.size) |
+ request.write_offset = 0 |
for chunk in chunker(): |
# Make sure we send at least one chunk for zero-length blobs |
has_sent_anything = False |
while chunk or not has_sent_anything: |
+ has_sent_anything = True |
slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
- request.data.data = chunk[:slice_len] |
+ request.data = chunk[:slice_len] |
+ if request.write_offset + slice_len == item.size: |
+ request.finish_write = True |
yield request |
- has_sent_anything = True |
- request.data.offset += slice_len |
- # The proxy only expects the first chunk to have the digest |
- request.data.ClearField("digest") |
+ request.write_offset += slice_len |
chunk = chunk[slice_len:] |
- # TODO(aludwin): batch up several requests to reuse TCP connections |
try: |
- response = self._stub.PushBlobs(slicer()) |
- except grpc.RpcError as g: |
- logging.error('gRPC error during push: re-throwing as IOError (%s)' % g) |
- raise IOError(g) |
+ response = self._stub.Write(slicer()) |
+ except grpc.Call as c: |
+ # You might think that errors from gRPC would be rpc.RpcError. You'd |
+ # be... right... but it's *also* an instance of grpc.Call, and that's |
+ # where the status code actually lives. |
+ if c.code() == grpc.StatusCode.ALREADY_EXISTS: |
+ # This is legit - we didn't check before we pushed so no problem if |
+ # it's already there. |
+ self._already_exists += 1 |
+ if self._already_exists % 100 == 0: |
+ logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( |
+ self._already_exists, self._num_pushes, |
+ 100.0 * self._already_exists / self._num_pushes)) |
+ else: |
+ logging.error('gRPC error during push: throwing as IOError (%s)' % c) |
+ raise IOError(c) |
+ except Exception as e: |
+ logging.error('error during push: throwing as IOError (%s)' % e) |
+ raise IOError(e) |
- if not response.status.succeeded: |
- raise IOError( |
- 'Error while uploading %s: %s' % ( |
- item.digest, response.status.error_detail)) |
+ if response.committed_size != item.size: |
+ raise IOError('%s/%d: incorrect size written (%d)' % ( |
+ item.digest, item.size, response.committed_size)) |
finally: |
with self._lock: |
@@ -615,38 +675,16 @@ class IsolateServerGrpc(StorageApi): |
def contains(self, items): |
"""Returns the set of all missing items.""" |
+ # TODO(aludwin): this isn't supported directly in Bytestream, so for now |
+ # assume that nothing is present in the cache. |
# Ensure all items were initialized with 'prepare' call. Storage does that. |
assert all(i.digest is not None and i.size is not None for i in items) |
- request = isolate_bot_pb2.ContainsRequest() |
- items_by_digest = {} |
- for item in items: |
- cd = request.digest.add() |
- cd.digest = binascii.unhexlify(item.digest) |
- items_by_digest[cd.digest] = item |
- try: |
- response = self._stub.Contains(request) |
- except grpc.RpcError as g: |
- logging.error('gRPC error during contains: re-throwing as IOError (%s)' |
- % g) |
- raise IOError(g) |
- |
- # If everything's present, return the empty set. |
- if response.status.succeeded: |
- return {} |
- |
- if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: |
- raise IOError('Unknown response during lookup: %s' % response.status) |
- |
- # Pick Items that are missing, attach _PushState to them. The gRPC |
- # implementation doesn't actually have a push state, we just attach |
- # empty objects to satisfy the StorageApi interface. |
+ # Assume all Items are missing, and attach _PushState to them. The gRPC |
+ # implementation doesn't actually have a push state, we just attach empty |
+ # objects to satisfy the StorageApi interface. |
missing_items = {} |
- for missing in response.status.missing_digest: |
- item = items_by_digest[missing.digest] |
+ for item in items: |
missing_items[item] = _IsolateServerGrpcPushState() |
- |
- logging.info('Queried %d files, %d cache hit', |
- len(items), len(items) - len(missing_items)) |
return missing_items |