Apache Spark og Amazon S3 - Gotchas og beste praksis

S3 er et objektlager og ikke et filsystem, derav problemene som oppstår som følge av en eventuell konsistens, ikke-atomnavn må håndteres i applikasjonskoden. Katalogserveren i et filsystem er erstattet av en hash-algoritme til filnavnet. Dette er dårlig for å liste opp ting, katalogoperasjoner, slette og gi nytt navn (kopiere og slette ettersom teknisk sett ikke er noe nytt navn i objektbutikker)

Begynn å bruke S3A (URI-skjema: s3a: //) - Hadoop 2.7+. S3a er den anbefalte S3-klienten for Hadoop 2.7 og senere S3a er mer performant og støtter større filer (opp til 5 TB) og har støtte for flerstegsopplasting. Alle objekter som er tilgjengelige fra s3n: // URLs, bør også være tilgjengelige fra s3a ganske enkelt ved å erstatte URL-skjemaet. De fleste feilrapporter mot S3N vil bli lukket som WONTFIX

Gjør Spark 2.0.1 til å fungere med S3a For Spark 2.0.1 bruk hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, joda-time-2.9.3.jar på din klassespor; ikke glem å oppdatere spark-default.conf med AWS-tastene og S3A FileSystemClass

Spark.hadoop.fs.s3a.access.key XXXXXXX
spark.hadoop.fs.s3a.secret.key XXXXXXX
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

Definitivt bruk Dataframes som ombestilling av søk og predikat-trykk ned er tilgjengelig utenfor boksen, og derfor blir mindre data hentet til slutt påskynde spørsmålene dine

Hvis du leser de samme dataene flere ganger, kan du prøve å bruke .cache eller s3distcp for å overføre filene til din lokale EMR-klynge for å dra nytte av den bedre filleserytelsen til et ekte filsystem. GroupBy-alternativet til s3distcp er et flott alternativ for å løse den lille filproblemet ved å slå sammen et stort antall små filer.

Noe som bringer meg til spørsmålet om å lese et stort antall små filer. Hvis det ikke er et alternativ å slå sammen filene ved å bruke et verktøy, kan du prøve følgende kode som effektivt fungerer rundt den langsomme flaskehalsen i S3-kataloglisten

import com.amazonaws.services.s3._, model._
    import com.amazonaws.auth.BasicAWS-bevis

    val forespørsel = new ListObjectsRequest ()
    request.setBucketName (bøtte)
    request.setPrefix (prefiks)
    request.setMaxKeys (pageLength)
    def s3 = new AmazonS3Client (nye BasicAWSCredentials (nøkkel, hemmelighet))

    val objs = s3.listObjects (forespørsel) // Merk at denne metoden returnerer avkortede data hvis lengre enn "sidelengden" ovenfor. Du må kanskje takle det.
    sc.parallelize (objs.getObjectSummaries.map (_. getKey) .toList)
        .flatMap {key => Source.fromInputStream (s3.getObject (bøtte, nøkkel) .getObjectContent: InputStream) .getLines}

Forsikre deg om at spark.sql.parquet.filterPushdown-alternativet er sant og spark.sql.parquet.mergeSchema er usant (for å unngå at skjema smelter sammen under skrivinger som virkelig bremser du skriver scenen). Heldigvis har Spark 2.0 rett standard

Har du lurt på hvorfor akkurat på det tidspunktet en jobb er i ferd med å fullføre, ingenting blir skrevet til loggene og alle gnistoperasjoner ser ut til å ha stoppet opp, men resultatene er ennå ikke i utdatakatalogen til S3 ... hva skjer? Vel hver gang eksekutørene skriver resultatet av jobben, skriver hver av dem til en midlertidig katalog utenfor hovedkatalogen der filene måtte skrives, og når alle eksekutørene er ferdig, gjøres et nytt navn for å få atomeksklusivitet. Dette er helt greit i et standard filsystem som hdfs der navn er øyeblikkelig, men i et objektlager som S3, er dette ikke gunstig, ettersom navn på S3 gjøres med 6 MB / s.

Hvis mulig, skriv utdataene fra jobbene til EMR hdfs (for å utnytte de nesten øyeblikkelige navnene og bedre filen IO til lokale hdfs) og legg til et dstcp-trinn for å flytte filene til S3, for å spare deg selv alle problemer med å håndtere innerdene til en objektbutikk som prøver å være et filsystem. Hvis du skriver til lokale hfs, kan du også aktivere spekulasjoner for å kontrollere løpende oppgaver uten å falle i dødpunktfellene tilknyttet DirectOutputCommiter.

Hvis du må bruke S3 som utdatakatalog, må du sørge for at følgende gnistkonfigurasjoner er angitt

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation falsk

Merk: DirectParquetOutputCommitter blir fjernet fra Spark 2.0 på grunn av sjansen for tap av data. Dessverre til vi har forbedret konsistensen fra S3a, må vi jobbe med løsningene. Ting forbedrer seg med Hadoop 2.8

Unngå keynames i leksikografisk rekkefølge. Man kan bruke hashing / tilfeldige prefikser eller omvendt dato for å komme seg rundt. Trikset er å navngi tastene dine hierarkisk og plassere de vanligste tingene du filtrerer etter på venstre side av nøkkelen. Og har aldri understrekinger i bøttenavn på grunn av DNS-problemer.

Aktiverer fs.s3a.fast.upload last opp deler av en enkelt fil parallelt til Amazon S3

Vel, det var hjernedumpen av problemer i produksjonen som jeg har løst nylig for å få Spark til å fungere med S3. Følg med for mer om dette når jeg graver dypere i neste innlegg ...