I need some help for my dag.
I have to monitoring a network folder to get a .pdf file, than i need to move this files to another folder like “temp folder”. the second step is call an API service to validate this PDF. If is valid i have to upload this file to S3.
Third step: Call another API to integrate this file on ERP program.
All this steps are stored in a Mongodb
Today i have a Python code with this all routine working well, but i want to use a Airflow Scheduler to control all this situation.
This is the code.
from Libs.Config.AppConfig import appConfig
from Libs.DB import MongoDB
from Libs.Objetos.Classes import ArquivoUpload, LoteUpload
from Libs.Utilitarios.IntegracaoMAX import ApiMAX
from Libs.Config.Logs import Logs
from Libs.Utilitarios.Uteis import *
#Logs objeto
log = Logs.RetornaLog("–MonitoraPasta–")
log.info(“Iniciando operação”)
log.info(“Abrindo arquivo de configuração”)
#Config Objeto
data = appConfig()
_origem = data.retorna_pasta_origem()
_erro = data.retorna_pasta_destino_erro()
_processados = data.retorna_pasta_destino_processadas()
Uteis.criapasta(_erro)
Uteis.criapasta(_processados)
lote_processos =
numero_processos_tarefa =
#Conexão Mongo
db = MongoDB.Mongo(‘MAX’,‘ArquivoUpload’,False, False)
lts = MongoDB.Mongo(‘MAX’, ‘Sequencias’, False, False)
lt = MongoDB.Mongo(‘MAX’, ‘LoteUpload’, False, False)
def arquivos_existentes(caminho):
path = caminho
files = Uteis.listar_arquivos_pasta(path)
log.info(f’Aquivos encontrados na pasta’)
id_lote = lts.incrementar_sequencia(‘LoteUpload’)
for f in files:
try:
if not Uteis.testa_diretorio(caminho + f):
origem = caminho + f
arquivo_analisado = _origem + f
arquivo = ArquivoUpload()
arquivo.caminhodocumento = arquivo_analisado
arquivo.status = 'fila'
arquivo.nomedocumento = f
arquivo.numeroprocesso = Uteis.ret_nome_arquivo_s_ext(origem)
api = ApiMAX()
pastamax = api.consulta_pasta(arquivo.numeroprocesso)
if pastamax:
arquivo.pastamax = pastamax
log.info(f"Pasta encontrada na API do MAX: {pastamax}.")
db.atualiza_mongo({"numeroprocesso": arquivo.numeroprocesso}, arquivo)
upload_completo = Uteis.upload_to_aws(arquivo.numeroprocesso)
if upload_completo:
cad_ok = api.cadastra_documentos(arquivo.nomedocumento, arquivo.pastamax)
if cad_ok:
Uteis.mover_arquivo(arquivo_analisado, _processados + arquivo.nomedocumento)
p = {'numeroprocesso':arquivo.numeroprocesso , 'status': True}
lote_processos.append(p)
else:
#!Alterando o caminho da gravação dentro do banco de dados.
arquivo_analisado = _erro + f
arquivo.caminhodocumento = arquivo_analisado
arquivo.status = 'Pasta não encontrada na API do MAX.'
log.info(f"Pasta não encontrada na API do MAX. Movendo arquivo para pasta de erros")
atualiza = db.atualiza_mongo({"numeroprocesso": arquivo.numeroprocesso}, arquivo)
p = {'numeroprocesso':arquivo.numeroprocesso , 'status': False}
lote_processos.append(p)
if atualiza:
Uteis.mover_arquivo(caminho, _erro, False, arquivo.nomedocumento)
except Exception as e:
log.info(e)
lote = LoteUpload()
lote._id = id_lote
lote.numeroprocesso = lote_processos
lt.atualiza_mongo({"_id":lote._id}, lote)
Uteis.email_notificacao_monitoramento_lote(lote, ['cadastro.csc@ernestoborges.com.br','tiago.ramos@ernestoborges.com.br'])
#Classe Main da aplicação
if name == “main”:
#Caminho da pasta monitorada
log.info(f"Iniciando Monitoramento na pasta: {_origem}")
arquivos = arquivos_existentes(_origem)
I need some help to start in Airflow…
Someone please help me in this thread?