Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(16)

Unified Diff: client/isolate_storage.py

Issue 2953253003: Replace custom blob gRPC API with ByteStream (Closed)
Patch Set: Import ndb directly to test code Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/swarming/server/bot_code_test.py ('k') | client/isolateserver.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/isolate_storage.py
diff --git a/client/isolate_storage.py b/client/isolate_storage.py
index 63234b0a1295d85e9e700aa057373ff5516f05be..7dc92b03db0b318288265c6b18322ed0626aa4fb 100644
--- a/client/isolate_storage.py
+++ b/client/isolate_storage.py
@@ -9,11 +9,13 @@ import base64
import binascii
import collections
import logging
+import os
import re
import sys
import threading
import time
import types
+import uuid
from utils import file_path
from utils import net
@@ -22,13 +24,26 @@ import isolated_format
# gRPC may not be installed on the worker machine. This is fine, as long as
# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
+# Full external requirements are: grpcio, certifi.
try:
import grpc
- from proto import isolate_bot_pb2
-except ImportError:
+ from google import auth as google_auth
+ from google.auth.transport import grpc as google_auth_transport_grpc
+ from google.auth.transport import requests as google_auth_transport_requests
+ from proto import bytestream_pb2
+except ImportError as err:
grpc = None
- isolate_bot_pb2 = None
-
+ bytestream_pb2 = None
+
+# If gRPC is installed, at least give a warning if certifi is not. This is not
+# actually used anywhere in this module, but if certifi is missing,
+# google.auth.transport will fail with
+# https://stackoverflow.com/questions/24973326
+if grpc is not None:
+ try:
+ import certifi
+ except ImportError as err:
+ logging.warning('could not import certifi; gRPC HTTPS connections may fail')
# Chunk size to use when reading from network stream.
NET_IO_FILE_CHUNK = 16 * 1024
@@ -506,20 +521,61 @@ class IsolateServerGrpc(StorageApi):
def __init__(self, server, namespace):
super(IsolateServerGrpc, self).__init__()
logging.info('Using gRPC for Isolate')
+ self._server = server
+ self._lock = threading.Lock()
+ self._memory_use = 0
+ self._num_pushes = 0
+ self._already_exists = 0
# Make sure grpc was successfully imported
assert grpc
- assert isolate_bot_pb2
-
+ assert bytestream_pb2
# Proxies only support the default-gzip namespace for now.
- # TODO(aludwin): support other namespaces
+ # TODO(aludwin): support other namespaces if necessary
assert namespace == 'default-gzip'
- self._server = server
- self._channel = grpc.insecure_channel(server)
- self._stub = isolate_bot_pb2.FileServiceStub(self._channel)
- self._lock = threading.Lock()
- self._memory_use = 0
- logging.info('...gRPC successfully initialized')
+
+ proxy = os.environ.get('ISOLATED_GRPC_PROXY', '')
+ roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS')
+ overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE')
+
+ # The "proxy" envvar must be of the form:
+ # http[s]://<server>[:port][/prefix]
+ m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
+ if not m:
+ raise ValueError(('gRPC proxy must have the form: '
+ 'http[s]://<server>[:port][/prefix] '
+ '(given: %s)') % proxy)
+ transport = m.group(1)
+ host = m.group(2)
+ prefix = m.group(3)
+ if not prefix.endswith('/'):
+ prefix = prefix + '/'
+ logging.info('gRPC proxy: transport %s, host %s, prefix %s',
+ transport, host, prefix)
+ self._prefix = prefix
+
+ if transport == 'http':
+ self._channel = grpc.insecure_channel(host)
+ elif transport == 'https':
+ # Using cloud container builder scopes for testing:
+ scopes = ('https://www.googleapis.com/auth/cloud-build-service',)
+ credentials, _ = google_auth.default(scopes=scopes)
+ request = google_auth_transport_requests.Request()
+ options = ()
+ root_certs = None
+ if roots is not None:
+ logging.info('Using root CA %s', roots)
+ with open(roots) as f:
+ root_certs = f.read()
+ if overd is not None:
+ logging.info('Using TLS server override %s', overd)
+ options=(('grpc.ssl_target_name_override', overd),)
+ ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
+ self._channel = google_auth_transport_grpc.secure_authorized_channel(
+ credentials, request, host, ssl_creds, options=options)
+ else:
+ raise ValueError('unknown transport %s (should be http[s])' % transport)
+ self._stub = bytestream_pb2.ByteStreamStub(self._channel)
@property
def location(self):
@@ -536,24 +592,13 @@ class IsolateServerGrpc(StorageApi):
def fetch(self, digest, offset=0):
# The gRPC APIs only work with an offset of 0
assert offset == 0
- request = isolate_bot_pb2.FetchBlobsRequest()
- req_digest = request.digest.add()
- # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte
- # array (like [0x01, 0x2a, 0xbc]).
- req_digest.digest = binascii.unhexlify(digest)
- expected_offset = 0
+ request = bytestream_pb2.ReadRequest()
+ #TODO(aludwin): send the expected size of the item
+ request.resource_name = '%sblobs/%s/0' % (
+ self._prefix, digest)
try:
- for response in self._stub.FetchBlobs(request,
- timeout=DOWNLOAD_READ_TIMEOUT):
- if not response.status.succeeded:
- raise IOError(
- 'Error while fetching %s: %s' % (digest, response.status))
- if not expected_offset == response.data.offset:
- raise IOError(
- 'Error while fetching %s: expected offset %d, got %d' % (
- digest, expected_offset, response.data.offset))
- expected_offset += len(response.data.data)
- yield response.data.data
+ for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT):
+ yield response.data
except grpc.RpcError as g:
logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
raise IOError(g)
@@ -567,6 +612,7 @@ class IsolateServerGrpc(StorageApi):
# Default to item.content().
content = item.content() if content is None else content
guard_memory_use(self, content, item.size)
+ self._num_pushes += 1
try:
def chunker():
@@ -580,34 +626,48 @@ class IsolateServerGrpc(StorageApi):
def slicer():
# Ensures every bit of content is under the gRPC max size; yields
# proto messages to send via gRPC.
- request = isolate_bot_pb2.PushBlobsRequest()
- request.data.digest.digest = binascii.unhexlify(item.digest)
- request.data.digest.size_bytes = item.size
- request.data.offset = 0
+ request = bytestream_pb2.WriteRequest()
+ u = uuid.uuid4()
+ request.resource_name = '%suploads/%s/blobs/%s/%d' % (
+ self._prefix, u, item.digest, item.size)
+ request.write_offset = 0
for chunk in chunker():
# Make sure we send at least one chunk for zero-length blobs
has_sent_anything = False
while chunk or not has_sent_anything:
+ has_sent_anything = True
slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
- request.data.data = chunk[:slice_len]
+ request.data = chunk[:slice_len]
+ if request.write_offset + slice_len == item.size:
+ request.finish_write = True
yield request
- has_sent_anything = True
- request.data.offset += slice_len
- # The proxy only expects the first chunk to have the digest
- request.data.ClearField("digest")
+ request.write_offset += slice_len
chunk = chunk[slice_len:]
- # TODO(aludwin): batch up several requests to reuse TCP connections
try:
- response = self._stub.PushBlobs(slicer())
- except grpc.RpcError as g:
- logging.error('gRPC error during push: re-throwing as IOError (%s)' % g)
- raise IOError(g)
+ response = self._stub.Write(slicer())
+ except grpc.Call as c:
+ # You might think that errors from gRPC would be rpc.RpcError. You'd
+ # be... right... but it's *also* an instance of grpc.Call, and that's
+ # where the status code actually lives.
+ if c.code() == grpc.StatusCode.ALREADY_EXISTS:
+ # This is legit - we didn't check before we pushed so no problem if
+ # it's already there.
+ self._already_exists += 1
+ if self._already_exists % 100 == 0:
+ logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
+ self._already_exists, self._num_pushes,
+ 100.0 * self._already_exists / self._num_pushes))
+ else:
+ logging.error('gRPC error during push: throwing as IOError (%s)' % c)
+ raise IOError(c)
+ except Exception as e:
+ logging.error('error during push: throwing as IOError (%s)' % e)
+ raise IOError(e)
- if not response.status.succeeded:
- raise IOError(
- 'Error while uploading %s: %s' % (
- item.digest, response.status.error_detail))
+ if response.committed_size != item.size:
+ raise IOError('%s/%d: incorrect size written (%d)' % (
+ item.digest, item.size, response.committed_size))
finally:
with self._lock:
@@ -615,38 +675,16 @@ class IsolateServerGrpc(StorageApi):
def contains(self, items):
"""Returns the set of all missing items."""
+ # TODO(aludwin): this isn't supported directly in Bytestream, so for now
+ # assume that nothing is present in the cache.
# Ensure all items were initialized with 'prepare' call. Storage does that.
assert all(i.digest is not None and i.size is not None for i in items)
- request = isolate_bot_pb2.ContainsRequest()
- items_by_digest = {}
- for item in items:
- cd = request.digest.add()
- cd.digest = binascii.unhexlify(item.digest)
- items_by_digest[cd.digest] = item
- try:
- response = self._stub.Contains(request)
- except grpc.RpcError as g:
- logging.error('gRPC error during contains: re-throwing as IOError (%s)'
- % g)
- raise IOError(g)
-
- # If everything's present, return the empty set.
- if response.status.succeeded:
- return {}
-
- if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST:
- raise IOError('Unknown response during lookup: %s' % response.status)
-
- # Pick Items that are missing, attach _PushState to them. The gRPC
- # implementation doesn't actually have a push state, we just attach
- # empty objects to satisfy the StorageApi interface.
+ # Assume all Items are missing, and attach _PushState to them. The gRPC
+ # implementation doesn't actually have a push state, we just attach empty
+ # objects to satisfy the StorageApi interface.
missing_items = {}
- for missing in response.status.missing_digest:
- item = items_by_digest[missing.digest]
+ for item in items:
missing_items[item] = _IsolateServerGrpcPushState()
-
- logging.info('Queried %d files, %d cache hit',
- len(items), len(items) - len(missing_items))
return missing_items
« no previous file with comments | « appengine/swarming/server/bot_code_test.py ('k') | client/isolateserver.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698