1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  """ 
 19  PySpark supports custom serializers for transferring data; this can improve 
 20  performance. 
 21   
 22  By default, PySpark uses L{PickleSerializer} to serialize objects using Python's 
 23  C{cPickle} serializer, which can serialize nearly any Python object. 
 24  Other serializers, like L{MarshalSerializer}, support fewer datatypes but can be 
 25  faster. 
 26   
 27  The serializer is chosen when creating L{SparkContext}: 
 28   
 29  >>> from pyspark.context import SparkContext 
 30  >>> from pyspark.serializers import MarshalSerializer 
 31  >>> sc = SparkContext('local', 'test', serializer=MarshalSerializer()) 
 32  >>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10) 
 33  [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 
 34  >>> sc.stop() 
 35   
 36  By default, PySpark serialize objects in batches; the batch size can be 
 37  controlled through SparkContext's C{batchSize} parameter 
 38  (the default size is 1024 objects): 
 39   
 40  >>> sc = SparkContext('local', 'test', batchSize=2) 
 41  >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x) 
 42   
 43  Behind the scenes, this creates a JavaRDD with four partitions, each of 
 44  which contains two batches of two objects: 
 45   
 46  >>> rdd.glom().collect() 
 47  [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] 
 48  >>> rdd._jrdd.count() 
 49  8L 
 50  >>> sc.stop() 
 51   
 52  A batch size of -1 uses an unlimited batch size, and a size of 1 disables 
 53  batching: 
 54   
 55  >>> sc = SparkContext('local', 'test', batchSize=1) 
 56  >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x) 
 57  >>> rdd.glom().collect() 
 58  [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] 
 59  >>> rdd._jrdd.count() 
 60  16L 
 61  """ 
 62   
 63  import cPickle 
 64  from itertools import chain, izip, product 
 65  import marshal 
 66  import struct 
 67  import sys 
 68  from pyspark import cloudpickle 
 69   
 70   
 71  __all__ = ["PickleSerializer", "MarshalSerializer"] 
 72   
 73   
 75      END_OF_DATA_SECTION = -1 
 76      PYTHON_EXCEPTION_THROWN = -2 
 77      TIMING_DATA = -3 
  78   
 79   
 81   
 83          """ 
 84          Serialize an iterator of objects to the output stream. 
 85          """ 
 86          raise NotImplementedError 
  87   
 89          """ 
 90          Return an iterator of deserialized objects from the input stream. 
 91          """ 
 92          raise NotImplementedError 
  93   
 94   
 96          return self.load_stream(stream) 
  97   
 98       
 99       
100   
101       
102       
103   
105          return isinstance(other, self.__class__) 
 106   
108          return not self.__eq__(other) 
  109   
110   
112      """ 
113      Serializer that writes objects as a stream of (length, data) pairs, 
114      where C{length} is a 32-bit integer and data is C{length} bytes. 
115      """ 
116   
118           
119           
120          self._only_write_strings = sys.version_info[0:2] <= (2, 6) 
 121   
123          for obj in iterator: 
124              self._write_with_length(obj, stream) 
 125   
127          while True: 
128              try: 
129                  yield self._read_with_length(stream) 
130              except EOFError: 
131                  return 
 132   
134          serialized = self.dumps(obj) 
135          write_int(len(serialized), stream) 
136          if self._only_write_strings: 
137              stream.write(str(serialized)) 
138          else: 
139              stream.write(serialized) 
 140   
142          length = read_int(stream) 
143          obj = stream.read(length) 
144          if obj == "": 
145              raise EOFError 
146          return self.loads(obj) 
 147   
149          """ 
150          Serialize an object into a byte array. 
151          When batching is used, this will be called with an array of objects. 
152          """ 
153          raise NotImplementedError 
 154   
156          """ 
157          Deserialize an object from a byte array. 
158          """ 
159          raise NotImplementedError 
  160   
161   
163      """ 
164      Serializes a stream of objects in batches by calling its wrapped 
165      Serializer with streams of objects. 
166      """ 
167   
168      UNLIMITED_BATCH_SIZE = -1 
169   
170 -    def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE): 
 171          self.serializer = serializer 
172          self.batchSize = batchSize 
 173   
175          if self.batchSize == self.UNLIMITED_BATCH_SIZE: 
176              yield list(iterator) 
177          else: 
178              items = [] 
179              count = 0 
180              for item in iterator: 
181                  items.append(item) 
182                  count += 1 
183                  if count == self.batchSize: 
184                      yield items 
185                      items = [] 
186                      count = 0 
187              if items: 
188                  yield items 
 189   
191          self.serializer.dump_stream(self._batched(iterator), stream) 
 192   
194          return chain.from_iterable(self._load_stream_without_unbatching(stream)) 
 195   
197              return self.serializer.load_stream(stream) 
 198   
200          return isinstance(other, BatchedSerializer) and \ 
201                 other.serializer == self.serializer 
 202   
204          return "BatchedSerializer<%s>" % str(self.serializer) 
  205   
206   
208      """ 
209      Deserializes the JavaRDD cartesian() of two PythonRDDs. 
210      """ 
211   
213          self.key_ser = key_ser 
214          self.val_ser = val_ser 
 215   
217          key_stream = self.key_ser._load_stream_without_unbatching(stream) 
218          val_stream = self.val_ser._load_stream_without_unbatching(stream) 
219          key_is_batched = isinstance(self.key_ser, BatchedSerializer) 
220          val_is_batched = isinstance(self.val_ser, BatchedSerializer) 
221          for (keys, vals) in izip(key_stream, val_stream): 
222              keys = keys if key_is_batched else [keys] 
223              vals = vals if val_is_batched else [vals] 
224              yield (keys, vals) 
 225   
227          for (keys, vals) in self.prepare_keys_values(stream): 
228              for pair in product(keys, vals): 
229                  yield pair 
 230   
232          return isinstance(other, CartesianDeserializer) and \ 
233                 self.key_ser == other.key_ser and self.val_ser == other.val_ser 
 234   
236          return "CartesianDeserializer<%s, %s>" % \ 
237                 (str(self.key_ser), str(self.val_ser)) 
  238   
239   
241      """ 
242      Deserializes the JavaRDD zip() of two PythonRDDs. 
243      """ 
244   
246          self.key_ser = key_ser 
247          self.val_ser = val_ser 
 248   
250          for (keys, vals) in self.prepare_keys_values(stream): 
251              for pair in izip(keys, vals): 
252                  yield pair 
 253   
255          return isinstance(other, PairDeserializer) and \ 
256                 self.key_ser == other.key_ser and self.val_ser == other.val_ser 
 257   
259          return "PairDeserializer<%s, %s>" % \ 
260                 (str(self.key_ser), str(self.val_ser)) 
  261   
262   
264   
265 -    def loads(self, obj): return obj 
 266 -    def dumps(self, obj): return obj 
  267   
268   
270      """ 
271      Serializes objects using Python's cPickle serializer: 
272   
273          http://docs.python.org/2/library/pickle.html 
274   
275      This serializer supports nearly any Python object, but may 
276      not be as fast as more specialized serializers. 
277      """ 
278   
279 -    def dumps(self, obj): return cPickle.dumps(obj, 2) 
 280      loads = cPickle.loads 
 281   
283   
284 -    def dumps(self, obj): return cloudpickle.dumps(obj, 2) 
  285   
286   
288      """ 
289      Serializes objects using Python's Marshal serializer: 
290   
291          http://docs.python.org/2/library/marshal.html 
292   
293      This serializer is faster than PickleSerializer but supports fewer datatypes. 
294      """ 
295   
296      dumps = marshal.dumps 
297      loads = marshal.loads 
 298   
299   
301      """ 
302      Deserializes streams written by String.getBytes. 
303      """ 
304   
305 -    def loads(self, stream): 
 306          length = read_int(stream) 
307          return stream.read(length).decode('utf8') 
 308   
310          while True: 
311              try: 
312                  yield self.loads(stream) 
313              except struct.error: 
314                  return 
315              except EOFError: 
316                  return 
  317   
318   
320      length = stream.read(8) 
321      if length == "": 
322          raise EOFError 
323      return struct.unpack("!q", length)[0] 
 324   
325   
327      stream.write(struct.pack("!q", value)) 
 328   
329   
331      return struct.pack("!q", value) 
 332   
333   
335      length = stream.read(4) 
336      if length == "": 
337          raise EOFError 
338      return struct.unpack("!i", length)[0] 
 339   
340   
342      stream.write(struct.pack("!i", value)) 
 343   
344   
346      write_int(len(obj), stream) 
347      stream.write(obj) 
 348