Luftstrøm: Mindre kjente tips, triks og beste fremgangsmåter

Det er visse ting med alle verktøyene du bruker, som du ikke vet selv etter at du har brukt det i lang tid. Og når du først vet det, er du som "Jeg skulle ønske jeg visste dette før", fordi du allerede hadde fortalt klienten din at det ikke kan gjøres på noen bedre måte . Luftmengde som annet verktøy er ikke annerledes, det er noen skjulte perler som kan gjøre livet ditt enkelt og gjøre DAG-utvikling morsom.

Du kjenner kanskje allerede noen av dem, og hvis du kjenner dem alle - vel du er PRO da PRO.

(1) DAG med kontekstleder

Ble du irritert over deg selv da du glemte å legge dag = dag til oppgaven din og luftstrømfeilen ble feil? Ja, det er lett å glemme å legge det til for hver oppgave. Det er også overflødig å legge til samme parameter som vist i følgende eksempel (eksempel_dag.py-fil):

Eksemplet (eksempel_dag.py-fil) over har bare to oppgaver, men hvis du har 10 eller mer, blir redundansen tydeligere. For å unngå dette kan du bruke Airflow DAGs som kontekstadministratorer for automatisk å tilordne nye operatører til den DAG som vist i eksemplet ovenfor (eksempel_dag_with_context.py) ved bruk av setning.

(2) Bruke Liste til å angi oppgaveavhengigheter

Når du vil opprette DAG som ligner den som er vist på bildet nedenfor, må du gjenta oppgavenavn når du angir oppgaveavhengighet.

Som vist i ovennevnte kodebit, vil bruk av vår normale måte å stille inn oppgaveavhengigheter bety at oppgavetid og slutt blir gjentatt tre ganger. Dette kan erstattes ved hjelp av pythonlister for å oppnå samme resultat på en mer elegant måte.

(3) Bruk standardargumenter for å unngå å gjenta argumenter

Luftmengde som tillater passering av en ordbok med parametere som vil være tilgjengelig for alle oppgavene i den DAG.

På DataReply bruker vi for eksempel BigQuery for alle våre DataWareshouse-relaterte DAG-er, og i stedet for å sende parametere som labels, bigquery_conn_id til hver oppgave, passerer vi ganske enkelt den indefault_args-ordboken som vist i DAG nedenfor.

Dette er også nyttig når du vil ha varsler om individuelle oppgavefeil i stedet for bare DAG-feil som jeg allerede nevnte i mitt siste blogginnlegg om Integrering av slakkvarsler i luftstrøm.

(4) "Params" -argumentet

“Params” er en ordbok med parametere på DAG-nivå som blir gjort tilgjengelige i maler. Disse paramene kan overstyres på oppgavenivå.

Dette er et ekstremt nyttig argument, og jeg har personlig brukt det mye, da det kan nås i templated felt med jinja-templering ved hjelp av params.param_name. Et eksempel på bruk er som følger:

Det gjør det enkelt for deg å skrive parameteriserte DAG i stedet for hardkodingsverdier. Som vist i eksemplene ovenfor kan paramsordbok defineres på tre steder: (1) I DAG-objekt (2) I standard_args-ordbok (3) Hver oppgave.

(5) Lagre sensitive data i Connections

De fleste brukere er klar over dette, men jeg har fortsatt sett passord lagret i ren tekst inne i DAG. For godhetens skyld - ikke gjør det. Du bør skrive DAGene dine på en måte som du er trygg nok til å lagre DAGene dine i et offentlig depot.

Som standard vil Airflow lagre passordene for tilkoblingen i ren tekst i metadatadatabasen. Kryptopakken er sterkt anbefalt under Airflow-installasjon og kan ganske enkelt gjøres ved pip installere apache-airflow [crypto].

Du kan da enkelt få tilgang til det på følgende måte:

fra airflow.hooks.base_hook import BaseHook
slack_token = BaseHook.get_connection ('slack'). passord

(6) Begrens antall luftstrømvariabler i DAG-en din

Luftmengdevariabler lagres i Metadata-databasen, så enhver oppfordring til variabler vil bety en forbindelse til Metadata DB. DAG-filene dine blir analysert hvert X. sekund. Å bruke et stort antall variabler i DAG-en din (og verre i default_args) kan bety at du kan ende opp metningen av antall tillatte tilkoblinger til databasen.

For å unngå denne situasjonen, kan du enten bare bruke en enkelt luftstrømvariabel med JSON-verdi. Ettersom en luftstrømvariabel kan inneholde JSON-verdi, kan du lagre all DAG-konfigurasjonen din i en enkelt variabel som vist på bildet nedenfor:

Som vist på dette skjermbildet kan du enten lagre verdier i separate luftstrømvariabler eller under en enkelt luftstrømvariabel som et JSON-felt

Du kan deretter få tilgang til dem som vist nedenfor under anbefalt måte:

(7) "kontekst" ordboken

Brukere glemmer ofte innholdet i kontekstordboken når de bruker PythonOperator med en konverterbar funksjon.

Konteksten inneholder referanser til beslektede objekter til oppgaveinstansen og er dokumentert under makroseksjonen i APIen, da de også er tilgjengelige for templatfeltet.

{
      'dag': oppgave.dag,
      'ds': ds,
      'next_ds': next_ds,
      'next_ds_nodash': next_ds_nodash,
      'prev_ds': prev_ds,
      'prev_ds_nodash': prev_ds_nodash,
      'ds_nodash': ds_nodash,
      'ts': ts,
      'ts_nodash': ts_nodash,
      'ts_nodash_with_tz': ts_nodash_with_tz,
      'gårdag': gårdag,
      'gær_ds_nodash': gårsdag_ds_nodash,
      "morgendag": morgendag,
      'morgen_ds_nodash': morgen_ds_nodash,
      'END_DATE': ds,
      'end_date': ds,
      'dag_run': dag_run,
      'run_id': run_id,
      'henrettelsesdato': self.execution_date,
      'prev_execution_date': prev_execution_date,
      'next_execution_date': next_execution_date,
      'siste_dato': ds,
      "makroer": makroer,
      'params': params,
      'tabeller': tabeller,
      'oppgave': oppgave,
      'task_instance': self,
      'ti': selv,
      'task_instance_key_str': ti_key_str,
      'conf': konfigurasjon,
      'test_mode': self.test_mode,
      'var': {
          'verdi': VariableAccessor (),
          'json': VariableJsonAccessor ()
      }
      'inlets': task.inlets,
      'uttak': oppgave.utsalg,
}

(8) Generere dynamiske luftmengdeoppgaver

Jeg har svart på mange spørsmål på StackOverflow om hvordan lage dynamiske oppgaver. Svaret er enkelt, du trenger bare å generere unik task_id for alle oppgavene dine. Nedenfor er to eksempler på hvordan du oppnår dette:

(9) Kjør "luftstrøm oppgradert b" i stedet for "luftstrøm initdb"

Takk til Ash Berlin for dette tipset i foredraget hans i First Apache Airflow London Meetup.

airflow initdb vil opprette alle standardforbindelser, diagrammer osv. som vi kanskje ikke bruker og ikke vil ha i vår produksjonsdatabase. airflow upgradedb vil i stedet bare bruke eventuelle manglende migrasjoner i databasetabellen. (inkludert oppretting av manglende tabeller osv.) Det er også trygt å kjøre hver gang, det sporer hvilke migrasjoner som allerede er brukt (ved hjelp av Alembic-modulen).

Gi meg beskjed i kommentarfeltet nedenfor hvis du vet noe som vil være verdt å legge til i dette blogginnlegget. Happy Airflow’ing :-)