Minor mods to playout via avformat and miracle unit generation on an xfer
[melted] / src / miracle / miracle_server.c
1 /*
2 * miracle_server.c -- DV Server
3 * Copyright (C) 2002-2003 Ushodaya Enterprises Limited
4 * Author: Charles Yates <charles.yates@pandora.be>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software Foundation,
18 * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19 */
20
21 /* System header files */
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <signal.h>
26
27 #include <fcntl.h>
28 #include <pthread.h>
29 #include <signal.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32
33 #include <string.h>
34 #include <netinet/in.h>
35 #include <netdb.h>
36 #include <errno.h>
37 #include <arpa/inet.h>
38
39 /* Application header files */
40 #include "miracle_server.h"
41 #include "miracle_connection.h"
42 #include "miracle_local.h"
43 #include "miracle_log.h"
44 #include "miracle_commands.h"
45 #include <valerie/valerie_remote.h>
46 #include <valerie/valerie_tokeniser.h>
47
48 #define VERSION "0.0.1"
49
50 static void miracle_command_received( mlt_listener listener, mlt_properties owner, miracle_server this, void **args )
51 {
52 if ( listener != NULL )
53 listener( owner, this, ( valerie_response ** )args[ 0 ], ( char * )args[ 1 ] );
54 }
55
56 static void miracle_doc_received( mlt_listener listener, mlt_properties owner, miracle_server this, void **args )
57 {
58 if ( listener != NULL )
59 listener( owner, this, ( valerie_response ** )args[ 0 ], ( char * )args[ 1 ], ( char * )args[ 2 ] );
60 }
61
62 static void miracle_push_received( mlt_listener listener, mlt_properties owner, miracle_server this, void **args )
63 {
64 if ( listener != NULL )
65 listener( owner, this, ( valerie_response ** )args[ 0 ], ( char * )args[ 1 ], ( mlt_service )args[ 2 ] );
66 }
67
68 /** Initialise a server structure.
69 */
70
71 miracle_server miracle_server_init( char *id )
72 {
73 miracle_server server = malloc( sizeof( miracle_server_t ) );
74 if ( server != NULL )
75 memset( server, 0, sizeof( miracle_server_t ) );
76 if ( server != NULL && mlt_properties_init( &server->parent, server ) == 0 )
77 {
78 server->id = id;
79 server->port = DEFAULT_TCP_PORT;
80 server->socket = -1;
81 server->shutdown = 1;
82 mlt_events_init( &server->parent );
83 mlt_events_register( &server->parent, "command-received", ( mlt_transmitter )miracle_command_received );
84 mlt_events_register( &server->parent, "doc-received", ( mlt_transmitter )miracle_doc_received );
85 mlt_events_register( &server->parent, "push-received", ( mlt_transmitter )miracle_push_received );
86 }
87 return server;
88 }
89
90 const char *miracle_server_id( miracle_server server )
91 {
92 return server != NULL && server->id != NULL ? server->id : "miracle";
93 }
94
95 void miracle_server_set_config( miracle_server server, char *config )
96 {
97 if ( server != NULL )
98 {
99 free( server->config );
100 server->config = config != NULL ? strdup( config ) : NULL;
101 }
102 }
103
104 /** Set the port of the server.
105 */
106
107 void miracle_server_set_port( miracle_server server, int port )
108 {
109 server->port = port;
110 }
111
112 void miracle_server_set_proxy( miracle_server server, char *proxy )
113 {
114 valerie_tokeniser tokeniser = valerie_tokeniser_init( );
115 server->proxy = 1;
116 server->remote_port = DEFAULT_TCP_PORT;
117 valerie_tokeniser_parse_new( tokeniser, proxy, ":" );
118 strcpy( server->remote_server, valerie_tokeniser_get_string( tokeniser, 0 ) );
119 if ( valerie_tokeniser_count( tokeniser ) == 2 )
120 server->remote_port = atoi( valerie_tokeniser_get_string( tokeniser, 1 ) );
121 valerie_tokeniser_close( tokeniser );
122 }
123
124 /** Wait for a connection.
125 */
126
127 static int miracle_server_wait_for_connect( miracle_server server )
128 {
129 struct timeval tv;
130 fd_set rfds;
131
132 /* Wait for a 1 second. */
133 tv.tv_sec = 1;
134 tv.tv_usec = 0;
135
136 FD_ZERO( &rfds );
137 FD_SET( server->socket, &rfds );
138
139 return select( server->socket + 1, &rfds, NULL, NULL, &tv);
140 }
141
142 /** Run the server thread.
143 */
144
145 static void *miracle_server_run( void *arg )
146 {
147 miracle_server server = arg;
148 pthread_t cmd_parse_info;
149 connection_t *tmp = NULL;
150 pthread_attr_t thread_attributes;
151 int socksize;
152
153 socksize = sizeof( struct sockaddr );
154
155 miracle_log( LOG_NOTICE, "%s version %s listening on port %i", server->id, VERSION, server->port );
156
157 /* Create the initial thread. We want all threads to be created detached so
158 their resources get freed automatically. (CY: ... hmmph...) */
159 pthread_attr_init( &thread_attributes );
160 pthread_attr_setdetachstate( &thread_attributes, PTHREAD_CREATE_DETACHED );
161 pthread_attr_init( &thread_attributes );
162 pthread_attr_setinheritsched( &thread_attributes, PTHREAD_INHERIT_SCHED );
163 /* pthread_attr_setschedpolicy( &thread_attributes, SCHED_RR ); */
164
165 while ( !server->shutdown )
166 {
167 /* Wait for a new connection. */
168 if ( miracle_server_wait_for_connect( server ) )
169 {
170 /* Create a new block of data to hold a copy of the incoming connection for
171 our server thread. The thread should free this when it terminates. */
172
173 tmp = (connection_t*) malloc( sizeof(connection_t) );
174 tmp->owner = &server->parent;
175 tmp->parser = server->parser;
176 tmp->fd = accept( server->socket, (struct sockaddr*) &(tmp->sin), &socksize );
177
178 /* Pass the connection to a parser thread :-/ */
179 if ( tmp->fd != -1 )
180 pthread_create( &cmd_parse_info, &thread_attributes, parser_thread, tmp );
181 }
182 }
183
184 miracle_log( LOG_NOTICE, "%s version %s server terminated.", server->id, VERSION );
185
186 return NULL;
187 }
188
189 /** Execute the server thread.
190 */
191
192 int miracle_server_execute( miracle_server server )
193 {
194 int error = 0;
195 valerie_response response = NULL;
196 int index = 0;
197 struct sockaddr_in ServerAddr;
198 int flag = 1;
199
200 server->shutdown = 0;
201
202 ServerAddr.sin_family = AF_INET;
203 ServerAddr.sin_port = htons( server->port );
204 ServerAddr.sin_addr.s_addr = INADDR_ANY;
205
206 /* Create socket, and bind to port. Listen there. Backlog = 5
207 should be sufficient for listen (). */
208 server->socket = socket( AF_INET, SOCK_STREAM, 0 );
209
210 if ( server->socket == -1 )
211 {
212 server->shutdown = 1;
213 perror( "socket" );
214 miracle_log( LOG_ERR, "%s unable to create socket.", server->id );
215 return -1;
216 }
217
218 setsockopt( server->socket, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof( int ) );
219
220 if ( bind( server->socket, (struct sockaddr *) &ServerAddr, sizeof (ServerAddr) ) != 0 )
221 {
222 server->shutdown = 1;
223 perror( "bind" );
224 miracle_log( LOG_ERR, "%s unable to bind to port %d.", server->id, server->port );
225 return -1;
226 }
227
228 if ( listen( server->socket, 5 ) != 0 )
229 {
230 server->shutdown = 1;
231 perror( "listen" );
232 miracle_log( LOG_ERR, "%s unable to listen on port %d.", server->id, server->port );
233 return -1;
234 }
235
236 fcntl( server->socket, F_SETFL, O_NONBLOCK );
237
238 if ( !server->proxy )
239 {
240 miracle_log( LOG_NOTICE, "Starting server on %d.", server->port );
241 server->parser = miracle_parser_init_local( );
242 }
243 else
244 {
245 miracle_log( LOG_NOTICE, "Starting proxy for %s:%d on %d.", server->remote_server, server->remote_port, server->port );
246 server->parser = valerie_parser_init_remote( server->remote_server, server->remote_port );
247 }
248
249 response = valerie_parser_connect( server->parser );
250
251 if ( response != NULL && valerie_response_get_error_code( response ) == 100 )
252 {
253 /* read configuration file */
254 if ( response != NULL && !server->proxy && server->config != NULL )
255 {
256 valerie_response_close( response );
257 response = valerie_parser_run( server->parser, server->config );
258
259 if ( valerie_response_count( response ) > 1 )
260 {
261 if ( valerie_response_get_error_code( response ) > 299 )
262 miracle_log( LOG_ERR, "Error evaluating server configuration. Processing stopped." );
263 for ( index = 0; index < valerie_response_count( response ); index ++ )
264 miracle_log( LOG_DEBUG, "%4d: %s", index, valerie_response_get_line( response, index ) );
265 }
266 }
267
268 if ( response != NULL )
269 {
270 pthread_attr_t attr;
271 int result;
272 pthread_attr_init( &attr );
273 pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
274 pthread_attr_setinheritsched( &attr, PTHREAD_EXPLICIT_SCHED );
275 pthread_attr_setschedpolicy( &attr, SCHED_FIFO );
276 pthread_attr_setscope( &attr, PTHREAD_SCOPE_SYSTEM );
277 valerie_response_close( response );
278 result = pthread_create( &server->thread, &attr, miracle_server_run, server );
279 if ( result )
280 {
281 miracle_log( LOG_WARNING, "Failed to schedule realtime (%s)", strerror(errno) );
282 pthread_attr_setschedpolicy( &attr, SCHED_OTHER );
283 result = pthread_create( &server->thread, &attr, miracle_server_run, server );
284 if ( result )
285 {
286 miracle_log( LOG_CRIT, "Failed to launch TCP listener thread" );
287 error = -1;
288 }
289 }
290 }
291 }
292 else
293 {
294 miracle_log( LOG_ERR, "Error connecting to parser. Processing stopped." );
295 server->shutdown = 1;
296 error = -1;
297 }
298
299 return error;
300 }
301
302 /** Fetch a units properties
303 */
304
305 mlt_properties miracle_server_fetch_unit( miracle_server server, int index )
306 {
307 miracle_unit unit = miracle_get_unit( index );
308 return unit != NULL ? unit->properties : NULL;
309 }
310
311 /** Shutdown the server.
312 */
313
314 void miracle_server_shutdown( miracle_server server )
315 {
316 if ( server != NULL && !server->shutdown )
317 {
318 server->shutdown = 1;
319 pthread_join( server->thread, NULL );
320 miracle_server_set_config( server, NULL );
321 valerie_parser_close( server->parser );
322 server->parser = NULL;
323 close( server->socket );
324 }
325 }
326
327 /** Close the server.
328 */
329
330 void miracle_server_close( miracle_server server )
331 {
332 if ( server != NULL && mlt_properties_dec_ref( &server->parent ) <= 0 )
333 {
334 mlt_properties_close( &server->parent );
335 miracle_server_shutdown( server );
336 free( server );
337 }
338 }