Slides PPTX - Spark Summit

Report
Yahoo Audience Expansion:
Migration from Hadoop Streaming
to Spark
Gavin Li, Jaebong Kim, Andy Feng
Yahoo
Agenda
• Audience Expansion Spark Application
• Spark scalability: problems and our solutions
• Performance tuning
How we built audience expansion on Spark
AUDIENCE EXPANSION
Audience Expansion
• Train a model to find users perform similar as
sample users
• Find more potential “converters”
System
•
•
•
•
Large scale machine learning system
Logistic Regression
TBs input data, up to TBs intermediate data
Hadoop pipeline is using 30000+ mappers,
2000 reducers, 16 hrs run time
• All hadoop streaming, ~20 jobs
• Use Spark to reduce latency and cost
Pipeline
Labeling
• Label positive/negative samples
• 6-7 hrs, IO intensive, 17 TB intermediate IO in hadoop
• Extract Features from raw events
Feature Extraction
• Logistic regression phase, CPU bound
Model Training
Score/Analyze
models
• Validate trained models, parameters combinations, select new model
• Validate and publish new model
Validation/Metrics
How to adopt to Spark efficiently?
•
•
•
•
•
Very complicated system
20+ hadoop streaming map reduce jobs
20k+ lines of code
Tbs data, person.months to do data validation
6+ person, 3 quarters to rewrite the system
based on Scala from scratch
Our migrate solution
• Build transition layer automatically convert
hadoop streaming jobs to Spark job
• Don’t need to change any Hadoop streaming
code
• 2 person*quarter
• Private Spark
ZIPPO
Audience Expansion Pipeline
20+ Hadoop Streaming jobs
ZIPPO:
Hadoop Streaming Over Spark
Hadoop Streaming
Spark
HDFS
ZIPPO
• A layer (zippo) between
Spark and application
• Implemented all
Hadoop Streaming
interfaces
• Migrate pipeline
without code rewriting
• Can focus on rewriting
perf bottleneck
• Plan to open source
Audience Expansion Pipeline
ZIPPO:
Hadoop
Streaming Over
Spark
Hadoop
Streaming
Spark
HDFS
ZIPPO - Supported Features
• Partition related
– Hadoop Partitioner class (-partitioner)
– Num.map.key.fields, num.map.parition.fields
• Distributed cache
– -cacheArchive, -file, -cacheFile
• Independent working directory for each task instead of
each executor
• Hadoop Streaming Aggregation
• Input Data Combination (to mitigate many small files)
• Customized OutputFormat, InputFormat
Performance Comparison 1Tb data
• Zippo Hadoop
streaming
• Spark cluster
– 1 hard drive
– 40 hosts
• Perf data:
– 1hr 25 min
• Original Hadoop
streaming
• Hadoop cluster
– 1 hard drives
– 40 Hosts
• Perf data
– 3hrs 5 min
SPARK SCALABILITY
Spark Shuffle
• Mapper side of shuffle write all the output to
disk(shuffle files)
• Data can be large scale, so not able to all hold
in memory
• Reducers transfer all the shuffle files for each
partition, then process
Spark Shuttle
Reducer Partition 1
Reducer Partition 2
Mapper m-2
Reducer Partition 3
Shuffle n
Shuffle 3
Shuffle 2
Shuffle 1
Reducer Partition n
Mapper 1
Shuffle n
Shuffle 3
Shuffle 2
Shuffle 1
On each Reducer
Shuffle
mapper 3
Shuffle
mapper 2
Shuffle
mapper 1
Shuffle
mapper n
Shuffle
mapper 3
Shuffle
mapper 2
Shuffle
mapper 1
• Uncompressed
Partition 4
Partition 3
Shuffle
mapper n
• Every partition needs to hold all the data from
all the mappers
Reducer i of 4 cores
• In hash map
Partition 1
Partition 2
• In memory
Shuffle
mapper n
Shuffle
mapper 3
Shuffle
mapper 2
Shuffle
mapper 1
Shuffle
mapper n
Shuffle
mapper 3
Shuffle
mapper 2
Shuffle
mapper 1
How many partitions?
• Need to have small enough partitions to put
all in memory
……
Host 2 (4 cores)
Host 1 (4 cores)
Partition n
Partition 1
Partition 2
Partition 3
Partition 4
Partition 5
Partition 6
Partition 7
Partition 8
Partition 9
Partition 10
Partition 11
Partition 12
Partition 13
Partition 14
……
Spark needs many Partitions
• So a common pattern of using Spark is to have
big number of partitions
On each Reducer
•
•
•
•
•
For 64 Gb memory host
16 cores CPU
For compression ratio 30:1, 2 times overhead
To process 3Tb data, Needs 46080 partitions
To process 3Pb data, Need 46 million
partitions
Non Scalable
• Not linear scalable.
• No matter how many hosts in total do we
have, we always need 46k partitions
Issues of huge number of
partitions
• Issue 1: OOM in mapper side
– Each Mapper core needs to write to 46k shuffle files
simultaneously
– 1 shuffle file = OutputStream + FastBufferStream +
CompressionStream
– Memory overhead:
• FD and related kernel overhead
• FastBufferStream (for making ramdom IO to sequential IO),
default 100k buffer each stream
• CompressionStream, default 64k buffer each stream
– So by default total buffer size:
• 164k * 46k * 16 = 100+ Gb
Issues of huge number of paritions
• Our solution to Mapper OOM
– Set spark.shuffle.file.buffer.kb to 4k for
FastBufferStream (kernel block size)
– Based on our Contributed patch
https://github.com/mesos/spark/pull/685
• Set spark.storage.compression.codec to
spark.storage.SnappyCompressionCodec to enable snappy to
reduce footprint
• Set spark.snappy.block.size to 8192 to reduce buffer size
(while snappy can still have good compression ratio)
– Total buffer size after this:
• 12k * 46k * 16 = 10Gb
Issues of huge number of
partitions
• Issue 2: large number of small files
– Each Input split in Mapper is broken down into at least 46K partitions
– Large number of small files makes lots of random R/W IO
– When each shuffle file is less then 4k (kernel block size), overhead
becomes significant
– Significant meta data overhead in FS layer
– Example: only manually deleting the whole tmp directory can take 2 hour
as we have too many small files
– Especially bad when splits are not balanced.
– 5x slower than Hadoop
Input Split 1
Input Split 2
…
Shuffle
46080
Shuffle 3
Shuffle 2
Shuffle 1
…
Shuffle
46080
Shuffle 3
Shuffle 2
Shuffle 1
Shuffle
46080
Shuffle 3
Shuffle 2
Shuffle 1
…
Input Split n
Reduce side compression
• Current shuffle in reducer side data in memory is
not compressed
• Can take 10-100 times more memory
• With our patch
https://github.com/mesos/spark/pull/686, we
reduced memory consumption by 30x, while
compression overhead is only less than 3%
• Without this patch it doesn’t work for our case
• 5x-10x performance improvement
Reduce side compression
• Reducer side
– compression – 1.6k files
– Noncompression – 46k shuffle files
Reducer Side Spilling
Reduce
Compression
Bucket n
Compression
Bucket 3
Compression
Bucket 2
Compression
Bucket 1
…
Spill n
Spill 2
Spill 1
Reducer Side Spilling
• Spills the over-size data to Disk in the
aggregation hash table
• Spilling - More IO, more sequential IO, less
seeks
• All in mem – less IO, more random IO, more
seeks
• Fundamentally resolved Spark’s scalability
issue
Align with previous Partition function
• Our input data are from another map reduce
job
• We use exactly the same hash function to
reduce number of shuffle files
Align with previous Partition function
• New hash function, More even distribution
Spark Job
Previous Job Generating
Input data
Key 0, 4, 8…
shuffule file 0
shuffule file 1
shuffule file 2
shuffule file 3
Key 1,5,9…
Input
Data
0
shuffule file 4
Mod 4
Key 2, 6, 10…
Mod
5
shuffule file 0
shuffule file 1
shuffule file 2
Key 3, 7, 11…
shuffule file 3
shuffule file 4
shuffule file 0
shuffule file 1
shuffule file 2
shuffule file 3
shuffule file 4
shuffule file 0
shuffule file 1
shuffule file 2
shuffule file 3
shuffule file 4
Align with previous Partition function
• Use the same hash function
Spark Job
Previous Job Generating
Input data
Key 0, 4, 8…
1 shuffle file
Key 1,5,9…
Input
Data
0
Mod 4
1 shuffle file
Mod 4
Key 2, 6, 10…
1 shuffle file
Key 3, 7, 11…
1 shuffle file
Align with previous Hash function
• Our Case:
– 16m shuffle files, 62kb on average (5-10x slower)
– 8k shuffle files, 125mb on average
• Several different input data sources
• Partition function from the major one
PERFORMANCE TUNNING
All About Resource Utilization
• Maximize the resource utilization
• Use as much CPU,Mem,Disk,Net as possbile
• Monitor vmstat, iostat, sar
Resource Utilization
• (This is old diagram, to update)
Resource Utilization
• Ideally CPU/IO should be fully utilized
• Mapper phase – IO bound
• Final reducer phase – CPU bound
Shuffle file transfer
• Spark transfers all shuffle files to reducer
memory before start processing.
• Non-streaming(very hard to change to
streaming).
• For poor resource utilization
– So need to make sure maxBytesInFlight is set big
enough
– Consider allocating 2x more threads than physical
core number
Thanks.
Gavin Li [email protected]
Jaebong Kim [email protected]
Andrew Feng [email protected]

similar documents