from loguru import logger
import requests
from bs4 import BeautifulSoup
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
from time import sleep
from datetime import datetime
from dgi.utils import define_ambiente_de_processamento
from dgi.info import DB_PROPS
from dgi.urls import URL_BUSCA, URL_BUSCA_RETANGULO
from dgi.catalogo.utils import formata_lista_de_imagens, recupera_informacoes_da_listagem
from dgi.decoradores import adiciona_log
from dgi.excecoes import ErroAoAcessarLink, DefinicaoInvalida
from dgi.banco import GeradorDeConexoesMongo
NUVEM_PROPS = {"q1": "", "q2": "", "q3": "", "q4": ""}
[docs]class CatalogoDGI:
"""Catálogo de imagens do DGI
Args:
db_props (:obt: `dict`, optional): Dicionário de configurações do acesso ao banco, possui as seguintes chaves:
host: ``str``: Endereço de IP do banco de dados
porta: ``int``: Porta do banco de dados
usuario: ``str``: Usuário para utilizar o banco (Opcional)
senha: ``str``: Senha para utilizar o banco (Opcional)
"""
def __init__(self, db_props = DB_PROPS):
self.__db_props = db_props
self.__ambiente_de_execucao = define_ambiente_de_processamento()
def __lthread(self, nworker, url, inicio, passo, limite, nome_do_local=""):
"""Método criado para a execução paralela da busca de imagens
"""
colecao = GeradorDeConexoesMongo.recupera_conexao(self.__db_props).dgi_dumps.catalogo
while True:
if inicio > limite:
break
res = requests.get(url.replace("*", str(inicio)))
soup = BeautifulSoup(res.content, features="lxml")
try:
# Salva os resultados da página atual
colecao.insert_many(formata_lista_de_imagens(soup, inicio, nome_do_local), ordered=False)
except BulkWriteError as bwe:
logger.error(bwe)
logger.info(f"Página {inicio} recuperada com sucesso")
inicio += passo
@adiciona_log("/tmp/listagem_dgi_regiao_{}.log")
def lista_imagens_dgi_regiao(self, satelite, instrumento, data, localizacoes, nuvem_props=NUVEM_PROPS) -> dict:
"""Método para listar os dados do DGI INPE levando em consideração um retângulo de referência
Args:
satelite (str): Nome do satelite
instrumento (str): Nome do instrumento
data (dict): Dicionário de datas, que contém as seguintes chaves
inicial: ``str``: Data no formato DD-MM-AAAA
final: ``str``: Data no formato DD-MM-AAAA
localizacoes (dict): Dicionário de localizações, que contém as seguintes chaves
norte: ``str``: Ponto do canto norte-leste
sul: ``str``: Ponto do canto sul-oeste
leste: ``str``: Ponto do canto norte-leste
oeste: ``str``: Ponto do canto sul-oeste
nuvem_props (dict): Dicionário com as propriedades de núvem em cada quadrante das imagens, este que contém as seguintes chaves
q1: ``str``: Porcentagem de núvem possível no quadrante 1
q2: ``str``: Porcentagem de núvem possível no quadrante 2
q3: ``str``: Porcentagem de núvem possível no quadrante 3
q4: ``str``: Porcentagem de núvem possível no quadrante 4
Returns:
dict: Dicionário contendo as informações gerais do processo
"""
try:
url_inicial = URL_BUSCA_RETANGULO.format(
"1", satelite, instrumento, data["inicial"], data["final"],
nuvem_props["q1"], nuvem_props["q2"], nuvem_props["q3"], nuvem_props["q4"],
localizacoes["norte"], localizacoes["sul"], localizacoes["leste"], localizacoes["oeste"]
)
logger.info(f"Buscando dados em: {url_inicial}")
propriedades_da_pagina = recupera_informacoes_da_listagem(url_inicial)
except RuntimeError as error:
logger.error(f"Erro ao tentar recuperar as informações do link {url_inicial}")
raise ErroAoAcessarLink(str(error))
# Definindo a quantidade de processos de acordo com a quantidade de páginas
if propriedades_da_pagina["paginas"] == 1:
workers = 1
elif propriedades_da_pagina["paginas"] < 5:
workers = 2
elif propriedades_da_pagina["paginas"] >= 5:
workers = 5
URL_BASE = URL_BUSCA_RETANGULO.format(
"*", satelite, instrumento, data["inicial"], data["final"],
nuvem_props["q1"], nuvem_props["q2"], nuvem_props["q3"], nuvem_props["q4"],
localizacoes["norte"], localizacoes["sul"], localizacoes["leste"], localizacoes["oeste"]
)
for i in range(1, workers + 1):
p = self.__ambiente_de_execucao(target=self.__lthread,
args=(i, URL_BASE, i, workers, propriedades_da_pagina["paginas"], ))
p.start()
p.join()
logger.info("O processo de listagem foi finalizado!")
return {
"propriedades_da_busca": {
**propriedades_da_pagina
},
"datas_pesquisadas": {
"inicio": data["inicial"],
"fim": data["final"]
}
}
@adiciona_log("/tmp/listagem_dgi_ponto_{}.log")
def lista_imagens_dgi_ponto(self, satelite, instrumento, data,
localizacoes, nome_padrao="", nomes_dos_locais=[], nuvem_props=NUVEM_PROPS) -> dict:
"""Método para listar os dados do DGI INPE levando em consideração um ponto de referência
Args:
satelite (str): Nome do satelite
instrumento (str): Nome do instrumento
data (dict): Dicionário de datas, que contém as seguintes chaves
inicial: ``str``: Data no formato DD-MM-AAAA
final: ``str``: Data no formato DD-MM-AAAA
localizacoes (dict): Dicionário de localizações, que contém as seguintes chaves
lat: ``str``: Latitude da localização pesquisada
long: ``str``: Longitude da localização pesquisada
nome_padrao (str): Template com o padrão de nomenclatura que deve ser utilizado na geração automática de nomes
Neste caso, caso seja inserido a string `PONTO_{}`, todos os pontos terão nome PONTO_1...PONTO_N
nome_dos_locais (list): Nome dos locais que estão sendo buscados (localizações)
nuvem_props (dict): Dicionário com as propriedades de núvem em cada quadrante das imagens, este que contém as seguintes chaves
q1: ``str``: Porcentagem de núvem possível no quadrante 1
q2: ``str``: Porcentagem de núvem possível no quadrante 2
q3: ``str``: Porcentagem de núvem possível no quadrante 3
q4: ``str``: Porcentagem de núvem possível no quadrante 4
Returns:
dict: Dicionário contendo as informações gerais do processo
"""
if len(nomes_dos_locais) != len(localizacoes):
if nome_padrao != "":
nomes_dos_locais = [nome_padrao.format(str(i)) for i in range(0, localizacoes)]
assert len(nomes_dos_locais) == len(localizacoes)
else:
raise DefinicaoInvalida("Seu input deve conter o nome de cada ponto inserido, ou uma template para a criação automática de nomes para os pontos")
logger.info("Iniciando trabalhos com até 3 workers")
for local, localizacao in zip(nomes_dos_locais, localizacoes):
logger.info(f"Iniciando busca de {local}")
# Realizando busca inicial (Aquisição de informações gerais)
url_inicial = URL_BUSCA.format(1, satelite, instrumento, data["inicial"], data["final"],
nuvem_props["q1"], nuvem_props["q2"], nuvem_props["q3"], nuvem_props["q4"],
localizacao["lat"], localizacao["long"])
try:
propriedades_da_pagina = recupera_informacoes_da_listagem(url_inicial)
except RuntimeError:
logger.error(f"Erro ao tentar recuperar as informações do link {url_inicial}"); sleep(10)
continue
# Iniciando os processos de busca para o local atual
URL_BASE = URL_BUSCA.format("*", satelite, instrumento, data["inicial"], data["final"],
nuvem_props["q1"], nuvem_props["q2"], nuvem_props["q3"], nuvem_props["q4"],
localizacao["lat"], localizacao["long"])
# Definindo a quantidade de processos de acordo com a quantidade de páginas
if propriedades_da_pagina["paginas"] == 1:
workers = 1
elif propriedades_da_pagina["paginas"] == 2:
workers = 2
else:
workers = 3
for i in range(1, workers + 1):
p = self.__ambiente_de_execucao(target=self.__lthread, args=(i, URL_BASE, i, workers, propriedades_da_pagina["paginas"], local, ))
p.start()
logger.info(f" Esperando a busca pelos dados de {local} acabarem")
p.join()
logger.info(f"Busca de {local} finalizada\n")
return {
"propriedades_da_busca": {
**propriedades_da_pagina
},
"datas_pesquisadas": {
"inicio": data["inicial"],
"fim": data["final"]
}
}
[docs] def recupera_data_ultima_insercao(self) -> datetime:
"""Recupera a data da última inserção feita no banco de dados
Returns:
datetime: Data da última inserção no banco de dados
"""
from pymongo import DESCENDING
colecao = GeradorDeConexoesMongo.recupera_conexao(self.__db_props).dgi_dumps.catalogo
ultima_pesquisa = list(colecao.find().sort('$natural', DESCENDING).limit(1))
if len(ultima_pesquisa) > 0:
return ultima_pesquisa[0]["data_insercao_no_banco"]
return []