1 |
<?php |
<?php |
2 |
|
|
3 |
|
/** |
4 |
|
* Ce fichier permet de créer un consumeur des messages stockées dans la |
5 |
|
* pile de RabbitMQ réservée au ERP. |
6 |
|
* |
7 |
|
* @package openfoncier |
8 |
|
*/ |
9 |
|
|
10 |
include_once('../../core/om_debug.inc.php'); |
include_once('../../core/om_debug.inc.php'); |
11 |
include_once('../../dyn/services.inc.php'); |
include_once('../../dyn/services.inc.php'); |
12 |
|
|
14 |
|
|
15 |
include_once('../../core/om_logger.class.php'); |
include_once('../../core/om_logger.class.php'); |
16 |
|
|
17 |
|
// inclusion de la librairie php-amqplib |
18 |
require_once('../php/php-amqplib/vendor/autoload.php'); |
require_once('../php/php-amqplib/vendor/autoload.php'); |
19 |
use PhpAmqpLib\Connection\AMQPConnection; |
use PhpAmqpLib\Connection\AMQPConnection; |
20 |
|
|
26 |
} |
} |
27 |
|
|
28 |
|
|
29 |
|
/** Utilise pour le test de retour de requete curl |
30 |
|
*/ |
31 |
define('HTTP_RESULT_OK', '200'); |
define('HTTP_RESULT_OK', '200'); |
32 |
|
|
33 |
|
|
34 |
|
/** |
35 |
|
* Fait le parse d'une chaine de characters recu comme |
36 |
|
* reponse a une requette curl |
37 |
|
* @param string $return La chaine de caracteres contenant un tableau |
38 |
|
* en format JSON |
39 |
|
* @return mixed Tableau associative contenant ce qui se trouve en |
40 |
|
* format JSON dans l'entree |
41 |
|
*/ |
42 |
function parse($return) |
function parse($return) |
43 |
{ |
{ |
44 |
// remplace les '{' et '}' par chaine vide |
// remplace les '{' et '}' par chaine vide |
45 |
$return = str_replace('{', '', $return); |
$return = str_replace('{', '', $return); |
46 |
$return = str_replace('}', '', $return); |
$return = str_replace('}', '', $return); |
47 |
|
// trimme les espaces et tabs |
48 |
$return = trim($return, " \t"); |
$return = trim($return, " \t"); |
49 |
// remplace les retours de ligne |
// remplace les retours de ligne |
50 |
$return = preg_replace('/\s\s+/', '', $return); |
$return = preg_replace('/\s\s+/', '', $return); |
51 |
|
// remplace les '"' par chaine vide |
52 |
$return = str_replace("\"", "", $return); |
$return = str_replace("\"", "", $return); |
53 |
|
// remplace ": " par ":" |
54 |
$return = str_replace(": ", ":", $return); |
$return = str_replace(": ", ":", $return); |
55 |
|
|
56 |
$result = array(); |
$result = array(); |
57 |
|
// obtiens les (clef => valeur) paires |
58 |
$resparray = explode(',', $return); |
$resparray = explode(',', $return); |
59 |
|
|
60 |
|
// si on a trouve des (clef => valeur) paires |
61 |
if ($resparray) { |
if ($resparray) { |
62 |
foreach ($resparray as $resp) { |
foreach ($resparray as $resp) { |
63 |
$keyvalue = explode(':', $resp); |
// obtiens le clef et la valeur |
64 |
|
$keyvalue = explode(':', $resp); |
65 |
|
// stocke le clef et la valeur dans le $result |
66 |
$result[trim(str_replace('"', '',$keyvalue[0]))] = |
$result[trim(str_replace('"', '',$keyvalue[0]))] = |
67 |
trim(str_replace('"', '', $keyvalue[1])); |
trim(str_replace('"', '', $keyvalue[1])); |
68 |
} |
} |
69 |
} |
} |
70 |
|
// retourne le tableau associative |
71 |
return $result; |
return $result; |
72 |
} |
} |
73 |
|
|
74 |
|
|
75 |
|
/** |
76 |
|
* Utilise curl pour faire POST a la service/messages d'ERP, et |
77 |
|
* log le resultat dans le fichier de log |
78 |
|
* @param string $msg Objet de la classe AMQPMessage |
79 |
|
*/ |
80 |
function postViaCurl($msg) { |
function postViaCurl($msg) { |
81 |
$data = $msg->body; |
$data = $msg->body; |
82 |
|
|
83 |
global $ERP_URL_MESSAGES; |
global $ERP_URL_MESSAGES; |
84 |
|
// si l'url de resource services/messages d'ERP est vide, ne traite pas |
85 |
|
// le message recu |
86 |
if (empty($ERP_URL_MESSAGES)) { |
if (empty($ERP_URL_MESSAGES)) { |
87 |
logger::instance()->cleanLog(); |
logger::instance()->cleanLog(); |
88 |
logger::instance()->log("Le URL de destination manque", EXTRA_VERBOSE_MODE); |
logger::instance()->log("Le URL de destination manque", EXTRA_VERBOSE_MODE); |
90 |
return; |
return; |
91 |
} |
} |
92 |
|
|
93 |
|
// cree une resource curl et set les options pour l'envoi d'un POST |
94 |
$curl_resource = curl_init($ERP_URL_MESSAGES); |
$curl_resource = curl_init($ERP_URL_MESSAGES); |
95 |
|
// set la methode d'envoi, POST |
96 |
curl_setopt($curl_resource, CURLOPT_CUSTOMREQUEST, "POST"); |
curl_setopt($curl_resource, CURLOPT_CUSTOMREQUEST, "POST"); |
97 |
|
// set le message a envoyer |
98 |
curl_setopt($curl_resource, CURLOPT_POSTFIELDS, $msg->body); |
curl_setopt($curl_resource, CURLOPT_POSTFIELDS, $msg->body); |
99 |
|
// une reponse est attendue |
100 |
curl_setopt($curl_resource, CURLOPT_RETURNTRANSFER, true); |
curl_setopt($curl_resource, CURLOPT_RETURNTRANSFER, true); |
101 |
|
// set le header |
102 |
curl_setopt($curl_resource, CURLOPT_HTTPHEADER, array( |
curl_setopt($curl_resource, CURLOPT_HTTPHEADER, array( |
103 |
'Content-Type: application/json', |
'Content-Type: application/json', |
104 |
'Content-Length: ' . strlen($msg->body)) |
'Content-Length: ' . strlen($msg->body)) |
105 |
); |
); |
106 |
|
|
107 |
|
// execute la requete curl et obtiens le resultat de la requete |
108 |
$result = curl_exec($curl_resource); |
$result = curl_exec($curl_resource); |
109 |
|
// ferme la resource curl |
110 |
curl_close($curl_resource); |
curl_close($curl_resource); |
111 |
|
|
112 |
|
|
113 |
// le resultat recu par curl est une chaine des caracteres qui contient |
// le resultat recu par curl est une chaine des caracteres qui contient |
114 |
// un tableau |
// un tableau JSON, donc appelle parse pour obtenir un tableau associative |
115 |
$result = parse($result); |
$result = parse($result); |
116 |
|
|
117 |
// log le retour |
// log le retour de curl dans le fichier de log |
118 |
logger::instance()->cleanLog(); |
logger::instance()->cleanLog(); |
119 |
logger::instance()->log($result['http_code_message'] . ': ' . |
logger::instance()->log(date('d/m/Y H:i'). ': '. |
120 |
|
$result['http_code_message'] . ' ' . |
121 |
$result['message'].' Message: '. |
$result['message'].' Message: '. |
122 |
$msg->body, EXTRA_VERBOSE_MODE); |
$msg->body, EXTRA_VERBOSE_MODE); |
123 |
logger::instance()->writeLogToFile(); |
logger::instance()->writeLogToFile(); |
124 |
|
|
125 |
|
// si le resultat du curl est OK, fais une accuse d'acception pour le message |
126 |
if ($result['http_code'] == HTTP_RESULT_OK) { |
if ($result['http_code'] == HTTP_RESULT_OK) { |
127 |
// accuse de reception du message |
// accuse de reception du message |
128 |
$msg->delivery_info['channel']-> |
$msg->delivery_info['channel']-> |
129 |
basic_ack($msg->delivery_info['delivery_tag']); |
basic_ack($msg->delivery_info['delivery_tag']); |
130 |
} else { |
} else { |
131 |
|
// le resultat de l'execution de curl indique un erreur |
132 |
|
// dors un peu avant de refuser le message |
133 |
sleep(2); |
sleep(2); |
134 |
// accuse de reception du message |
// refuse le message |
135 |
$msg->delivery_info['channel']->basic_reject( |
$msg->delivery_info['channel']->basic_reject( |
136 |
$msg->delivery_info['delivery_tag'], true); |
$msg->delivery_info['delivery_tag'], true); |
137 |
|
|
138 |
} |
} |
139 |
// Envoie un message avec la chaine "quit" pour canceler le consumeur. |
// envoie un message avec la chaine "quit" pour ce consumeur |
140 |
if ($msg->body === 'quit') { |
if ($msg->body === 'quit') { |
141 |
$msg->delivery_info['channel']-> |
$msg->delivery_info['channel']-> |
142 |
basic_cancel($msg->delivery_info['consumer_tag']); |
basic_cancel($msg->delivery_info['consumer_tag']); |
146 |
|
|
147 |
|
|
148 |
|
|
149 |
// pour des besoins de logging il faut setter $_SERVER["REQUEST_URI"] |
// pour des besoins de log il faut set $_SERVER["REQUEST_URI"] |
150 |
if (!isset($_SERVER["REQUEST_URI"])) { |
if (!isset($_SERVER["REQUEST_URI"])) { |
151 |
$_SERVER["REQUEST_URI"] = __FILE__; |
$_SERVER["REQUEST_URI"] = __FILE__; |
152 |
} |
} |
163 |
exit; |
exit; |
164 |
} |
} |
165 |
|
|
166 |
|
// un exchange fait le routage des messages vers les piles |
167 |
$exchange = 'router'; |
$exchange = 'router'; |
168 |
|
// on va utiliser la pile $ERP_QUEUE |
169 |
$queue = $ERP_QUEUE; |
$queue = $ERP_QUEUE; |
170 |
|
// l'identifiant unique de consumeur ; il est utilise par RabbitMQ |
171 |
$consumer_tag = 'consumer'.$suffix; |
$consumer_tag = 'consumer'.$suffix; |
172 |
|
|
173 |
|
// etablis une connexion avec RabbitMQ instance |
174 |
$conn = new AMQPConnection($ERP_CONNECTION_HOST, $ERP_CONNECTIOn_PORT, |
$conn = new AMQPConnection($ERP_CONNECTION_HOST, $ERP_CONNECTIOn_PORT, |
175 |
$ERP_CONNECTION_USER, $ERP_CONNECTION_PASSWORD, |
$ERP_CONNECTION_USER, $ERP_CONNECTION_PASSWORD, |
176 |
$ERP_CONNECTION_VHOST); |
$ERP_CONNECTION_VHOST); |
177 |
|
|
178 |
|
// obtiens un canal de la connexion |
179 |
$ch = $conn->channel(); |
$ch = $conn->channel(); |
180 |
|
// si le canal n'est pas disponible arretes l'execution |
181 |
if (is_null($ch)) { |
if (is_null($ch)) { |
182 |
// probleme avec la connexion, donc log le fait |
// probleme avec la connexion, log le fait et arrete l'execution |
183 |
logger::instance()->log('Probleme dans l\'etablisement de la connexion', |
logger::instance()->log('Probleme dans l\'etablisement de la connexion', |
184 |
EXTRA_VERBOSE_MODE); |
EXTRA_VERBOSE_MODE); |
185 |
logger::instance()->writeLogToFile(); |
logger::instance()->writeLogToFile(); |
186 |
|
exit; |
187 |
} |
} |
188 |
|
|
189 |
|
// declare une pile (la creer s'il faut) |
190 |
$ch->queue_declare($queue, false, true, false, false); |
$ch->queue_declare($queue, false, true, false, false); |
191 |
|
|
192 |
|
// declare un exchange qui va diriger les messages |
193 |
$ch->exchange_declare($exchange, 'direct', false, true, false); |
$ch->exchange_declare($exchange, 'direct', false, true, false); |
194 |
|
|
195 |
|
// relier la pile d'ERP au exchange |
196 |
$ch->queue_bind($queue, $exchange); |
$ch->queue_bind($queue, $exchange); |
197 |
|
|
198 |
|
// registre le callback pour ce consumer aupres de RabbitMQ |
199 |
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'postViaCurl'); |
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'postViaCurl'); |
200 |
|
|
201 |
function shutdown($ch, $conn) { |
// loop as long as the channel has callbacks registered |
|
$ch->close(); |
|
|
$conn->close(); |
|
|
} |
|
|
register_shutdown_function('shutdown', $ch, $conn); |
|
|
|
|
|
// Loop as long as the channel has callbacks registered |
|
202 |
while (count($ch->callbacks)) { |
while (count($ch->callbacks)) { |
203 |
$ch->wait(); |
$ch->wait(); |
204 |
} |
} |